1use crate::loom::sync::{Arc, Condvar, Mutex, MutexGuard};
60use crate::runtime;
61use crate::runtime::driver::Driver;
62use crate::runtime::scheduler::multi_thread_alt::{
63 idle, queue, stats, Counters, Handle, Idle, Overflow, Stats, TraceStatus,
64};
65use crate::runtime::scheduler::{self, inject, Lock};
66use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks};
67use crate::runtime::{blocking, coop, driver, task, Config, SchedulerMetrics, WorkerMetrics};
68use crate::runtime::{context, TaskHooks};
69use crate::util::atomic_cell::AtomicCell;
70use crate::util::rand::{FastRand, RngSeedGenerator};
71
72use std::cell::{Cell, RefCell};
73use std::task::Waker;
74use std::time::Duration;
75use std::{cmp, thread};
76
77cfg_unstable_metrics! {
78 mod metrics;
79}
80
81mod taskdump_mock;
82
83pub(super) struct Worker {
87 tick: u32,
89
90 pub(super) is_shutdown: bool,
92
93 is_traced: bool,
95
96 num_seq_local_queue_polls: u32,
99
100 global_queue_interval: u32,
102
103 workers_to_notify: Vec<usize>,
105
106 idle_snapshot: idle::Snapshot,
108
109 stats: stats::Ephemeral,
110}
111
112#[repr(align(128))]
116pub(super) struct Core {
117 pub(super) index: usize,
119
120 lifo_slot: Option<Notified>,
121
122 run_queue: queue::Local<Arc<Handle>>,
124
125 pub(super) is_searching: bool,
128
129 stats: Stats,
131
132 rand: FastRand,
134}
135
136pub(crate) struct Shared {
138 remotes: Box<[Remote]>,
140
141 pub(super) inject: inject::Shared<Arc<Handle>>,
145
146 idle: Idle,
148
149 pub(super) owned: OwnedTasks<Arc<Handle>>,
151
152 pub(super) synced: Mutex<Synced>,
154
155 driver: AtomicCell<Driver>,
158
159 pub(super) condvars: Vec<Condvar>,
162
163 pub(super) trace_status: TraceStatus,
165
166 config: Config,
168
169 pub(super) scheduler_metrics: SchedulerMetrics,
171
172 pub(super) worker_metrics: Box<[WorkerMetrics]>,
173
174 _counters: Counters,
179}
180
181pub(crate) struct Synced {
183 pub(super) assigned_cores: Vec<Option<Box<Core>>>,
186
187 shutdown_cores: Vec<Box<Core>>,
192
193 shutdown_driver: Option<Box<Driver>>,
195
196 pub(super) idle: idle::Synced,
198
199 pub(crate) inject: inject::Synced,
201}
202
203struct Remote {
205 pub(super) steal: queue::Steal<Arc<Handle>>,
214}
215
216pub(crate) struct Context {
218 handle: Arc<Handle>,
220
221 index: usize,
223
224 lifo_enabled: Cell<bool>,
226
227 core: RefCell<Option<Box<Core>>>,
229
230 handoff_core: Arc<AtomicCell<Core>>,
232
233 pub(crate) defer: RefCell<Vec<Notified>>,
236}
237
238type RunResult = Result<Box<Core>, ()>;
242type NextTaskResult = Result<(Option<Notified>, Box<Core>), ()>;
243
244type Task = task::Task<Arc<Handle>>;
246
247type Notified = task::Notified<Arc<Handle>>;
249
250const MAX_LIFO_POLLS_PER_TICK: usize = 3;
255
256pub(super) fn create(
257 num_cores: usize,
258 driver: Driver,
259 driver_handle: driver::Handle,
260 blocking_spawner: blocking::Spawner,
261 seed_generator: RngSeedGenerator,
262 config: Config,
263) -> runtime::Handle {
264 let mut num_workers = num_cores;
265
266 if driver.is_enabled() {
269 num_workers += 1;
270 }
271
272 let mut cores = Vec::with_capacity(num_cores);
273 let mut remotes = Vec::with_capacity(num_cores);
274 let mut worker_metrics = Vec::with_capacity(num_cores);
276
277 for i in 0..num_cores {
279 let (steal, run_queue) = queue::local(config.local_queue_capacity);
280
281 let metrics = WorkerMetrics::from_config(&config);
282 let stats = Stats::new(&metrics);
283
284 cores.push(Box::new(Core {
285 index: i,
286 lifo_slot: None,
287 run_queue,
288 is_searching: false,
289 stats,
290 rand: FastRand::from_seed(config.seed_generator.next_seed()),
291 }));
292
293 remotes.push(Remote {
294 steal,
295 });
297 worker_metrics.push(metrics);
298 }
299
300 let (idle, idle_synced) = Idle::new(cores, num_workers);
303 let (inject, inject_synced) = inject::Shared::new();
304
305 let handle = Arc::new(Handle {
306 task_hooks: TaskHooks::from_config(&config),
307 shared: Shared {
308 remotes: remotes.into_boxed_slice(),
309 inject,
310 idle,
311 owned: OwnedTasks::new(num_cores),
312 synced: Mutex::new(Synced {
313 assigned_cores: (0..num_workers).map(|_| None).collect(),
314 shutdown_cores: Vec::with_capacity(num_cores),
315 shutdown_driver: None,
316 idle: idle_synced,
317 inject: inject_synced,
318 }),
319 driver: AtomicCell::new(Some(Box::new(driver))),
320 condvars: (0..num_workers).map(|_| Condvar::new()).collect(),
321 trace_status: TraceStatus::new(num_cores),
322 config,
323 scheduler_metrics: SchedulerMetrics::new(),
324 worker_metrics: worker_metrics.into_boxed_slice(),
325 _counters: Counters,
326 },
327 driver: driver_handle,
328 blocking_spawner,
329 seed_generator,
330 });
331
332 let rt_handle = runtime::Handle {
333 inner: scheduler::Handle::MultiThreadAlt(handle),
334 };
335
336 for index in 0..num_workers {
338 let handle = rt_handle.inner.expect_multi_thread_alt();
339 let h2 = handle.clone();
340 let handoff_core = Arc::new(AtomicCell::new(None));
341
342 handle
343 .blocking_spawner
344 .spawn_blocking(&rt_handle, move || run(index, h2, handoff_core, false));
345 }
346
347 rt_handle
348}
349
350#[track_caller]
351pub(crate) fn block_in_place<F, R>(f: F) -> R
352where
353 F: FnOnce() -> R,
354{
355 struct Reset(coop::Budget);
357
358 impl Drop for Reset {
359 fn drop(&mut self) {
360 with_current(|maybe_cx| {
361 if let Some(cx) = maybe_cx {
362 let core = cx.handoff_core.take();
363 let mut cx_core = cx.core.borrow_mut();
364 assert!(cx_core.is_none());
365 *cx_core = core;
366
367 coop::set(self.0);
370 }
371 });
372 }
373 }
374
375 let mut had_entered = false;
376
377 let setup_result = with_current(|maybe_cx| {
378 match (
379 crate::runtime::context::current_enter_context(),
380 maybe_cx.is_some(),
381 ) {
382 (context::EnterRuntime::Entered { .. }, true) => {
383 had_entered = true;
386 }
387 (
388 context::EnterRuntime::Entered {
389 allow_block_in_place,
390 },
391 false,
392 ) => {
393 if allow_block_in_place {
397 had_entered = true;
398 return Ok(());
399 } else {
400 return Err(
403 "can call blocking only when running on the multi-threaded runtime",
404 );
405 }
406 }
407 (context::EnterRuntime::NotEntered, true) => {
408 return Ok(());
411 }
412 (context::EnterRuntime::NotEntered, false) => {
413 return Ok(());
416 }
417 }
418
419 let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
420
421 let core = match cx.core.borrow_mut().take() {
423 Some(core) => core,
424 None => return Ok(()),
425 };
426
427 cx.handoff_core.set(core);
432
433 let index = cx.index;
439 let handle = cx.handle.clone();
440 let handoff_core = cx.handoff_core.clone();
441 runtime::spawn_blocking(move || run(index, handle, handoff_core, true));
442 Ok(())
443 });
444
445 if let Err(panic_message) = setup_result {
446 panic!("{}", panic_message);
447 }
448
449 if had_entered {
450 let _reset = Reset(coop::stop());
453
454 crate::runtime::context::exit_runtime(f)
455 } else {
456 f()
457 }
458}
459
460fn run(
461 index: usize,
462 handle: Arc<Handle>,
463 handoff_core: Arc<AtomicCell<Core>>,
464 blocking_in_place: bool,
465) {
466 struct AbortOnPanic;
467
468 impl Drop for AbortOnPanic {
469 fn drop(&mut self) {
470 if std::thread::panicking() {
471 eprintln!("worker thread panicking; aborting process");
472 std::process::abort();
473 }
474 }
475 }
476
477 #[cfg(debug_assertions)]
480 let _abort_on_panic = AbortOnPanic;
481
482 let num_workers = handle.shared.condvars.len();
483
484 let mut worker = Worker {
485 tick: 0,
486 num_seq_local_queue_polls: 0,
487 global_queue_interval: Stats::DEFAULT_GLOBAL_QUEUE_INTERVAL,
488 is_shutdown: false,
489 is_traced: false,
490 workers_to_notify: Vec::with_capacity(num_workers - 1),
491 idle_snapshot: idle::Snapshot::new(&handle.shared.idle),
492 stats: stats::Ephemeral::new(),
493 };
494
495 let sched_handle = scheduler::Handle::MultiThreadAlt(handle.clone());
496
497 crate::runtime::context::enter_runtime(&sched_handle, true, |_| {
498 let cx = scheduler::Context::MultiThreadAlt(Context {
500 index,
501 lifo_enabled: Cell::new(!handle.shared.config.disable_lifo_slot),
502 handle,
503 core: RefCell::new(None),
504 handoff_core,
505 defer: RefCell::new(Vec::with_capacity(64)),
506 });
507
508 context::set_scheduler(&cx, || {
509 let cx = cx.expect_multi_thread_alt();
510
511 let res = worker.run(&cx, blocking_in_place);
513 debug_assert!(res.is_err());
516
517 if !cx.defer.borrow().is_empty() {
521 worker.schedule_deferred_without_core(&cx, &mut cx.shared().synced.lock());
522 }
523 });
524 });
525}
526
527macro_rules! try_task {
528 ($e:expr) => {{
529 let (task, core) = $e?;
530 if task.is_some() {
531 return Ok((task, core));
532 }
533 core
534 }};
535}
536
537macro_rules! try_task_new_batch {
538 ($w:expr, $e:expr) => {{
539 let (task, mut core) = $e?;
540 if task.is_some() {
541 core.stats.start_processing_scheduled_tasks(&mut $w.stats);
542 return Ok((task, core));
543 }
544 core
545 }};
546}
547
548impl Worker {
549 fn run(&mut self, cx: &Context, blocking_in_place: bool) -> RunResult {
550 let (maybe_task, mut core) = {
551 if blocking_in_place {
552 if let Some(core) = cx.handoff_core.take() {
553 (None, core)
554 } else {
555 return Err(());
557 }
558 } else {
559 let mut synced = cx.shared().synced.lock();
560
561 if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
563 let maybe_task = cx.shared().next_remote_task_synced(&mut synced);
565 (maybe_task, core)
566 } else {
567 self.wait_for_core(cx, synced)?
569 }
570 }
571 };
572
573 cx.shared().worker_metrics[core.index].set_thread_id(thread::current().id());
574 core.stats.start_processing_scheduled_tasks(&mut self.stats);
575
576 if let Some(task) = maybe_task {
577 core = self.run_task(cx, core, task)?;
578 }
579
580 while !self.is_shutdown {
581 let (maybe_task, c) = self.next_task(cx, core)?;
582 core = c;
583
584 if let Some(task) = maybe_task {
585 core = self.run_task(cx, core, task)?;
586 } else {
587 assert!(self.is_shutdown);
590 break;
591 }
592 }
593
594 cx.shared().shutdown_core(&cx.handle, core);
595
596 self.shutdown_clear_defer(cx);
599
600 Err(())
601 }
602
603 fn try_acquire_available_core(
605 &mut self,
606 cx: &Context,
607 synced: &mut Synced,
608 ) -> Option<Box<Core>> {
609 if let Some(mut core) = cx
610 .shared()
611 .idle
612 .try_acquire_available_core(&mut synced.idle)
613 {
614 self.reset_acquired_core(cx, synced, &mut core);
615 Some(core)
616 } else {
617 None
618 }
619 }
620
621 fn wait_for_core(
623 &mut self,
624 cx: &Context,
625 mut synced: MutexGuard<'_, Synced>,
626 ) -> NextTaskResult {
627 if cx.shared().idle.needs_searching() {
628 if let Some(mut core) = self.try_acquire_available_core(cx, &mut synced) {
629 cx.shared().idle.transition_worker_to_searching(&mut core);
630 return Ok((None, core));
631 }
632 }
633
634 cx.shared()
635 .idle
636 .transition_worker_to_parked(&mut synced, cx.index);
637
638 let mut core = loop {
640 if let Some(core) = synced.assigned_cores[cx.index].take() {
641 break core;
642 }
643
644 if cx.shared().inject.is_closed(&synced.inject) {
646 self.shutdown_clear_defer(cx);
647 return Err(());
648 }
649
650 synced = cx.shared().condvars[cx.index].wait(synced).unwrap();
651 };
652
653 self.reset_acquired_core(cx, &mut synced, &mut core);
654
655 if self.is_shutdown {
656 return Ok((None, core));
658 }
659
660 let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
661 let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n);
662
663 core.stats.unparked();
664 self.flush_metrics(cx, &mut core);
665
666 Ok((maybe_task, core))
667 }
668
669 fn reset_acquired_core(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core) {
671 self.global_queue_interval = core.stats.tuned_global_queue_interval(&cx.shared().config);
672
673 self.reset_lifo_enabled(cx);
676
677 #[cfg(not(loom))]
679 debug_assert!(core.run_queue.is_empty());
680
681 self.update_global_flags(cx, synced);
683 }
684
685 fn next_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
688 self.assert_lifo_enabled_is_correct(cx);
689
690 if self.is_traced {
691 core = cx.handle.trace_core(core);
692 }
693
694 self.tick = self.tick.wrapping_add(1);
696
697 core = try_task!(self.maybe_maintenance(&cx, core));
700
701 core = try_task!(self.next_notified_task(cx, core));
704
705 core.stats.end_processing_scheduled_tasks(&mut self.stats);
707
708 super::counters::inc_num_no_local_work();
709
710 if !cx.defer.borrow().is_empty() {
711 try_task_new_batch!(self, self.park_yield(cx, core));
714
715 panic!("what happened to the deferred tasks? 🤔");
716 }
717
718 while !self.is_shutdown {
719 core = try_task_new_batch!(self, self.search_for_work(cx, core));
723
724 debug_assert!(cx.defer.borrow().is_empty());
725 core = try_task_new_batch!(self, self.park(cx, core));
726 }
727
728 self.shutdown_clear_defer(cx);
730
731 Ok((None, core))
732 }
733
734 fn next_notified_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
735 self.num_seq_local_queue_polls += 1;
736
737 if self.num_seq_local_queue_polls % self.global_queue_interval == 0 {
738 super::counters::inc_global_queue_interval();
739
740 self.num_seq_local_queue_polls = 0;
741
742 self.tune_global_queue_interval(cx, &mut core);
744
745 if let Some(task) = self.next_remote_task(cx) {
746 return Ok((Some(task), core));
747 }
748 }
749
750 if let Some(task) = core.next_local_task() {
751 return Ok((Some(task), core));
752 }
753
754 self.next_remote_task_batch(cx, core)
755 }
756
757 fn next_remote_task(&self, cx: &Context) -> Option<Notified> {
758 if cx.shared().inject.is_empty() {
759 return None;
760 }
761
762 let mut synced = cx.shared().synced.lock();
763 cx.shared().next_remote_task_synced(&mut synced)
764 }
765
766 fn next_remote_task_batch(&self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
767 if cx.shared().inject.is_empty() {
768 return Ok((None, core));
769 }
770
771 let cap = usize::min(
776 core.run_queue.remaining_slots(),
777 usize::max(core.run_queue.max_capacity() / 2, 1),
778 );
779
780 let mut synced = cx.shared().synced.lock();
781 let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, cap);
782 Ok((maybe_task, core))
783 }
784
785 fn next_remote_task_batch_synced(
786 &self,
787 cx: &Context,
788 synced: &mut Synced,
789 core: &mut Core,
790 max: usize,
791 ) -> Option<Notified> {
792 super::counters::inc_num_remote_batch();
793
794 let n = if core.is_searching {
798 cx.shared().inject.len() / cx.shared().idle.num_searching() + 1
799 } else {
800 cx.shared().inject.len() / cx.shared().remotes.len() + 1
801 };
802
803 let n = usize::min(n, max) + 1;
804
805 let mut tasks = unsafe { cx.shared().inject.pop_n(&mut synced.inject, n) };
807
808 let ret = tasks.next();
810
811 core.run_queue.push_back(tasks);
813
814 ret
815 }
816
817 fn search_for_work(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
823 #[cfg(not(loom))]
824 const ROUNDS: usize = 4;
825
826 #[cfg(loom)]
827 const ROUNDS: usize = 1;
828
829 debug_assert!(core.lifo_slot.is_none());
830 #[cfg(not(loom))]
831 debug_assert!(core.run_queue.is_empty());
832
833 if !core.run_queue.can_steal() {
834 return Ok((None, core));
835 }
836
837 if !self.transition_to_searching(cx, &mut core) {
838 return Ok((None, core));
839 }
840
841 cx.shared().idle.snapshot(&mut self.idle_snapshot);
845
846 let num = cx.shared().remotes.len();
847
848 for i in 0..ROUNDS {
849 let start = core.rand.fastrand_n(num as u32) as usize;
851
852 if let Some(task) = self.steal_one_round(cx, &mut core, start) {
853 return Ok((Some(task), core));
854 }
855
856 core = try_task!(self.next_remote_task_batch(cx, core));
857
858 if i > 0 {
859 super::counters::inc_num_spin_stall();
860 std::thread::sleep(std::time::Duration::from_micros(i as u64));
861 }
862 }
863
864 Ok((None, core))
865 }
866
867 fn steal_one_round(&self, cx: &Context, core: &mut Core, start: usize) -> Option<Notified> {
868 let num = cx.shared().remotes.len();
869
870 for i in 0..num {
871 let i = (start + i) % num;
872
873 if i == core.index {
875 continue;
876 }
877
878 if self.idle_snapshot.is_idle(i) {
880 continue;
881 }
882
883 let target = &cx.shared().remotes[i];
884
885 if let Some(task) = target
886 .steal
887 .steal_into(&mut core.run_queue, &mut core.stats)
888 {
889 return Some(task);
890 }
891 }
892
893 None
894 }
895
896 fn run_task(&mut self, cx: &Context, mut core: Box<Core>, task: Notified) -> RunResult {
897 let task = cx.shared().owned.assert_owner(task);
898
899 if self.transition_from_searching(cx, &mut core) {
902 super::counters::inc_num_relay_search();
903 cx.shared().notify_parked_local();
904 }
905
906 self.assert_lifo_enabled_is_correct(cx);
907
908 core.stats.start_poll(&mut self.stats);
913
914 *cx.core.borrow_mut() = Some(core);
916
917 coop::budget(|| {
919 super::counters::inc_num_polls();
920 task.run();
921 let mut lifo_polls = 0;
922
923 loop {
926 let mut core = match cx.core.borrow_mut().take() {
929 Some(core) => core,
930 None => {
931 return Err(());
935 }
936 };
937
938 let task = match core.next_lifo_task() {
940 Some(task) => task,
941 None => {
942 self.reset_lifo_enabled(cx);
943 core.stats.end_poll();
944 return Ok(core);
945 }
946 };
947
948 if !coop::has_budget_remaining() {
949 core.stats.end_poll();
950
951 core.run_queue
954 .push_back_or_overflow(task, cx.shared(), &mut core.stats);
955 debug_assert!(cx.lifo_enabled.get());
958 return Ok(core);
959 }
960
961 lifo_polls += 1;
963 super::counters::inc_lifo_schedules();
964
965 if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
973 cx.lifo_enabled.set(false);
974 super::counters::inc_lifo_capped();
975 }
976
977 *cx.core.borrow_mut() = Some(core);
979 let task = cx.shared().owned.assert_owner(task);
980 super::counters::inc_num_lifo_polls();
981 task.run();
982 }
983 })
984 }
985
986 fn schedule_deferred_with_core<'a>(
987 &mut self,
988 cx: &'a Context,
989 mut core: Box<Core>,
990 synced: impl FnOnce() -> MutexGuard<'a, Synced>,
991 ) -> NextTaskResult {
992 let mut defer = cx.defer.borrow_mut();
993
994 let task = defer.pop();
996
997 if task.is_none() {
998 return Ok((None, core));
999 }
1000
1001 if !defer.is_empty() {
1002 let mut synced = synced();
1003
1004 let num_fanout = cmp::min(defer.len(), cx.shared().idle.num_idle(&synced.idle));
1006
1007 let num_fanout = cmp::min(2, num_fanout);
1014
1015 if num_fanout > 0 {
1016 cx.shared()
1017 .push_remote_task_batch_synced(&mut synced, defer.drain(..num_fanout));
1018
1019 cx.shared()
1020 .idle
1021 .notify_mult(&mut synced, &mut self.workers_to_notify, num_fanout);
1022 }
1023
1024 drop(synced);
1026 }
1027
1028 for worker in self.workers_to_notify.drain(..) {
1030 cx.shared().condvars[worker].notify_one()
1031 }
1032
1033 if !defer.is_empty() {
1034 for task in defer.drain(..) {
1036 core.run_queue
1037 .push_back_or_overflow(task, cx.shared(), &mut core.stats);
1038 }
1039
1040 cx.shared().notify_parked_local();
1041 }
1042
1043 Ok((task, core))
1044 }
1045
1046 fn schedule_deferred_without_core<'a>(&mut self, cx: &Context, synced: &mut Synced) {
1047 let mut defer = cx.defer.borrow_mut();
1048 let num = defer.len();
1049
1050 if num > 0 {
1051 cx.shared()
1053 .push_remote_task_batch_synced(synced, defer.drain(..));
1054
1055 debug_assert!(self.workers_to_notify.is_empty());
1056
1057 cx.shared()
1059 .idle
1060 .notify_mult(synced, &mut self.workers_to_notify, num);
1061
1062 for worker in self.workers_to_notify.drain(..) {
1064 cx.shared().condvars[worker].notify_one()
1065 }
1066 }
1067 }
1068
1069 fn maybe_maintenance(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1070 if self.tick % cx.shared().config.event_interval == 0 {
1071 super::counters::inc_num_maintenance();
1072
1073 core.stats.end_processing_scheduled_tasks(&mut self.stats);
1074
1075 core = try_task_new_batch!(self, self.park_yield(cx, core));
1077
1078 core.stats.start_processing_scheduled_tasks(&mut self.stats);
1079 }
1080
1081 Ok((None, core))
1082 }
1083
1084 fn flush_metrics(&self, cx: &Context, core: &mut Core) {
1085 core.stats.submit(&cx.shared().worker_metrics[core.index]);
1086 }
1087
1088 fn update_global_flags(&mut self, cx: &Context, synced: &mut Synced) {
1089 if !self.is_shutdown {
1090 self.is_shutdown = cx.shared().inject.is_closed(&synced.inject);
1091 }
1092
1093 if !self.is_traced {
1094 self.is_traced = cx.shared().trace_status.trace_requested();
1095 }
1096 }
1097
1098 fn park_yield(&mut self, cx: &Context, core: Box<Core>) -> NextTaskResult {
1099 if let Some(mut driver) = cx.shared().driver.take() {
1102 driver.park_timeout(&cx.handle.driver, Duration::from_millis(0));
1103
1104 cx.shared().driver.set(driver);
1105 }
1106
1107 let (maybe_task, mut core) =
1109 self.schedule_deferred_with_core(cx, core, || cx.shared().synced.lock())?;
1110
1111 self.flush_metrics(cx, &mut core);
1112 self.update_global_flags(cx, &mut cx.shared().synced.lock());
1113
1114 Ok((maybe_task, core))
1115 }
1116
1117 fn park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1135 if let Some(f) = &cx.shared().config.before_park {
1136 f();
1137 }
1138
1139 if self.can_transition_to_parked(&mut core) {
1140 debug_assert!(!self.is_shutdown);
1141 debug_assert!(!self.is_traced);
1142
1143 core = try_task!(self.do_park(cx, core));
1144 }
1145
1146 if let Some(f) = &cx.shared().config.after_unpark {
1147 f();
1148 }
1149
1150 Ok((None, core))
1151 }
1152
1153 fn do_park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1154 let was_searching = core.is_searching;
1155
1156 let mut synced = cx.shared().synced.lock();
1158
1159 #[cfg(not(loom))]
1161 debug_assert!(core.run_queue.is_empty());
1162
1163 let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
1165 if let Some(task) = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n) {
1166 return Ok((Some(task), core));
1167 }
1168
1169 if !was_searching {
1170 if cx
1171 .shared()
1172 .idle
1173 .transition_worker_to_searching_if_needed(&mut synced.idle, &mut core)
1174 {
1175 return Ok((None, core));
1177 }
1178 }
1179
1180 super::counters::inc_num_parks();
1181 core.stats.about_to_park();
1182 self.flush_metrics(cx, &mut core);
1184
1185 self.update_global_flags(cx, &mut synced);
1187
1188 if self.is_shutdown {
1189 return Ok((None, core));
1190 }
1191
1192 core.is_searching = false;
1194 cx.shared().idle.release_core(&mut synced, core);
1195
1196 drop(synced);
1197
1198 if was_searching {
1199 if cx.shared().idle.transition_worker_from_searching() {
1200 for i in 0..cx.shared().remotes.len() {
1203 if !cx.shared().remotes[i].steal.is_empty() {
1204 let mut synced = cx.shared().synced.lock();
1205
1206 if let Some(mut core) = self.try_acquire_available_core(cx, &mut synced) {
1208 cx.shared().idle.transition_worker_to_searching(&mut core);
1209 return Ok((None, core));
1210 } else {
1211 break;
1213 }
1214 }
1215 }
1216 }
1217 }
1218
1219 if let Some(mut driver) = cx.shared().take_driver() {
1220 driver.park(&cx.handle.driver);
1222
1223 synced = cx.shared().synced.lock();
1224
1225 if cx.shared().inject.is_closed(&mut synced.inject) {
1226 synced.shutdown_driver = Some(driver);
1227 self.shutdown_clear_defer(cx);
1228 cx.shared().shutdown_finalize(&cx.handle, &mut synced);
1229 return Err(());
1230 }
1231
1232 cx.shared().driver.set(driver);
1234
1235 if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
1237 self.schedule_deferred_with_core(cx, core, move || synced)
1239 } else {
1240 self.schedule_deferred_without_core(cx, &mut synced);
1242
1243 self.wait_for_core(cx, synced)
1245 }
1246 } else {
1247 synced = cx.shared().synced.lock();
1248
1249 self.wait_for_core(cx, synced)
1251 }
1252 }
1253
1254 fn transition_to_searching(&self, cx: &Context, core: &mut Core) -> bool {
1255 if !core.is_searching {
1256 cx.shared().idle.try_transition_worker_to_searching(core);
1257 }
1258
1259 core.is_searching
1260 }
1261
1262 fn transition_from_searching(&self, cx: &Context, core: &mut Core) -> bool {
1264 if !core.is_searching {
1265 return false;
1266 }
1267
1268 core.is_searching = false;
1269 cx.shared().idle.transition_worker_from_searching()
1270 }
1271
1272 fn can_transition_to_parked(&self, core: &mut Core) -> bool {
1273 !self.has_tasks(core) && !self.is_shutdown && !self.is_traced
1274 }
1275
1276 fn has_tasks(&self, core: &Core) -> bool {
1277 core.lifo_slot.is_some() || !core.run_queue.is_empty()
1278 }
1279
1280 fn reset_lifo_enabled(&self, cx: &Context) {
1281 cx.lifo_enabled
1282 .set(!cx.handle.shared.config.disable_lifo_slot);
1283 }
1284
1285 fn assert_lifo_enabled_is_correct(&self, cx: &Context) {
1286 debug_assert_eq!(
1287 cx.lifo_enabled.get(),
1288 !cx.handle.shared.config.disable_lifo_slot
1289 );
1290 }
1291
1292 fn tune_global_queue_interval(&mut self, cx: &Context, core: &mut Core) {
1293 let next = core.stats.tuned_global_queue_interval(&cx.shared().config);
1294
1295 if u32::abs_diff(self.global_queue_interval, next) > 2 {
1297 self.global_queue_interval = next;
1298 }
1299 }
1300
1301 fn shutdown_clear_defer(&self, cx: &Context) {
1302 let mut defer = cx.defer.borrow_mut();
1303
1304 for task in defer.drain(..) {
1305 drop(task);
1306 }
1307 }
1308}
1309
1310impl Context {
1311 pub(crate) fn defer(&self, waker: &Waker) {
1312 waker.wake_by_ref();
1314 }
1315
1316 fn shared(&self) -> &Shared {
1317 &self.handle.shared
1318 }
1319
1320 #[cfg_attr(not(feature = "time"), allow(dead_code))]
1321 pub(crate) fn get_worker_index(&self) -> usize {
1322 self.index
1323 }
1324}
1325
1326impl Core {
1327 fn next_local_task(&mut self) -> Option<Notified> {
1328 self.next_lifo_task().or_else(|| self.run_queue.pop())
1329 }
1330
1331 fn next_lifo_task(&mut self) -> Option<Notified> {
1332 self.lifo_slot.take()
1333 }
1334}
1335
1336impl Shared {
1337 fn next_remote_task_synced(&self, synced: &mut Synced) -> Option<Notified> {
1338 unsafe { self.inject.pop(&mut synced.inject) }
1340 }
1341
1342 pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1343 use std::ptr;
1344
1345 with_current(|maybe_cx| {
1346 if let Some(cx) = maybe_cx {
1347 if ptr::eq(self, &cx.handle.shared) {
1349 if let Some(core) = cx.core.borrow_mut().as_mut() {
1351 if is_yield {
1352 cx.defer.borrow_mut().push(task);
1353 } else {
1354 self.schedule_local(cx, core, task);
1355 }
1356 } else {
1357 cx.defer.borrow_mut().push(task);
1361 }
1362 return;
1363 }
1364 }
1365
1366 self.schedule_remote(task);
1368 })
1369 }
1370
1371 fn schedule_local(&self, cx: &Context, core: &mut Core, task: Notified) {
1372 core.stats.inc_local_schedule_count();
1373
1374 if cx.lifo_enabled.get() {
1375 let prev = std::mem::replace(&mut core.lifo_slot, Some(task));
1377 if let Some(prev) = prev {
1380 core.run_queue
1381 .push_back_or_overflow(prev, self, &mut core.stats);
1382 } else {
1383 return;
1384 }
1385 } else {
1386 core.run_queue
1387 .push_back_or_overflow(task, self, &mut core.stats);
1388 }
1389
1390 self.notify_parked_local();
1391 }
1392
1393 fn notify_parked_local(&self) {
1394 super::counters::inc_num_inc_notify_local();
1395 self.idle.notify_local(self);
1396 }
1397
1398 fn schedule_remote(&self, task: Notified) {
1399 super::counters::inc_num_notify_remote();
1400 self.scheduler_metrics.inc_remote_schedule_count();
1401
1402 let mut synced = self.synced.lock();
1403 self.push_remote_task(&mut synced, task);
1405
1406 self.idle.notify_remote(synced, self);
1409 }
1410
1411 pub(super) fn close(&self, handle: &Handle) {
1412 {
1413 let mut synced = self.synced.lock();
1414
1415 if let Some(driver) = self.driver.take() {
1416 synced.shutdown_driver = Some(driver);
1417 }
1418
1419 if !self.inject.close(&mut synced.inject) {
1420 return;
1421 }
1422
1423 self.idle.shutdown(&mut synced, self);
1425 }
1426
1427 self.idle.shutdown_unassigned_cores(handle, self);
1430 }
1431
1432 fn push_remote_task(&self, synced: &mut Synced, task: Notified) {
1433 unsafe {
1435 self.inject.push(&mut synced.inject, task);
1436 }
1437 }
1438
1439 fn push_remote_task_batch<I>(&self, iter: I)
1440 where
1441 I: Iterator<Item = task::Notified<Arc<Handle>>>,
1442 {
1443 unsafe {
1444 self.inject.push_batch(self, iter);
1445 }
1446 }
1447
1448 fn push_remote_task_batch_synced<I>(&self, synced: &mut Synced, iter: I)
1449 where
1450 I: Iterator<Item = task::Notified<Arc<Handle>>>,
1451 {
1452 unsafe {
1453 self.inject.push_batch(&mut synced.inject, iter);
1454 }
1455 }
1456
1457 fn take_driver(&self) -> Option<Box<Driver>> {
1458 if !self.driver_enabled() {
1459 return None;
1460 }
1461
1462 self.driver.take()
1463 }
1464
1465 fn driver_enabled(&self) -> bool {
1466 self.condvars.len() > self.remotes.len()
1467 }
1468
1469 pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box<Core>) {
1470 let start = core.rand.fastrand_n(self.owned.get_shard_size() as u32);
1472 self.owned.close_and_shutdown_all(start as usize);
1473
1474 core.stats.submit(&self.worker_metrics[core.index]);
1475
1476 let mut synced = self.synced.lock();
1477 synced.shutdown_cores.push(core);
1478
1479 self.shutdown_finalize(handle, &mut synced);
1480 }
1481
1482 pub(super) fn shutdown_finalize(&self, handle: &Handle, synced: &mut Synced) {
1483 if synced.shutdown_cores.len() != self.remotes.len() {
1485 return;
1486 }
1487
1488 let driver = synced.shutdown_driver.take();
1489
1490 if self.driver_enabled() && driver.is_none() {
1491 return;
1492 }
1493
1494 debug_assert!(self.owned.is_empty());
1495
1496 for mut core in synced.shutdown_cores.drain(..) {
1497 while core.next_local_task().is_some() {}
1499 }
1500
1501 if let Some(mut driver) = driver {
1503 driver.shutdown(&handle.driver);
1504 }
1505
1506 while let Some(task) = self.next_remote_task_synced(synced) {
1513 drop(task);
1514 }
1515 }
1516}
1517
1518impl Overflow<Arc<Handle>> for Shared {
1519 fn push(&self, task: task::Notified<Arc<Handle>>) {
1520 self.push_remote_task(&mut self.synced.lock(), task);
1521 }
1522
1523 fn push_batch<I>(&self, iter: I)
1524 where
1525 I: Iterator<Item = task::Notified<Arc<Handle>>>,
1526 {
1527 self.push_remote_task_batch(iter)
1528 }
1529}
1530
1531impl<'a> Lock<inject::Synced> for &'a Shared {
1532 type Handle = SyncedGuard<'a>;
1533
1534 fn lock(self) -> Self::Handle {
1535 SyncedGuard {
1536 lock: self.synced.lock(),
1537 }
1538 }
1539}
1540
1541impl<'a> Lock<Synced> for &'a Shared {
1542 type Handle = SyncedGuard<'a>;
1543
1544 fn lock(self) -> Self::Handle {
1545 SyncedGuard {
1546 lock: self.synced.lock(),
1547 }
1548 }
1549}
1550
1551impl task::Schedule for Arc<Handle> {
1552 fn release(&self, task: &Task) -> Option<Task> {
1553 self.shared.owned.remove(task)
1554 }
1555
1556 fn schedule(&self, task: Notified) {
1557 self.shared.schedule_task(task, false);
1558 }
1559
1560 fn hooks(&self) -> TaskHarnessScheduleHooks {
1561 TaskHarnessScheduleHooks {
1562 task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
1563 }
1564 }
1565
1566 fn yield_now(&self, task: Notified) {
1567 self.shared.schedule_task(task, true);
1568 }
1569}
1570
1571impl AsMut<Synced> for Synced {
1572 fn as_mut(&mut self) -> &mut Synced {
1573 self
1574 }
1575}
1576
1577pub(crate) struct SyncedGuard<'a> {
1578 lock: crate::loom::sync::MutexGuard<'a, Synced>,
1579}
1580
1581impl<'a> AsMut<inject::Synced> for SyncedGuard<'a> {
1582 fn as_mut(&mut self) -> &mut inject::Synced {
1583 &mut self.lock.inject
1584 }
1585}
1586
1587impl<'a> AsMut<Synced> for SyncedGuard<'a> {
1588 fn as_mut(&mut self) -> &mut Synced {
1589 &mut self.lock
1590 }
1591}
1592
1593#[track_caller]
1594fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1595 use scheduler::Context::MultiThreadAlt;
1596
1597 context::with_scheduler(|ctx| match ctx {
1598 Some(MultiThreadAlt(ctx)) => f(Some(ctx)),
1599 _ => f(None),
1600 })
1601}