// The class' name explains itself well. // // FIXME: Do we need TSan annotations here? classSpinlock { public: voidlock()noexcept{ // Here we try to grab the lock first before failing back to TTAS. // // If the lock is not contend, this fast-path should be quicker. // If the lock is contended and we have to fail back to slow TTAS, this // single try shouldn't add too much overhead. // // What's more, by keeping this method small, chances are higher that this // method get inlined by the compiler. if (FLARE_LIKELY(try_lock())) { return; }
// copy from folly, but replace thread::yield with FLARE_CPU_RELAX // https://github.com/facebook/folly/blob/main/folly/synchronization/RWSpinLock.h /* * A simple, small (4-bytes), but unfair rwlock. Use it when you want * a nice writer and don't expect a lot of write/read contention, or * when you need small rwlocks since you are creating a large number * of them. * * Note that the unfairness here is extreme: if the lock is * continually accessed for read, writers will never get a chance. If * the lock can be that highly contended this class is probably not an * ideal choice anyway. * * It currently implements most of the Lockable, SharedLockable and * UpgradeLockable concepts except the TimedLockable related locking/unlocking * interfaces. */ classRWSpinLock { enum : int32_t { READER = 4, UPGRADED = 2, WRITER = 1 };
// Lockable Concept voidlock(){ while (!FLARE_LIKELY(try_lock())) { FLARE_CPU_RELAX(); } }
// Writer is responsible for clearing up both the UPGRADED and WRITER bits. voidunlock(){ static_assert(READER > WRITER + UPGRADED, "wrong bits!"); bits_.fetch_and(~(WRITER | UPGRADED), std::memory_order_release); }
// SharedLockable Concept voidlock_shared(){ while (!FLARE_LIKELY(try_lock_shared())) { FLARE_CPU_RELAX(); } }
// Downgrade the lock from writer status to reader status. voidunlock_and_lock_shared(){ bits_.fetch_add(READER, std::memory_order_acquire); unlock(); }
// UpgradeLockable Concept voidlock_upgrade(){ while (!try_lock_upgrade()) { FLARE_CPU_RELAX(); } }
// write unlock and upgrade lock atomically voidunlock_and_lock_upgrade(){ // need to do it in two steps here -- as the UPGRADED bit might be OR-ed at // the same time when other threads are trying do try_lock_upgrade(). bits_.fetch_or(UPGRADED, std::memory_order_acquire); bits_.fetch_add(-WRITER, std::memory_order_release); }
// Attempt to acquire writer permission. Return false if we didn't get it. booltry_lock(){ int32_t expect = 0; return bits_.compare_exchange_strong(expect, WRITER, std::memory_order_acq_rel); }
// Try to get reader permission on the lock. This can fail if we // find out someone is a writer or upgrader. // Setting the UPGRADED bit would allow a writer-to-be to indicate // its intention to write and block any new readers while waiting // for existing readers to finish and release their read locks. This // helps avoid starving writers (promoted from upgraders). booltry_lock_shared(){ // fetch_add is considerably (100%) faster than compare_exchange, // so here we are optimizing for the common (lock success) case. int32_t value = bits_.fetch_add(READER, std::memory_order_acquire); if (FLARE_UNLIKELY(value & (WRITER | UPGRADED))) { bits_.fetch_add(-READER, std::memory_order_release); returnfalse; } returntrue; }
// try to unlock upgrade and write lock atomically booltry_unlock_upgrade_and_lock(){ int32_t expect = UPGRADED; return bits_.compare_exchange_strong(expect, WRITER, std::memory_order_acq_rel); }
// try to acquire an upgradable lock. booltry_lock_upgrade(){ int32_t value = bits_.fetch_or(UPGRADED, std::memory_order_acquire);
// Note: when failed, we cannot flip the UPGRADED bit back, // as in this case there is either another upgrade lock or a write lock. // If it's a write lock, the bit will get cleared up when that lock's done // with unlock(). return ((value & (UPGRADED | WRITER)) == 0); }
// mainly for debugging purposes. int32_tbits()const{ return bits_.load(std::memory_order_acquire); }
// It's locked, take the slow path. std::unique_lock splk(slow_path_lock_);
// Tell the owner that we're waiting for the lock. if (count_.fetch_add(1, std::memory_order_acquire) == 0) { // The owner released the lock before we incremented `count_`. // // We're still kind of lucky. return; }
// Bad luck then. First we add us to the wait chain. auto current = GetCurrentFiberEntity(); std::unique_lock slk(current->scheduler_lock); WaitBlock wb = {.waiter = current}; FLARE_CHECK(impl_.AddWaiter(&wb)); // This can't fail as we never call // `SetPersistentAwakened()`.
// Now the slow path lock can be unlocked. // // Indeed it's possible that we're awakened even before we call `Halt()`, // but this issue is already addressed by `scheduler_lock` (which we're // holding). splk.unlock();
// Wait until we're woken by `unlock()`. // // Given that `scheduler_lock` is held by us, anyone else who concurrently // tries to wake us up is blocking on it until `Halt()` has completed. // Hence no race here. current->scheduling_group->Halt(current, std::move(slk));
// Lock's owner has awakened us up, the lock is in our hand then. FLARE_DCHECK(!impl_.TryRemoveWaiter(&wb)); return; }
voidSchedulingGroup::Halt( FiberEntity* self, std::unique_lock<Spinlock>&& scheduler_lock)noexcept{ if (!self->need_halt) { // 这个分支可以忽略,只有在整个sg都stop了,才会进入这个分支。 self->need_halt = true; if (scheduler_lock) { scheduler_lock.unlock(); } return; }
FLARE_CHECK_EQ(self, GetCurrentFiberEntity(), "`self` must be pointer to caller's `FiberEntity`."); FLARE_CHECK( scheduler_lock.owns_lock(), "Scheduler lock must be held by caller prior to call to this method."); FLARE_CHECK( self->state == FiberState::Running, "`Halt()` is only for running fiber's use. If you want to `ReadyFiber()` " "yourself and `Halt()`, what you really need is `Yield()`."); auto master = GetMasterFiberEntity(); self->state = FiberState::Waiting; // 从running转到wait阶段
// We simply yield to master fiber for now. // // TODO(luobogao): We can directly yield to next ready fiber. This way we can // eliminate a context switch. // // Note that we need to hold `scheduler_lock` until we finished context swap. // Otherwise if we're in ready queue, we can be resume again even before we // stopped running. This will be disastrous. // // Do NOT pass `scheduler_lock` ('s pointer)` to cb. Instead, play with raw // lock. // // The reason is that, `std::unique_lock<...>::unlock` itself is not an atomic // operation (although `Spinlock` is). // // What this means is that, after unlocking the scheduler lock, and the fiber // starts to run again, `std::unique_lock<...>::owns_lock` does not // necessarily be updated in time (before the fiber checks it), which can lead // to subtle bugs. master->ResumeOn( [self_lock = scheduler_lock.release()]() { self_lock->unlock(); }); // 切到master fiber,在切之前,完成本fiber的调度锁解锁
// When we're back, we should be in the same fiber. FLARE_CHECK_EQ(self, GetCurrentFiberEntity()); }
if (auto was = count_.fetch_sub(1, std::memory_order_release); was == 1) { // Lucky day, no one is waiting on the mutex. // // Nothing to do. } else { FLARE_CHECK_GT(was, 1);
// We need this lock so as to see a consistent state between `count_` and // `impl_` ('s internal wait queue). std::unique_lock splk(slow_path_lock_); auto fiber = impl_.WakeOne(); FLARE_CHECK(fiber); // Otherwise `was` must be 1 (as there's no waiter). splk.unlock(); fiber->scheduling_group->ReadyFiber(fiber, std::unique_lock(fiber->scheduler_lock), true); } }
template <classLockType, classF> voidwait(std::unique_lock<LockType> &lock, F &&pred){ FLARE_DCHECK(IsFiberContextPresent());
while (!std::forward<F>(pred)()) { wait(lock); } FLARE_DCHECK(lock.owns_lock()); }
// You can always assume this method returns as a result of `notify_xxx` even // if it can actually results from timing out. This is conformant behavior -- // it's just a spurious wake up in latter case. // // Returns `false` on timeout. template <classLockType> boolwait_until(std::unique_lock<LockType> &lock, std::chrono::steady_clock::time_point expires_at);
template <classLockType, classF> boolwait_until(std::unique_lock<LockType> &lk, std::chrono::steady_clock::time_point timeout, F &&pred){ FLARE_DCHECK(IsFiberContextPresent());
while (!std::forward<F>(pred)()) { wait_until(lk, timeout); if (ReadSteadyClock() >= timeout) { return std::forward<F>(pred)(); } } FLARE_DCHECK(lk.owns_lock()); returntrue; }
auto current = GetCurrentFiberEntity(); auto sg = current->scheduling_group; bool use_timeout = expires_at != std::chrono::steady_clock::time_point::max(); DelayedInit<AsyncWaker> awaker;
// Add us to the wait queue. std::unique_lock slk(current->scheduler_lock); WaitBlock wb = {.waiter = current}; FLARE_CHECK(impl_.AddWaiter(&wb)); if (use_timeout) { // Set a timeout if needed. ... }
// Release user's lock. lock.unlock();
// Block until being waken up by either `notify_xxx` or the timer. sg->Halt(current, std::move(slk)); // `slk` is released by `Halt()`.
... // Grab the lock again and return. lock.lock(); return !timeout; }
将本fiber加入到等待队列,然后切到主fiber上(通过halt),挂起本fiber。
再看wait 带上predict的实现:
1 2 3 4 5 6 7 8 9 10
template <classLockType, classF> voidwait(std::unique_lock<LockType> &lock, F &&pred){ FLARE_DCHECK(IsFiberContextPresent());
while (!std::forward<F>(pred)()) { wait(lock); } FLARE_DCHECK(lock.owns_lock()); }
template <classMutex, classConditionVariable, std::ptrdiff_t kLeastMaxValue> void BasicCountingSemaphore<Mutex, ConditionVariable, kLeastMaxValue>::release( std::ptrdiff_t count) { // Count should be less than LeastMaxValue and greater than 0 FLARE_CHECK_LE(count, kLeastMaxValue); FLARE_CHECK_GT(count, 0); std::scoped_lock lk(lock_); // Internal counter should be less than LeastMaxValue FLARE_CHECK_LE(current_, kLeastMaxValue - count); current_ += count; if (count == 1) { cv_.notify_one(); } else { cv_.notify_all(); } }
资源归还count个,如果归还数为1,则唤醒一个fiber,否则唤醒所有。
为什么这里需要唤醒所有,有一部分再被唤醒后有会陷入等待,而不是循环一个个notify?
在ConditionVar的notify_all中有一段注释:
1 2 3 4 5
// We cannot keep calling `notify_one` here. If a waiter immediately goes to // sleep again after we wake up it, it's possible that we wake it again when // we try to drain the wait chain. // // So we remove all waiters first, and schedule them then.