tokio/sync/broadcast.rs
1//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2//! all consumers.
3//!
4//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7//! long as `T` is `Send`.
8//!
9//! When a value is sent, **all** [`Receiver`] handles are notified and will
10//! receive the value. The value is stored once inside the channel and cloned on
11//! demand for each receiver. Once all receivers have received a clone of the
12//! value, the value is released from the channel.
13//!
14//! A channel is created by calling [`channel`], specifying the maximum number
15//! of messages the channel can retain at any given time.
16//!
17//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18//! returned [`Receiver`] will receive values sent **after** the call to
19//! `subscribe`.
20//!
21//! This channel is also suitable for the single-producer multi-consumer
22//! use-case, where a single sender broadcasts values to many receivers.
23//!
24//! ## Lagging
25//!
26//! As sent messages must be retained until **all** [`Receiver`] handles receive
27//! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28//! In this case, all but one receiver are able to receive values at the rate
29//! they are sent. Because one receiver is stalled, the channel starts to fill
30//! up.
31//!
32//! This broadcast channel implementation handles this case by setting a hard
33//! upper bound on the number of values the channel may retain at any given
34//! time. This upper bound is passed to the [`channel`] function as an argument.
35//!
36//! If a value is sent when the channel is at capacity, the oldest value
37//! currently held by the channel is released. This frees up space for the new
38//! value. Any receiver that has not yet seen the released value will return
39//! [`RecvError::Lagged`] the next time [`recv`] is called.
40//!
41//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42//! updated to the oldest value contained by the channel. The next call to
43//! [`recv`] will return this value.
44//!
45//! This behavior enables a receiver to detect when it has lagged so far behind
46//! that data has been dropped. The caller may decide how to respond to this:
47//! either by aborting its task or by tolerating lost messages and resuming
48//! consumption of the channel.
49//!
50//! ## Closing
51//!
52//! When **all** [`Sender`] handles have been dropped, no new values may be
53//! sent. At this point, the channel is "closed". Once a receiver has received
54//! all values retained by the channel, the next call to [`recv`] will return
55//! with [`RecvError::Closed`].
56//!
57//! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58//! will be marked as read. If this receiver was the only one not to have read
59//! that message, the message will be dropped at this point.
60//!
61//! [`Sender`]: crate::sync::broadcast::Sender
62//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63//! [`Receiver`]: crate::sync::broadcast::Receiver
64//! [`channel`]: crate::sync::broadcast::channel
65//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67//! [`recv`]: crate::sync::broadcast::Receiver::recv
68//!
69//! # Examples
70//!
71//! Basic usage
72//!
73//! ```
74//! use tokio::sync::broadcast;
75//!
76//! #[tokio::main]
77//! async fn main() {
78//! let (tx, mut rx1) = broadcast::channel(16);
79//! let mut rx2 = tx.subscribe();
80//!
81//! tokio::spawn(async move {
82//! assert_eq!(rx1.recv().await.unwrap(), 10);
83//! assert_eq!(rx1.recv().await.unwrap(), 20);
84//! });
85//!
86//! tokio::spawn(async move {
87//! assert_eq!(rx2.recv().await.unwrap(), 10);
88//! assert_eq!(rx2.recv().await.unwrap(), 20);
89//! });
90//!
91//! tx.send(10).unwrap();
92//! tx.send(20).unwrap();
93//! }
94//! ```
95//!
96//! Handling lag
97//!
98//! ```
99//! use tokio::sync::broadcast;
100//!
101//! #[tokio::main]
102//! async fn main() {
103//! let (tx, mut rx) = broadcast::channel(2);
104//!
105//! tx.send(10).unwrap();
106//! tx.send(20).unwrap();
107//! tx.send(30).unwrap();
108//!
109//! // The receiver lagged behind
110//! assert!(rx.recv().await.is_err());
111//!
112//! // At this point, we can abort or continue with lost messages
113//!
114//! assert_eq!(20, rx.recv().await.unwrap());
115//! assert_eq!(30, rx.recv().await.unwrap());
116//! }
117//! ```
118
119use crate::loom::cell::UnsafeCell;
120use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
121use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
122use crate::runtime::coop::cooperative;
123use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
124use crate::util::WakeList;
125
126use std::fmt;
127use std::future::Future;
128use std::marker::PhantomPinned;
129use std::pin::Pin;
130use std::ptr::NonNull;
131use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
132use std::task::{ready, Context, Poll, Waker};
133
134/// Sending-half of the [`broadcast`] channel.
135///
136/// May be used from many threads. Messages can be sent with
137/// [`send`][Sender::send].
138///
139/// # Examples
140///
141/// ```
142/// use tokio::sync::broadcast;
143///
144/// #[tokio::main]
145/// async fn main() {
146/// let (tx, mut rx1) = broadcast::channel(16);
147/// let mut rx2 = tx.subscribe();
148///
149/// tokio::spawn(async move {
150/// assert_eq!(rx1.recv().await.unwrap(), 10);
151/// assert_eq!(rx1.recv().await.unwrap(), 20);
152/// });
153///
154/// tokio::spawn(async move {
155/// assert_eq!(rx2.recv().await.unwrap(), 10);
156/// assert_eq!(rx2.recv().await.unwrap(), 20);
157/// });
158///
159/// tx.send(10).unwrap();
160/// tx.send(20).unwrap();
161/// }
162/// ```
163///
164/// [`broadcast`]: crate::sync::broadcast
165pub struct Sender<T> {
166 shared: Arc<Shared<T>>,
167}
168
169/// Receiving-half of the [`broadcast`] channel.
170///
171/// Must not be used concurrently. Messages may be retrieved using
172/// [`recv`][Receiver::recv].
173///
174/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
175/// wrapper.
176///
177/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
178///
179/// # Examples
180///
181/// ```
182/// use tokio::sync::broadcast;
183///
184/// #[tokio::main]
185/// async fn main() {
186/// let (tx, mut rx1) = broadcast::channel(16);
187/// let mut rx2 = tx.subscribe();
188///
189/// tokio::spawn(async move {
190/// assert_eq!(rx1.recv().await.unwrap(), 10);
191/// assert_eq!(rx1.recv().await.unwrap(), 20);
192/// });
193///
194/// tokio::spawn(async move {
195/// assert_eq!(rx2.recv().await.unwrap(), 10);
196/// assert_eq!(rx2.recv().await.unwrap(), 20);
197/// });
198///
199/// tx.send(10).unwrap();
200/// tx.send(20).unwrap();
201/// }
202/// ```
203///
204/// [`broadcast`]: crate::sync::broadcast
205pub struct Receiver<T> {
206 /// State shared with all receivers and senders.
207 shared: Arc<Shared<T>>,
208
209 /// Next position to read from
210 next: u64,
211}
212
213pub mod error {
214 //! Broadcast error types
215
216 use std::fmt;
217
218 /// Error returned by the [`send`] function on a [`Sender`].
219 ///
220 /// A **send** operation can only fail if there are no active receivers,
221 /// implying that the message could never be received. The error contains the
222 /// message being sent as a payload so it can be recovered.
223 ///
224 /// [`send`]: crate::sync::broadcast::Sender::send
225 /// [`Sender`]: crate::sync::broadcast::Sender
226 #[derive(Debug)]
227 pub struct SendError<T>(pub T);
228
229 impl<T> fmt::Display for SendError<T> {
230 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231 write!(f, "channel closed")
232 }
233 }
234
235 impl<T: fmt::Debug> std::error::Error for SendError<T> {}
236
237 /// An error returned from the [`recv`] function on a [`Receiver`].
238 ///
239 /// [`recv`]: crate::sync::broadcast::Receiver::recv
240 /// [`Receiver`]: crate::sync::broadcast::Receiver
241 #[derive(Debug, PartialEq, Eq, Clone)]
242 pub enum RecvError {
243 /// There are no more active senders implying no further messages will ever
244 /// be sent.
245 Closed,
246
247 /// The receiver lagged too far behind. Attempting to receive again will
248 /// return the oldest message still retained by the channel.
249 ///
250 /// Includes the number of skipped messages.
251 Lagged(u64),
252 }
253
254 impl fmt::Display for RecvError {
255 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256 match self {
257 RecvError::Closed => write!(f, "channel closed"),
258 RecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
259 }
260 }
261 }
262
263 impl std::error::Error for RecvError {}
264
265 /// An error returned from the [`try_recv`] function on a [`Receiver`].
266 ///
267 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
268 /// [`Receiver`]: crate::sync::broadcast::Receiver
269 #[derive(Debug, PartialEq, Eq, Clone)]
270 pub enum TryRecvError {
271 /// The channel is currently empty. There are still active
272 /// [`Sender`] handles, so data may yet become available.
273 ///
274 /// [`Sender`]: crate::sync::broadcast::Sender
275 Empty,
276
277 /// There are no more active senders implying no further messages will ever
278 /// be sent.
279 Closed,
280
281 /// The receiver lagged too far behind and has been forcibly disconnected.
282 /// Attempting to receive again will return the oldest message still
283 /// retained by the channel.
284 ///
285 /// Includes the number of skipped messages.
286 Lagged(u64),
287 }
288
289 impl fmt::Display for TryRecvError {
290 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
291 match self {
292 TryRecvError::Empty => write!(f, "channel empty"),
293 TryRecvError::Closed => write!(f, "channel closed"),
294 TryRecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
295 }
296 }
297 }
298
299 impl std::error::Error for TryRecvError {}
300}
301
302use self::error::{RecvError, SendError, TryRecvError};
303
304use super::Notify;
305
306/// Data shared between senders and receivers.
307struct Shared<T> {
308 /// slots in the channel.
309 buffer: Box<[RwLock<Slot<T>>]>,
310
311 /// Mask a position -> index.
312 mask: usize,
313
314 /// Tail of the queue. Includes the rx wait list.
315 tail: Mutex<Tail>,
316
317 /// Number of outstanding Sender handles.
318 num_tx: AtomicUsize,
319
320 /// Notify when the last subscribed [`Receiver`] drops.
321 notify_last_rx_drop: Notify,
322}
323
324/// Next position to write a value.
325struct Tail {
326 /// Next position to write to.
327 pos: u64,
328
329 /// Number of active receivers.
330 rx_cnt: usize,
331
332 /// True if the channel is closed.
333 closed: bool,
334
335 /// Receivers waiting for a value.
336 waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
337}
338
339/// Slot in the buffer.
340struct Slot<T> {
341 /// Remaining number of receivers that are expected to see this value.
342 ///
343 /// When this goes to zero, the value is released.
344 ///
345 /// An atomic is used as it is mutated concurrently with the slot read lock
346 /// acquired.
347 rem: AtomicUsize,
348
349 /// Uniquely identifies the `send` stored in the slot.
350 pos: u64,
351
352 /// The value being broadcast.
353 ///
354 /// The value is set by `send` when the write lock is held. When a reader
355 /// drops, `rem` is decremented. When it hits zero, the value is dropped.
356 val: UnsafeCell<Option<T>>,
357}
358
359/// An entry in the wait queue.
360struct Waiter {
361 /// True if queued.
362 queued: AtomicBool,
363
364 /// Task waiting on the broadcast channel.
365 waker: Option<Waker>,
366
367 /// Intrusive linked-list pointers.
368 pointers: linked_list::Pointers<Waiter>,
369
370 /// Should not be `Unpin`.
371 _p: PhantomPinned,
372}
373
374impl Waiter {
375 fn new() -> Self {
376 Self {
377 queued: AtomicBool::new(false),
378 waker: None,
379 pointers: linked_list::Pointers::new(),
380 _p: PhantomPinned,
381 }
382 }
383}
384
385generate_addr_of_methods! {
386 impl<> Waiter {
387 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
388 &self.pointers
389 }
390 }
391}
392
393struct RecvGuard<'a, T> {
394 slot: RwLockReadGuard<'a, Slot<T>>,
395}
396
397/// Receive a value future.
398struct Recv<'a, T> {
399 /// Receiver being waited on.
400 receiver: &'a mut Receiver<T>,
401
402 /// Entry in the waiter `LinkedList`.
403 waiter: UnsafeCell<Waiter>,
404}
405
406unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
407unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
408
409/// Max number of receivers. Reserve space to lock.
410const MAX_RECEIVERS: usize = usize::MAX >> 2;
411
412/// Create a bounded, multi-producer, multi-consumer channel where each sent
413/// value is broadcasted to all active receivers.
414///
415/// **Note:** The actual capacity may be greater than the provided `capacity`.
416///
417/// All data sent on [`Sender`] will become available on every active
418/// [`Receiver`] in the same order as it was sent.
419///
420/// The `Sender` can be cloned to `send` to the same channel from multiple
421/// points in the process or it can be used concurrently from an `Arc`. New
422/// `Receiver` handles are created by calling [`Sender::subscribe`].
423///
424/// If all [`Receiver`] handles are dropped, the `send` method will return a
425/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
426/// method will return a [`RecvError`].
427///
428/// [`Sender`]: crate::sync::broadcast::Sender
429/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
430/// [`Receiver`]: crate::sync::broadcast::Receiver
431/// [`recv`]: crate::sync::broadcast::Receiver::recv
432/// [`SendError`]: crate::sync::broadcast::error::SendError
433/// [`RecvError`]: crate::sync::broadcast::error::RecvError
434///
435/// # Examples
436///
437/// ```
438/// use tokio::sync::broadcast;
439///
440/// #[tokio::main]
441/// async fn main() {
442/// let (tx, mut rx1) = broadcast::channel(16);
443/// let mut rx2 = tx.subscribe();
444///
445/// tokio::spawn(async move {
446/// assert_eq!(rx1.recv().await.unwrap(), 10);
447/// assert_eq!(rx1.recv().await.unwrap(), 20);
448/// });
449///
450/// tokio::spawn(async move {
451/// assert_eq!(rx2.recv().await.unwrap(), 10);
452/// assert_eq!(rx2.recv().await.unwrap(), 20);
453/// });
454///
455/// tx.send(10).unwrap();
456/// tx.send(20).unwrap();
457/// }
458/// ```
459///
460/// # Panics
461///
462/// This will panic if `capacity` is equal to `0` or larger
463/// than `usize::MAX / 2`.
464#[track_caller]
465pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
466 // SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
467 let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
468 let rx = Receiver {
469 shared: tx.shared.clone(),
470 next: 0,
471 };
472 (tx, rx)
473}
474
475unsafe impl<T: Send> Send for Sender<T> {}
476unsafe impl<T: Send> Sync for Sender<T> {}
477
478unsafe impl<T: Send> Send for Receiver<T> {}
479unsafe impl<T: Send> Sync for Receiver<T> {}
480
481impl<T> Sender<T> {
482 /// Creates the sending-half of the [`broadcast`] channel.
483 ///
484 /// See the documentation of [`broadcast::channel`] for more information on this method.
485 ///
486 /// [`broadcast`]: crate::sync::broadcast
487 /// [`broadcast::channel`]: crate::sync::broadcast::channel
488 #[track_caller]
489 pub fn new(capacity: usize) -> Self {
490 // SAFETY: We don't create extra receivers, so there are 0.
491 unsafe { Self::new_with_receiver_count(0, capacity) }
492 }
493
494 /// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
495 /// count.
496 ///
497 /// See the documentation of [`broadcast::channel`](self::channel) for more errors when
498 /// calling this function.
499 ///
500 /// # Safety:
501 ///
502 /// The caller must ensure that the amount of receivers for this Sender is correct before
503 /// the channel functionalities are used, the count is zero by default, as this function
504 /// does not create any receivers by itself.
505 #[track_caller]
506 unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
507 assert!(capacity > 0, "broadcast channel capacity cannot be zero");
508 assert!(
509 capacity <= usize::MAX >> 1,
510 "broadcast channel capacity exceeded `usize::MAX / 2`"
511 );
512
513 // Round to a power of two
514 capacity = capacity.next_power_of_two();
515
516 let mut buffer = Vec::with_capacity(capacity);
517
518 for i in 0..capacity {
519 buffer.push(RwLock::new(Slot {
520 rem: AtomicUsize::new(0),
521 pos: (i as u64).wrapping_sub(capacity as u64),
522 val: UnsafeCell::new(None),
523 }));
524 }
525
526 let shared = Arc::new(Shared {
527 buffer: buffer.into_boxed_slice(),
528 mask: capacity - 1,
529 tail: Mutex::new(Tail {
530 pos: 0,
531 rx_cnt: receiver_count,
532 closed: false,
533 waiters: LinkedList::new(),
534 }),
535 num_tx: AtomicUsize::new(1),
536 notify_last_rx_drop: Notify::new(),
537 });
538
539 Sender { shared }
540 }
541
542 /// Attempts to send a value to all active [`Receiver`] handles, returning
543 /// it back if it could not be sent.
544 ///
545 /// A successful send occurs when there is at least one active [`Receiver`]
546 /// handle. An unsuccessful send would be one where all associated
547 /// [`Receiver`] handles have already been dropped.
548 ///
549 /// # Return
550 ///
551 /// On success, the number of subscribed [`Receiver`] handles is returned.
552 /// This does not mean that this number of receivers will see the message as
553 /// a receiver may drop or lag ([see lagging](self#lagging)) before receiving
554 /// the message.
555 ///
556 /// # Note
557 ///
558 /// A return value of `Ok` **does not** mean that the sent value will be
559 /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
560 /// handles may be dropped before receiving the sent message.
561 ///
562 /// A return value of `Err` **does not** mean that future calls to `send`
563 /// will fail. New [`Receiver`] handles may be created by calling
564 /// [`subscribe`].
565 ///
566 /// [`Receiver`]: crate::sync::broadcast::Receiver
567 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
568 ///
569 /// # Examples
570 ///
571 /// ```
572 /// use tokio::sync::broadcast;
573 ///
574 /// #[tokio::main]
575 /// async fn main() {
576 /// let (tx, mut rx1) = broadcast::channel(16);
577 /// let mut rx2 = tx.subscribe();
578 ///
579 /// tokio::spawn(async move {
580 /// assert_eq!(rx1.recv().await.unwrap(), 10);
581 /// assert_eq!(rx1.recv().await.unwrap(), 20);
582 /// });
583 ///
584 /// tokio::spawn(async move {
585 /// assert_eq!(rx2.recv().await.unwrap(), 10);
586 /// assert_eq!(rx2.recv().await.unwrap(), 20);
587 /// });
588 ///
589 /// tx.send(10).unwrap();
590 /// tx.send(20).unwrap();
591 /// }
592 /// ```
593 pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
594 let mut tail = self.shared.tail.lock();
595
596 if tail.rx_cnt == 0 {
597 return Err(SendError(value));
598 }
599
600 // Position to write into
601 let pos = tail.pos;
602 let rem = tail.rx_cnt;
603 let idx = (pos & self.shared.mask as u64) as usize;
604
605 // Update the tail position
606 tail.pos = tail.pos.wrapping_add(1);
607
608 // Get the slot
609 let mut slot = self.shared.buffer[idx].write();
610
611 // Track the position
612 slot.pos = pos;
613
614 // Set remaining receivers
615 slot.rem.with_mut(|v| *v = rem);
616
617 // Write the value
618 slot.val = UnsafeCell::new(Some(value));
619
620 // Release the slot lock before notifying the receivers.
621 drop(slot);
622
623 // Notify and release the mutex. This must happen after the slot lock is
624 // released, otherwise the writer lock bit could be cleared while another
625 // thread is in the critical section.
626 self.shared.notify_rx(tail);
627
628 Ok(rem)
629 }
630
631 /// Creates a new [`Receiver`] handle that will receive values sent **after**
632 /// this call to `subscribe`.
633 ///
634 /// # Examples
635 ///
636 /// ```
637 /// use tokio::sync::broadcast;
638 ///
639 /// #[tokio::main]
640 /// async fn main() {
641 /// let (tx, _rx) = broadcast::channel(16);
642 ///
643 /// // Will not be seen
644 /// tx.send(10).unwrap();
645 ///
646 /// let mut rx = tx.subscribe();
647 ///
648 /// tx.send(20).unwrap();
649 ///
650 /// let value = rx.recv().await.unwrap();
651 /// assert_eq!(20, value);
652 /// }
653 /// ```
654 pub fn subscribe(&self) -> Receiver<T> {
655 let shared = self.shared.clone();
656 new_receiver(shared)
657 }
658
659 /// Returns the number of queued values.
660 ///
661 /// A value is queued until it has either been seen by all receivers that were alive at the time
662 /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
663 /// queue's capacity.
664 ///
665 /// # Note
666 ///
667 /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
668 /// have been evicted from the queue before being seen by all receivers.
669 ///
670 /// # Examples
671 ///
672 /// ```
673 /// use tokio::sync::broadcast;
674 ///
675 /// #[tokio::main]
676 /// async fn main() {
677 /// let (tx, mut rx1) = broadcast::channel(16);
678 /// let mut rx2 = tx.subscribe();
679 ///
680 /// tx.send(10).unwrap();
681 /// tx.send(20).unwrap();
682 /// tx.send(30).unwrap();
683 ///
684 /// assert_eq!(tx.len(), 3);
685 ///
686 /// rx1.recv().await.unwrap();
687 ///
688 /// // The len is still 3 since rx2 hasn't seen the first value yet.
689 /// assert_eq!(tx.len(), 3);
690 ///
691 /// rx2.recv().await.unwrap();
692 ///
693 /// assert_eq!(tx.len(), 2);
694 /// }
695 /// ```
696 pub fn len(&self) -> usize {
697 let tail = self.shared.tail.lock();
698
699 let base_idx = (tail.pos & self.shared.mask as u64) as usize;
700 let mut low = 0;
701 let mut high = self.shared.buffer.len();
702 while low < high {
703 let mid = low + (high - low) / 2;
704 let idx = base_idx.wrapping_add(mid) & self.shared.mask;
705 if self.shared.buffer[idx].read().rem.load(SeqCst) == 0 {
706 low = mid + 1;
707 } else {
708 high = mid;
709 }
710 }
711
712 self.shared.buffer.len() - low
713 }
714
715 /// Returns true if there are no queued values.
716 ///
717 /// # Examples
718 ///
719 /// ```
720 /// use tokio::sync::broadcast;
721 ///
722 /// #[tokio::main]
723 /// async fn main() {
724 /// let (tx, mut rx1) = broadcast::channel(16);
725 /// let mut rx2 = tx.subscribe();
726 ///
727 /// assert!(tx.is_empty());
728 ///
729 /// tx.send(10).unwrap();
730 ///
731 /// assert!(!tx.is_empty());
732 ///
733 /// rx1.recv().await.unwrap();
734 ///
735 /// // The queue is still not empty since rx2 hasn't seen the value.
736 /// assert!(!tx.is_empty());
737 ///
738 /// rx2.recv().await.unwrap();
739 ///
740 /// assert!(tx.is_empty());
741 /// }
742 /// ```
743 pub fn is_empty(&self) -> bool {
744 let tail = self.shared.tail.lock();
745
746 let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
747 self.shared.buffer[idx].read().rem.load(SeqCst) == 0
748 }
749
750 /// Returns the number of active receivers.
751 ///
752 /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
753 /// [`subscribe`]. These are the handles that will receive values sent on
754 /// this [`Sender`].
755 ///
756 /// # Note
757 ///
758 /// It is not guaranteed that a sent message will reach this number of
759 /// receivers. Active receivers may never call [`recv`] again before
760 /// dropping.
761 ///
762 /// [`recv`]: crate::sync::broadcast::Receiver::recv
763 /// [`Receiver`]: crate::sync::broadcast::Receiver
764 /// [`Sender`]: crate::sync::broadcast::Sender
765 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
766 /// [`channel`]: crate::sync::broadcast::channel
767 ///
768 /// # Examples
769 ///
770 /// ```
771 /// use tokio::sync::broadcast;
772 ///
773 /// #[tokio::main]
774 /// async fn main() {
775 /// let (tx, _rx1) = broadcast::channel(16);
776 ///
777 /// assert_eq!(1, tx.receiver_count());
778 ///
779 /// let mut _rx2 = tx.subscribe();
780 ///
781 /// assert_eq!(2, tx.receiver_count());
782 ///
783 /// tx.send(10).unwrap();
784 /// }
785 /// ```
786 pub fn receiver_count(&self) -> usize {
787 let tail = self.shared.tail.lock();
788 tail.rx_cnt
789 }
790
791 /// Returns `true` if senders belong to the same channel.
792 ///
793 /// # Examples
794 ///
795 /// ```
796 /// use tokio::sync::broadcast;
797 ///
798 /// #[tokio::main]
799 /// async fn main() {
800 /// let (tx, _rx) = broadcast::channel::<()>(16);
801 /// let tx2 = tx.clone();
802 ///
803 /// assert!(tx.same_channel(&tx2));
804 ///
805 /// let (tx3, _rx3) = broadcast::channel::<()>(16);
806 ///
807 /// assert!(!tx3.same_channel(&tx2));
808 /// }
809 /// ```
810 pub fn same_channel(&self, other: &Self) -> bool {
811 Arc::ptr_eq(&self.shared, &other.shared)
812 }
813
814 /// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches
815 /// zero.
816 ///
817 /// # Examples
818 ///
819 /// ```
820 /// use futures::FutureExt;
821 /// use tokio::sync::broadcast;
822 ///
823 /// #[tokio::main]
824 /// async fn main() {
825 /// let (tx, mut rx1) = broadcast::channel::<u32>(16);
826 /// let mut rx2 = tx.subscribe();
827 ///
828 /// let _ = tx.send(10);
829 ///
830 /// assert_eq!(rx1.recv().await.unwrap(), 10);
831 /// drop(rx1);
832 /// assert!(tx.closed().now_or_never().is_none());
833 ///
834 /// assert_eq!(rx2.recv().await.unwrap(), 10);
835 /// drop(rx2);
836 /// assert!(tx.closed().now_or_never().is_some());
837 /// }
838 /// ```
839 pub async fn closed(&self) {
840 loop {
841 let notified = self.shared.notify_last_rx_drop.notified();
842
843 {
844 // Ensure the lock drops if the channel isn't closed
845 let tail = self.shared.tail.lock();
846 if tail.closed {
847 return;
848 }
849 }
850
851 notified.await;
852 }
853 }
854
855 fn close_channel(&self) {
856 let mut tail = self.shared.tail.lock();
857 tail.closed = true;
858
859 self.shared.notify_rx(tail);
860 }
861}
862
863/// Create a new `Receiver` which reads starting from the tail.
864fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
865 let mut tail = shared.tail.lock();
866
867 assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");
868
869 if tail.rx_cnt == 0 {
870 // Potentially need to re-open the channel, if a new receiver has been added between calls
871 // to poll(). Note that we use rx_cnt == 0 instead of is_closed since is_closed also
872 // applies if the sender has been dropped
873 tail.closed = false;
874 }
875
876 tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
877 let next = tail.pos;
878
879 drop(tail);
880
881 Receiver { shared, next }
882}
883
884/// List used in `Shared::notify_rx`. It wraps a guarded linked list
885/// and gates the access to it on the `Shared.tail` mutex. It also empties
886/// the list on drop.
887struct WaitersList<'a, T> {
888 list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
889 is_empty: bool,
890 shared: &'a Shared<T>,
891}
892
893impl<'a, T> Drop for WaitersList<'a, T> {
894 fn drop(&mut self) {
895 // If the list is not empty, we unlink all waiters from it.
896 // We do not wake the waiters to avoid double panics.
897 if !self.is_empty {
898 let _lock_guard = self.shared.tail.lock();
899 while self.list.pop_back().is_some() {}
900 }
901 }
902}
903
904impl<'a, T> WaitersList<'a, T> {
905 fn new(
906 unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
907 guard: Pin<&'a Waiter>,
908 shared: &'a Shared<T>,
909 ) -> Self {
910 let guard_ptr = NonNull::from(guard.get_ref());
911 let list = unguarded_list.into_guarded(guard_ptr);
912 WaitersList {
913 list,
914 is_empty: false,
915 shared,
916 }
917 }
918
919 /// Removes the last element from the guarded list. Modifying this list
920 /// requires an exclusive access to the main list in `Notify`.
921 fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
922 let result = self.list.pop_back();
923 if result.is_none() {
924 // Save information about emptiness to avoid waiting for lock
925 // in the destructor.
926 self.is_empty = true;
927 }
928 result
929 }
930}
931
932impl<T> Shared<T> {
933 fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
934 // It is critical for `GuardedLinkedList` safety that the guard node is
935 // pinned in memory and is not dropped until the guarded list is dropped.
936 let guard = Waiter::new();
937 pin!(guard);
938
939 // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
940 // underneath to allow every waiter to safely remove itself from it.
941 //
942 // * This list will be still guarded by the `waiters` lock.
943 // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
944 // * This wrapper will empty the list on drop. It is critical for safety
945 // that we will not leave any list entry with a pointer to the local
946 // guard node after this function returns / panics.
947 let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
948
949 let mut wakers = WakeList::new();
950 'outer: loop {
951 while wakers.can_push() {
952 match list.pop_back_locked(&mut tail) {
953 Some(waiter) => {
954 unsafe {
955 // Safety: accessing `waker` is safe because
956 // the tail lock is held.
957 if let Some(waker) = (*waiter.as_ptr()).waker.take() {
958 wakers.push(waker);
959 }
960
961 // Safety: `queued` is atomic.
962 let queued = &(*waiter.as_ptr()).queued;
963 // `Relaxed` suffices because the tail lock is held.
964 assert!(queued.load(Relaxed));
965 // `Release` is needed to synchronize with `Recv::drop`.
966 // It is critical to set this variable **after** waker
967 // is extracted, otherwise we may data race with `Recv::drop`.
968 queued.store(false, Release);
969 }
970 }
971 None => {
972 break 'outer;
973 }
974 }
975 }
976
977 // Release the lock before waking.
978 drop(tail);
979
980 // Before we acquire the lock again all sorts of things can happen:
981 // some waiters may remove themselves from the list and new waiters
982 // may be added. This is fine since at worst we will unnecessarily
983 // wake up waiters which will then queue themselves again.
984
985 wakers.wake_all();
986
987 // Acquire the lock again.
988 tail = self.tail.lock();
989 }
990
991 // Release the lock before waking.
992 drop(tail);
993
994 wakers.wake_all();
995 }
996}
997
998impl<T> Clone for Sender<T> {
999 fn clone(&self) -> Sender<T> {
1000 let shared = self.shared.clone();
1001 shared.num_tx.fetch_add(1, SeqCst);
1002
1003 Sender { shared }
1004 }
1005}
1006
1007impl<T> Drop for Sender<T> {
1008 fn drop(&mut self) {
1009 if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
1010 self.close_channel();
1011 }
1012 }
1013}
1014
1015impl<T> Receiver<T> {
1016 /// Returns the number of messages that were sent into the channel and that
1017 /// this [`Receiver`] has yet to receive.
1018 ///
1019 /// If the returned value from `len` is larger than the next largest power of 2
1020 /// of the capacity of the channel any call to [`recv`] will return an
1021 /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
1022 /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
1023 /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
1024 /// values larger than 16.
1025 ///
1026 /// [`Receiver`]: crate::sync::broadcast::Receiver
1027 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1028 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1029 ///
1030 /// # Examples
1031 ///
1032 /// ```
1033 /// use tokio::sync::broadcast;
1034 ///
1035 /// #[tokio::main]
1036 /// async fn main() {
1037 /// let (tx, mut rx1) = broadcast::channel(16);
1038 ///
1039 /// tx.send(10).unwrap();
1040 /// tx.send(20).unwrap();
1041 ///
1042 /// assert_eq!(rx1.len(), 2);
1043 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1044 /// assert_eq!(rx1.len(), 1);
1045 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1046 /// assert_eq!(rx1.len(), 0);
1047 /// }
1048 /// ```
1049 pub fn len(&self) -> usize {
1050 let next_send_pos = self.shared.tail.lock().pos;
1051 (next_send_pos - self.next) as usize
1052 }
1053
1054 /// Returns true if there aren't any messages in the channel that the [`Receiver`]
1055 /// has yet to receive.
1056 ///
1057 /// [`Receiver]: create::sync::broadcast::Receiver
1058 ///
1059 /// # Examples
1060 ///
1061 /// ```
1062 /// use tokio::sync::broadcast;
1063 ///
1064 /// #[tokio::main]
1065 /// async fn main() {
1066 /// let (tx, mut rx1) = broadcast::channel(16);
1067 ///
1068 /// assert!(rx1.is_empty());
1069 ///
1070 /// tx.send(10).unwrap();
1071 /// tx.send(20).unwrap();
1072 ///
1073 /// assert!(!rx1.is_empty());
1074 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1075 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1076 /// assert!(rx1.is_empty());
1077 /// }
1078 /// ```
1079 pub fn is_empty(&self) -> bool {
1080 self.len() == 0
1081 }
1082
1083 /// Returns `true` if receivers belong to the same channel.
1084 ///
1085 /// # Examples
1086 ///
1087 /// ```
1088 /// use tokio::sync::broadcast;
1089 ///
1090 /// #[tokio::main]
1091 /// async fn main() {
1092 /// let (tx, rx) = broadcast::channel::<()>(16);
1093 /// let rx2 = tx.subscribe();
1094 ///
1095 /// assert!(rx.same_channel(&rx2));
1096 ///
1097 /// let (_tx3, rx3) = broadcast::channel::<()>(16);
1098 ///
1099 /// assert!(!rx3.same_channel(&rx2));
1100 /// }
1101 /// ```
1102 pub fn same_channel(&self, other: &Self) -> bool {
1103 Arc::ptr_eq(&self.shared, &other.shared)
1104 }
1105
1106 /// Locks the next value if there is one.
1107 fn recv_ref(
1108 &mut self,
1109 waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
1110 ) -> Result<RecvGuard<'_, T>, TryRecvError> {
1111 let idx = (self.next & self.shared.mask as u64) as usize;
1112
1113 // The slot holding the next value to read
1114 let mut slot = self.shared.buffer[idx].read();
1115
1116 if slot.pos != self.next {
1117 // Release the `slot` lock before attempting to acquire the `tail`
1118 // lock. This is required because `send2` acquires the tail lock
1119 // first followed by the slot lock. Acquiring the locks in reverse
1120 // order here would result in a potential deadlock: `recv_ref`
1121 // acquires the `slot` lock and attempts to acquire the `tail` lock
1122 // while `send2` acquired the `tail` lock and attempts to acquire
1123 // the slot lock.
1124 drop(slot);
1125
1126 let mut old_waker = None;
1127
1128 let mut tail = self.shared.tail.lock();
1129
1130 // Acquire slot lock again
1131 slot = self.shared.buffer[idx].read();
1132
1133 // Make sure the position did not change. This could happen in the
1134 // unlikely event that the buffer is wrapped between dropping the
1135 // read lock and acquiring the tail lock.
1136 if slot.pos != self.next {
1137 let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
1138
1139 if next_pos == self.next {
1140 // At this point the channel is empty for *this* receiver. If
1141 // it's been closed, then that's what we return, otherwise we
1142 // set a waker and return empty.
1143 if tail.closed {
1144 return Err(TryRecvError::Closed);
1145 }
1146
1147 // Store the waker
1148 if let Some((waiter, waker)) = waiter {
1149 // Safety: called while locked.
1150 unsafe {
1151 // Only queue if not already queued
1152 waiter.with_mut(|ptr| {
1153 // If there is no waker **or** if the currently
1154 // stored waker references a **different** task,
1155 // track the tasks' waker to be notified on
1156 // receipt of a new value.
1157 match (*ptr).waker {
1158 Some(ref w) if w.will_wake(waker) => {}
1159 _ => {
1160 old_waker = std::mem::replace(
1161 &mut (*ptr).waker,
1162 Some(waker.clone()),
1163 );
1164 }
1165 }
1166
1167 // If the waiter is not already queued, enqueue it.
1168 // `Relaxed` order suffices: we have synchronized with
1169 // all writers through the tail lock that we hold.
1170 if !(*ptr).queued.load(Relaxed) {
1171 // `Relaxed` order suffices: all the readers will
1172 // synchronize with this write through the tail lock.
1173 (*ptr).queued.store(true, Relaxed);
1174 tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1175 }
1176 });
1177 }
1178 }
1179
1180 // Drop the old waker after releasing the locks.
1181 drop(slot);
1182 drop(tail);
1183 drop(old_waker);
1184
1185 return Err(TryRecvError::Empty);
1186 }
1187
1188 // At this point, the receiver has lagged behind the sender by
1189 // more than the channel capacity. The receiver will attempt to
1190 // catch up by skipping dropped messages and setting the
1191 // internal cursor to the **oldest** message stored by the
1192 // channel.
1193 let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1194
1195 let missed = next.wrapping_sub(self.next);
1196
1197 drop(tail);
1198
1199 // The receiver is slow but no values have been missed
1200 if missed == 0 {
1201 self.next = self.next.wrapping_add(1);
1202
1203 return Ok(RecvGuard { slot });
1204 }
1205
1206 self.next = next;
1207
1208 return Err(TryRecvError::Lagged(missed));
1209 }
1210 }
1211
1212 self.next = self.next.wrapping_add(1);
1213
1214 Ok(RecvGuard { slot })
1215 }
1216}
1217
1218impl<T: Clone> Receiver<T> {
1219 /// Re-subscribes to the channel starting from the current tail element.
1220 ///
1221 /// This [`Receiver`] handle will receive a clone of all values sent
1222 /// **after** it has resubscribed. This will not include elements that are
1223 /// in the queue of the current receiver. Consider the following example.
1224 ///
1225 /// # Examples
1226 ///
1227 /// ```
1228 /// use tokio::sync::broadcast;
1229 ///
1230 /// #[tokio::main]
1231 /// async fn main() {
1232 /// let (tx, mut rx) = broadcast::channel(2);
1233 ///
1234 /// tx.send(1).unwrap();
1235 /// let mut rx2 = rx.resubscribe();
1236 /// tx.send(2).unwrap();
1237 ///
1238 /// assert_eq!(rx2.recv().await.unwrap(), 2);
1239 /// assert_eq!(rx.recv().await.unwrap(), 1);
1240 /// }
1241 /// ```
1242 pub fn resubscribe(&self) -> Self {
1243 let shared = self.shared.clone();
1244 new_receiver(shared)
1245 }
1246 /// Receives the next value for this receiver.
1247 ///
1248 /// Each [`Receiver`] handle will receive a clone of all values sent
1249 /// **after** it has subscribed.
1250 ///
1251 /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1252 /// dropped, indicating that no further values can be sent on the channel.
1253 ///
1254 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1255 /// sent values will overwrite old values. At this point, a call to [`recv`]
1256 /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1257 /// internal cursor is updated to point to the oldest value still held by
1258 /// the channel. A subsequent call to [`recv`] will return this value
1259 /// **unless** it has been since overwritten.
1260 ///
1261 /// # Cancel safety
1262 ///
1263 /// This method is cancel safe. If `recv` is used as the event in a
1264 /// [`tokio::select!`](crate::select) statement and some other branch
1265 /// completes first, it is guaranteed that no messages were received on this
1266 /// channel.
1267 ///
1268 /// [`Receiver`]: crate::sync::broadcast::Receiver
1269 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1270 ///
1271 /// # Examples
1272 ///
1273 /// ```
1274 /// use tokio::sync::broadcast;
1275 ///
1276 /// #[tokio::main]
1277 /// async fn main() {
1278 /// let (tx, mut rx1) = broadcast::channel(16);
1279 /// let mut rx2 = tx.subscribe();
1280 ///
1281 /// tokio::spawn(async move {
1282 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1283 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1284 /// });
1285 ///
1286 /// tokio::spawn(async move {
1287 /// assert_eq!(rx2.recv().await.unwrap(), 10);
1288 /// assert_eq!(rx2.recv().await.unwrap(), 20);
1289 /// });
1290 ///
1291 /// tx.send(10).unwrap();
1292 /// tx.send(20).unwrap();
1293 /// }
1294 /// ```
1295 ///
1296 /// Handling lag
1297 ///
1298 /// ```
1299 /// use tokio::sync::broadcast;
1300 ///
1301 /// #[tokio::main]
1302 /// async fn main() {
1303 /// let (tx, mut rx) = broadcast::channel(2);
1304 ///
1305 /// tx.send(10).unwrap();
1306 /// tx.send(20).unwrap();
1307 /// tx.send(30).unwrap();
1308 ///
1309 /// // The receiver lagged behind
1310 /// assert!(rx.recv().await.is_err());
1311 ///
1312 /// // At this point, we can abort or continue with lost messages
1313 ///
1314 /// assert_eq!(20, rx.recv().await.unwrap());
1315 /// assert_eq!(30, rx.recv().await.unwrap());
1316 /// }
1317 /// ```
1318 pub async fn recv(&mut self) -> Result<T, RecvError> {
1319 cooperative(Recv::new(self)).await
1320 }
1321
1322 /// Attempts to return a pending value on this receiver without awaiting.
1323 ///
1324 /// This is useful for a flavor of "optimistic check" before deciding to
1325 /// await on a receiver.
1326 ///
1327 /// Compared with [`recv`], this function has three failure cases instead of two
1328 /// (one for closed, one for an empty buffer, one for a lagging receiver).
1329 ///
1330 /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1331 /// dropped, indicating that no further values can be sent on the channel.
1332 ///
1333 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1334 /// sent values will overwrite old values. At this point, a call to [`recv`]
1335 /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1336 /// internal cursor is updated to point to the oldest value still held by
1337 /// the channel. A subsequent call to [`try_recv`] will return this value
1338 /// **unless** it has been since overwritten. If there are no values to
1339 /// receive, `Err(TryRecvError::Empty)` is returned.
1340 ///
1341 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1342 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1343 /// [`Receiver`]: crate::sync::broadcast::Receiver
1344 ///
1345 /// # Examples
1346 ///
1347 /// ```
1348 /// use tokio::sync::broadcast;
1349 ///
1350 /// #[tokio::main]
1351 /// async fn main() {
1352 /// let (tx, mut rx) = broadcast::channel(16);
1353 ///
1354 /// assert!(rx.try_recv().is_err());
1355 ///
1356 /// tx.send(10).unwrap();
1357 ///
1358 /// let value = rx.try_recv().unwrap();
1359 /// assert_eq!(10, value);
1360 /// }
1361 /// ```
1362 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1363 let guard = self.recv_ref(None)?;
1364 guard.clone_value().ok_or(TryRecvError::Closed)
1365 }
1366
1367 /// Blocking receive to call outside of asynchronous contexts.
1368 ///
1369 /// # Panics
1370 ///
1371 /// This function panics if called within an asynchronous execution
1372 /// context.
1373 ///
1374 /// # Examples
1375 /// ```
1376 /// use std::thread;
1377 /// use tokio::sync::broadcast;
1378 ///
1379 /// #[tokio::main]
1380 /// async fn main() {
1381 /// let (tx, mut rx) = broadcast::channel(16);
1382 ///
1383 /// let sync_code = thread::spawn(move || {
1384 /// assert_eq!(rx.blocking_recv(), Ok(10));
1385 /// });
1386 ///
1387 /// let _ = tx.send(10);
1388 /// sync_code.join().unwrap();
1389 /// }
1390 /// ```
1391 pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1392 crate::future::block_on(self.recv())
1393 }
1394}
1395
1396impl<T> Drop for Receiver<T> {
1397 fn drop(&mut self) {
1398 let mut tail = self.shared.tail.lock();
1399
1400 tail.rx_cnt -= 1;
1401 let until = tail.pos;
1402 let remaining_rx = tail.rx_cnt;
1403
1404 if remaining_rx == 0 {
1405 self.shared.notify_last_rx_drop.notify_waiters();
1406 tail.closed = true;
1407 }
1408
1409 drop(tail);
1410
1411 while self.next < until {
1412 match self.recv_ref(None) {
1413 Ok(_) => {}
1414 // The channel is closed
1415 Err(TryRecvError::Closed) => break,
1416 // Ignore lagging, we will catch up
1417 Err(TryRecvError::Lagged(..)) => {}
1418 // Can't be empty
1419 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1420 }
1421 }
1422 }
1423}
1424
1425impl<'a, T> Recv<'a, T> {
1426 fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1427 Recv {
1428 receiver,
1429 waiter: UnsafeCell::new(Waiter {
1430 queued: AtomicBool::new(false),
1431 waker: None,
1432 pointers: linked_list::Pointers::new(),
1433 _p: PhantomPinned,
1434 }),
1435 }
1436 }
1437
1438 /// A custom `project` implementation is used in place of `pin-project-lite`
1439 /// as a custom drop implementation is needed.
1440 fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1441 unsafe {
1442 // Safety: Receiver is Unpin
1443 is_unpin::<&mut Receiver<T>>();
1444
1445 let me = self.get_unchecked_mut();
1446 (me.receiver, &me.waiter)
1447 }
1448 }
1449}
1450
1451impl<'a, T> Future for Recv<'a, T>
1452where
1453 T: Clone,
1454{
1455 type Output = Result<T, RecvError>;
1456
1457 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1458 ready!(crate::trace::trace_leaf(cx));
1459
1460 let (receiver, waiter) = self.project();
1461
1462 let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1463 Ok(value) => value,
1464 Err(TryRecvError::Empty) => return Poll::Pending,
1465 Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1466 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1467 };
1468
1469 Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1470 }
1471}
1472
1473impl<'a, T> Drop for Recv<'a, T> {
1474 fn drop(&mut self) {
1475 // Safety: `waiter.queued` is atomic.
1476 // Acquire ordering is required to synchronize with
1477 // `Shared::notify_rx` before we drop the object.
1478 let queued = self
1479 .waiter
1480 .with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
1481
1482 // If the waiter is queued, we need to unlink it from the waiters list.
1483 // If not, no further synchronization is required, since the waiter
1484 // is not in the list and, as such, is not shared with any other threads.
1485 if queued {
1486 // Acquire the tail lock. This is required for safety before accessing
1487 // the waiter node.
1488 let mut tail = self.receiver.shared.tail.lock();
1489
1490 // Safety: tail lock is held.
1491 // `Relaxed` order suffices because we hold the tail lock.
1492 let queued = self
1493 .waiter
1494 .with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
1495
1496 if queued {
1497 // Remove the node
1498 //
1499 // safety: tail lock is held and the wait node is verified to be in
1500 // the list.
1501 unsafe {
1502 self.waiter.with_mut(|ptr| {
1503 tail.waiters.remove((&mut *ptr).into());
1504 });
1505 }
1506 }
1507 }
1508 }
1509}
1510
1511/// # Safety
1512///
1513/// `Waiter` is forced to be !Unpin.
1514unsafe impl linked_list::Link for Waiter {
1515 type Handle = NonNull<Waiter>;
1516 type Target = Waiter;
1517
1518 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1519 *handle
1520 }
1521
1522 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1523 ptr
1524 }
1525
1526 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1527 Waiter::addr_of_pointers(target)
1528 }
1529}
1530
1531impl<T> fmt::Debug for Sender<T> {
1532 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1533 write!(fmt, "broadcast::Sender")
1534 }
1535}
1536
1537impl<T> fmt::Debug for Receiver<T> {
1538 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1539 write!(fmt, "broadcast::Receiver")
1540 }
1541}
1542
1543impl<'a, T> RecvGuard<'a, T> {
1544 fn clone_value(&self) -> Option<T>
1545 where
1546 T: Clone,
1547 {
1548 self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
1549 }
1550}
1551
1552impl<'a, T> Drop for RecvGuard<'a, T> {
1553 fn drop(&mut self) {
1554 // Decrement the remaining counter
1555 if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1556 // Safety: Last receiver, drop the value
1557 self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
1558 }
1559 }
1560}
1561
1562fn is_unpin<T: Unpin>() {}
1563
1564#[cfg(not(loom))]
1565#[cfg(test)]
1566mod tests {
1567 use super::*;
1568
1569 #[test]
1570 fn receiver_count_on_sender_constructor() {
1571 let sender = Sender::<i32>::new(16);
1572 assert_eq!(sender.receiver_count(), 0);
1573
1574 let rx_1 = sender.subscribe();
1575 assert_eq!(sender.receiver_count(), 1);
1576
1577 let rx_2 = rx_1.resubscribe();
1578 assert_eq!(sender.receiver_count(), 2);
1579
1580 let rx_3 = sender.subscribe();
1581 assert_eq!(sender.receiver_count(), 3);
1582
1583 drop(rx_3);
1584 drop(rx_1);
1585 assert_eq!(sender.receiver_count(), 1);
1586
1587 drop(rx_2);
1588 assert_eq!(sender.receiver_count(), 0);
1589 }
1590
1591 #[cfg(not(loom))]
1592 #[test]
1593 fn receiver_count_on_channel_constructor() {
1594 let (sender, rx) = channel::<i32>(16);
1595 assert_eq!(sender.receiver_count(), 1);
1596
1597 let _rx_2 = rx.resubscribe();
1598 assert_eq!(sender.receiver_count(), 2);
1599 }
1600}