tokio/runtime/scheduler/multi_thread_alt/
worker.rs

1//! A scheduler is initialized with a fixed number of workers. Each worker is
2//! driven by a thread. Each worker has a "core" which contains data such as the
3//! run queue and other state. When `block_in_place` is called, the worker's
4//! "core" is handed off to a new thread allowing the scheduler to continue to
5//! make progress while the originating thread blocks.
6//!
7//! # Shutdown
8//!
9//! Shutting down the runtime involves the following steps:
10//!
11//!  1. The Shared::close method is called. This closes the inject queue and
12//!     `OwnedTasks` instance and wakes up all worker threads.
13//!
14//!  2. Each worker thread observes the close signal next time it runs
15//!     Core::maintenance by checking whether the inject queue is closed.
16//!     The `Core::is_shutdown` flag is set to true.
17//!
18//!  3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19//!     will keep removing tasks from `OwnedTasks` until it is empty. No new
20//!     tasks can be pushed to the `OwnedTasks` during or after this step as it
21//!     was closed in step 1.
22//!
23//!  5. The workers call Shared::shutdown to enter the single-threaded phase of
24//!     shutdown. These calls will push their core to `Shared::shutdown_cores`,
25//!     and the last thread to push its core will finish the shutdown procedure.
26//!
27//!  6. The local run queue of each core is emptied, then the inject queue is
28//!     emptied.
29//!
30//! At this point, shutdown has completed. It is not possible for any of the
31//! collections to contain any tasks at this point, as each collection was
32//! closed first, then emptied afterwards.
33//!
34//! ## Spawns during shutdown
35//!
36//! When spawning tasks during shutdown, there are two cases:
37//!
38//!  * The spawner observes the `OwnedTasks` being open, and the inject queue is
39//!    closed.
40//!  * The spawner observes the `OwnedTasks` being closed and doesn't check the
41//!    inject queue.
42//!
43//! The first case can only happen if the `OwnedTasks::bind` call happens before
44//! or during step 1 of shutdown. In this case, the runtime will clean up the
45//! task in step 3 of shutdown.
46//!
47//! In the latter case, the task was not spawned and the task is immediately
48//! cancelled by the spawner.
49//!
50//! The correctness of shutdown requires both the inject queue and `OwnedTasks`
51//! collection to have a closed bit. With a close bit on only the inject queue,
52//! spawning could run in to a situation where a task is successfully bound long
53//! after the runtime has shut down. With a close bit on only the `OwnedTasks`,
54//! the first spawning situation could result in the notification being pushed
55//! to the inject queue after step 6 of shutdown, which would leave a task in
56//! the inject queue indefinitely. This would be a ref-count cycle and a memory
57//! leak.
58
59use crate::loom::sync::{Arc, Condvar, Mutex, MutexGuard};
60use crate::runtime;
61use crate::runtime::driver::Driver;
62use crate::runtime::scheduler::multi_thread_alt::{
63    idle, queue, stats, Counters, Handle, Idle, Overflow, Stats, TraceStatus,
64};
65use crate::runtime::scheduler::{self, inject, Lock};
66use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks};
67use crate::runtime::{blocking, coop, driver, task, Config, SchedulerMetrics, WorkerMetrics};
68use crate::runtime::{context, TaskHooks};
69use crate::util::atomic_cell::AtomicCell;
70use crate::util::rand::{FastRand, RngSeedGenerator};
71
72use std::cell::{Cell, RefCell};
73use std::task::Waker;
74use std::time::Duration;
75use std::{cmp, thread};
76
77cfg_unstable_metrics! {
78    mod metrics;
79}
80
81mod taskdump_mock;
82
83/// A scheduler worker
84///
85/// Data is stack-allocated and never migrates threads
86pub(super) struct Worker {
87    /// Used to schedule bookkeeping tasks every so often.
88    tick: u32,
89
90    /// True if the scheduler is being shutdown
91    pub(super) is_shutdown: bool,
92
93    /// True if the scheduler is being traced
94    is_traced: bool,
95
96    /// Counter used to track when to poll from the local queue vs. the
97    /// injection queue
98    num_seq_local_queue_polls: u32,
99
100    /// How often to check the global queue
101    global_queue_interval: u32,
102
103    /// Used to collect a list of workers to notify
104    workers_to_notify: Vec<usize>,
105
106    /// Snapshot of idle core list. This helps speedup stealing
107    idle_snapshot: idle::Snapshot,
108
109    stats: stats::Ephemeral,
110}
111
112/// Core data
113///
114/// Data is heap-allocated and migrates threads.
115#[repr(align(128))]
116pub(super) struct Core {
117    /// Index holding this core's remote/shared state.
118    pub(super) index: usize,
119
120    lifo_slot: Option<Notified>,
121
122    /// The worker-local run queue.
123    run_queue: queue::Local<Arc<Handle>>,
124
125    /// True if the worker is currently searching for more work. Searching
126    /// involves attempting to steal from other workers.
127    pub(super) is_searching: bool,
128
129    /// Per-worker runtime stats
130    stats: Stats,
131
132    /// Fast random number generator.
133    rand: FastRand,
134}
135
136/// State shared across all workers
137pub(crate) struct Shared {
138    /// Per-core remote state.
139    remotes: Box<[Remote]>,
140
141    /// Global task queue used for:
142    ///  1. Submit work to the scheduler while **not** currently on a worker thread.
143    ///  2. Submit work to the scheduler when a worker run queue is saturated
144    pub(super) inject: inject::Shared<Arc<Handle>>,
145
146    /// Coordinates idle workers
147    idle: Idle,
148
149    /// Collection of all active tasks spawned onto this executor.
150    pub(super) owned: OwnedTasks<Arc<Handle>>,
151
152    /// Data synchronized by the scheduler mutex
153    pub(super) synced: Mutex<Synced>,
154
155    /// Power's Tokio's I/O, timers, etc... the responsibility of polling the
156    /// driver is shared across workers.
157    driver: AtomicCell<Driver>,
158
159    /// Condition variables used to unblock worker threads. Each worker thread
160    /// has its own `condvar` it waits on.
161    pub(super) condvars: Vec<Condvar>,
162
163    /// The number of cores that have observed the trace signal.
164    pub(super) trace_status: TraceStatus,
165
166    /// Scheduler configuration options
167    config: Config,
168
169    /// Collects metrics from the runtime.
170    pub(super) scheduler_metrics: SchedulerMetrics,
171
172    pub(super) worker_metrics: Box<[WorkerMetrics]>,
173
174    /// Only held to trigger some code on drop. This is used to get internal
175    /// runtime metrics that can be useful when doing performance
176    /// investigations. This does nothing (empty struct, no drop impl) unless
177    /// the `tokio_internal_mt_counters` `cfg` flag is set.
178    _counters: Counters,
179}
180
181/// Data synchronized by the scheduler mutex
182pub(crate) struct Synced {
183    /// When worker is notified, it is assigned a core. The core is placed here
184    /// until the worker wakes up to take it.
185    pub(super) assigned_cores: Vec<Option<Box<Core>>>,
186
187    /// Cores that have observed the shutdown signal
188    ///
189    /// The core is **not** placed back in the worker to avoid it from being
190    /// stolen by a thread that was spawned as part of `block_in_place`.
191    shutdown_cores: Vec<Box<Core>>,
192
193    /// The driver goes here when shutting down
194    shutdown_driver: Option<Box<Driver>>,
195
196    /// Synchronized state for `Idle`.
197    pub(super) idle: idle::Synced,
198
199    /// Synchronized state for `Inject`.
200    pub(crate) inject: inject::Synced,
201}
202
203/// Used to communicate with a worker from other threads.
204struct Remote {
205    /// When a task is scheduled from a worker, it is stored in this slot. The
206    /// worker will check this slot for a task **before** checking the run
207    /// queue. This effectively results in the **last** scheduled task to be run
208    /// next (LIFO). This is an optimization for improving locality which
209    /// benefits message passing patterns and helps to reduce latency.
210    // lifo_slot: Lifo,
211
212    /// Steals tasks from this worker.
213    pub(super) steal: queue::Steal<Arc<Handle>>,
214}
215
216/// Thread-local context
217pub(crate) struct Context {
218    // Current scheduler's handle
219    handle: Arc<Handle>,
220
221    /// Worker index
222    index: usize,
223
224    /// True when the LIFO slot is enabled
225    lifo_enabled: Cell<bool>,
226
227    /// Core data
228    core: RefCell<Option<Box<Core>>>,
229
230    /// Used to pass cores to other threads when `block_in_place` is called
231    handoff_core: Arc<AtomicCell<Core>>,
232
233    /// Tasks to wake after resource drivers are polled. This is mostly to
234    /// handle yielded tasks.
235    pub(crate) defer: RefCell<Vec<Notified>>,
236}
237
238/// Running a task may consume the core. If the core is still available when
239/// running the task completes, it is returned. Otherwise, the worker will need
240/// to stop processing.
241type RunResult = Result<Box<Core>, ()>;
242type NextTaskResult = Result<(Option<Notified>, Box<Core>), ()>;
243
244/// A task handle
245type Task = task::Task<Arc<Handle>>;
246
247/// A notified task handle
248type Notified = task::Notified<Arc<Handle>>;
249
250/// Value picked out of thin-air. Running the LIFO slot a handful of times
251/// seems sufficient to benefit from locality. More than 3 times probably is
252/// overweighing. The value can be tuned in the future with data that shows
253/// improvements.
254const MAX_LIFO_POLLS_PER_TICK: usize = 3;
255
256pub(super) fn create(
257    num_cores: usize,
258    driver: Driver,
259    driver_handle: driver::Handle,
260    blocking_spawner: blocking::Spawner,
261    seed_generator: RngSeedGenerator,
262    config: Config,
263) -> runtime::Handle {
264    let mut num_workers = num_cores;
265
266    // If the driver is enabled, we need an extra thread to handle polling the
267    // driver when all cores are busy.
268    if driver.is_enabled() {
269        num_workers += 1;
270    }
271
272    let mut cores = Vec::with_capacity(num_cores);
273    let mut remotes = Vec::with_capacity(num_cores);
274    // Worker metrics are actually core based
275    let mut worker_metrics = Vec::with_capacity(num_cores);
276
277    // Create the local queues
278    for i in 0..num_cores {
279        let (steal, run_queue) = queue::local(config.local_queue_capacity);
280
281        let metrics = WorkerMetrics::from_config(&config);
282        let stats = Stats::new(&metrics);
283
284        cores.push(Box::new(Core {
285            index: i,
286            lifo_slot: None,
287            run_queue,
288            is_searching: false,
289            stats,
290            rand: FastRand::from_seed(config.seed_generator.next_seed()),
291        }));
292
293        remotes.push(Remote {
294            steal,
295            // lifo_slot: Lifo::new(),
296        });
297        worker_metrics.push(metrics);
298    }
299
300    // Allocate num-cores + 1 workers, so one worker can handle the I/O driver,
301    // if needed.
302    let (idle, idle_synced) = Idle::new(cores, num_workers);
303    let (inject, inject_synced) = inject::Shared::new();
304
305    let handle = Arc::new(Handle {
306        task_hooks: TaskHooks::from_config(&config),
307        shared: Shared {
308            remotes: remotes.into_boxed_slice(),
309            inject,
310            idle,
311            owned: OwnedTasks::new(num_cores),
312            synced: Mutex::new(Synced {
313                assigned_cores: (0..num_workers).map(|_| None).collect(),
314                shutdown_cores: Vec::with_capacity(num_cores),
315                shutdown_driver: None,
316                idle: idle_synced,
317                inject: inject_synced,
318            }),
319            driver: AtomicCell::new(Some(Box::new(driver))),
320            condvars: (0..num_workers).map(|_| Condvar::new()).collect(),
321            trace_status: TraceStatus::new(num_cores),
322            config,
323            scheduler_metrics: SchedulerMetrics::new(),
324            worker_metrics: worker_metrics.into_boxed_slice(),
325            _counters: Counters,
326        },
327        driver: driver_handle,
328        blocking_spawner,
329        seed_generator,
330    });
331
332    let rt_handle = runtime::Handle {
333        inner: scheduler::Handle::MultiThreadAlt(handle),
334    };
335
336    // Eagerly start worker threads
337    for index in 0..num_workers {
338        let handle = rt_handle.inner.expect_multi_thread_alt();
339        let h2 = handle.clone();
340        let handoff_core = Arc::new(AtomicCell::new(None));
341
342        handle
343            .blocking_spawner
344            .spawn_blocking(&rt_handle, move || run(index, h2, handoff_core, false));
345    }
346
347    rt_handle
348}
349
350#[track_caller]
351pub(crate) fn block_in_place<F, R>(f: F) -> R
352where
353    F: FnOnce() -> R,
354{
355    // Try to steal the worker core back
356    struct Reset(coop::Budget);
357
358    impl Drop for Reset {
359        fn drop(&mut self) {
360            with_current(|maybe_cx| {
361                if let Some(cx) = maybe_cx {
362                    let core = cx.handoff_core.take();
363                    let mut cx_core = cx.core.borrow_mut();
364                    assert!(cx_core.is_none());
365                    *cx_core = core;
366
367                    // Reset the task budget as we are re-entering the
368                    // runtime.
369                    coop::set(self.0);
370                }
371            });
372        }
373    }
374
375    let mut had_entered = false;
376
377    let setup_result = with_current(|maybe_cx| {
378        match (
379            crate::runtime::context::current_enter_context(),
380            maybe_cx.is_some(),
381        ) {
382            (context::EnterRuntime::Entered { .. }, true) => {
383                // We are on a thread pool runtime thread, so we just need to
384                // set up blocking.
385                had_entered = true;
386            }
387            (
388                context::EnterRuntime::Entered {
389                    allow_block_in_place,
390                },
391                false,
392            ) => {
393                // We are on an executor, but _not_ on the thread pool.  That is
394                // _only_ okay if we are in a thread pool runtime's block_on
395                // method:
396                if allow_block_in_place {
397                    had_entered = true;
398                    return Ok(());
399                } else {
400                    // This probably means we are on the current_thread runtime or in a
401                    // LocalSet, where it is _not_ okay to block.
402                    return Err(
403                        "can call blocking only when running on the multi-threaded runtime",
404                    );
405                }
406            }
407            (context::EnterRuntime::NotEntered, true) => {
408                // This is a nested call to block_in_place (we already exited).
409                // All the necessary setup has already been done.
410                return Ok(());
411            }
412            (context::EnterRuntime::NotEntered, false) => {
413                // We are outside of the tokio runtime, so blocking is fine.
414                // We can also skip all of the thread pool blocking setup steps.
415                return Ok(());
416            }
417        }
418
419        let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
420
421        // Get the worker core. If none is set, then blocking is fine!
422        let core = match cx.core.borrow_mut().take() {
423            Some(core) => core,
424            None => return Ok(()),
425        };
426
427        // In order to block, the core must be sent to another thread for
428        // execution.
429        //
430        // First, move the core back into the worker's shared core slot.
431        cx.handoff_core.set(core);
432
433        // Next, clone the worker handle and send it to a new thread for
434        // processing.
435        //
436        // Once the blocking task is done executing, we will attempt to
437        // steal the core back.
438        let index = cx.index;
439        let handle = cx.handle.clone();
440        let handoff_core = cx.handoff_core.clone();
441        runtime::spawn_blocking(move || run(index, handle, handoff_core, true));
442        Ok(())
443    });
444
445    if let Err(panic_message) = setup_result {
446        panic!("{}", panic_message);
447    }
448
449    if had_entered {
450        // Unset the current task's budget. Blocking sections are not
451        // constrained by task budgets.
452        let _reset = Reset(coop::stop());
453
454        crate::runtime::context::exit_runtime(f)
455    } else {
456        f()
457    }
458}
459
460fn run(
461    index: usize,
462    handle: Arc<Handle>,
463    handoff_core: Arc<AtomicCell<Core>>,
464    blocking_in_place: bool,
465) {
466    struct AbortOnPanic;
467
468    impl Drop for AbortOnPanic {
469        fn drop(&mut self) {
470            if std::thread::panicking() {
471                eprintln!("worker thread panicking; aborting process");
472                std::process::abort();
473            }
474        }
475    }
476
477    // Catching panics on worker threads in tests is quite tricky. Instead, when
478    // debug assertions are enabled, we just abort the process.
479    #[cfg(debug_assertions)]
480    let _abort_on_panic = AbortOnPanic;
481
482    let num_workers = handle.shared.condvars.len();
483
484    let mut worker = Worker {
485        tick: 0,
486        num_seq_local_queue_polls: 0,
487        global_queue_interval: Stats::DEFAULT_GLOBAL_QUEUE_INTERVAL,
488        is_shutdown: false,
489        is_traced: false,
490        workers_to_notify: Vec::with_capacity(num_workers - 1),
491        idle_snapshot: idle::Snapshot::new(&handle.shared.idle),
492        stats: stats::Ephemeral::new(),
493    };
494
495    let sched_handle = scheduler::Handle::MultiThreadAlt(handle.clone());
496
497    crate::runtime::context::enter_runtime(&sched_handle, true, |_| {
498        // Set the worker context.
499        let cx = scheduler::Context::MultiThreadAlt(Context {
500            index,
501            lifo_enabled: Cell::new(!handle.shared.config.disable_lifo_slot),
502            handle,
503            core: RefCell::new(None),
504            handoff_core,
505            defer: RefCell::new(Vec::with_capacity(64)),
506        });
507
508        context::set_scheduler(&cx, || {
509            let cx = cx.expect_multi_thread_alt();
510
511            // Run the worker
512            let res = worker.run(&cx, blocking_in_place);
513            // `err` here signifies the core was lost, this is an expected end
514            // state for a worker.
515            debug_assert!(res.is_err());
516
517            // Check if there are any deferred tasks to notify. This can happen when
518            // the worker core is lost due to `block_in_place()` being called from
519            // within the task.
520            if !cx.defer.borrow().is_empty() {
521                worker.schedule_deferred_without_core(&cx, &mut cx.shared().synced.lock());
522            }
523        });
524    });
525}
526
527macro_rules! try_task {
528    ($e:expr) => {{
529        let (task, core) = $e?;
530        if task.is_some() {
531            return Ok((task, core));
532        }
533        core
534    }};
535}
536
537macro_rules! try_task_new_batch {
538    ($w:expr, $e:expr) => {{
539        let (task, mut core) = $e?;
540        if task.is_some() {
541            core.stats.start_processing_scheduled_tasks(&mut $w.stats);
542            return Ok((task, core));
543        }
544        core
545    }};
546}
547
548impl Worker {
549    fn run(&mut self, cx: &Context, blocking_in_place: bool) -> RunResult {
550        let (maybe_task, mut core) = {
551            if blocking_in_place {
552                if let Some(core) = cx.handoff_core.take() {
553                    (None, core)
554                } else {
555                    // Just shutdown
556                    return Err(());
557                }
558            } else {
559                let mut synced = cx.shared().synced.lock();
560
561                // First try to acquire an available core
562                if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
563                    // Try to poll a task from the global queue
564                    let maybe_task = cx.shared().next_remote_task_synced(&mut synced);
565                    (maybe_task, core)
566                } else {
567                    // block the thread to wait for a core to be assigned to us
568                    self.wait_for_core(cx, synced)?
569                }
570            }
571        };
572
573        cx.shared().worker_metrics[core.index].set_thread_id(thread::current().id());
574        core.stats.start_processing_scheduled_tasks(&mut self.stats);
575
576        if let Some(task) = maybe_task {
577            core = self.run_task(cx, core, task)?;
578        }
579
580        while !self.is_shutdown {
581            let (maybe_task, c) = self.next_task(cx, core)?;
582            core = c;
583
584            if let Some(task) = maybe_task {
585                core = self.run_task(cx, core, task)?;
586            } else {
587                // The only reason to get `None` from `next_task` is we have
588                // entered the shutdown phase.
589                assert!(self.is_shutdown);
590                break;
591            }
592        }
593
594        cx.shared().shutdown_core(&cx.handle, core);
595
596        // It is possible that tasks wake others during drop, so we need to
597        // clear the defer list.
598        self.shutdown_clear_defer(cx);
599
600        Err(())
601    }
602
603    // Try to acquire an available core, but do not block the thread
604    fn try_acquire_available_core(
605        &mut self,
606        cx: &Context,
607        synced: &mut Synced,
608    ) -> Option<Box<Core>> {
609        if let Some(mut core) = cx
610            .shared()
611            .idle
612            .try_acquire_available_core(&mut synced.idle)
613        {
614            self.reset_acquired_core(cx, synced, &mut core);
615            Some(core)
616        } else {
617            None
618        }
619    }
620
621    // Block the current thread, waiting for an available core
622    fn wait_for_core(
623        &mut self,
624        cx: &Context,
625        mut synced: MutexGuard<'_, Synced>,
626    ) -> NextTaskResult {
627        if cx.shared().idle.needs_searching() {
628            if let Some(mut core) = self.try_acquire_available_core(cx, &mut synced) {
629                cx.shared().idle.transition_worker_to_searching(&mut core);
630                return Ok((None, core));
631            }
632        }
633
634        cx.shared()
635            .idle
636            .transition_worker_to_parked(&mut synced, cx.index);
637
638        // Wait until a core is available, then exit the loop.
639        let mut core = loop {
640            if let Some(core) = synced.assigned_cores[cx.index].take() {
641                break core;
642            }
643
644            // If shutting down, abort
645            if cx.shared().inject.is_closed(&synced.inject) {
646                self.shutdown_clear_defer(cx);
647                return Err(());
648            }
649
650            synced = cx.shared().condvars[cx.index].wait(synced).unwrap();
651        };
652
653        self.reset_acquired_core(cx, &mut synced, &mut core);
654
655        if self.is_shutdown {
656            // Currently shutting down, don't do any more work
657            return Ok((None, core));
658        }
659
660        let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
661        let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n);
662
663        core.stats.unparked();
664        self.flush_metrics(cx, &mut core);
665
666        Ok((maybe_task, core))
667    }
668
669    /// Ensure core's state is set correctly for the worker to start using.
670    fn reset_acquired_core(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core) {
671        self.global_queue_interval = core.stats.tuned_global_queue_interval(&cx.shared().config);
672
673        // Reset `lifo_enabled` here in case the core was previously stolen from
674        // a task that had the LIFO slot disabled.
675        self.reset_lifo_enabled(cx);
676
677        // At this point, the local queue should be empty
678        #[cfg(not(loom))]
679        debug_assert!(core.run_queue.is_empty());
680
681        // Update shutdown state while locked
682        self.update_global_flags(cx, synced);
683    }
684
685    /// Finds the next task to run, this could be from a queue or stealing. If
686    /// none are available, the thread sleeps and tries again.
687    fn next_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
688        self.assert_lifo_enabled_is_correct(cx);
689
690        if self.is_traced {
691            core = cx.handle.trace_core(core);
692        }
693
694        // Increment the tick
695        self.tick = self.tick.wrapping_add(1);
696
697        // Runs maintenance every so often. When maintenance is run, the
698        // driver is checked, which may result in a task being found.
699        core = try_task!(self.maybe_maintenance(&cx, core));
700
701        // Check the LIFO slot, local run queue, and the injection queue for
702        // a notified task.
703        core = try_task!(self.next_notified_task(cx, core));
704
705        // We consumed all work in the queues and will start searching for work.
706        core.stats.end_processing_scheduled_tasks(&mut self.stats);
707
708        super::counters::inc_num_no_local_work();
709
710        if !cx.defer.borrow().is_empty() {
711            // We are deferring tasks, so poll the resource driver and schedule
712            // the deferred tasks.
713            try_task_new_batch!(self, self.park_yield(cx, core));
714
715            panic!("what happened to the deferred tasks? 🤔");
716        }
717
718        while !self.is_shutdown {
719            // Search for more work, this involves trying to poll the resource
720            // driver, steal from other workers, and check the global queue
721            // again.
722            core = try_task_new_batch!(self, self.search_for_work(cx, core));
723
724            debug_assert!(cx.defer.borrow().is_empty());
725            core = try_task_new_batch!(self, self.park(cx, core));
726        }
727
728        // Shutting down, drop any deferred tasks
729        self.shutdown_clear_defer(cx);
730
731        Ok((None, core))
732    }
733
734    fn next_notified_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
735        self.num_seq_local_queue_polls += 1;
736
737        if self.num_seq_local_queue_polls % self.global_queue_interval == 0 {
738            super::counters::inc_global_queue_interval();
739
740            self.num_seq_local_queue_polls = 0;
741
742            // Update the global queue interval, if needed
743            self.tune_global_queue_interval(cx, &mut core);
744
745            if let Some(task) = self.next_remote_task(cx) {
746                return Ok((Some(task), core));
747            }
748        }
749
750        if let Some(task) = core.next_local_task() {
751            return Ok((Some(task), core));
752        }
753
754        self.next_remote_task_batch(cx, core)
755    }
756
757    fn next_remote_task(&self, cx: &Context) -> Option<Notified> {
758        if cx.shared().inject.is_empty() {
759            return None;
760        }
761
762        let mut synced = cx.shared().synced.lock();
763        cx.shared().next_remote_task_synced(&mut synced)
764    }
765
766    fn next_remote_task_batch(&self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
767        if cx.shared().inject.is_empty() {
768            return Ok((None, core));
769        }
770
771        // Other threads can only **remove** tasks from the current worker's
772        // `run_queue`. So, we can be confident that by the time we call
773        // `run_queue.push_back` below, there will be *at least* `cap`
774        // available slots in the queue.
775        let cap = usize::min(
776            core.run_queue.remaining_slots(),
777            usize::max(core.run_queue.max_capacity() / 2, 1),
778        );
779
780        let mut synced = cx.shared().synced.lock();
781        let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, cap);
782        Ok((maybe_task, core))
783    }
784
785    fn next_remote_task_batch_synced(
786        &self,
787        cx: &Context,
788        synced: &mut Synced,
789        core: &mut Core,
790        max: usize,
791    ) -> Option<Notified> {
792        super::counters::inc_num_remote_batch();
793
794        // The worker is currently idle, pull a batch of work from the
795        // injection queue. We don't want to pull *all* the work so other
796        // workers can also get some.
797        let n = if core.is_searching {
798            cx.shared().inject.len() / cx.shared().idle.num_searching() + 1
799        } else {
800            cx.shared().inject.len() / cx.shared().remotes.len() + 1
801        };
802
803        let n = usize::min(n, max) + 1;
804
805        // safety: passing in the correct `inject::Synced`.
806        let mut tasks = unsafe { cx.shared().inject.pop_n(&mut synced.inject, n) };
807
808        // Pop the first task to return immediately
809        let ret = tasks.next();
810
811        // Push the rest of the on the run queue
812        core.run_queue.push_back(tasks);
813
814        ret
815    }
816
817    /// Function responsible for stealing tasks from another worker
818    ///
819    /// Note: Only if less than half the workers are searching for tasks to steal
820    /// a new worker will actually try to steal. The idea is to make sure not all
821    /// workers will be trying to steal at the same time.
822    fn search_for_work(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
823        #[cfg(not(loom))]
824        const ROUNDS: usize = 4;
825
826        #[cfg(loom)]
827        const ROUNDS: usize = 1;
828
829        debug_assert!(core.lifo_slot.is_none());
830        #[cfg(not(loom))]
831        debug_assert!(core.run_queue.is_empty());
832
833        if !core.run_queue.can_steal() {
834            return Ok((None, core));
835        }
836
837        if !self.transition_to_searching(cx, &mut core) {
838            return Ok((None, core));
839        }
840
841        // core = try_task!(self, self.poll_driver(cx, core));
842
843        // Get a snapshot of which workers are idle
844        cx.shared().idle.snapshot(&mut self.idle_snapshot);
845
846        let num = cx.shared().remotes.len();
847
848        for i in 0..ROUNDS {
849            // Start from a random worker
850            let start = core.rand.fastrand_n(num as u32) as usize;
851
852            if let Some(task) = self.steal_one_round(cx, &mut core, start) {
853                return Ok((Some(task), core));
854            }
855
856            core = try_task!(self.next_remote_task_batch(cx, core));
857
858            if i > 0 {
859                super::counters::inc_num_spin_stall();
860                std::thread::sleep(std::time::Duration::from_micros(i as u64));
861            }
862        }
863
864        Ok((None, core))
865    }
866
867    fn steal_one_round(&self, cx: &Context, core: &mut Core, start: usize) -> Option<Notified> {
868        let num = cx.shared().remotes.len();
869
870        for i in 0..num {
871            let i = (start + i) % num;
872
873            // Don't steal from ourself! We know we don't have work.
874            if i == core.index {
875                continue;
876            }
877
878            // If the core is currently idle, then there is nothing to steal.
879            if self.idle_snapshot.is_idle(i) {
880                continue;
881            }
882
883            let target = &cx.shared().remotes[i];
884
885            if let Some(task) = target
886                .steal
887                .steal_into(&mut core.run_queue, &mut core.stats)
888            {
889                return Some(task);
890            }
891        }
892
893        None
894    }
895
896    fn run_task(&mut self, cx: &Context, mut core: Box<Core>, task: Notified) -> RunResult {
897        let task = cx.shared().owned.assert_owner(task);
898
899        // Make sure the worker is not in the **searching** state. This enables
900        // another idle worker to try to steal work.
901        if self.transition_from_searching(cx, &mut core) {
902            super::counters::inc_num_relay_search();
903            cx.shared().notify_parked_local();
904        }
905
906        self.assert_lifo_enabled_is_correct(cx);
907
908        // Measure the poll start time. Note that we may end up polling other
909        // tasks under this measurement. In this case, the tasks came from the
910        // LIFO slot and are considered part of the current task for scheduling
911        // purposes. These tasks inherent the "parent"'s limits.
912        core.stats.start_poll(&mut self.stats);
913
914        // Make the core available to the runtime context
915        *cx.core.borrow_mut() = Some(core);
916
917        // Run the task
918        coop::budget(|| {
919            super::counters::inc_num_polls();
920            task.run();
921            let mut lifo_polls = 0;
922
923            // As long as there is budget remaining and a task exists in the
924            // `lifo_slot`, then keep running.
925            loop {
926                // Check if we still have the core. If not, the core was stolen
927                // by another worker.
928                let mut core = match cx.core.borrow_mut().take() {
929                    Some(core) => core,
930                    None => {
931                        // In this case, we cannot call `reset_lifo_enabled()`
932                        // because the core was stolen. The stealer will handle
933                        // that at the top of `Context::run`
934                        return Err(());
935                    }
936                };
937
938                // Check for a task in the LIFO slot
939                let task = match core.next_lifo_task() {
940                    Some(task) => task,
941                    None => {
942                        self.reset_lifo_enabled(cx);
943                        core.stats.end_poll();
944                        return Ok(core);
945                    }
946                };
947
948                if !coop::has_budget_remaining() {
949                    core.stats.end_poll();
950
951                    // Not enough budget left to run the LIFO task, push it to
952                    // the back of the queue and return.
953                    core.run_queue
954                        .push_back_or_overflow(task, cx.shared(), &mut core.stats);
955                    // If we hit this point, the LIFO slot should be enabled.
956                    // There is no need to reset it.
957                    debug_assert!(cx.lifo_enabled.get());
958                    return Ok(core);
959                }
960
961                // Track that we are about to run a task from the LIFO slot.
962                lifo_polls += 1;
963                super::counters::inc_lifo_schedules();
964
965                // Disable the LIFO slot if we reach our limit
966                //
967                // In ping-ping style workloads where task A notifies task B,
968                // which notifies task A again, continuously prioritizing the
969                // LIFO slot can cause starvation as these two tasks will
970                // repeatedly schedule the other. To mitigate this, we limit the
971                // number of times the LIFO slot is prioritized.
972                if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
973                    cx.lifo_enabled.set(false);
974                    super::counters::inc_lifo_capped();
975                }
976
977                // Run the LIFO task, then loop
978                *cx.core.borrow_mut() = Some(core);
979                let task = cx.shared().owned.assert_owner(task);
980                super::counters::inc_num_lifo_polls();
981                task.run();
982            }
983        })
984    }
985
986    fn schedule_deferred_with_core<'a>(
987        &mut self,
988        cx: &'a Context,
989        mut core: Box<Core>,
990        synced: impl FnOnce() -> MutexGuard<'a, Synced>,
991    ) -> NextTaskResult {
992        let mut defer = cx.defer.borrow_mut();
993
994        // Grab a task to run next
995        let task = defer.pop();
996
997        if task.is_none() {
998            return Ok((None, core));
999        }
1000
1001        if !defer.is_empty() {
1002            let mut synced = synced();
1003
1004            // Number of tasks we want to try to spread across idle workers
1005            let num_fanout = cmp::min(defer.len(), cx.shared().idle.num_idle(&synced.idle));
1006
1007            // Cap the number of threads woken up at one time. This is to limit
1008            // the number of no-op wakes and reduce mutext contention.
1009            //
1010            // This number was picked after some basic benchmarks, but it can
1011            // probably be tuned using the mean poll time value (slower task
1012            // polls can leverage more woken workers).
1013            let num_fanout = cmp::min(2, num_fanout);
1014
1015            if num_fanout > 0 {
1016                cx.shared()
1017                    .push_remote_task_batch_synced(&mut synced, defer.drain(..num_fanout));
1018
1019                cx.shared()
1020                    .idle
1021                    .notify_mult(&mut synced, &mut self.workers_to_notify, num_fanout);
1022            }
1023
1024            // Do not run the task while holding the lock...
1025            drop(synced);
1026        }
1027
1028        // Notify any workers
1029        for worker in self.workers_to_notify.drain(..) {
1030            cx.shared().condvars[worker].notify_one()
1031        }
1032
1033        if !defer.is_empty() {
1034            // Push the rest of the tasks on the local queue
1035            for task in defer.drain(..) {
1036                core.run_queue
1037                    .push_back_or_overflow(task, cx.shared(), &mut core.stats);
1038            }
1039
1040            cx.shared().notify_parked_local();
1041        }
1042
1043        Ok((task, core))
1044    }
1045
1046    fn schedule_deferred_without_core<'a>(&mut self, cx: &Context, synced: &mut Synced) {
1047        let mut defer = cx.defer.borrow_mut();
1048        let num = defer.len();
1049
1050        if num > 0 {
1051            // Push all tasks to the injection queue
1052            cx.shared()
1053                .push_remote_task_batch_synced(synced, defer.drain(..));
1054
1055            debug_assert!(self.workers_to_notify.is_empty());
1056
1057            // Notify workers
1058            cx.shared()
1059                .idle
1060                .notify_mult(synced, &mut self.workers_to_notify, num);
1061
1062            // Notify any workers
1063            for worker in self.workers_to_notify.drain(..) {
1064                cx.shared().condvars[worker].notify_one()
1065            }
1066        }
1067    }
1068
1069    fn maybe_maintenance(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1070        if self.tick % cx.shared().config.event_interval == 0 {
1071            super::counters::inc_num_maintenance();
1072
1073            core.stats.end_processing_scheduled_tasks(&mut self.stats);
1074
1075            // Run regularly scheduled maintenance
1076            core = try_task_new_batch!(self, self.park_yield(cx, core));
1077
1078            core.stats.start_processing_scheduled_tasks(&mut self.stats);
1079        }
1080
1081        Ok((None, core))
1082    }
1083
1084    fn flush_metrics(&self, cx: &Context, core: &mut Core) {
1085        core.stats.submit(&cx.shared().worker_metrics[core.index]);
1086    }
1087
1088    fn update_global_flags(&mut self, cx: &Context, synced: &mut Synced) {
1089        if !self.is_shutdown {
1090            self.is_shutdown = cx.shared().inject.is_closed(&synced.inject);
1091        }
1092
1093        if !self.is_traced {
1094            self.is_traced = cx.shared().trace_status.trace_requested();
1095        }
1096    }
1097
1098    fn park_yield(&mut self, cx: &Context, core: Box<Core>) -> NextTaskResult {
1099        // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
1100        // to run without actually putting the thread to sleep.
1101        if let Some(mut driver) = cx.shared().driver.take() {
1102            driver.park_timeout(&cx.handle.driver, Duration::from_millis(0));
1103
1104            cx.shared().driver.set(driver);
1105        }
1106
1107        // If there are more I/O events, schedule them.
1108        let (maybe_task, mut core) =
1109            self.schedule_deferred_with_core(cx, core, || cx.shared().synced.lock())?;
1110
1111        self.flush_metrics(cx, &mut core);
1112        self.update_global_flags(cx, &mut cx.shared().synced.lock());
1113
1114        Ok((maybe_task, core))
1115    }
1116
1117    /*
1118    fn poll_driver(&mut self, cx: &Context, core: Box<Core>) -> NextTaskResult {
1119        // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
1120        // to run without actually putting the thread to sleep.
1121        if let Some(mut driver) = cx.shared().driver.take() {
1122            driver.park_timeout(&cx.handle.driver, Duration::from_millis(0));
1123
1124            cx.shared().driver.set(driver);
1125
1126            // If there are more I/O events, schedule them.
1127            self.schedule_deferred_with_core(cx, core, || cx.shared().synced.lock())
1128        } else {
1129            Ok((None, core))
1130        }
1131    }
1132    */
1133
1134    fn park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1135        if let Some(f) = &cx.shared().config.before_park {
1136            f();
1137        }
1138
1139        if self.can_transition_to_parked(&mut core) {
1140            debug_assert!(!self.is_shutdown);
1141            debug_assert!(!self.is_traced);
1142
1143            core = try_task!(self.do_park(cx, core));
1144        }
1145
1146        if let Some(f) = &cx.shared().config.after_unpark {
1147            f();
1148        }
1149
1150        Ok((None, core))
1151    }
1152
1153    fn do_park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1154        let was_searching = core.is_searching;
1155
1156        // Acquire the lock
1157        let mut synced = cx.shared().synced.lock();
1158
1159        // The local queue should be empty at this point
1160        #[cfg(not(loom))]
1161        debug_assert!(core.run_queue.is_empty());
1162
1163        // Try one last time to get tasks
1164        let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
1165        if let Some(task) = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n) {
1166            return Ok((Some(task), core));
1167        }
1168
1169        if !was_searching {
1170            if cx
1171                .shared()
1172                .idle
1173                .transition_worker_to_searching_if_needed(&mut synced.idle, &mut core)
1174            {
1175                // Skip parking, go back to searching
1176                return Ok((None, core));
1177            }
1178        }
1179
1180        super::counters::inc_num_parks();
1181        core.stats.about_to_park();
1182        // Flush metrics to the runtime metrics aggregator
1183        self.flush_metrics(cx, &mut core);
1184
1185        // If the runtime is shutdown, skip parking
1186        self.update_global_flags(cx, &mut synced);
1187
1188        if self.is_shutdown {
1189            return Ok((None, core));
1190        }
1191
1192        // Release the core
1193        core.is_searching = false;
1194        cx.shared().idle.release_core(&mut synced, core);
1195
1196        drop(synced);
1197
1198        if was_searching {
1199            if cx.shared().idle.transition_worker_from_searching() {
1200                // cx.shared().idle.snapshot(&mut self.idle_snapshot);
1201                // We were the last searching worker, we need to do one last check
1202                for i in 0..cx.shared().remotes.len() {
1203                    if !cx.shared().remotes[i].steal.is_empty() {
1204                        let mut synced = cx.shared().synced.lock();
1205
1206                        // Try to get a core
1207                        if let Some(mut core) = self.try_acquire_available_core(cx, &mut synced) {
1208                            cx.shared().idle.transition_worker_to_searching(&mut core);
1209                            return Ok((None, core));
1210                        } else {
1211                            // Fall back to the park routine
1212                            break;
1213                        }
1214                    }
1215                }
1216            }
1217        }
1218
1219        if let Some(mut driver) = cx.shared().take_driver() {
1220            // Wait for driver events
1221            driver.park(&cx.handle.driver);
1222
1223            synced = cx.shared().synced.lock();
1224
1225            if cx.shared().inject.is_closed(&mut synced.inject) {
1226                synced.shutdown_driver = Some(driver);
1227                self.shutdown_clear_defer(cx);
1228                cx.shared().shutdown_finalize(&cx.handle, &mut synced);
1229                return Err(());
1230            }
1231
1232            // Put the driver back
1233            cx.shared().driver.set(driver);
1234
1235            // Try to acquire an available core to schedule I/O events
1236            if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
1237                // This may result in a task being run
1238                self.schedule_deferred_with_core(cx, core, move || synced)
1239            } else {
1240                // Schedule any deferred tasks
1241                self.schedule_deferred_without_core(cx, &mut synced);
1242
1243                // Wait for a core.
1244                self.wait_for_core(cx, synced)
1245            }
1246        } else {
1247            synced = cx.shared().synced.lock();
1248
1249            // Wait for a core to be assigned to us
1250            self.wait_for_core(cx, synced)
1251        }
1252    }
1253
1254    fn transition_to_searching(&self, cx: &Context, core: &mut Core) -> bool {
1255        if !core.is_searching {
1256            cx.shared().idle.try_transition_worker_to_searching(core);
1257        }
1258
1259        core.is_searching
1260    }
1261
1262    /// Returns `true` if another worker must be notified
1263    fn transition_from_searching(&self, cx: &Context, core: &mut Core) -> bool {
1264        if !core.is_searching {
1265            return false;
1266        }
1267
1268        core.is_searching = false;
1269        cx.shared().idle.transition_worker_from_searching()
1270    }
1271
1272    fn can_transition_to_parked(&self, core: &mut Core) -> bool {
1273        !self.has_tasks(core) && !self.is_shutdown && !self.is_traced
1274    }
1275
1276    fn has_tasks(&self, core: &Core) -> bool {
1277        core.lifo_slot.is_some() || !core.run_queue.is_empty()
1278    }
1279
1280    fn reset_lifo_enabled(&self, cx: &Context) {
1281        cx.lifo_enabled
1282            .set(!cx.handle.shared.config.disable_lifo_slot);
1283    }
1284
1285    fn assert_lifo_enabled_is_correct(&self, cx: &Context) {
1286        debug_assert_eq!(
1287            cx.lifo_enabled.get(),
1288            !cx.handle.shared.config.disable_lifo_slot
1289        );
1290    }
1291
1292    fn tune_global_queue_interval(&mut self, cx: &Context, core: &mut Core) {
1293        let next = core.stats.tuned_global_queue_interval(&cx.shared().config);
1294
1295        // Smooth out jitter
1296        if u32::abs_diff(self.global_queue_interval, next) > 2 {
1297            self.global_queue_interval = next;
1298        }
1299    }
1300
1301    fn shutdown_clear_defer(&self, cx: &Context) {
1302        let mut defer = cx.defer.borrow_mut();
1303
1304        for task in defer.drain(..) {
1305            drop(task);
1306        }
1307    }
1308}
1309
1310impl Context {
1311    pub(crate) fn defer(&self, waker: &Waker) {
1312        // TODO: refactor defer across all runtimes
1313        waker.wake_by_ref();
1314    }
1315
1316    fn shared(&self) -> &Shared {
1317        &self.handle.shared
1318    }
1319
1320    #[cfg_attr(not(feature = "time"), allow(dead_code))]
1321    pub(crate) fn get_worker_index(&self) -> usize {
1322        self.index
1323    }
1324}
1325
1326impl Core {
1327    fn next_local_task(&mut self) -> Option<Notified> {
1328        self.next_lifo_task().or_else(|| self.run_queue.pop())
1329    }
1330
1331    fn next_lifo_task(&mut self) -> Option<Notified> {
1332        self.lifo_slot.take()
1333    }
1334}
1335
1336impl Shared {
1337    fn next_remote_task_synced(&self, synced: &mut Synced) -> Option<Notified> {
1338        // safety: we only have access to a valid `Synced` in this file.
1339        unsafe { self.inject.pop(&mut synced.inject) }
1340    }
1341
1342    pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1343        use std::ptr;
1344
1345        with_current(|maybe_cx| {
1346            if let Some(cx) = maybe_cx {
1347                // Make sure the task is part of the **current** scheduler.
1348                if ptr::eq(self, &cx.handle.shared) {
1349                    // And the current thread still holds a core
1350                    if let Some(core) = cx.core.borrow_mut().as_mut() {
1351                        if is_yield {
1352                            cx.defer.borrow_mut().push(task);
1353                        } else {
1354                            self.schedule_local(cx, core, task);
1355                        }
1356                    } else {
1357                        // This can happen if either the core was stolen
1358                        // (`block_in_place`) or the notification happens from
1359                        // the driver.
1360                        cx.defer.borrow_mut().push(task);
1361                    }
1362                    return;
1363                }
1364            }
1365
1366            // Otherwise, use the inject queue.
1367            self.schedule_remote(task);
1368        })
1369    }
1370
1371    fn schedule_local(&self, cx: &Context, core: &mut Core, task: Notified) {
1372        core.stats.inc_local_schedule_count();
1373
1374        if cx.lifo_enabled.get() {
1375            // Push to the LIFO slot
1376            let prev = std::mem::replace(&mut core.lifo_slot, Some(task));
1377            // let prev = cx.shared().remotes[core.index].lifo_slot.swap_local(task);
1378
1379            if let Some(prev) = prev {
1380                core.run_queue
1381                    .push_back_or_overflow(prev, self, &mut core.stats);
1382            } else {
1383                return;
1384            }
1385        } else {
1386            core.run_queue
1387                .push_back_or_overflow(task, self, &mut core.stats);
1388        }
1389
1390        self.notify_parked_local();
1391    }
1392
1393    fn notify_parked_local(&self) {
1394        super::counters::inc_num_inc_notify_local();
1395        self.idle.notify_local(self);
1396    }
1397
1398    fn schedule_remote(&self, task: Notified) {
1399        super::counters::inc_num_notify_remote();
1400        self.scheduler_metrics.inc_remote_schedule_count();
1401
1402        let mut synced = self.synced.lock();
1403        // Push the task in the
1404        self.push_remote_task(&mut synced, task);
1405
1406        // Notify a worker. The mutex is passed in and will be released as part
1407        // of the method call.
1408        self.idle.notify_remote(synced, self);
1409    }
1410
1411    pub(super) fn close(&self, handle: &Handle) {
1412        {
1413            let mut synced = self.synced.lock();
1414
1415            if let Some(driver) = self.driver.take() {
1416                synced.shutdown_driver = Some(driver);
1417            }
1418
1419            if !self.inject.close(&mut synced.inject) {
1420                return;
1421            }
1422
1423            // Set the shutdown flag on all available cores
1424            self.idle.shutdown(&mut synced, self);
1425        }
1426
1427        // Any unassigned cores need to be shutdown, but we have to first drop
1428        // the lock
1429        self.idle.shutdown_unassigned_cores(handle, self);
1430    }
1431
1432    fn push_remote_task(&self, synced: &mut Synced, task: Notified) {
1433        // safety: passing in correct `idle::Synced`
1434        unsafe {
1435            self.inject.push(&mut synced.inject, task);
1436        }
1437    }
1438
1439    fn push_remote_task_batch<I>(&self, iter: I)
1440    where
1441        I: Iterator<Item = task::Notified<Arc<Handle>>>,
1442    {
1443        unsafe {
1444            self.inject.push_batch(self, iter);
1445        }
1446    }
1447
1448    fn push_remote_task_batch_synced<I>(&self, synced: &mut Synced, iter: I)
1449    where
1450        I: Iterator<Item = task::Notified<Arc<Handle>>>,
1451    {
1452        unsafe {
1453            self.inject.push_batch(&mut synced.inject, iter);
1454        }
1455    }
1456
1457    fn take_driver(&self) -> Option<Box<Driver>> {
1458        if !self.driver_enabled() {
1459            return None;
1460        }
1461
1462        self.driver.take()
1463    }
1464
1465    fn driver_enabled(&self) -> bool {
1466        self.condvars.len() > self.remotes.len()
1467    }
1468
1469    pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box<Core>) {
1470        // Start from a random inner list
1471        let start = core.rand.fastrand_n(self.owned.get_shard_size() as u32);
1472        self.owned.close_and_shutdown_all(start as usize);
1473
1474        core.stats.submit(&self.worker_metrics[core.index]);
1475
1476        let mut synced = self.synced.lock();
1477        synced.shutdown_cores.push(core);
1478
1479        self.shutdown_finalize(handle, &mut synced);
1480    }
1481
1482    pub(super) fn shutdown_finalize(&self, handle: &Handle, synced: &mut Synced) {
1483        // Wait for all cores
1484        if synced.shutdown_cores.len() != self.remotes.len() {
1485            return;
1486        }
1487
1488        let driver = synced.shutdown_driver.take();
1489
1490        if self.driver_enabled() && driver.is_none() {
1491            return;
1492        }
1493
1494        debug_assert!(self.owned.is_empty());
1495
1496        for mut core in synced.shutdown_cores.drain(..) {
1497            // Drain tasks from the local queue
1498            while core.next_local_task().is_some() {}
1499        }
1500
1501        // Shutdown the driver
1502        if let Some(mut driver) = driver {
1503            driver.shutdown(&handle.driver);
1504        }
1505
1506        // Drain the injection queue
1507        //
1508        // We already shut down every task, so we can simply drop the tasks. We
1509        // cannot call `next_remote_task()` because we already hold the lock.
1510        //
1511        // safety: passing in correct `idle::Synced`
1512        while let Some(task) = self.next_remote_task_synced(synced) {
1513            drop(task);
1514        }
1515    }
1516}
1517
1518impl Overflow<Arc<Handle>> for Shared {
1519    fn push(&self, task: task::Notified<Arc<Handle>>) {
1520        self.push_remote_task(&mut self.synced.lock(), task);
1521    }
1522
1523    fn push_batch<I>(&self, iter: I)
1524    where
1525        I: Iterator<Item = task::Notified<Arc<Handle>>>,
1526    {
1527        self.push_remote_task_batch(iter)
1528    }
1529}
1530
1531impl<'a> Lock<inject::Synced> for &'a Shared {
1532    type Handle = SyncedGuard<'a>;
1533
1534    fn lock(self) -> Self::Handle {
1535        SyncedGuard {
1536            lock: self.synced.lock(),
1537        }
1538    }
1539}
1540
1541impl<'a> Lock<Synced> for &'a Shared {
1542    type Handle = SyncedGuard<'a>;
1543
1544    fn lock(self) -> Self::Handle {
1545        SyncedGuard {
1546            lock: self.synced.lock(),
1547        }
1548    }
1549}
1550
1551impl task::Schedule for Arc<Handle> {
1552    fn release(&self, task: &Task) -> Option<Task> {
1553        self.shared.owned.remove(task)
1554    }
1555
1556    fn schedule(&self, task: Notified) {
1557        self.shared.schedule_task(task, false);
1558    }
1559
1560    fn hooks(&self) -> TaskHarnessScheduleHooks {
1561        TaskHarnessScheduleHooks {
1562            task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
1563        }
1564    }
1565
1566    fn yield_now(&self, task: Notified) {
1567        self.shared.schedule_task(task, true);
1568    }
1569}
1570
1571impl AsMut<Synced> for Synced {
1572    fn as_mut(&mut self) -> &mut Synced {
1573        self
1574    }
1575}
1576
1577pub(crate) struct SyncedGuard<'a> {
1578    lock: crate::loom::sync::MutexGuard<'a, Synced>,
1579}
1580
1581impl<'a> AsMut<inject::Synced> for SyncedGuard<'a> {
1582    fn as_mut(&mut self) -> &mut inject::Synced {
1583        &mut self.lock.inject
1584    }
1585}
1586
1587impl<'a> AsMut<Synced> for SyncedGuard<'a> {
1588    fn as_mut(&mut self) -> &mut Synced {
1589        &mut self.lock
1590    }
1591}
1592
1593#[track_caller]
1594fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1595    use scheduler::Context::MultiThreadAlt;
1596
1597    context::with_scheduler(|ctx| match ctx {
1598        Some(MultiThreadAlt(ctx)) => f(Some(ctx)),
1599        _ => f(None),
1600    })
1601}