Fiber::Fiber(const Attributes& attr, Function<void()>&& start) { // Choose a scheduling group for running this fiber. auto sg = GetSchedulingGroup(attr.scheduling_group_attr, attr.scheduling_group); FLARE_CHECK(sg, "No scheduling group is available?"); if (attr.execution_context) { // Caller specified an execution context, so we should wrap `start` to run // in the execution context. // // `ec` holds a reference to `attr.execution_context`, it's released when // `start` returns. start = [start = std::move(start), ec = RefPtr(ref_ptr, attr.execution_context)] { ec->Execute(start); }; } // `desc` will cease to exist once `start` returns. We don't own it. auto desc = fiber::detail::NewFiberDesc(); desc->worker = fiber::detail::GetFiberWorker(attr.scheduling_group, attr.fiber_worker); desc->start_proc = std::move(start); desc->scheduling_group_local = attr.scheduling_group_local; desc->system_fiber = attr.system_fiber;
// If `join()` is called, we'll sleep on this. desc->exit_barrier = object_pool::GetRefCounted<fiber::detail::ExitBarrier>(); join_impl_ = desc->exit_barrier;
// Schedule the fiber. if (attr.launch_policy == fiber::Launch::Post) { sg->StartFiber(desc); } else { sg->SwitchTo(fiber::detail::GetCurrentFiberEntity(), fiber::detail::InstantiateFiberEntity(sg, desc)); } }
if (id == Fiber::kUnspecifiedSchedulingGroup) { // unspecified, get a random one return fiber::detail::GetSchedulingGroup( Random<std::size_t>(0, fiber::GetSchedulingGroupCount() - 1)); }
// specify other id, less likely to happen return fiber::detail::GetSchedulingGroup(id); }
case Fiber::SchedulingGroupAttr::ATTR_BACKGROUND: { if (FLARE_LIKELY(id == Fiber::kNearestSchedulingGroup)) { // specify nearest return fiber::detail::NearestSchedulingGroup(true); }
if (id == Fiber::kUnspecifiedSchedulingGroup) { // unspecified, get a random one return fiber::detail::GetBgSchedulingGroup( Random<std::size_t>(0, fiber::GetBgSchedulingGroupCount() - 1)); }
// specify other id, less likely to happen return fiber::detail::GetBgSchedulingGroup(id); }
case Fiber::SchedulingGroupAttr::ATTR_UNSPECIFIED: { // unspecified, check if the id is set to a special value if (id == Fiber::kBackgroundSchedulingGroup) { // id is set to be background return fiber::detail::GetBgSchedulingGroup( Random<std::size_t>(0, fiber::GetBgSchedulingGroupCount() - 1)); }
// go with a random one in foreground return fiber::detail::GetSchedulingGroup( Random<std::size_t>(0, fiber::GetSchedulingGroupCount() - 1)); }
// by default, return the nearest one in foreground groups default: return fiber::detail::NearestSchedulingGroup(); } }
// Get scheduling group "nearest" to the calling thread. // // - If calling thread is a fiber worker, it's current scheduling group is // returned. // // - Otherwise if NUMA aware is enabled, a randomly chosen scheduling group in // the same node is returned. // // - If no scheduling group is initialized in current node, or NUMA aware is not // enabled, a randomly chosen one is returned. // // - If no scheduling group is initialize at all, `nullptr` is returned instead. inline SchedulingGroup* NearestSchedulingGroup(constbool background = false){ FLARE_INTERNAL_TLS_MODEL thread_local SchedulingGroup* nearest{}; if (FLARE_LIKELY(nearest)) { return nearest; } returnNearestSchedulingGroupSlow(&nearest, background); }
SchedulingGroup* NearestSchedulingGroupSlow(SchedulingGroup** cache, constbool background){ if (auto rc = SchedulingGroup::Current()) { // Only if we indeed belong to the scheduling group (in which case the // "nearest" scheduling group never changes) we fill the cache. *cache = rc; return rc; }
// We don't pay for overhead of initialize `next` unless we're not in running // fiber worker. FLARE_INTERNAL_TLS_MODEL thread_local std::size_t next = Random();
// This structure stores information describing how to instantiate a // `FiberEntity`. The instantiation is deferred to first run of the fiber. // // This approach should help performance since: // // - Reduced memory footprint: We don't need to allocate a stack until actual // run. // // - Alleviated producer-consumer effect: The fiber stack is allocated in fiber // worker, where most (exited) fiber' stack is freed. This promotes more // thread-local-level reuse. If we keep allocating stack from thread X and // consume it in thread Y, we'd have a hard time in transfering fiber stack // between them (mostly because we can't afford a big transfer-size to avoid // excessive memory footprint.). structalignas(hardware_destructive_interference_size) FiberDesc : RunnableEntity { Function<void()> start_proc; RefPtr<ExitBarrier> exit_barrier; std::uint64_t last_ready_tsc; // std::uint32_t fiber_worker; bool scheduling_group_local; bool system_fiber;
FiberDesc(); };
fiber的ctor中:
1 2 3 4 5 6 7
// `desc` will cease to exist once `start` returns. We don't own it. auto desc = fiber::detail::NewFiberDesc(); desc->worker = fiber::detail::GetFiberWorker(attr.scheduling_group, attr.fiber_worker); // 是否指定fiber worker来run, 如果没有绑定,则丢到公共队列,由后续的sg中的其他worker来run desc->start_proc = std::move(start); desc->scheduling_group_local = attr.scheduling_group_local; // 是否支持在不同的sg中运行 desc->system_fiber = attr.system_fiber; // system fiber 是fiber框架内部用的fiber
// We need scheduler lock here actually (at least to comfort TSan). But so // long as this check does not fiber, we're safe without the lock I think. FLARE_CHECK(to->state == FiberState::Ready, "Fiber `to` is not in ready state."); FLARE_CHECK_NE(self, to, "Switch to yourself results in U.B.");
// TODO(luobogao): Ensure neither `self->scheduler_lock` nor // `to->scheduler_lock` is currrently held (by someone else).
// We delay queuing `self` to run queue until `to` starts to run. // // It's possible that we first add `self` to run queue with its scheduler lock // locked, and unlock the lock when `to` runs. However, if `self` is grabbed // by some worker prior `to` starts to run, the worker will spin to wait for // `to`. This can be quite costly. to->ResumeOn([this, self]() { ReadyFiber(self, std::unique_lock(self->scheduler_lock), false); // 重点关注这行, 切换到目标fiber后,将原来的fiber重新入队 });
// When we're back, we should be in the same fiber. FLARE_CHECK_EQ(self, GetCurrentFiberEntity()); }
voidSchedulingGroup::ReadyFiber( // 入队 FiberEntity* fiber, std::unique_lock<Spinlock>&& scheduler_lock, bool normal_prio)noexcept{ FLARE_DCHECK_NE(fiber, GetMasterFiberEntity(), "Master fiber should not be added to run queue."); fiber->state = FiberState::Ready; fiber->scheduling_group = this; fiber->last_ready_tsc = ReadTsc(); if (scheduler_lock) { // TODO(zhangxingrui): 解锁 scheduler_lock.unlock(); } if (FLAGS_flare_fiber_schedule_policy_mode == SCHEDULE_BALANCE_POLICY || fiber->worker == nullptr) { QueueRunnableEntity(fiber, fiber->scheduling_group_local); // 重新入队 } else { SG_WORKER_PRIVATE_QUEUE_COUNTER_INCR; fiber->worker->QueueRunnableEntity(fiber, normal_prio); } }
voidFiberEntity::ResumeOn(Function<void()>&& cb)noexcept{ auto caller = GetCurrentFiberEntity(); FLARE_CHECK(!resume_proc, "You may not call `ResumeOn` on a fiber twice (before the first " "one has executed)."); FLARE_CHECK_NE(caller, this, "Calling `Resume()` on self is undefined.");
// This pending call will be performed and cleared immediately when we // switched to `*this` fiber (before calling user's continuation). resume_proc = std::move(cb); Resume(); }
inlinevoidFiberEntity::Resume()noexcept{ // Note that there are some inconsistencies. The stack we're running on is not // our stack. This should be easy to see, since we're actually running in // caller's context (including its stack). auto caller = GetCurrentFiberEntity(); FLARE_DCHECK_NE(caller, this, "Calling `Resume()` on self is undefined.");
// Argument `context` (i.e., `this`) is only used the first time the context // is jumped to (in `FiberProc`). jump_context(&caller->state_save_area, state_save_area, this); // 换fiber // ... SetCurrentFiberEntity(caller); // The caller has back.
// Check for pending `ResumeOn`. DestructiveRunCallbackOpt(&caller->resume_proc); }
// Entry point for newly-started fibers. // // NOT put into anonymous namespace to simplify its displayed name in GDB. // // `extern "C"` doesn't help, unfortunately. (It does simplify mangled name, // though.) // // Do NOT mark this function as `noexcept`. We don't want to force stack being // unwound on exception. staticvoidFiberProc(void* context){ auto self = reinterpret_cast<FiberEntity*>(context); // We're running in `self`'s stack now.
// Hmmm, there is a pending resumption callback, even if we haven't completely // started.. // // We'll run it anyway. This, for now, is mostly used for `Dispatch` fiber // launch policy. DestructiveRunCallbackOpt(&self->resume_proc); // 入队原来的fiber DestructiveRunCallback(&self->start_proc); // 开始跑本start
// This fiber should not be waiting on anything (mutex / condition_variable // / ...), i.e., no one else should be referring this fiber (referring to its // `exit_barrier` is, since it's ref-counted, no problem), otherwise it's a // programming mistake.
// Let's see if there will be someone who will be waiting on us. if (!self->exit_barrier) { // Mark the fiber as dead. This prevent our GDB plugin from listing this // fiber out. self->state = FiberState::Dead;
// No one is waiting for us, this is easy. GetMasterFiberEntity()->ResumeOn([self] { FreeFiberEntity(self); }); } else { ... } FLARE_CHECK(0); // Can't be here. }
重新入队原fiber, 并开始跑自己的proc:
1 2 3 4 5 6 7
// Hmmm, there is a pending resumption callback, even if we haven't completely // started.. // // We'll run it anyway. This, for now, is mostly used for `Dispatch` fiber // launch policy. DestructiveRunCallbackOpt(&self->resume_proc); // 入队原来的fiber DestructiveRunCallback(&self->start_proc); // 开始跑本fiber start
完成后,释放本fiber memory:
1 2
// No one is waiting for us, this is easy. GetMasterFiberEntity()->ResumeOn([self] { FreeFiberEntity(self); }); // 切换到MasterFiberEntity 后,释放本fiber