tokio/runtime/scheduler/multi_thread/worker.rs
1//! A scheduler is initialized with a fixed number of workers. Each worker is
2//! driven by a thread. Each worker has a "core" which contains data such as the
3//! run queue and other state. When `block_in_place` is called, the worker's
4//! "core" is handed off to a new thread allowing the scheduler to continue to
5//! make progress while the originating thread blocks.
6//!
7//! # Shutdown
8//!
9//! Shutting down the runtime involves the following steps:
10//!
11//! 1. The Shared::close method is called. This closes the inject queue and
12//! `OwnedTasks` instance and wakes up all worker threads.
13//!
14//! 2. Each worker thread observes the close signal next time it runs
15//! Core::maintenance by checking whether the inject queue is closed.
16//! The `Core::is_shutdown` flag is set to true.
17//!
18//! 3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19//! will keep removing tasks from `OwnedTasks` until it is empty. No new
20//! tasks can be pushed to the `OwnedTasks` during or after this step as it
21//! was closed in step 1.
22//!
23//! 5. The workers call Shared::shutdown to enter the single-threaded phase of
24//! shutdown. These calls will push their core to `Shared::shutdown_cores`,
25//! and the last thread to push its core will finish the shutdown procedure.
26//!
27//! 6. The local run queue of each core is emptied, then the inject queue is
28//! emptied.
29//!
30//! At this point, shutdown has completed. It is not possible for any of the
31//! collections to contain any tasks at this point, as each collection was
32//! closed first, then emptied afterwards.
33//!
34//! ## Spawns during shutdown
35//!
36//! When spawning tasks during shutdown, there are two cases:
37//!
38//! * The spawner observes the `OwnedTasks` being open, and the inject queue is
39//! closed.
40//! * The spawner observes the `OwnedTasks` being closed and doesn't check the
41//! inject queue.
42//!
43//! The first case can only happen if the `OwnedTasks::bind` call happens before
44//! or during step 1 of shutdown. In this case, the runtime will clean up the
45//! task in step 3 of shutdown.
46//!
47//! In the latter case, the task was not spawned and the task is immediately
48//! cancelled by the spawner.
49//!
50//! The correctness of shutdown requires both the inject queue and `OwnedTasks`
51//! collection to have a closed bit. With a close bit on only the inject queue,
52//! spawning could run in to a situation where a task is successfully bound long
53//! after the runtime has shut down. With a close bit on only the `OwnedTasks`,
54//! the first spawning situation could result in the notification being pushed
55//! to the inject queue after step 6 of shutdown, which would leave a task in
56//! the inject queue indefinitely. This would be a ref-count cycle and a memory
57//! leak.
58
59use crate::loom::sync::{Arc, Mutex};
60use crate::runtime;
61use crate::runtime::scheduler::multi_thread::{
62 idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
63};
64use crate::runtime::scheduler::{inject, Defer, Lock};
65use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks};
66use crate::runtime::{
67 blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics,
68};
69use crate::runtime::{context, TaskHooks};
70use crate::util::atomic_cell::AtomicCell;
71use crate::util::rand::{FastRand, RngSeedGenerator};
72
73use std::cell::RefCell;
74use std::task::Waker;
75use std::thread;
76use std::time::Duration;
77
78mod metrics;
79
80cfg_taskdump! {
81 mod taskdump;
82}
83
84cfg_not_taskdump! {
85 mod taskdump_mock;
86}
87
88/// A scheduler worker
89pub(super) struct Worker {
90 /// Reference to scheduler's handle
91 handle: Arc<Handle>,
92
93 /// Index holding this worker's remote state
94 index: usize,
95
96 /// Used to hand-off a worker's core to another thread.
97 core: AtomicCell<Core>,
98}
99
100/// Core data
101struct Core {
102 /// Used to schedule bookkeeping tasks every so often.
103 tick: u32,
104
105 /// When a task is scheduled from a worker, it is stored in this slot. The
106 /// worker will check this slot for a task **before** checking the run
107 /// queue. This effectively results in the **last** scheduled task to be run
108 /// next (LIFO). This is an optimization for improving locality which
109 /// benefits message passing patterns and helps to reduce latency.
110 lifo_slot: Option<Notified>,
111
112 /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
113 /// they go to the back of the `run_queue`.
114 lifo_enabled: bool,
115
116 /// The worker-local run queue.
117 run_queue: queue::Local<Arc<Handle>>,
118
119 /// True if the worker is currently searching for more work. Searching
120 /// involves attempting to steal from other workers.
121 is_searching: bool,
122
123 /// True if the scheduler is being shutdown
124 is_shutdown: bool,
125
126 /// True if the scheduler is being traced
127 is_traced: bool,
128
129 /// Parker
130 ///
131 /// Stored in an `Option` as the parker is added / removed to make the
132 /// borrow checker happy.
133 park: Option<Parker>,
134
135 /// Per-worker runtime stats
136 stats: Stats,
137
138 /// How often to check the global queue
139 global_queue_interval: u32,
140
141 /// Fast random number generator.
142 rand: FastRand,
143}
144
145/// State shared across all workers
146pub(crate) struct Shared {
147 /// Per-worker remote state. All other workers have access to this and is
148 /// how they communicate between each other.
149 remotes: Box<[Remote]>,
150
151 /// Global task queue used for:
152 /// 1. Submit work to the scheduler while **not** currently on a worker thread.
153 /// 2. Submit work to the scheduler when a worker run queue is saturated
154 pub(super) inject: inject::Shared<Arc<Handle>>,
155
156 /// Coordinates idle workers
157 idle: Idle,
158
159 /// Collection of all active tasks spawned onto this executor.
160 pub(crate) owned: OwnedTasks<Arc<Handle>>,
161
162 /// Data synchronized by the scheduler mutex
163 pub(super) synced: Mutex<Synced>,
164
165 /// Cores that have observed the shutdown signal
166 ///
167 /// The core is **not** placed back in the worker to avoid it from being
168 /// stolen by a thread that was spawned as part of `block_in_place`.
169 #[allow(clippy::vec_box)] // we're moving an already-boxed value
170 shutdown_cores: Mutex<Vec<Box<Core>>>,
171
172 /// The number of cores that have observed the trace signal.
173 pub(super) trace_status: TraceStatus,
174
175 /// Scheduler configuration options
176 config: Config,
177
178 /// Collects metrics from the runtime.
179 pub(super) scheduler_metrics: SchedulerMetrics,
180
181 pub(super) worker_metrics: Box<[WorkerMetrics]>,
182
183 /// Only held to trigger some code on drop. This is used to get internal
184 /// runtime metrics that can be useful when doing performance
185 /// investigations. This does nothing (empty struct, no drop impl) unless
186 /// the `tokio_internal_mt_counters` `cfg` flag is set.
187 _counters: Counters,
188}
189
190/// Data synchronized by the scheduler mutex
191pub(crate) struct Synced {
192 /// Synchronized state for `Idle`.
193 pub(super) idle: idle::Synced,
194
195 /// Synchronized state for `Inject`.
196 pub(crate) inject: inject::Synced,
197}
198
199/// Used to communicate with a worker from other threads.
200struct Remote {
201 /// Steals tasks from this worker.
202 pub(super) steal: queue::Steal<Arc<Handle>>,
203
204 /// Unparks the associated worker thread
205 unpark: Unparker,
206}
207
208/// Thread-local context
209pub(crate) struct Context {
210 /// Worker
211 worker: Arc<Worker>,
212
213 /// Core data
214 core: RefCell<Option<Box<Core>>>,
215
216 /// Tasks to wake after resource drivers are polled. This is mostly to
217 /// handle yielded tasks.
218 pub(crate) defer: Defer,
219}
220
221/// Starts the workers
222pub(crate) struct Launch(Vec<Arc<Worker>>);
223
224/// Running a task may consume the core. If the core is still available when
225/// running the task completes, it is returned. Otherwise, the worker will need
226/// to stop processing.
227type RunResult = Result<Box<Core>, ()>;
228
229/// A task handle
230type Task = task::Task<Arc<Handle>>;
231
232/// A notified task handle
233type Notified = task::Notified<Arc<Handle>>;
234
235/// Value picked out of thin-air. Running the LIFO slot a handful of times
236/// seems sufficient to benefit from locality. More than 3 times probably is
237/// overweighing. The value can be tuned in the future with data that shows
238/// improvements.
239const MAX_LIFO_POLLS_PER_TICK: usize = 3;
240
241pub(super) fn create(
242 size: usize,
243 park: Parker,
244 driver_handle: driver::Handle,
245 blocking_spawner: blocking::Spawner,
246 seed_generator: RngSeedGenerator,
247 config: Config,
248) -> (Arc<Handle>, Launch) {
249 let mut cores = Vec::with_capacity(size);
250 let mut remotes = Vec::with_capacity(size);
251 let mut worker_metrics = Vec::with_capacity(size);
252
253 // Create the local queues
254 for _ in 0..size {
255 let (steal, run_queue) = queue::local();
256
257 let park = park.clone();
258 let unpark = park.unpark();
259 let metrics = WorkerMetrics::from_config(&config);
260 let stats = Stats::new(&metrics);
261
262 cores.push(Box::new(Core {
263 tick: 0,
264 lifo_slot: None,
265 lifo_enabled: !config.disable_lifo_slot,
266 run_queue,
267 is_searching: false,
268 is_shutdown: false,
269 is_traced: false,
270 park: Some(park),
271 global_queue_interval: stats.tuned_global_queue_interval(&config),
272 stats,
273 rand: FastRand::from_seed(config.seed_generator.next_seed()),
274 }));
275
276 remotes.push(Remote { steal, unpark });
277 worker_metrics.push(metrics);
278 }
279
280 let (idle, idle_synced) = Idle::new(size);
281 let (inject, inject_synced) = inject::Shared::new();
282
283 let remotes_len = remotes.len();
284 let handle = Arc::new(Handle {
285 task_hooks: TaskHooks::from_config(&config),
286 shared: Shared {
287 remotes: remotes.into_boxed_slice(),
288 inject,
289 idle,
290 owned: OwnedTasks::new(size),
291 synced: Mutex::new(Synced {
292 idle: idle_synced,
293 inject: inject_synced,
294 }),
295 shutdown_cores: Mutex::new(vec![]),
296 trace_status: TraceStatus::new(remotes_len),
297 config,
298 scheduler_metrics: SchedulerMetrics::new(),
299 worker_metrics: worker_metrics.into_boxed_slice(),
300 _counters: Counters,
301 },
302 driver: driver_handle,
303 blocking_spawner,
304 seed_generator,
305 });
306
307 let mut launch = Launch(vec![]);
308
309 for (index, core) in cores.drain(..).enumerate() {
310 launch.0.push(Arc::new(Worker {
311 handle: handle.clone(),
312 index,
313 core: AtomicCell::new(Some(core)),
314 }));
315 }
316
317 (handle, launch)
318}
319
320#[track_caller]
321pub(crate) fn block_in_place<F, R>(f: F) -> R
322where
323 F: FnOnce() -> R,
324{
325 // Try to steal the worker core back
326 struct Reset {
327 take_core: bool,
328 budget: coop::Budget,
329 }
330
331 impl Drop for Reset {
332 fn drop(&mut self) {
333 with_current(|maybe_cx| {
334 if let Some(cx) = maybe_cx {
335 if self.take_core {
336 let core = cx.worker.core.take();
337
338 if core.is_some() {
339 cx.worker.handle.shared.worker_metrics[cx.worker.index]
340 .set_thread_id(thread::current().id());
341 }
342
343 let mut cx_core = cx.core.borrow_mut();
344 assert!(cx_core.is_none());
345 *cx_core = core;
346 }
347
348 // Reset the task budget as we are re-entering the
349 // runtime.
350 coop::set(self.budget);
351 }
352 });
353 }
354 }
355
356 let mut had_entered = false;
357 let mut take_core = false;
358
359 let setup_result = with_current(|maybe_cx| {
360 match (
361 crate::runtime::context::current_enter_context(),
362 maybe_cx.is_some(),
363 ) {
364 (context::EnterRuntime::Entered { .. }, true) => {
365 // We are on a thread pool runtime thread, so we just need to
366 // set up blocking.
367 had_entered = true;
368 }
369 (
370 context::EnterRuntime::Entered {
371 allow_block_in_place,
372 },
373 false,
374 ) => {
375 // We are on an executor, but _not_ on the thread pool. That is
376 // _only_ okay if we are in a thread pool runtime's block_on
377 // method:
378 if allow_block_in_place {
379 had_entered = true;
380 return Ok(());
381 } else {
382 // This probably means we are on the current_thread runtime or in a
383 // LocalSet, where it is _not_ okay to block.
384 return Err(
385 "can call blocking only when running on the multi-threaded runtime",
386 );
387 }
388 }
389 (context::EnterRuntime::NotEntered, true) => {
390 // This is a nested call to block_in_place (we already exited).
391 // All the necessary setup has already been done.
392 return Ok(());
393 }
394 (context::EnterRuntime::NotEntered, false) => {
395 // We are outside of the tokio runtime, so blocking is fine.
396 // We can also skip all of the thread pool blocking setup steps.
397 return Ok(());
398 }
399 }
400
401 let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
402
403 // Get the worker core. If none is set, then blocking is fine!
404 let mut core = match cx.core.borrow_mut().take() {
405 Some(core) => core,
406 None => return Ok(()),
407 };
408
409 // If we heavily call `spawn_blocking`, there might be no available thread to
410 // run this core. Except for the task in the lifo_slot, all tasks can be
411 // stolen, so we move the task out of the lifo_slot to the run_queue.
412 if let Some(task) = core.lifo_slot.take() {
413 core.run_queue
414 .push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats);
415 }
416
417 // We are taking the core from the context and sending it to another
418 // thread.
419 take_core = true;
420
421 // The parker should be set here
422 assert!(core.park.is_some());
423
424 // In order to block, the core must be sent to another thread for
425 // execution.
426 //
427 // First, move the core back into the worker's shared core slot.
428 cx.worker.core.set(core);
429
430 // Next, clone the worker handle and send it to a new thread for
431 // processing.
432 //
433 // Once the blocking task is done executing, we will attempt to
434 // steal the core back.
435 let worker = cx.worker.clone();
436 runtime::spawn_blocking(move || run(worker));
437 Ok(())
438 });
439
440 if let Err(panic_message) = setup_result {
441 panic!("{}", panic_message);
442 }
443
444 if had_entered {
445 // Unset the current task's budget. Blocking sections are not
446 // constrained by task budgets.
447 let _reset = Reset {
448 take_core,
449 budget: coop::stop(),
450 };
451
452 crate::runtime::context::exit_runtime(f)
453 } else {
454 f()
455 }
456}
457
458impl Launch {
459 pub(crate) fn launch(mut self) {
460 for worker in self.0.drain(..) {
461 runtime::spawn_blocking(move || run(worker));
462 }
463 }
464}
465
466fn run(worker: Arc<Worker>) {
467 #[allow(dead_code)]
468 struct AbortOnPanic;
469
470 impl Drop for AbortOnPanic {
471 fn drop(&mut self) {
472 if std::thread::panicking() {
473 eprintln!("worker thread panicking; aborting process");
474 std::process::abort();
475 }
476 }
477 }
478
479 // Catching panics on worker threads in tests is quite tricky. Instead, when
480 // debug assertions are enabled, we just abort the process.
481 #[cfg(debug_assertions)]
482 let _abort_on_panic = AbortOnPanic;
483
484 // Acquire a core. If this fails, then another thread is running this
485 // worker and there is nothing further to do.
486 let core = match worker.core.take() {
487 Some(core) => core,
488 None => return,
489 };
490
491 worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id());
492
493 let handle = scheduler::Handle::MultiThread(worker.handle.clone());
494
495 crate::runtime::context::enter_runtime(&handle, true, |_| {
496 // Set the worker context.
497 let cx = scheduler::Context::MultiThread(Context {
498 worker,
499 core: RefCell::new(None),
500 defer: Defer::new(),
501 });
502
503 context::set_scheduler(&cx, || {
504 let cx = cx.expect_multi_thread();
505
506 // This should always be an error. It only returns a `Result` to support
507 // using `?` to short circuit.
508 assert!(cx.run(core).is_err());
509
510 // Check if there are any deferred tasks to notify. This can happen when
511 // the worker core is lost due to `block_in_place()` being called from
512 // within the task.
513 cx.defer.wake();
514 });
515 });
516}
517
518impl Context {
519 fn run(&self, mut core: Box<Core>) -> RunResult {
520 // Reset `lifo_enabled` here in case the core was previously stolen from
521 // a task that had the LIFO slot disabled.
522 self.reset_lifo_enabled(&mut core);
523
524 // Start as "processing" tasks as polling tasks from the local queue
525 // will be one of the first things we do.
526 core.stats.start_processing_scheduled_tasks();
527
528 while !core.is_shutdown {
529 self.assert_lifo_enabled_is_correct(&core);
530
531 if core.is_traced {
532 core = self.worker.handle.trace_core(core);
533 }
534
535 // Increment the tick
536 core.tick();
537
538 // Run maintenance, if needed
539 core = self.maintenance(core);
540
541 // First, check work available to the current worker.
542 if let Some(task) = core.next_task(&self.worker) {
543 core = self.run_task(task, core)?;
544 continue;
545 }
546
547 // We consumed all work in the queues and will start searching for work.
548 core.stats.end_processing_scheduled_tasks();
549
550 // There is no more **local** work to process, try to steal work
551 // from other workers.
552 if let Some(task) = core.steal_work(&self.worker) {
553 // Found work, switch back to processing
554 core.stats.start_processing_scheduled_tasks();
555 core = self.run_task(task, core)?;
556 } else {
557 // Wait for work
558 core = if !self.defer.is_empty() {
559 self.park_timeout(core, Some(Duration::from_millis(0)))
560 } else {
561 self.park(core)
562 };
563 core.stats.start_processing_scheduled_tasks();
564 }
565 }
566
567 core.pre_shutdown(&self.worker);
568 // Signal shutdown
569 self.worker.handle.shutdown_core(core);
570 Err(())
571 }
572
573 fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
574 #[cfg(tokio_unstable)]
575 let task_id = task.task_id();
576
577 let task = self.worker.handle.shared.owned.assert_owner(task);
578
579 // Make sure the worker is not in the **searching** state. This enables
580 // another idle worker to try to steal work.
581 core.transition_from_searching(&self.worker);
582
583 self.assert_lifo_enabled_is_correct(&core);
584
585 // Measure the poll start time. Note that we may end up polling other
586 // tasks under this measurement. In this case, the tasks came from the
587 // LIFO slot and are considered part of the current task for scheduling
588 // purposes. These tasks inherent the "parent"'s limits.
589 core.stats.start_poll();
590
591 // Make the core available to the runtime context
592 *self.core.borrow_mut() = Some(core);
593
594 // Run the task
595 coop::budget(|| {
596 // Unlike the poll time above, poll start callback is attached to the task id,
597 // so it is tightly associated with the actual poll invocation.
598 #[cfg(tokio_unstable)]
599 self.worker.handle.task_hooks.poll_start_callback(task_id);
600
601 task.run();
602
603 #[cfg(tokio_unstable)]
604 self.worker.handle.task_hooks.poll_stop_callback(task_id);
605
606 let mut lifo_polls = 0;
607
608 // As long as there is budget remaining and a task exists in the
609 // `lifo_slot`, then keep running.
610 loop {
611 // Check if we still have the core. If not, the core was stolen
612 // by another worker.
613 let mut core = match self.core.borrow_mut().take() {
614 Some(core) => core,
615 None => {
616 // In this case, we cannot call `reset_lifo_enabled()`
617 // because the core was stolen. The stealer will handle
618 // that at the top of `Context::run`
619 return Err(());
620 }
621 };
622
623 // Check for a task in the LIFO slot
624 let task = match core.lifo_slot.take() {
625 Some(task) => task,
626 None => {
627 self.reset_lifo_enabled(&mut core);
628 core.stats.end_poll();
629 return Ok(core);
630 }
631 };
632
633 if !coop::has_budget_remaining() {
634 core.stats.end_poll();
635
636 // Not enough budget left to run the LIFO task, push it to
637 // the back of the queue and return.
638 core.run_queue.push_back_or_overflow(
639 task,
640 &*self.worker.handle,
641 &mut core.stats,
642 );
643 // If we hit this point, the LIFO slot should be enabled.
644 // There is no need to reset it.
645 debug_assert!(core.lifo_enabled);
646 return Ok(core);
647 }
648
649 // Track that we are about to run a task from the LIFO slot.
650 lifo_polls += 1;
651 super::counters::inc_lifo_schedules();
652
653 // Disable the LIFO slot if we reach our limit
654 //
655 // In ping-ping style workloads where task A notifies task B,
656 // which notifies task A again, continuously prioritizing the
657 // LIFO slot can cause starvation as these two tasks will
658 // repeatedly schedule the other. To mitigate this, we limit the
659 // number of times the LIFO slot is prioritized.
660 if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
661 core.lifo_enabled = false;
662 super::counters::inc_lifo_capped();
663 }
664
665 // Run the LIFO task, then loop
666 *self.core.borrow_mut() = Some(core);
667 let task = self.worker.handle.shared.owned.assert_owner(task);
668
669 #[cfg(tokio_unstable)]
670 let task_id = task.task_id();
671
672 #[cfg(tokio_unstable)]
673 self.worker.handle.task_hooks.poll_start_callback(task_id);
674
675 task.run();
676
677 #[cfg(tokio_unstable)]
678 self.worker.handle.task_hooks.poll_stop_callback(task_id);
679 }
680 })
681 }
682
683 fn reset_lifo_enabled(&self, core: &mut Core) {
684 core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
685 }
686
687 fn assert_lifo_enabled_is_correct(&self, core: &Core) {
688 debug_assert_eq!(
689 core.lifo_enabled,
690 !self.worker.handle.shared.config.disable_lifo_slot
691 );
692 }
693
694 fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
695 if core.tick % self.worker.handle.shared.config.event_interval == 0 {
696 super::counters::inc_num_maintenance();
697
698 core.stats.end_processing_scheduled_tasks();
699
700 // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
701 // to run without actually putting the thread to sleep.
702 core = self.park_timeout(core, Some(Duration::from_millis(0)));
703
704 // Run regularly scheduled maintenance
705 core.maintenance(&self.worker);
706
707 core.stats.start_processing_scheduled_tasks();
708 }
709
710 core
711 }
712
713 /// Parks the worker thread while waiting for tasks to execute.
714 ///
715 /// This function checks if indeed there's no more work left to be done before parking.
716 /// Also important to notice that, before parking, the worker thread will try to take
717 /// ownership of the Driver (IO/Time) and dispatch any events that might have fired.
718 /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled
719 /// in its own local queue until the queue saturates (ntasks > `LOCAL_QUEUE_CAPACITY`).
720 /// When the local queue is saturated, the overflow tasks are added to the injection queue
721 /// from where other workers can pick them up.
722 /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
723 /// after all the IOs get dispatched
724 fn park(&self, mut core: Box<Core>) -> Box<Core> {
725 if let Some(f) = &self.worker.handle.shared.config.before_park {
726 f();
727 }
728
729 if core.transition_to_parked(&self.worker) {
730 while !core.is_shutdown && !core.is_traced {
731 core.stats.about_to_park();
732 core.stats
733 .submit(&self.worker.handle.shared.worker_metrics[self.worker.index]);
734
735 core = self.park_timeout(core, None);
736
737 core.stats.unparked();
738
739 // Run regularly scheduled maintenance
740 core.maintenance(&self.worker);
741
742 if core.transition_from_parked(&self.worker) {
743 break;
744 }
745 }
746 }
747
748 if let Some(f) = &self.worker.handle.shared.config.after_unpark {
749 f();
750 }
751 core
752 }
753
754 fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
755 self.assert_lifo_enabled_is_correct(&core);
756
757 // Take the parker out of core
758 let mut park = core.park.take().expect("park missing");
759
760 // Store `core` in context
761 *self.core.borrow_mut() = Some(core);
762
763 // Park thread
764 if let Some(timeout) = duration {
765 park.park_timeout(&self.worker.handle.driver, timeout);
766 } else {
767 park.park(&self.worker.handle.driver);
768 }
769
770 self.defer.wake();
771
772 // Remove `core` from context
773 core = self.core.borrow_mut().take().expect("core missing");
774
775 // Place `park` back in `core`
776 core.park = Some(park);
777
778 if core.should_notify_others() {
779 self.worker.handle.notify_parked_local();
780 }
781
782 core
783 }
784
785 pub(crate) fn defer(&self, waker: &Waker) {
786 self.defer.defer(waker);
787 }
788
789 #[allow(dead_code)]
790 pub(crate) fn get_worker_index(&self) -> usize {
791 self.worker.index
792 }
793}
794
795impl Core {
796 /// Increment the tick
797 fn tick(&mut self) {
798 self.tick = self.tick.wrapping_add(1);
799 }
800
801 /// Return the next notified task available to this worker.
802 fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
803 if self.tick % self.global_queue_interval == 0 {
804 // Update the global queue interval, if needed
805 self.tune_global_queue_interval(worker);
806
807 worker
808 .handle
809 .next_remote_task()
810 .or_else(|| self.next_local_task())
811 } else {
812 let maybe_task = self.next_local_task();
813
814 if maybe_task.is_some() {
815 return maybe_task;
816 }
817
818 if worker.inject().is_empty() {
819 return None;
820 }
821
822 // Other threads can only **remove** tasks from the current worker's
823 // `run_queue`. So, we can be confident that by the time we call
824 // `run_queue.push_back` below, there will be *at least* `cap`
825 // available slots in the queue.
826 let cap = usize::min(
827 self.run_queue.remaining_slots(),
828 self.run_queue.max_capacity() / 2,
829 );
830
831 // The worker is currently idle, pull a batch of work from the
832 // injection queue. We don't want to pull *all* the work so other
833 // workers can also get some.
834 let n = usize::min(
835 worker.inject().len() / worker.handle.shared.remotes.len() + 1,
836 cap,
837 );
838
839 // Take at least one task since the first task is returned directly
840 // and not pushed onto the local queue.
841 let n = usize::max(1, n);
842
843 let mut synced = worker.handle.shared.synced.lock();
844 // safety: passing in the correct `inject::Synced`.
845 let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) };
846
847 // Pop the first task to return immediately
848 let ret = tasks.next();
849
850 // Push the rest of the on the run queue
851 self.run_queue.push_back(tasks);
852
853 ret
854 }
855 }
856
857 fn next_local_task(&mut self) -> Option<Notified> {
858 self.lifo_slot.take().or_else(|| self.run_queue.pop())
859 }
860
861 /// Function responsible for stealing tasks from another worker
862 ///
863 /// Note: Only if less than half the workers are searching for tasks to steal
864 /// a new worker will actually try to steal. The idea is to make sure not all
865 /// workers will be trying to steal at the same time.
866 fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
867 if !self.transition_to_searching(worker) {
868 return None;
869 }
870
871 let num = worker.handle.shared.remotes.len();
872 // Start from a random worker
873 let start = self.rand.fastrand_n(num as u32) as usize;
874
875 for i in 0..num {
876 let i = (start + i) % num;
877
878 // Don't steal from ourself! We know we don't have work.
879 if i == worker.index {
880 continue;
881 }
882
883 let target = &worker.handle.shared.remotes[i];
884 if let Some(task) = target
885 .steal
886 .steal_into(&mut self.run_queue, &mut self.stats)
887 {
888 return Some(task);
889 }
890 }
891
892 // Fallback on checking the global queue
893 worker.handle.next_remote_task()
894 }
895
896 fn transition_to_searching(&mut self, worker: &Worker) -> bool {
897 if !self.is_searching {
898 self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
899 }
900
901 self.is_searching
902 }
903
904 fn transition_from_searching(&mut self, worker: &Worker) {
905 if !self.is_searching {
906 return;
907 }
908
909 self.is_searching = false;
910 worker.handle.transition_worker_from_searching();
911 }
912
913 fn has_tasks(&self) -> bool {
914 self.lifo_slot.is_some() || self.run_queue.has_tasks()
915 }
916
917 fn should_notify_others(&self) -> bool {
918 // If there are tasks available to steal, but this worker is not
919 // looking for tasks to steal, notify another worker.
920 if self.is_searching {
921 return false;
922 }
923 self.lifo_slot.is_some() as usize + self.run_queue.len() > 1
924 }
925
926 /// Prepares the worker state for parking.
927 ///
928 /// Returns true if the transition happened, false if there is work to do first.
929 fn transition_to_parked(&mut self, worker: &Worker) -> bool {
930 // Workers should not park if they have work to do
931 if self.has_tasks() || self.is_traced {
932 return false;
933 }
934
935 // When the final worker transitions **out** of searching to parked, it
936 // must check all the queues one last time in case work materialized
937 // between the last work scan and transitioning out of searching.
938 let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked(
939 &worker.handle.shared,
940 worker.index,
941 self.is_searching,
942 );
943
944 // The worker is no longer searching. Setting this is the local cache
945 // only.
946 self.is_searching = false;
947
948 if is_last_searcher {
949 worker.handle.notify_if_work_pending();
950 }
951
952 true
953 }
954
955 /// Returns `true` if the transition happened.
956 fn transition_from_parked(&mut self, worker: &Worker) -> bool {
957 // If a task is in the lifo slot/run queue, then we must unpark regardless of
958 // being notified
959 if self.has_tasks() {
960 // When a worker wakes, it should only transition to the "searching"
961 // state when the wake originates from another worker *or* a new task
962 // is pushed. We do *not* want the worker to transition to "searching"
963 // when it wakes when the I/O driver receives new events.
964 self.is_searching = !worker
965 .handle
966 .shared
967 .idle
968 .unpark_worker_by_id(&worker.handle.shared, worker.index);
969 return true;
970 }
971
972 if worker
973 .handle
974 .shared
975 .idle
976 .is_parked(&worker.handle.shared, worker.index)
977 {
978 return false;
979 }
980
981 // When unparked, the worker is in the searching state.
982 self.is_searching = true;
983 true
984 }
985
986 /// Runs maintenance work such as checking the pool's state.
987 fn maintenance(&mut self, worker: &Worker) {
988 self.stats
989 .submit(&worker.handle.shared.worker_metrics[worker.index]);
990
991 if !self.is_shutdown {
992 // Check if the scheduler has been shutdown
993 let synced = worker.handle.shared.synced.lock();
994 self.is_shutdown = worker.inject().is_closed(&synced.inject);
995 }
996
997 if !self.is_traced {
998 // Check if the worker should be tracing.
999 self.is_traced = worker.handle.shared.trace_status.trace_requested();
1000 }
1001 }
1002
1003 /// Signals all tasks to shut down, and waits for them to complete. Must run
1004 /// before we enter the single-threaded phase of shutdown processing.
1005 fn pre_shutdown(&mut self, worker: &Worker) {
1006 // Start from a random inner list
1007 let start = self
1008 .rand
1009 .fastrand_n(worker.handle.shared.owned.get_shard_size() as u32);
1010 // Signal to all tasks to shut down.
1011 worker
1012 .handle
1013 .shared
1014 .owned
1015 .close_and_shutdown_all(start as usize);
1016
1017 self.stats
1018 .submit(&worker.handle.shared.worker_metrics[worker.index]);
1019 }
1020
1021 /// Shuts down the core.
1022 fn shutdown(&mut self, handle: &Handle) {
1023 // Take the core
1024 let mut park = self.park.take().expect("park missing");
1025
1026 // Drain the queue
1027 while self.next_local_task().is_some() {}
1028
1029 park.shutdown(&handle.driver);
1030 }
1031
1032 fn tune_global_queue_interval(&mut self, worker: &Worker) {
1033 let next = self
1034 .stats
1035 .tuned_global_queue_interval(&worker.handle.shared.config);
1036
1037 // Smooth out jitter
1038 if u32::abs_diff(self.global_queue_interval, next) > 2 {
1039 self.global_queue_interval = next;
1040 }
1041 }
1042}
1043
1044impl Worker {
1045 /// Returns a reference to the scheduler's injection queue.
1046 fn inject(&self) -> &inject::Shared<Arc<Handle>> {
1047 &self.handle.shared.inject
1048 }
1049}
1050
1051// TODO: Move `Handle` impls into handle.rs
1052impl task::Schedule for Arc<Handle> {
1053 fn release(&self, task: &Task) -> Option<Task> {
1054 self.shared.owned.remove(task)
1055 }
1056
1057 fn schedule(&self, task: Notified) {
1058 self.schedule_task(task, false);
1059 }
1060
1061 fn hooks(&self) -> TaskHarnessScheduleHooks {
1062 TaskHarnessScheduleHooks {
1063 task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
1064 }
1065 }
1066
1067 fn yield_now(&self, task: Notified) {
1068 self.schedule_task(task, true);
1069 }
1070}
1071
1072impl Handle {
1073 pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1074 with_current(|maybe_cx| {
1075 if let Some(cx) = maybe_cx {
1076 // Make sure the task is part of the **current** scheduler.
1077 if self.ptr_eq(&cx.worker.handle) {
1078 // And the current thread still holds a core
1079 if let Some(core) = cx.core.borrow_mut().as_mut() {
1080 self.schedule_local(core, task, is_yield);
1081 return;
1082 }
1083 }
1084 }
1085
1086 // Otherwise, use the inject queue.
1087 self.push_remote_task(task);
1088 self.notify_parked_remote();
1089 });
1090 }
1091
1092 pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) {
1093 if let Some(task) = task {
1094 self.schedule_task(task, false);
1095 }
1096 }
1097
1098 fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
1099 core.stats.inc_local_schedule_count();
1100
1101 // Spawning from the worker thread. If scheduling a "yield" then the
1102 // task must always be pushed to the back of the queue, enabling other
1103 // tasks to be executed. If **not** a yield, then there is more
1104 // flexibility and the task may go to the front of the queue.
1105 let should_notify = if is_yield || !core.lifo_enabled {
1106 core.run_queue
1107 .push_back_or_overflow(task, self, &mut core.stats);
1108 true
1109 } else {
1110 // Push to the LIFO slot
1111 let prev = core.lifo_slot.take();
1112 let ret = prev.is_some();
1113
1114 if let Some(prev) = prev {
1115 core.run_queue
1116 .push_back_or_overflow(prev, self, &mut core.stats);
1117 }
1118
1119 core.lifo_slot = Some(task);
1120
1121 ret
1122 };
1123
1124 // Only notify if not currently parked. If `park` is `None`, then the
1125 // scheduling is from a resource driver. As notifications often come in
1126 // batches, the notification is delayed until the park is complete.
1127 if should_notify && core.park.is_some() {
1128 self.notify_parked_local();
1129 }
1130 }
1131
1132 fn next_remote_task(&self) -> Option<Notified> {
1133 if self.shared.inject.is_empty() {
1134 return None;
1135 }
1136
1137 let mut synced = self.shared.synced.lock();
1138 // safety: passing in correct `idle::Synced`
1139 unsafe { self.shared.inject.pop(&mut synced.inject) }
1140 }
1141
1142 fn push_remote_task(&self, task: Notified) {
1143 self.shared.scheduler_metrics.inc_remote_schedule_count();
1144
1145 let mut synced = self.shared.synced.lock();
1146 // safety: passing in correct `idle::Synced`
1147 unsafe {
1148 self.shared.inject.push(&mut synced.inject, task);
1149 }
1150 }
1151
1152 pub(super) fn close(&self) {
1153 if self
1154 .shared
1155 .inject
1156 .close(&mut self.shared.synced.lock().inject)
1157 {
1158 self.notify_all();
1159 }
1160 }
1161
1162 fn notify_parked_local(&self) {
1163 super::counters::inc_num_inc_notify_local();
1164
1165 if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1166 super::counters::inc_num_unparks_local();
1167 self.shared.remotes[index].unpark.unpark(&self.driver);
1168 }
1169 }
1170
1171 fn notify_parked_remote(&self) {
1172 if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1173 self.shared.remotes[index].unpark.unpark(&self.driver);
1174 }
1175 }
1176
1177 pub(super) fn notify_all(&self) {
1178 for remote in &self.shared.remotes[..] {
1179 remote.unpark.unpark(&self.driver);
1180 }
1181 }
1182
1183 fn notify_if_work_pending(&self) {
1184 for remote in &self.shared.remotes[..] {
1185 if !remote.steal.is_empty() {
1186 self.notify_parked_local();
1187 return;
1188 }
1189 }
1190
1191 if !self.shared.inject.is_empty() {
1192 self.notify_parked_local();
1193 }
1194 }
1195
1196 fn transition_worker_from_searching(&self) {
1197 if self.shared.idle.transition_worker_from_searching() {
1198 // We are the final searching worker. Because work was found, we
1199 // need to notify another worker.
1200 self.notify_parked_local();
1201 }
1202 }
1203
1204 /// Signals that a worker has observed the shutdown signal and has replaced
1205 /// its core back into its handle.
1206 ///
1207 /// If all workers have reached this point, the final cleanup is performed.
1208 fn shutdown_core(&self, core: Box<Core>) {
1209 let mut cores = self.shared.shutdown_cores.lock();
1210 cores.push(core);
1211
1212 if cores.len() != self.shared.remotes.len() {
1213 return;
1214 }
1215
1216 debug_assert!(self.shared.owned.is_empty());
1217
1218 for mut core in cores.drain(..) {
1219 core.shutdown(self);
1220 }
1221
1222 // Drain the injection queue
1223 //
1224 // We already shut down every task, so we can simply drop the tasks.
1225 while let Some(task) = self.next_remote_task() {
1226 drop(task);
1227 }
1228 }
1229
1230 fn ptr_eq(&self, other: &Handle) -> bool {
1231 std::ptr::eq(self, other)
1232 }
1233}
1234
1235impl Overflow<Arc<Handle>> for Handle {
1236 fn push(&self, task: task::Notified<Arc<Handle>>) {
1237 self.push_remote_task(task);
1238 }
1239
1240 fn push_batch<I>(&self, iter: I)
1241 where
1242 I: Iterator<Item = task::Notified<Arc<Handle>>>,
1243 {
1244 unsafe {
1245 self.shared.inject.push_batch(self, iter);
1246 }
1247 }
1248}
1249
1250pub(crate) struct InjectGuard<'a> {
1251 lock: crate::loom::sync::MutexGuard<'a, Synced>,
1252}
1253
1254impl<'a> AsMut<inject::Synced> for InjectGuard<'a> {
1255 fn as_mut(&mut self) -> &mut inject::Synced {
1256 &mut self.lock.inject
1257 }
1258}
1259
1260impl<'a> Lock<inject::Synced> for &'a Handle {
1261 type Handle = InjectGuard<'a>;
1262
1263 fn lock(self) -> Self::Handle {
1264 InjectGuard {
1265 lock: self.shared.synced.lock(),
1266 }
1267 }
1268}
1269
1270#[track_caller]
1271fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1272 use scheduler::Context::MultiThread;
1273
1274 context::with_scheduler(|ctx| match ctx {
1275 Some(MultiThread(ctx)) => f(Some(ctx)),
1276 _ => f(None),
1277 })
1278}