tokio/runtime/scheduler/multi_thread/
worker.rs

1//! A scheduler is initialized with a fixed number of workers. Each worker is
2//! driven by a thread. Each worker has a "core" which contains data such as the
3//! run queue and other state. When `block_in_place` is called, the worker's
4//! "core" is handed off to a new thread allowing the scheduler to continue to
5//! make progress while the originating thread blocks.
6//!
7//! # Shutdown
8//!
9//! Shutting down the runtime involves the following steps:
10//!
11//!  1. The Shared::close method is called. This closes the inject queue and
12//!     `OwnedTasks` instance and wakes up all worker threads.
13//!
14//!  2. Each worker thread observes the close signal next time it runs
15//!     Core::maintenance by checking whether the inject queue is closed.
16//!     The `Core::is_shutdown` flag is set to true.
17//!
18//!  3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19//!     will keep removing tasks from `OwnedTasks` until it is empty. No new
20//!     tasks can be pushed to the `OwnedTasks` during or after this step as it
21//!     was closed in step 1.
22//!
23//!  5. The workers call Shared::shutdown to enter the single-threaded phase of
24//!     shutdown. These calls will push their core to `Shared::shutdown_cores`,
25//!     and the last thread to push its core will finish the shutdown procedure.
26//!
27//!  6. The local run queue of each core is emptied, then the inject queue is
28//!     emptied.
29//!
30//! At this point, shutdown has completed. It is not possible for any of the
31//! collections to contain any tasks at this point, as each collection was
32//! closed first, then emptied afterwards.
33//!
34//! ## Spawns during shutdown
35//!
36//! When spawning tasks during shutdown, there are two cases:
37//!
38//!  * The spawner observes the `OwnedTasks` being open, and the inject queue is
39//!    closed.
40//!  * The spawner observes the `OwnedTasks` being closed and doesn't check the
41//!    inject queue.
42//!
43//! The first case can only happen if the `OwnedTasks::bind` call happens before
44//! or during step 1 of shutdown. In this case, the runtime will clean up the
45//! task in step 3 of shutdown.
46//!
47//! In the latter case, the task was not spawned and the task is immediately
48//! cancelled by the spawner.
49//!
50//! The correctness of shutdown requires both the inject queue and `OwnedTasks`
51//! collection to have a closed bit. With a close bit on only the inject queue,
52//! spawning could run in to a situation where a task is successfully bound long
53//! after the runtime has shut down. With a close bit on only the `OwnedTasks`,
54//! the first spawning situation could result in the notification being pushed
55//! to the inject queue after step 6 of shutdown, which would leave a task in
56//! the inject queue indefinitely. This would be a ref-count cycle and a memory
57//! leak.
58
59use crate::loom::sync::{Arc, Mutex};
60use crate::runtime;
61use crate::runtime::scheduler::multi_thread::{
62    idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
63};
64use crate::runtime::scheduler::{inject, Defer, Lock};
65use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks};
66use crate::runtime::{
67    blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics,
68};
69use crate::runtime::{context, TaskHooks};
70use crate::util::atomic_cell::AtomicCell;
71use crate::util::rand::{FastRand, RngSeedGenerator};
72
73use std::cell::RefCell;
74use std::task::Waker;
75use std::thread;
76use std::time::Duration;
77
78mod metrics;
79
80cfg_taskdump! {
81    mod taskdump;
82}
83
84cfg_not_taskdump! {
85    mod taskdump_mock;
86}
87
88/// A scheduler worker
89pub(super) struct Worker {
90    /// Reference to scheduler's handle
91    handle: Arc<Handle>,
92
93    /// Index holding this worker's remote state
94    index: usize,
95
96    /// Used to hand-off a worker's core to another thread.
97    core: AtomicCell<Core>,
98}
99
100/// Core data
101struct Core {
102    /// Used to schedule bookkeeping tasks every so often.
103    tick: u32,
104
105    /// When a task is scheduled from a worker, it is stored in this slot. The
106    /// worker will check this slot for a task **before** checking the run
107    /// queue. This effectively results in the **last** scheduled task to be run
108    /// next (LIFO). This is an optimization for improving locality which
109    /// benefits message passing patterns and helps to reduce latency.
110    lifo_slot: Option<Notified>,
111
112    /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
113    /// they go to the back of the `run_queue`.
114    lifo_enabled: bool,
115
116    /// The worker-local run queue.
117    run_queue: queue::Local<Arc<Handle>>,
118
119    /// True if the worker is currently searching for more work. Searching
120    /// involves attempting to steal from other workers.
121    is_searching: bool,
122
123    /// True if the scheduler is being shutdown
124    is_shutdown: bool,
125
126    /// True if the scheduler is being traced
127    is_traced: bool,
128
129    /// Parker
130    ///
131    /// Stored in an `Option` as the parker is added / removed to make the
132    /// borrow checker happy.
133    park: Option<Parker>,
134
135    /// Per-worker runtime stats
136    stats: Stats,
137
138    /// How often to check the global queue
139    global_queue_interval: u32,
140
141    /// Fast random number generator.
142    rand: FastRand,
143}
144
145/// State shared across all workers
146pub(crate) struct Shared {
147    /// Per-worker remote state. All other workers have access to this and is
148    /// how they communicate between each other.
149    remotes: Box<[Remote]>,
150
151    /// Global task queue used for:
152    ///  1. Submit work to the scheduler while **not** currently on a worker thread.
153    ///  2. Submit work to the scheduler when a worker run queue is saturated
154    pub(super) inject: inject::Shared<Arc<Handle>>,
155
156    /// Coordinates idle workers
157    idle: Idle,
158
159    /// Collection of all active tasks spawned onto this executor.
160    pub(crate) owned: OwnedTasks<Arc<Handle>>,
161
162    /// Data synchronized by the scheduler mutex
163    pub(super) synced: Mutex<Synced>,
164
165    /// Cores that have observed the shutdown signal
166    ///
167    /// The core is **not** placed back in the worker to avoid it from being
168    /// stolen by a thread that was spawned as part of `block_in_place`.
169    #[allow(clippy::vec_box)] // we're moving an already-boxed value
170    shutdown_cores: Mutex<Vec<Box<Core>>>,
171
172    /// The number of cores that have observed the trace signal.
173    pub(super) trace_status: TraceStatus,
174
175    /// Scheduler configuration options
176    config: Config,
177
178    /// Collects metrics from the runtime.
179    pub(super) scheduler_metrics: SchedulerMetrics,
180
181    pub(super) worker_metrics: Box<[WorkerMetrics]>,
182
183    /// Only held to trigger some code on drop. This is used to get internal
184    /// runtime metrics that can be useful when doing performance
185    /// investigations. This does nothing (empty struct, no drop impl) unless
186    /// the `tokio_internal_mt_counters` `cfg` flag is set.
187    _counters: Counters,
188}
189
190/// Data synchronized by the scheduler mutex
191pub(crate) struct Synced {
192    /// Synchronized state for `Idle`.
193    pub(super) idle: idle::Synced,
194
195    /// Synchronized state for `Inject`.
196    pub(crate) inject: inject::Synced,
197}
198
199/// Used to communicate with a worker from other threads.
200struct Remote {
201    /// Steals tasks from this worker.
202    pub(super) steal: queue::Steal<Arc<Handle>>,
203
204    /// Unparks the associated worker thread
205    unpark: Unparker,
206}
207
208/// Thread-local context
209pub(crate) struct Context {
210    /// Worker
211    worker: Arc<Worker>,
212
213    /// Core data
214    core: RefCell<Option<Box<Core>>>,
215
216    /// Tasks to wake after resource drivers are polled. This is mostly to
217    /// handle yielded tasks.
218    pub(crate) defer: Defer,
219}
220
221/// Starts the workers
222pub(crate) struct Launch(Vec<Arc<Worker>>);
223
224/// Running a task may consume the core. If the core is still available when
225/// running the task completes, it is returned. Otherwise, the worker will need
226/// to stop processing.
227type RunResult = Result<Box<Core>, ()>;
228
229/// A task handle
230type Task = task::Task<Arc<Handle>>;
231
232/// A notified task handle
233type Notified = task::Notified<Arc<Handle>>;
234
235/// Value picked out of thin-air. Running the LIFO slot a handful of times
236/// seems sufficient to benefit from locality. More than 3 times probably is
237/// overweighing. The value can be tuned in the future with data that shows
238/// improvements.
239const MAX_LIFO_POLLS_PER_TICK: usize = 3;
240
241pub(super) fn create(
242    size: usize,
243    park: Parker,
244    driver_handle: driver::Handle,
245    blocking_spawner: blocking::Spawner,
246    seed_generator: RngSeedGenerator,
247    config: Config,
248) -> (Arc<Handle>, Launch) {
249    let mut cores = Vec::with_capacity(size);
250    let mut remotes = Vec::with_capacity(size);
251    let mut worker_metrics = Vec::with_capacity(size);
252
253    // Create the local queues
254    for _ in 0..size {
255        let (steal, run_queue) = queue::local();
256
257        let park = park.clone();
258        let unpark = park.unpark();
259        let metrics = WorkerMetrics::from_config(&config);
260        let stats = Stats::new(&metrics);
261
262        cores.push(Box::new(Core {
263            tick: 0,
264            lifo_slot: None,
265            lifo_enabled: !config.disable_lifo_slot,
266            run_queue,
267            is_searching: false,
268            is_shutdown: false,
269            is_traced: false,
270            park: Some(park),
271            global_queue_interval: stats.tuned_global_queue_interval(&config),
272            stats,
273            rand: FastRand::from_seed(config.seed_generator.next_seed()),
274        }));
275
276        remotes.push(Remote { steal, unpark });
277        worker_metrics.push(metrics);
278    }
279
280    let (idle, idle_synced) = Idle::new(size);
281    let (inject, inject_synced) = inject::Shared::new();
282
283    let remotes_len = remotes.len();
284    let handle = Arc::new(Handle {
285        task_hooks: TaskHooks::from_config(&config),
286        shared: Shared {
287            remotes: remotes.into_boxed_slice(),
288            inject,
289            idle,
290            owned: OwnedTasks::new(size),
291            synced: Mutex::new(Synced {
292                idle: idle_synced,
293                inject: inject_synced,
294            }),
295            shutdown_cores: Mutex::new(vec![]),
296            trace_status: TraceStatus::new(remotes_len),
297            config,
298            scheduler_metrics: SchedulerMetrics::new(),
299            worker_metrics: worker_metrics.into_boxed_slice(),
300            _counters: Counters,
301        },
302        driver: driver_handle,
303        blocking_spawner,
304        seed_generator,
305    });
306
307    let mut launch = Launch(vec![]);
308
309    for (index, core) in cores.drain(..).enumerate() {
310        launch.0.push(Arc::new(Worker {
311            handle: handle.clone(),
312            index,
313            core: AtomicCell::new(Some(core)),
314        }));
315    }
316
317    (handle, launch)
318}
319
320#[track_caller]
321pub(crate) fn block_in_place<F, R>(f: F) -> R
322where
323    F: FnOnce() -> R,
324{
325    // Try to steal the worker core back
326    struct Reset {
327        take_core: bool,
328        budget: coop::Budget,
329    }
330
331    impl Drop for Reset {
332        fn drop(&mut self) {
333            with_current(|maybe_cx| {
334                if let Some(cx) = maybe_cx {
335                    if self.take_core {
336                        let core = cx.worker.core.take();
337
338                        if core.is_some() {
339                            cx.worker.handle.shared.worker_metrics[cx.worker.index]
340                                .set_thread_id(thread::current().id());
341                        }
342
343                        let mut cx_core = cx.core.borrow_mut();
344                        assert!(cx_core.is_none());
345                        *cx_core = core;
346                    }
347
348                    // Reset the task budget as we are re-entering the
349                    // runtime.
350                    coop::set(self.budget);
351                }
352            });
353        }
354    }
355
356    let mut had_entered = false;
357    let mut take_core = false;
358
359    let setup_result = with_current(|maybe_cx| {
360        match (
361            crate::runtime::context::current_enter_context(),
362            maybe_cx.is_some(),
363        ) {
364            (context::EnterRuntime::Entered { .. }, true) => {
365                // We are on a thread pool runtime thread, so we just need to
366                // set up blocking.
367                had_entered = true;
368            }
369            (
370                context::EnterRuntime::Entered {
371                    allow_block_in_place,
372                },
373                false,
374            ) => {
375                // We are on an executor, but _not_ on the thread pool.  That is
376                // _only_ okay if we are in a thread pool runtime's block_on
377                // method:
378                if allow_block_in_place {
379                    had_entered = true;
380                    return Ok(());
381                } else {
382                    // This probably means we are on the current_thread runtime or in a
383                    // LocalSet, where it is _not_ okay to block.
384                    return Err(
385                        "can call blocking only when running on the multi-threaded runtime",
386                    );
387                }
388            }
389            (context::EnterRuntime::NotEntered, true) => {
390                // This is a nested call to block_in_place (we already exited).
391                // All the necessary setup has already been done.
392                return Ok(());
393            }
394            (context::EnterRuntime::NotEntered, false) => {
395                // We are outside of the tokio runtime, so blocking is fine.
396                // We can also skip all of the thread pool blocking setup steps.
397                return Ok(());
398            }
399        }
400
401        let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
402
403        // Get the worker core. If none is set, then blocking is fine!
404        let mut core = match cx.core.borrow_mut().take() {
405            Some(core) => core,
406            None => return Ok(()),
407        };
408
409        // If we heavily call `spawn_blocking`, there might be no available thread to
410        // run this core. Except for the task in the lifo_slot, all tasks can be
411        // stolen, so we move the task out of the lifo_slot to the run_queue.
412        if let Some(task) = core.lifo_slot.take() {
413            core.run_queue
414                .push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats);
415        }
416
417        // We are taking the core from the context and sending it to another
418        // thread.
419        take_core = true;
420
421        // The parker should be set here
422        assert!(core.park.is_some());
423
424        // In order to block, the core must be sent to another thread for
425        // execution.
426        //
427        // First, move the core back into the worker's shared core slot.
428        cx.worker.core.set(core);
429
430        // Next, clone the worker handle and send it to a new thread for
431        // processing.
432        //
433        // Once the blocking task is done executing, we will attempt to
434        // steal the core back.
435        let worker = cx.worker.clone();
436        runtime::spawn_blocking(move || run(worker));
437        Ok(())
438    });
439
440    if let Err(panic_message) = setup_result {
441        panic!("{}", panic_message);
442    }
443
444    if had_entered {
445        // Unset the current task's budget. Blocking sections are not
446        // constrained by task budgets.
447        let _reset = Reset {
448            take_core,
449            budget: coop::stop(),
450        };
451
452        crate::runtime::context::exit_runtime(f)
453    } else {
454        f()
455    }
456}
457
458impl Launch {
459    pub(crate) fn launch(mut self) {
460        for worker in self.0.drain(..) {
461            runtime::spawn_blocking(move || run(worker));
462        }
463    }
464}
465
466fn run(worker: Arc<Worker>) {
467    #[allow(dead_code)]
468    struct AbortOnPanic;
469
470    impl Drop for AbortOnPanic {
471        fn drop(&mut self) {
472            if std::thread::panicking() {
473                eprintln!("worker thread panicking; aborting process");
474                std::process::abort();
475            }
476        }
477    }
478
479    // Catching panics on worker threads in tests is quite tricky. Instead, when
480    // debug assertions are enabled, we just abort the process.
481    #[cfg(debug_assertions)]
482    let _abort_on_panic = AbortOnPanic;
483
484    // Acquire a core. If this fails, then another thread is running this
485    // worker and there is nothing further to do.
486    let core = match worker.core.take() {
487        Some(core) => core,
488        None => return,
489    };
490
491    worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id());
492
493    let handle = scheduler::Handle::MultiThread(worker.handle.clone());
494
495    crate::runtime::context::enter_runtime(&handle, true, |_| {
496        // Set the worker context.
497        let cx = scheduler::Context::MultiThread(Context {
498            worker,
499            core: RefCell::new(None),
500            defer: Defer::new(),
501        });
502
503        context::set_scheduler(&cx, || {
504            let cx = cx.expect_multi_thread();
505
506            // This should always be an error. It only returns a `Result` to support
507            // using `?` to short circuit.
508            assert!(cx.run(core).is_err());
509
510            // Check if there are any deferred tasks to notify. This can happen when
511            // the worker core is lost due to `block_in_place()` being called from
512            // within the task.
513            cx.defer.wake();
514        });
515    });
516}
517
518impl Context {
519    fn run(&self, mut core: Box<Core>) -> RunResult {
520        // Reset `lifo_enabled` here in case the core was previously stolen from
521        // a task that had the LIFO slot disabled.
522        self.reset_lifo_enabled(&mut core);
523
524        // Start as "processing" tasks as polling tasks from the local queue
525        // will be one of the first things we do.
526        core.stats.start_processing_scheduled_tasks();
527
528        while !core.is_shutdown {
529            self.assert_lifo_enabled_is_correct(&core);
530
531            if core.is_traced {
532                core = self.worker.handle.trace_core(core);
533            }
534
535            // Increment the tick
536            core.tick();
537
538            // Run maintenance, if needed
539            core = self.maintenance(core);
540
541            // First, check work available to the current worker.
542            if let Some(task) = core.next_task(&self.worker) {
543                core = self.run_task(task, core)?;
544                continue;
545            }
546
547            // We consumed all work in the queues and will start searching for work.
548            core.stats.end_processing_scheduled_tasks();
549
550            // There is no more **local** work to process, try to steal work
551            // from other workers.
552            if let Some(task) = core.steal_work(&self.worker) {
553                // Found work, switch back to processing
554                core.stats.start_processing_scheduled_tasks();
555                core = self.run_task(task, core)?;
556            } else {
557                // Wait for work
558                core = if !self.defer.is_empty() {
559                    self.park_timeout(core, Some(Duration::from_millis(0)))
560                } else {
561                    self.park(core)
562                };
563                core.stats.start_processing_scheduled_tasks();
564            }
565        }
566
567        core.pre_shutdown(&self.worker);
568        // Signal shutdown
569        self.worker.handle.shutdown_core(core);
570        Err(())
571    }
572
573    fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
574        #[cfg(tokio_unstable)]
575        let task_id = task.task_id();
576
577        let task = self.worker.handle.shared.owned.assert_owner(task);
578
579        // Make sure the worker is not in the **searching** state. This enables
580        // another idle worker to try to steal work.
581        core.transition_from_searching(&self.worker);
582
583        self.assert_lifo_enabled_is_correct(&core);
584
585        // Measure the poll start time. Note that we may end up polling other
586        // tasks under this measurement. In this case, the tasks came from the
587        // LIFO slot and are considered part of the current task for scheduling
588        // purposes. These tasks inherent the "parent"'s limits.
589        core.stats.start_poll();
590
591        // Make the core available to the runtime context
592        *self.core.borrow_mut() = Some(core);
593
594        // Run the task
595        coop::budget(|| {
596            // Unlike the poll time above, poll start callback is attached to the task id,
597            // so it is tightly associated with the actual poll invocation.
598            #[cfg(tokio_unstable)]
599            self.worker.handle.task_hooks.poll_start_callback(task_id);
600
601            task.run();
602
603            #[cfg(tokio_unstable)]
604            self.worker.handle.task_hooks.poll_stop_callback(task_id);
605
606            let mut lifo_polls = 0;
607
608            // As long as there is budget remaining and a task exists in the
609            // `lifo_slot`, then keep running.
610            loop {
611                // Check if we still have the core. If not, the core was stolen
612                // by another worker.
613                let mut core = match self.core.borrow_mut().take() {
614                    Some(core) => core,
615                    None => {
616                        // In this case, we cannot call `reset_lifo_enabled()`
617                        // because the core was stolen. The stealer will handle
618                        // that at the top of `Context::run`
619                        return Err(());
620                    }
621                };
622
623                // Check for a task in the LIFO slot
624                let task = match core.lifo_slot.take() {
625                    Some(task) => task,
626                    None => {
627                        self.reset_lifo_enabled(&mut core);
628                        core.stats.end_poll();
629                        return Ok(core);
630                    }
631                };
632
633                if !coop::has_budget_remaining() {
634                    core.stats.end_poll();
635
636                    // Not enough budget left to run the LIFO task, push it to
637                    // the back of the queue and return.
638                    core.run_queue.push_back_or_overflow(
639                        task,
640                        &*self.worker.handle,
641                        &mut core.stats,
642                    );
643                    // If we hit this point, the LIFO slot should be enabled.
644                    // There is no need to reset it.
645                    debug_assert!(core.lifo_enabled);
646                    return Ok(core);
647                }
648
649                // Track that we are about to run a task from the LIFO slot.
650                lifo_polls += 1;
651                super::counters::inc_lifo_schedules();
652
653                // Disable the LIFO slot if we reach our limit
654                //
655                // In ping-ping style workloads where task A notifies task B,
656                // which notifies task A again, continuously prioritizing the
657                // LIFO slot can cause starvation as these two tasks will
658                // repeatedly schedule the other. To mitigate this, we limit the
659                // number of times the LIFO slot is prioritized.
660                if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
661                    core.lifo_enabled = false;
662                    super::counters::inc_lifo_capped();
663                }
664
665                // Run the LIFO task, then loop
666                *self.core.borrow_mut() = Some(core);
667                let task = self.worker.handle.shared.owned.assert_owner(task);
668
669                #[cfg(tokio_unstable)]
670                let task_id = task.task_id();
671
672                #[cfg(tokio_unstable)]
673                self.worker.handle.task_hooks.poll_start_callback(task_id);
674
675                task.run();
676
677                #[cfg(tokio_unstable)]
678                self.worker.handle.task_hooks.poll_stop_callback(task_id);
679            }
680        })
681    }
682
683    fn reset_lifo_enabled(&self, core: &mut Core) {
684        core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
685    }
686
687    fn assert_lifo_enabled_is_correct(&self, core: &Core) {
688        debug_assert_eq!(
689            core.lifo_enabled,
690            !self.worker.handle.shared.config.disable_lifo_slot
691        );
692    }
693
694    fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
695        if core.tick % self.worker.handle.shared.config.event_interval == 0 {
696            super::counters::inc_num_maintenance();
697
698            core.stats.end_processing_scheduled_tasks();
699
700            // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
701            // to run without actually putting the thread to sleep.
702            core = self.park_timeout(core, Some(Duration::from_millis(0)));
703
704            // Run regularly scheduled maintenance
705            core.maintenance(&self.worker);
706
707            core.stats.start_processing_scheduled_tasks();
708        }
709
710        core
711    }
712
713    /// Parks the worker thread while waiting for tasks to execute.
714    ///
715    /// This function checks if indeed there's no more work left to be done before parking.
716    /// Also important to notice that, before parking, the worker thread will try to take
717    /// ownership of the Driver (IO/Time) and dispatch any events that might have fired.
718    /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled
719    /// in its own local queue until the queue saturates (ntasks > `LOCAL_QUEUE_CAPACITY`).
720    /// When the local queue is saturated, the overflow tasks are added to the injection queue
721    /// from where other workers can pick them up.
722    /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
723    /// after all the IOs get dispatched
724    fn park(&self, mut core: Box<Core>) -> Box<Core> {
725        if let Some(f) = &self.worker.handle.shared.config.before_park {
726            f();
727        }
728
729        if core.transition_to_parked(&self.worker) {
730            while !core.is_shutdown && !core.is_traced {
731                core.stats.about_to_park();
732                core.stats
733                    .submit(&self.worker.handle.shared.worker_metrics[self.worker.index]);
734
735                core = self.park_timeout(core, None);
736
737                core.stats.unparked();
738
739                // Run regularly scheduled maintenance
740                core.maintenance(&self.worker);
741
742                if core.transition_from_parked(&self.worker) {
743                    break;
744                }
745            }
746        }
747
748        if let Some(f) = &self.worker.handle.shared.config.after_unpark {
749            f();
750        }
751        core
752    }
753
754    fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
755        self.assert_lifo_enabled_is_correct(&core);
756
757        // Take the parker out of core
758        let mut park = core.park.take().expect("park missing");
759
760        // Store `core` in context
761        *self.core.borrow_mut() = Some(core);
762
763        // Park thread
764        if let Some(timeout) = duration {
765            park.park_timeout(&self.worker.handle.driver, timeout);
766        } else {
767            park.park(&self.worker.handle.driver);
768        }
769
770        self.defer.wake();
771
772        // Remove `core` from context
773        core = self.core.borrow_mut().take().expect("core missing");
774
775        // Place `park` back in `core`
776        core.park = Some(park);
777
778        if core.should_notify_others() {
779            self.worker.handle.notify_parked_local();
780        }
781
782        core
783    }
784
785    pub(crate) fn defer(&self, waker: &Waker) {
786        self.defer.defer(waker);
787    }
788
789    #[allow(dead_code)]
790    pub(crate) fn get_worker_index(&self) -> usize {
791        self.worker.index
792    }
793}
794
795impl Core {
796    /// Increment the tick
797    fn tick(&mut self) {
798        self.tick = self.tick.wrapping_add(1);
799    }
800
801    /// Return the next notified task available to this worker.
802    fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
803        if self.tick % self.global_queue_interval == 0 {
804            // Update the global queue interval, if needed
805            self.tune_global_queue_interval(worker);
806
807            worker
808                .handle
809                .next_remote_task()
810                .or_else(|| self.next_local_task())
811        } else {
812            let maybe_task = self.next_local_task();
813
814            if maybe_task.is_some() {
815                return maybe_task;
816            }
817
818            if worker.inject().is_empty() {
819                return None;
820            }
821
822            // Other threads can only **remove** tasks from the current worker's
823            // `run_queue`. So, we can be confident that by the time we call
824            // `run_queue.push_back` below, there will be *at least* `cap`
825            // available slots in the queue.
826            let cap = usize::min(
827                self.run_queue.remaining_slots(),
828                self.run_queue.max_capacity() / 2,
829            );
830
831            // The worker is currently idle, pull a batch of work from the
832            // injection queue. We don't want to pull *all* the work so other
833            // workers can also get some.
834            let n = usize::min(
835                worker.inject().len() / worker.handle.shared.remotes.len() + 1,
836                cap,
837            );
838
839            // Take at least one task since the first task is returned directly
840            // and not pushed onto the local queue.
841            let n = usize::max(1, n);
842
843            let mut synced = worker.handle.shared.synced.lock();
844            // safety: passing in the correct `inject::Synced`.
845            let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) };
846
847            // Pop the first task to return immediately
848            let ret = tasks.next();
849
850            // Push the rest of the on the run queue
851            self.run_queue.push_back(tasks);
852
853            ret
854        }
855    }
856
857    fn next_local_task(&mut self) -> Option<Notified> {
858        self.lifo_slot.take().or_else(|| self.run_queue.pop())
859    }
860
861    /// Function responsible for stealing tasks from another worker
862    ///
863    /// Note: Only if less than half the workers are searching for tasks to steal
864    /// a new worker will actually try to steal. The idea is to make sure not all
865    /// workers will be trying to steal at the same time.
866    fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
867        if !self.transition_to_searching(worker) {
868            return None;
869        }
870
871        let num = worker.handle.shared.remotes.len();
872        // Start from a random worker
873        let start = self.rand.fastrand_n(num as u32) as usize;
874
875        for i in 0..num {
876            let i = (start + i) % num;
877
878            // Don't steal from ourself! We know we don't have work.
879            if i == worker.index {
880                continue;
881            }
882
883            let target = &worker.handle.shared.remotes[i];
884            if let Some(task) = target
885                .steal
886                .steal_into(&mut self.run_queue, &mut self.stats)
887            {
888                return Some(task);
889            }
890        }
891
892        // Fallback on checking the global queue
893        worker.handle.next_remote_task()
894    }
895
896    fn transition_to_searching(&mut self, worker: &Worker) -> bool {
897        if !self.is_searching {
898            self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
899        }
900
901        self.is_searching
902    }
903
904    fn transition_from_searching(&mut self, worker: &Worker) {
905        if !self.is_searching {
906            return;
907        }
908
909        self.is_searching = false;
910        worker.handle.transition_worker_from_searching();
911    }
912
913    fn has_tasks(&self) -> bool {
914        self.lifo_slot.is_some() || self.run_queue.has_tasks()
915    }
916
917    fn should_notify_others(&self) -> bool {
918        // If there are tasks available to steal, but this worker is not
919        // looking for tasks to steal, notify another worker.
920        if self.is_searching {
921            return false;
922        }
923        self.lifo_slot.is_some() as usize + self.run_queue.len() > 1
924    }
925
926    /// Prepares the worker state for parking.
927    ///
928    /// Returns true if the transition happened, false if there is work to do first.
929    fn transition_to_parked(&mut self, worker: &Worker) -> bool {
930        // Workers should not park if they have work to do
931        if self.has_tasks() || self.is_traced {
932            return false;
933        }
934
935        // When the final worker transitions **out** of searching to parked, it
936        // must check all the queues one last time in case work materialized
937        // between the last work scan and transitioning out of searching.
938        let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked(
939            &worker.handle.shared,
940            worker.index,
941            self.is_searching,
942        );
943
944        // The worker is no longer searching. Setting this is the local cache
945        // only.
946        self.is_searching = false;
947
948        if is_last_searcher {
949            worker.handle.notify_if_work_pending();
950        }
951
952        true
953    }
954
955    /// Returns `true` if the transition happened.
956    fn transition_from_parked(&mut self, worker: &Worker) -> bool {
957        // If a task is in the lifo slot/run queue, then we must unpark regardless of
958        // being notified
959        if self.has_tasks() {
960            // When a worker wakes, it should only transition to the "searching"
961            // state when the wake originates from another worker *or* a new task
962            // is pushed. We do *not* want the worker to transition to "searching"
963            // when it wakes when the I/O driver receives new events.
964            self.is_searching = !worker
965                .handle
966                .shared
967                .idle
968                .unpark_worker_by_id(&worker.handle.shared, worker.index);
969            return true;
970        }
971
972        if worker
973            .handle
974            .shared
975            .idle
976            .is_parked(&worker.handle.shared, worker.index)
977        {
978            return false;
979        }
980
981        // When unparked, the worker is in the searching state.
982        self.is_searching = true;
983        true
984    }
985
986    /// Runs maintenance work such as checking the pool's state.
987    fn maintenance(&mut self, worker: &Worker) {
988        self.stats
989            .submit(&worker.handle.shared.worker_metrics[worker.index]);
990
991        if !self.is_shutdown {
992            // Check if the scheduler has been shutdown
993            let synced = worker.handle.shared.synced.lock();
994            self.is_shutdown = worker.inject().is_closed(&synced.inject);
995        }
996
997        if !self.is_traced {
998            // Check if the worker should be tracing.
999            self.is_traced = worker.handle.shared.trace_status.trace_requested();
1000        }
1001    }
1002
1003    /// Signals all tasks to shut down, and waits for them to complete. Must run
1004    /// before we enter the single-threaded phase of shutdown processing.
1005    fn pre_shutdown(&mut self, worker: &Worker) {
1006        // Start from a random inner list
1007        let start = self
1008            .rand
1009            .fastrand_n(worker.handle.shared.owned.get_shard_size() as u32);
1010        // Signal to all tasks to shut down.
1011        worker
1012            .handle
1013            .shared
1014            .owned
1015            .close_and_shutdown_all(start as usize);
1016
1017        self.stats
1018            .submit(&worker.handle.shared.worker_metrics[worker.index]);
1019    }
1020
1021    /// Shuts down the core.
1022    fn shutdown(&mut self, handle: &Handle) {
1023        // Take the core
1024        let mut park = self.park.take().expect("park missing");
1025
1026        // Drain the queue
1027        while self.next_local_task().is_some() {}
1028
1029        park.shutdown(&handle.driver);
1030    }
1031
1032    fn tune_global_queue_interval(&mut self, worker: &Worker) {
1033        let next = self
1034            .stats
1035            .tuned_global_queue_interval(&worker.handle.shared.config);
1036
1037        // Smooth out jitter
1038        if u32::abs_diff(self.global_queue_interval, next) > 2 {
1039            self.global_queue_interval = next;
1040        }
1041    }
1042}
1043
1044impl Worker {
1045    /// Returns a reference to the scheduler's injection queue.
1046    fn inject(&self) -> &inject::Shared<Arc<Handle>> {
1047        &self.handle.shared.inject
1048    }
1049}
1050
1051// TODO: Move `Handle` impls into handle.rs
1052impl task::Schedule for Arc<Handle> {
1053    fn release(&self, task: &Task) -> Option<Task> {
1054        self.shared.owned.remove(task)
1055    }
1056
1057    fn schedule(&self, task: Notified) {
1058        self.schedule_task(task, false);
1059    }
1060
1061    fn hooks(&self) -> TaskHarnessScheduleHooks {
1062        TaskHarnessScheduleHooks {
1063            task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
1064        }
1065    }
1066
1067    fn yield_now(&self, task: Notified) {
1068        self.schedule_task(task, true);
1069    }
1070}
1071
1072impl Handle {
1073    pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1074        with_current(|maybe_cx| {
1075            if let Some(cx) = maybe_cx {
1076                // Make sure the task is part of the **current** scheduler.
1077                if self.ptr_eq(&cx.worker.handle) {
1078                    // And the current thread still holds a core
1079                    if let Some(core) = cx.core.borrow_mut().as_mut() {
1080                        self.schedule_local(core, task, is_yield);
1081                        return;
1082                    }
1083                }
1084            }
1085
1086            // Otherwise, use the inject queue.
1087            self.push_remote_task(task);
1088            self.notify_parked_remote();
1089        });
1090    }
1091
1092    pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) {
1093        if let Some(task) = task {
1094            self.schedule_task(task, false);
1095        }
1096    }
1097
1098    fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
1099        core.stats.inc_local_schedule_count();
1100
1101        // Spawning from the worker thread. If scheduling a "yield" then the
1102        // task must always be pushed to the back of the queue, enabling other
1103        // tasks to be executed. If **not** a yield, then there is more
1104        // flexibility and the task may go to the front of the queue.
1105        let should_notify = if is_yield || !core.lifo_enabled {
1106            core.run_queue
1107                .push_back_or_overflow(task, self, &mut core.stats);
1108            true
1109        } else {
1110            // Push to the LIFO slot
1111            let prev = core.lifo_slot.take();
1112            let ret = prev.is_some();
1113
1114            if let Some(prev) = prev {
1115                core.run_queue
1116                    .push_back_or_overflow(prev, self, &mut core.stats);
1117            }
1118
1119            core.lifo_slot = Some(task);
1120
1121            ret
1122        };
1123
1124        // Only notify if not currently parked. If `park` is `None`, then the
1125        // scheduling is from a resource driver. As notifications often come in
1126        // batches, the notification is delayed until the park is complete.
1127        if should_notify && core.park.is_some() {
1128            self.notify_parked_local();
1129        }
1130    }
1131
1132    fn next_remote_task(&self) -> Option<Notified> {
1133        if self.shared.inject.is_empty() {
1134            return None;
1135        }
1136
1137        let mut synced = self.shared.synced.lock();
1138        // safety: passing in correct `idle::Synced`
1139        unsafe { self.shared.inject.pop(&mut synced.inject) }
1140    }
1141
1142    fn push_remote_task(&self, task: Notified) {
1143        self.shared.scheduler_metrics.inc_remote_schedule_count();
1144
1145        let mut synced = self.shared.synced.lock();
1146        // safety: passing in correct `idle::Synced`
1147        unsafe {
1148            self.shared.inject.push(&mut synced.inject, task);
1149        }
1150    }
1151
1152    pub(super) fn close(&self) {
1153        if self
1154            .shared
1155            .inject
1156            .close(&mut self.shared.synced.lock().inject)
1157        {
1158            self.notify_all();
1159        }
1160    }
1161
1162    fn notify_parked_local(&self) {
1163        super::counters::inc_num_inc_notify_local();
1164
1165        if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1166            super::counters::inc_num_unparks_local();
1167            self.shared.remotes[index].unpark.unpark(&self.driver);
1168        }
1169    }
1170
1171    fn notify_parked_remote(&self) {
1172        if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1173            self.shared.remotes[index].unpark.unpark(&self.driver);
1174        }
1175    }
1176
1177    pub(super) fn notify_all(&self) {
1178        for remote in &self.shared.remotes[..] {
1179            remote.unpark.unpark(&self.driver);
1180        }
1181    }
1182
1183    fn notify_if_work_pending(&self) {
1184        for remote in &self.shared.remotes[..] {
1185            if !remote.steal.is_empty() {
1186                self.notify_parked_local();
1187                return;
1188            }
1189        }
1190
1191        if !self.shared.inject.is_empty() {
1192            self.notify_parked_local();
1193        }
1194    }
1195
1196    fn transition_worker_from_searching(&self) {
1197        if self.shared.idle.transition_worker_from_searching() {
1198            // We are the final searching worker. Because work was found, we
1199            // need to notify another worker.
1200            self.notify_parked_local();
1201        }
1202    }
1203
1204    /// Signals that a worker has observed the shutdown signal and has replaced
1205    /// its core back into its handle.
1206    ///
1207    /// If all workers have reached this point, the final cleanup is performed.
1208    fn shutdown_core(&self, core: Box<Core>) {
1209        let mut cores = self.shared.shutdown_cores.lock();
1210        cores.push(core);
1211
1212        if cores.len() != self.shared.remotes.len() {
1213            return;
1214        }
1215
1216        debug_assert!(self.shared.owned.is_empty());
1217
1218        for mut core in cores.drain(..) {
1219            core.shutdown(self);
1220        }
1221
1222        // Drain the injection queue
1223        //
1224        // We already shut down every task, so we can simply drop the tasks.
1225        while let Some(task) = self.next_remote_task() {
1226            drop(task);
1227        }
1228    }
1229
1230    fn ptr_eq(&self, other: &Handle) -> bool {
1231        std::ptr::eq(self, other)
1232    }
1233}
1234
1235impl Overflow<Arc<Handle>> for Handle {
1236    fn push(&self, task: task::Notified<Arc<Handle>>) {
1237        self.push_remote_task(task);
1238    }
1239
1240    fn push_batch<I>(&self, iter: I)
1241    where
1242        I: Iterator<Item = task::Notified<Arc<Handle>>>,
1243    {
1244        unsafe {
1245            self.shared.inject.push_batch(self, iter);
1246        }
1247    }
1248}
1249
1250pub(crate) struct InjectGuard<'a> {
1251    lock: crate::loom::sync::MutexGuard<'a, Synced>,
1252}
1253
1254impl<'a> AsMut<inject::Synced> for InjectGuard<'a> {
1255    fn as_mut(&mut self) -> &mut inject::Synced {
1256        &mut self.lock.inject
1257    }
1258}
1259
1260impl<'a> Lock<inject::Synced> for &'a Handle {
1261    type Handle = InjectGuard<'a>;
1262
1263    fn lock(self) -> Self::Handle {
1264        InjectGuard {
1265            lock: self.shared.synced.lock(),
1266        }
1267    }
1268}
1269
1270#[track_caller]
1271fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1272    use scheduler::Context::MultiThread;
1273
1274    context::with_scheduler(|ctx| match ctx {
1275        Some(MultiThread(ctx)) => f(Some(ctx)),
1276        _ => f(None),
1277    })
1278}