tokio/sync/
broadcast.rs

1//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2//! all consumers.
3//!
4//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7//! long as `T` is `Send`.
8//!
9//! When a value is sent, **all** [`Receiver`] handles are notified and will
10//! receive the value. The value is stored once inside the channel and cloned on
11//! demand for each receiver. Once all receivers have received a clone of the
12//! value, the value is released from the channel.
13//!
14//! A channel is created by calling [`channel`], specifying the maximum number
15//! of messages the channel can retain at any given time.
16//!
17//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18//! returned [`Receiver`] will receive values sent **after** the call to
19//! `subscribe`.
20//!
21//! This channel is also suitable for the single-producer multi-consumer
22//! use-case, where a single sender broadcasts values to many receivers.
23//!
24//! ## Lagging
25//!
26//! As sent messages must be retained until **all** [`Receiver`] handles receive
27//! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28//! In this case, all but one receiver are able to receive values at the rate
29//! they are sent. Because one receiver is stalled, the channel starts to fill
30//! up.
31//!
32//! This broadcast channel implementation handles this case by setting a hard
33//! upper bound on the number of values the channel may retain at any given
34//! time. This upper bound is passed to the [`channel`] function as an argument.
35//!
36//! If a value is sent when the channel is at capacity, the oldest value
37//! currently held by the channel is released. This frees up space for the new
38//! value. Any receiver that has not yet seen the released value will return
39//! [`RecvError::Lagged`] the next time [`recv`] is called.
40//!
41//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42//! updated to the oldest value contained by the channel. The next call to
43//! [`recv`] will return this value.
44//!
45//! This behavior enables a receiver to detect when it has lagged so far behind
46//! that data has been dropped. The caller may decide how to respond to this:
47//! either by aborting its task or by tolerating lost messages and resuming
48//! consumption of the channel.
49//!
50//! ## Closing
51//!
52//! When **all** [`Sender`] handles have been dropped, no new values may be
53//! sent. At this point, the channel is "closed". Once a receiver has received
54//! all values retained by the channel, the next call to [`recv`] will return
55//! with [`RecvError::Closed`].
56//!
57//! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58//! will be marked as read. If this receiver was the only one not to have read
59//! that message, the message will be dropped at this point.
60//!
61//! [`Sender`]: crate::sync::broadcast::Sender
62//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63//! [`Receiver`]: crate::sync::broadcast::Receiver
64//! [`channel`]: crate::sync::broadcast::channel
65//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67//! [`recv`]: crate::sync::broadcast::Receiver::recv
68//!
69//! # Examples
70//!
71//! Basic usage
72//!
73//! ```
74//! use tokio::sync::broadcast;
75//!
76//! #[tokio::main]
77//! async fn main() {
78//!     let (tx, mut rx1) = broadcast::channel(16);
79//!     let mut rx2 = tx.subscribe();
80//!
81//!     tokio::spawn(async move {
82//!         assert_eq!(rx1.recv().await.unwrap(), 10);
83//!         assert_eq!(rx1.recv().await.unwrap(), 20);
84//!     });
85//!
86//!     tokio::spawn(async move {
87//!         assert_eq!(rx2.recv().await.unwrap(), 10);
88//!         assert_eq!(rx2.recv().await.unwrap(), 20);
89//!     });
90//!
91//!     tx.send(10).unwrap();
92//!     tx.send(20).unwrap();
93//! }
94//! ```
95//!
96//! Handling lag
97//!
98//! ```
99//! use tokio::sync::broadcast;
100//!
101//! #[tokio::main]
102//! async fn main() {
103//!     let (tx, mut rx) = broadcast::channel(2);
104//!
105//!     tx.send(10).unwrap();
106//!     tx.send(20).unwrap();
107//!     tx.send(30).unwrap();
108//!
109//!     // The receiver lagged behind
110//!     assert!(rx.recv().await.is_err());
111//!
112//!     // At this point, we can abort or continue with lost messages
113//!
114//!     assert_eq!(20, rx.recv().await.unwrap());
115//!     assert_eq!(30, rx.recv().await.unwrap());
116//! }
117//! ```
118
119use crate::loom::cell::UnsafeCell;
120use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
121use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
122use crate::runtime::coop::cooperative;
123use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
124use crate::util::WakeList;
125
126use std::fmt;
127use std::future::Future;
128use std::marker::PhantomPinned;
129use std::pin::Pin;
130use std::ptr::NonNull;
131use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
132use std::task::{ready, Context, Poll, Waker};
133
134/// Sending-half of the [`broadcast`] channel.
135///
136/// May be used from many threads. Messages can be sent with
137/// [`send`][Sender::send].
138///
139/// # Examples
140///
141/// ```
142/// use tokio::sync::broadcast;
143///
144/// #[tokio::main]
145/// async fn main() {
146///     let (tx, mut rx1) = broadcast::channel(16);
147///     let mut rx2 = tx.subscribe();
148///
149///     tokio::spawn(async move {
150///         assert_eq!(rx1.recv().await.unwrap(), 10);
151///         assert_eq!(rx1.recv().await.unwrap(), 20);
152///     });
153///
154///     tokio::spawn(async move {
155///         assert_eq!(rx2.recv().await.unwrap(), 10);
156///         assert_eq!(rx2.recv().await.unwrap(), 20);
157///     });
158///
159///     tx.send(10).unwrap();
160///     tx.send(20).unwrap();
161/// }
162/// ```
163///
164/// [`broadcast`]: crate::sync::broadcast
165pub struct Sender<T> {
166    shared: Arc<Shared<T>>,
167}
168
169/// Receiving-half of the [`broadcast`] channel.
170///
171/// Must not be used concurrently. Messages may be retrieved using
172/// [`recv`][Receiver::recv].
173///
174/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
175/// wrapper.
176///
177/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
178///
179/// # Examples
180///
181/// ```
182/// use tokio::sync::broadcast;
183///
184/// #[tokio::main]
185/// async fn main() {
186///     let (tx, mut rx1) = broadcast::channel(16);
187///     let mut rx2 = tx.subscribe();
188///
189///     tokio::spawn(async move {
190///         assert_eq!(rx1.recv().await.unwrap(), 10);
191///         assert_eq!(rx1.recv().await.unwrap(), 20);
192///     });
193///
194///     tokio::spawn(async move {
195///         assert_eq!(rx2.recv().await.unwrap(), 10);
196///         assert_eq!(rx2.recv().await.unwrap(), 20);
197///     });
198///
199///     tx.send(10).unwrap();
200///     tx.send(20).unwrap();
201/// }
202/// ```
203///
204/// [`broadcast`]: crate::sync::broadcast
205pub struct Receiver<T> {
206    /// State shared with all receivers and senders.
207    shared: Arc<Shared<T>>,
208
209    /// Next position to read from
210    next: u64,
211}
212
213pub mod error {
214    //! Broadcast error types
215
216    use std::fmt;
217
218    /// Error returned by the [`send`] function on a [`Sender`].
219    ///
220    /// A **send** operation can only fail if there are no active receivers,
221    /// implying that the message could never be received. The error contains the
222    /// message being sent as a payload so it can be recovered.
223    ///
224    /// [`send`]: crate::sync::broadcast::Sender::send
225    /// [`Sender`]: crate::sync::broadcast::Sender
226    #[derive(Debug)]
227    pub struct SendError<T>(pub T);
228
229    impl<T> fmt::Display for SendError<T> {
230        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231            write!(f, "channel closed")
232        }
233    }
234
235    impl<T: fmt::Debug> std::error::Error for SendError<T> {}
236
237    /// An error returned from the [`recv`] function on a [`Receiver`].
238    ///
239    /// [`recv`]: crate::sync::broadcast::Receiver::recv
240    /// [`Receiver`]: crate::sync::broadcast::Receiver
241    #[derive(Debug, PartialEq, Eq, Clone)]
242    pub enum RecvError {
243        /// There are no more active senders implying no further messages will ever
244        /// be sent.
245        Closed,
246
247        /// The receiver lagged too far behind. Attempting to receive again will
248        /// return the oldest message still retained by the channel.
249        ///
250        /// Includes the number of skipped messages.
251        Lagged(u64),
252    }
253
254    impl fmt::Display for RecvError {
255        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256            match self {
257                RecvError::Closed => write!(f, "channel closed"),
258                RecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
259            }
260        }
261    }
262
263    impl std::error::Error for RecvError {}
264
265    /// An error returned from the [`try_recv`] function on a [`Receiver`].
266    ///
267    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
268    /// [`Receiver`]: crate::sync::broadcast::Receiver
269    #[derive(Debug, PartialEq, Eq, Clone)]
270    pub enum TryRecvError {
271        /// The channel is currently empty. There are still active
272        /// [`Sender`] handles, so data may yet become available.
273        ///
274        /// [`Sender`]: crate::sync::broadcast::Sender
275        Empty,
276
277        /// There are no more active senders implying no further messages will ever
278        /// be sent.
279        Closed,
280
281        /// The receiver lagged too far behind and has been forcibly disconnected.
282        /// Attempting to receive again will return the oldest message still
283        /// retained by the channel.
284        ///
285        /// Includes the number of skipped messages.
286        Lagged(u64),
287    }
288
289    impl fmt::Display for TryRecvError {
290        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
291            match self {
292                TryRecvError::Empty => write!(f, "channel empty"),
293                TryRecvError::Closed => write!(f, "channel closed"),
294                TryRecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
295            }
296        }
297    }
298
299    impl std::error::Error for TryRecvError {}
300}
301
302use self::error::{RecvError, SendError, TryRecvError};
303
304use super::Notify;
305
306/// Data shared between senders and receivers.
307struct Shared<T> {
308    /// slots in the channel.
309    buffer: Box<[RwLock<Slot<T>>]>,
310
311    /// Mask a position -> index.
312    mask: usize,
313
314    /// Tail of the queue. Includes the rx wait list.
315    tail: Mutex<Tail>,
316
317    /// Number of outstanding Sender handles.
318    num_tx: AtomicUsize,
319
320    /// Notify when the last subscribed [`Receiver`] drops.
321    notify_last_rx_drop: Notify,
322}
323
324/// Next position to write a value.
325struct Tail {
326    /// Next position to write to.
327    pos: u64,
328
329    /// Number of active receivers.
330    rx_cnt: usize,
331
332    /// True if the channel is closed.
333    closed: bool,
334
335    /// Receivers waiting for a value.
336    waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
337}
338
339/// Slot in the buffer.
340struct Slot<T> {
341    /// Remaining number of receivers that are expected to see this value.
342    ///
343    /// When this goes to zero, the value is released.
344    ///
345    /// An atomic is used as it is mutated concurrently with the slot read lock
346    /// acquired.
347    rem: AtomicUsize,
348
349    /// Uniquely identifies the `send` stored in the slot.
350    pos: u64,
351
352    /// The value being broadcast.
353    ///
354    /// The value is set by `send` when the write lock is held. When a reader
355    /// drops, `rem` is decremented. When it hits zero, the value is dropped.
356    val: UnsafeCell<Option<T>>,
357}
358
359/// An entry in the wait queue.
360struct Waiter {
361    /// True if queued.
362    queued: AtomicBool,
363
364    /// Task waiting on the broadcast channel.
365    waker: Option<Waker>,
366
367    /// Intrusive linked-list pointers.
368    pointers: linked_list::Pointers<Waiter>,
369
370    /// Should not be `Unpin`.
371    _p: PhantomPinned,
372}
373
374impl Waiter {
375    fn new() -> Self {
376        Self {
377            queued: AtomicBool::new(false),
378            waker: None,
379            pointers: linked_list::Pointers::new(),
380            _p: PhantomPinned,
381        }
382    }
383}
384
385generate_addr_of_methods! {
386    impl<> Waiter {
387        unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
388            &self.pointers
389        }
390    }
391}
392
393struct RecvGuard<'a, T> {
394    slot: RwLockReadGuard<'a, Slot<T>>,
395}
396
397/// Receive a value future.
398struct Recv<'a, T> {
399    /// Receiver being waited on.
400    receiver: &'a mut Receiver<T>,
401
402    /// Entry in the waiter `LinkedList`.
403    waiter: UnsafeCell<Waiter>,
404}
405
406unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
407unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
408
409/// Max number of receivers. Reserve space to lock.
410const MAX_RECEIVERS: usize = usize::MAX >> 2;
411
412/// Create a bounded, multi-producer, multi-consumer channel where each sent
413/// value is broadcasted to all active receivers.
414///
415/// **Note:** The actual capacity may be greater than the provided `capacity`.
416///
417/// All data sent on [`Sender`] will become available on every active
418/// [`Receiver`] in the same order as it was sent.
419///
420/// The `Sender` can be cloned to `send` to the same channel from multiple
421/// points in the process or it can be used concurrently from an `Arc`. New
422/// `Receiver` handles are created by calling [`Sender::subscribe`].
423///
424/// If all [`Receiver`] handles are dropped, the `send` method will return a
425/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
426/// method will return a [`RecvError`].
427///
428/// [`Sender`]: crate::sync::broadcast::Sender
429/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
430/// [`Receiver`]: crate::sync::broadcast::Receiver
431/// [`recv`]: crate::sync::broadcast::Receiver::recv
432/// [`SendError`]: crate::sync::broadcast::error::SendError
433/// [`RecvError`]: crate::sync::broadcast::error::RecvError
434///
435/// # Examples
436///
437/// ```
438/// use tokio::sync::broadcast;
439///
440/// #[tokio::main]
441/// async fn main() {
442///     let (tx, mut rx1) = broadcast::channel(16);
443///     let mut rx2 = tx.subscribe();
444///
445///     tokio::spawn(async move {
446///         assert_eq!(rx1.recv().await.unwrap(), 10);
447///         assert_eq!(rx1.recv().await.unwrap(), 20);
448///     });
449///
450///     tokio::spawn(async move {
451///         assert_eq!(rx2.recv().await.unwrap(), 10);
452///         assert_eq!(rx2.recv().await.unwrap(), 20);
453///     });
454///
455///     tx.send(10).unwrap();
456///     tx.send(20).unwrap();
457/// }
458/// ```
459///
460/// # Panics
461///
462/// This will panic if `capacity` is equal to `0` or larger
463/// than `usize::MAX / 2`.
464#[track_caller]
465pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
466    // SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
467    let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
468    let rx = Receiver {
469        shared: tx.shared.clone(),
470        next: 0,
471    };
472    (tx, rx)
473}
474
475unsafe impl<T: Send> Send for Sender<T> {}
476unsafe impl<T: Send> Sync for Sender<T> {}
477
478unsafe impl<T: Send> Send for Receiver<T> {}
479unsafe impl<T: Send> Sync for Receiver<T> {}
480
481impl<T> Sender<T> {
482    /// Creates the sending-half of the [`broadcast`] channel.
483    ///
484    /// See the documentation of [`broadcast::channel`] for more information on this method.
485    ///
486    /// [`broadcast`]: crate::sync::broadcast
487    /// [`broadcast::channel`]: crate::sync::broadcast::channel
488    #[track_caller]
489    pub fn new(capacity: usize) -> Self {
490        // SAFETY: We don't create extra receivers, so there are 0.
491        unsafe { Self::new_with_receiver_count(0, capacity) }
492    }
493
494    /// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
495    /// count.
496    ///
497    /// See the documentation of [`broadcast::channel`](self::channel) for more errors when
498    /// calling this function.
499    ///
500    /// # Safety:
501    ///
502    /// The caller must ensure that the amount of receivers for this Sender is correct before
503    /// the channel functionalities are used, the count is zero by default, as this function
504    /// does not create any receivers by itself.
505    #[track_caller]
506    unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
507        assert!(capacity > 0, "broadcast channel capacity cannot be zero");
508        assert!(
509            capacity <= usize::MAX >> 1,
510            "broadcast channel capacity exceeded `usize::MAX / 2`"
511        );
512
513        // Round to a power of two
514        capacity = capacity.next_power_of_two();
515
516        let mut buffer = Vec::with_capacity(capacity);
517
518        for i in 0..capacity {
519            buffer.push(RwLock::new(Slot {
520                rem: AtomicUsize::new(0),
521                pos: (i as u64).wrapping_sub(capacity as u64),
522                val: UnsafeCell::new(None),
523            }));
524        }
525
526        let shared = Arc::new(Shared {
527            buffer: buffer.into_boxed_slice(),
528            mask: capacity - 1,
529            tail: Mutex::new(Tail {
530                pos: 0,
531                rx_cnt: receiver_count,
532                closed: false,
533                waiters: LinkedList::new(),
534            }),
535            num_tx: AtomicUsize::new(1),
536            notify_last_rx_drop: Notify::new(),
537        });
538
539        Sender { shared }
540    }
541
542    /// Attempts to send a value to all active [`Receiver`] handles, returning
543    /// it back if it could not be sent.
544    ///
545    /// A successful send occurs when there is at least one active [`Receiver`]
546    /// handle. An unsuccessful send would be one where all associated
547    /// [`Receiver`] handles have already been dropped.
548    ///
549    /// # Return
550    ///
551    /// On success, the number of subscribed [`Receiver`] handles is returned.
552    /// This does not mean that this number of receivers will see the message as
553    /// a receiver may drop or lag ([see lagging](self#lagging)) before receiving
554    /// the message.
555    ///
556    /// # Note
557    ///
558    /// A return value of `Ok` **does not** mean that the sent value will be
559    /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
560    /// handles may be dropped before receiving the sent message.
561    ///
562    /// A return value of `Err` **does not** mean that future calls to `send`
563    /// will fail. New [`Receiver`] handles may be created by calling
564    /// [`subscribe`].
565    ///
566    /// [`Receiver`]: crate::sync::broadcast::Receiver
567    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
568    ///
569    /// # Examples
570    ///
571    /// ```
572    /// use tokio::sync::broadcast;
573    ///
574    /// #[tokio::main]
575    /// async fn main() {
576    ///     let (tx, mut rx1) = broadcast::channel(16);
577    ///     let mut rx2 = tx.subscribe();
578    ///
579    ///     tokio::spawn(async move {
580    ///         assert_eq!(rx1.recv().await.unwrap(), 10);
581    ///         assert_eq!(rx1.recv().await.unwrap(), 20);
582    ///     });
583    ///
584    ///     tokio::spawn(async move {
585    ///         assert_eq!(rx2.recv().await.unwrap(), 10);
586    ///         assert_eq!(rx2.recv().await.unwrap(), 20);
587    ///     });
588    ///
589    ///     tx.send(10).unwrap();
590    ///     tx.send(20).unwrap();
591    /// }
592    /// ```
593    pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
594        let mut tail = self.shared.tail.lock();
595
596        if tail.rx_cnt == 0 {
597            return Err(SendError(value));
598        }
599
600        // Position to write into
601        let pos = tail.pos;
602        let rem = tail.rx_cnt;
603        let idx = (pos & self.shared.mask as u64) as usize;
604
605        // Update the tail position
606        tail.pos = tail.pos.wrapping_add(1);
607
608        // Get the slot
609        let mut slot = self.shared.buffer[idx].write();
610
611        // Track the position
612        slot.pos = pos;
613
614        // Set remaining receivers
615        slot.rem.with_mut(|v| *v = rem);
616
617        // Write the value
618        slot.val = UnsafeCell::new(Some(value));
619
620        // Release the slot lock before notifying the receivers.
621        drop(slot);
622
623        // Notify and release the mutex. This must happen after the slot lock is
624        // released, otherwise the writer lock bit could be cleared while another
625        // thread is in the critical section.
626        self.shared.notify_rx(tail);
627
628        Ok(rem)
629    }
630
631    /// Creates a new [`Receiver`] handle that will receive values sent **after**
632    /// this call to `subscribe`.
633    ///
634    /// # Examples
635    ///
636    /// ```
637    /// use tokio::sync::broadcast;
638    ///
639    /// #[tokio::main]
640    /// async fn main() {
641    ///     let (tx, _rx) = broadcast::channel(16);
642    ///
643    ///     // Will not be seen
644    ///     tx.send(10).unwrap();
645    ///
646    ///     let mut rx = tx.subscribe();
647    ///
648    ///     tx.send(20).unwrap();
649    ///
650    ///     let value = rx.recv().await.unwrap();
651    ///     assert_eq!(20, value);
652    /// }
653    /// ```
654    pub fn subscribe(&self) -> Receiver<T> {
655        let shared = self.shared.clone();
656        new_receiver(shared)
657    }
658
659    /// Returns the number of queued values.
660    ///
661    /// A value is queued until it has either been seen by all receivers that were alive at the time
662    /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
663    /// queue's capacity.
664    ///
665    /// # Note
666    ///
667    /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
668    /// have been evicted from the queue before being seen by all receivers.
669    ///
670    /// # Examples
671    ///
672    /// ```
673    /// use tokio::sync::broadcast;
674    ///
675    /// #[tokio::main]
676    /// async fn main() {
677    ///     let (tx, mut rx1) = broadcast::channel(16);
678    ///     let mut rx2 = tx.subscribe();
679    ///
680    ///     tx.send(10).unwrap();
681    ///     tx.send(20).unwrap();
682    ///     tx.send(30).unwrap();
683    ///
684    ///     assert_eq!(tx.len(), 3);
685    ///
686    ///     rx1.recv().await.unwrap();
687    ///
688    ///     // The len is still 3 since rx2 hasn't seen the first value yet.
689    ///     assert_eq!(tx.len(), 3);
690    ///
691    ///     rx2.recv().await.unwrap();
692    ///
693    ///     assert_eq!(tx.len(), 2);
694    /// }
695    /// ```
696    pub fn len(&self) -> usize {
697        let tail = self.shared.tail.lock();
698
699        let base_idx = (tail.pos & self.shared.mask as u64) as usize;
700        let mut low = 0;
701        let mut high = self.shared.buffer.len();
702        while low < high {
703            let mid = low + (high - low) / 2;
704            let idx = base_idx.wrapping_add(mid) & self.shared.mask;
705            if self.shared.buffer[idx].read().rem.load(SeqCst) == 0 {
706                low = mid + 1;
707            } else {
708                high = mid;
709            }
710        }
711
712        self.shared.buffer.len() - low
713    }
714
715    /// Returns true if there are no queued values.
716    ///
717    /// # Examples
718    ///
719    /// ```
720    /// use tokio::sync::broadcast;
721    ///
722    /// #[tokio::main]
723    /// async fn main() {
724    ///     let (tx, mut rx1) = broadcast::channel(16);
725    ///     let mut rx2 = tx.subscribe();
726    ///
727    ///     assert!(tx.is_empty());
728    ///
729    ///     tx.send(10).unwrap();
730    ///
731    ///     assert!(!tx.is_empty());
732    ///
733    ///     rx1.recv().await.unwrap();
734    ///
735    ///     // The queue is still not empty since rx2 hasn't seen the value.
736    ///     assert!(!tx.is_empty());
737    ///
738    ///     rx2.recv().await.unwrap();
739    ///
740    ///     assert!(tx.is_empty());
741    /// }
742    /// ```
743    pub fn is_empty(&self) -> bool {
744        let tail = self.shared.tail.lock();
745
746        let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
747        self.shared.buffer[idx].read().rem.load(SeqCst) == 0
748    }
749
750    /// Returns the number of active receivers.
751    ///
752    /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
753    /// [`subscribe`]. These are the handles that will receive values sent on
754    /// this [`Sender`].
755    ///
756    /// # Note
757    ///
758    /// It is not guaranteed that a sent message will reach this number of
759    /// receivers. Active receivers may never call [`recv`] again before
760    /// dropping.
761    ///
762    /// [`recv`]: crate::sync::broadcast::Receiver::recv
763    /// [`Receiver`]: crate::sync::broadcast::Receiver
764    /// [`Sender`]: crate::sync::broadcast::Sender
765    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
766    /// [`channel`]: crate::sync::broadcast::channel
767    ///
768    /// # Examples
769    ///
770    /// ```
771    /// use tokio::sync::broadcast;
772    ///
773    /// #[tokio::main]
774    /// async fn main() {
775    ///     let (tx, _rx1) = broadcast::channel(16);
776    ///
777    ///     assert_eq!(1, tx.receiver_count());
778    ///
779    ///     let mut _rx2 = tx.subscribe();
780    ///
781    ///     assert_eq!(2, tx.receiver_count());
782    ///
783    ///     tx.send(10).unwrap();
784    /// }
785    /// ```
786    pub fn receiver_count(&self) -> usize {
787        let tail = self.shared.tail.lock();
788        tail.rx_cnt
789    }
790
791    /// Returns `true` if senders belong to the same channel.
792    ///
793    /// # Examples
794    ///
795    /// ```
796    /// use tokio::sync::broadcast;
797    ///
798    /// #[tokio::main]
799    /// async fn main() {
800    ///     let (tx, _rx) = broadcast::channel::<()>(16);
801    ///     let tx2 = tx.clone();
802    ///
803    ///     assert!(tx.same_channel(&tx2));
804    ///
805    ///     let (tx3, _rx3) = broadcast::channel::<()>(16);
806    ///
807    ///     assert!(!tx3.same_channel(&tx2));
808    /// }
809    /// ```
810    pub fn same_channel(&self, other: &Self) -> bool {
811        Arc::ptr_eq(&self.shared, &other.shared)
812    }
813
814    /// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches
815    /// zero.
816    ///
817    /// # Examples
818    ///
819    /// ```
820    /// use futures::FutureExt;
821    /// use tokio::sync::broadcast;
822    ///
823    /// #[tokio::main]
824    /// async fn main() {
825    ///     let (tx, mut rx1) = broadcast::channel::<u32>(16);
826    ///     let mut rx2 = tx.subscribe();
827    ///
828    ///     let _ = tx.send(10);
829    ///
830    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
831    ///     drop(rx1);
832    ///     assert!(tx.closed().now_or_never().is_none());
833    ///
834    ///     assert_eq!(rx2.recv().await.unwrap(), 10);
835    ///     drop(rx2);
836    ///     assert!(tx.closed().now_or_never().is_some());
837    /// }
838    /// ```
839    pub async fn closed(&self) {
840        loop {
841            let notified = self.shared.notify_last_rx_drop.notified();
842
843            {
844                // Ensure the lock drops if the channel isn't closed
845                let tail = self.shared.tail.lock();
846                if tail.closed {
847                    return;
848                }
849            }
850
851            notified.await;
852        }
853    }
854
855    fn close_channel(&self) {
856        let mut tail = self.shared.tail.lock();
857        tail.closed = true;
858
859        self.shared.notify_rx(tail);
860    }
861}
862
863/// Create a new `Receiver` which reads starting from the tail.
864fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
865    let mut tail = shared.tail.lock();
866
867    assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");
868
869    if tail.rx_cnt == 0 {
870        // Potentially need to re-open the channel, if a new receiver has been added between calls
871        // to poll(). Note that we use rx_cnt == 0 instead of is_closed since is_closed also
872        // applies if the sender has been dropped
873        tail.closed = false;
874    }
875
876    tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
877    let next = tail.pos;
878
879    drop(tail);
880
881    Receiver { shared, next }
882}
883
884/// List used in `Shared::notify_rx`. It wraps a guarded linked list
885/// and gates the access to it on the `Shared.tail` mutex. It also empties
886/// the list on drop.
887struct WaitersList<'a, T> {
888    list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
889    is_empty: bool,
890    shared: &'a Shared<T>,
891}
892
893impl<'a, T> Drop for WaitersList<'a, T> {
894    fn drop(&mut self) {
895        // If the list is not empty, we unlink all waiters from it.
896        // We do not wake the waiters to avoid double panics.
897        if !self.is_empty {
898            let _lock_guard = self.shared.tail.lock();
899            while self.list.pop_back().is_some() {}
900        }
901    }
902}
903
904impl<'a, T> WaitersList<'a, T> {
905    fn new(
906        unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
907        guard: Pin<&'a Waiter>,
908        shared: &'a Shared<T>,
909    ) -> Self {
910        let guard_ptr = NonNull::from(guard.get_ref());
911        let list = unguarded_list.into_guarded(guard_ptr);
912        WaitersList {
913            list,
914            is_empty: false,
915            shared,
916        }
917    }
918
919    /// Removes the last element from the guarded list. Modifying this list
920    /// requires an exclusive access to the main list in `Notify`.
921    fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
922        let result = self.list.pop_back();
923        if result.is_none() {
924            // Save information about emptiness to avoid waiting for lock
925            // in the destructor.
926            self.is_empty = true;
927        }
928        result
929    }
930}
931
932impl<T> Shared<T> {
933    fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
934        // It is critical for `GuardedLinkedList` safety that the guard node is
935        // pinned in memory and is not dropped until the guarded list is dropped.
936        let guard = Waiter::new();
937        pin!(guard);
938
939        // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
940        // underneath to allow every waiter to safely remove itself from it.
941        //
942        // * This list will be still guarded by the `waiters` lock.
943        //   `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
944        // * This wrapper will empty the list on drop. It is critical for safety
945        //   that we will not leave any list entry with a pointer to the local
946        //   guard node after this function returns / panics.
947        let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
948
949        let mut wakers = WakeList::new();
950        'outer: loop {
951            while wakers.can_push() {
952                match list.pop_back_locked(&mut tail) {
953                    Some(waiter) => {
954                        unsafe {
955                            // Safety: accessing `waker` is safe because
956                            // the tail lock is held.
957                            if let Some(waker) = (*waiter.as_ptr()).waker.take() {
958                                wakers.push(waker);
959                            }
960
961                            // Safety: `queued` is atomic.
962                            let queued = &(*waiter.as_ptr()).queued;
963                            // `Relaxed` suffices because the tail lock is held.
964                            assert!(queued.load(Relaxed));
965                            // `Release` is needed to synchronize with `Recv::drop`.
966                            // It is critical to set this variable **after** waker
967                            // is extracted, otherwise we may data race with `Recv::drop`.
968                            queued.store(false, Release);
969                        }
970                    }
971                    None => {
972                        break 'outer;
973                    }
974                }
975            }
976
977            // Release the lock before waking.
978            drop(tail);
979
980            // Before we acquire the lock again all sorts of things can happen:
981            // some waiters may remove themselves from the list and new waiters
982            // may be added. This is fine since at worst we will unnecessarily
983            // wake up waiters which will then queue themselves again.
984
985            wakers.wake_all();
986
987            // Acquire the lock again.
988            tail = self.tail.lock();
989        }
990
991        // Release the lock before waking.
992        drop(tail);
993
994        wakers.wake_all();
995    }
996}
997
998impl<T> Clone for Sender<T> {
999    fn clone(&self) -> Sender<T> {
1000        let shared = self.shared.clone();
1001        shared.num_tx.fetch_add(1, SeqCst);
1002
1003        Sender { shared }
1004    }
1005}
1006
1007impl<T> Drop for Sender<T> {
1008    fn drop(&mut self) {
1009        if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
1010            self.close_channel();
1011        }
1012    }
1013}
1014
1015impl<T> Receiver<T> {
1016    /// Returns the number of messages that were sent into the channel and that
1017    /// this [`Receiver`] has yet to receive.
1018    ///
1019    /// If the returned value from `len` is larger than the next largest power of 2
1020    /// of the capacity of the channel any call to [`recv`] will return an
1021    /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
1022    /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
1023    /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
1024    /// values larger than 16.
1025    ///
1026    /// [`Receiver`]: crate::sync::broadcast::Receiver
1027    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1028    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1029    ///
1030    /// # Examples
1031    ///
1032    /// ```
1033    /// use tokio::sync::broadcast;
1034    ///
1035    /// #[tokio::main]
1036    /// async fn main() {
1037    ///     let (tx, mut rx1) = broadcast::channel(16);
1038    ///
1039    ///     tx.send(10).unwrap();
1040    ///     tx.send(20).unwrap();
1041    ///
1042    ///     assert_eq!(rx1.len(), 2);
1043    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
1044    ///     assert_eq!(rx1.len(), 1);
1045    ///     assert_eq!(rx1.recv().await.unwrap(), 20);
1046    ///     assert_eq!(rx1.len(), 0);
1047    /// }
1048    /// ```
1049    pub fn len(&self) -> usize {
1050        let next_send_pos = self.shared.tail.lock().pos;
1051        (next_send_pos - self.next) as usize
1052    }
1053
1054    /// Returns true if there aren't any messages in the channel that the [`Receiver`]
1055    /// has yet to receive.
1056    ///
1057    /// [`Receiver]: create::sync::broadcast::Receiver
1058    ///
1059    /// # Examples
1060    ///
1061    /// ```
1062    /// use tokio::sync::broadcast;
1063    ///
1064    /// #[tokio::main]
1065    /// async fn main() {
1066    ///     let (tx, mut rx1) = broadcast::channel(16);
1067    ///
1068    ///     assert!(rx1.is_empty());
1069    ///
1070    ///     tx.send(10).unwrap();
1071    ///     tx.send(20).unwrap();
1072    ///
1073    ///     assert!(!rx1.is_empty());
1074    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
1075    ///     assert_eq!(rx1.recv().await.unwrap(), 20);
1076    ///     assert!(rx1.is_empty());
1077    /// }
1078    /// ```
1079    pub fn is_empty(&self) -> bool {
1080        self.len() == 0
1081    }
1082
1083    /// Returns `true` if receivers belong to the same channel.
1084    ///
1085    /// # Examples
1086    ///
1087    /// ```
1088    /// use tokio::sync::broadcast;
1089    ///
1090    /// #[tokio::main]
1091    /// async fn main() {
1092    ///     let (tx, rx) = broadcast::channel::<()>(16);
1093    ///     let rx2 = tx.subscribe();
1094    ///
1095    ///     assert!(rx.same_channel(&rx2));
1096    ///
1097    ///     let (_tx3, rx3) = broadcast::channel::<()>(16);
1098    ///
1099    ///     assert!(!rx3.same_channel(&rx2));
1100    /// }
1101    /// ```
1102    pub fn same_channel(&self, other: &Self) -> bool {
1103        Arc::ptr_eq(&self.shared, &other.shared)
1104    }
1105
1106    /// Locks the next value if there is one.
1107    fn recv_ref(
1108        &mut self,
1109        waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
1110    ) -> Result<RecvGuard<'_, T>, TryRecvError> {
1111        let idx = (self.next & self.shared.mask as u64) as usize;
1112
1113        // The slot holding the next value to read
1114        let mut slot = self.shared.buffer[idx].read();
1115
1116        if slot.pos != self.next {
1117            // Release the `slot` lock before attempting to acquire the `tail`
1118            // lock. This is required because `send2` acquires the tail lock
1119            // first followed by the slot lock. Acquiring the locks in reverse
1120            // order here would result in a potential deadlock: `recv_ref`
1121            // acquires the `slot` lock and attempts to acquire the `tail` lock
1122            // while `send2` acquired the `tail` lock and attempts to acquire
1123            // the slot lock.
1124            drop(slot);
1125
1126            let mut old_waker = None;
1127
1128            let mut tail = self.shared.tail.lock();
1129
1130            // Acquire slot lock again
1131            slot = self.shared.buffer[idx].read();
1132
1133            // Make sure the position did not change. This could happen in the
1134            // unlikely event that the buffer is wrapped between dropping the
1135            // read lock and acquiring the tail lock.
1136            if slot.pos != self.next {
1137                let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
1138
1139                if next_pos == self.next {
1140                    // At this point the channel is empty for *this* receiver. If
1141                    // it's been closed, then that's what we return, otherwise we
1142                    // set a waker and return empty.
1143                    if tail.closed {
1144                        return Err(TryRecvError::Closed);
1145                    }
1146
1147                    // Store the waker
1148                    if let Some((waiter, waker)) = waiter {
1149                        // Safety: called while locked.
1150                        unsafe {
1151                            // Only queue if not already queued
1152                            waiter.with_mut(|ptr| {
1153                                // If there is no waker **or** if the currently
1154                                // stored waker references a **different** task,
1155                                // track the tasks' waker to be notified on
1156                                // receipt of a new value.
1157                                match (*ptr).waker {
1158                                    Some(ref w) if w.will_wake(waker) => {}
1159                                    _ => {
1160                                        old_waker = std::mem::replace(
1161                                            &mut (*ptr).waker,
1162                                            Some(waker.clone()),
1163                                        );
1164                                    }
1165                                }
1166
1167                                // If the waiter is not already queued, enqueue it.
1168                                // `Relaxed` order suffices: we have synchronized with
1169                                // all writers through the tail lock that we hold.
1170                                if !(*ptr).queued.load(Relaxed) {
1171                                    // `Relaxed` order suffices: all the readers will
1172                                    // synchronize with this write through the tail lock.
1173                                    (*ptr).queued.store(true, Relaxed);
1174                                    tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1175                                }
1176                            });
1177                        }
1178                    }
1179
1180                    // Drop the old waker after releasing the locks.
1181                    drop(slot);
1182                    drop(tail);
1183                    drop(old_waker);
1184
1185                    return Err(TryRecvError::Empty);
1186                }
1187
1188                // At this point, the receiver has lagged behind the sender by
1189                // more than the channel capacity. The receiver will attempt to
1190                // catch up by skipping dropped messages and setting the
1191                // internal cursor to the **oldest** message stored by the
1192                // channel.
1193                let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1194
1195                let missed = next.wrapping_sub(self.next);
1196
1197                drop(tail);
1198
1199                // The receiver is slow but no values have been missed
1200                if missed == 0 {
1201                    self.next = self.next.wrapping_add(1);
1202
1203                    return Ok(RecvGuard { slot });
1204                }
1205
1206                self.next = next;
1207
1208                return Err(TryRecvError::Lagged(missed));
1209            }
1210        }
1211
1212        self.next = self.next.wrapping_add(1);
1213
1214        Ok(RecvGuard { slot })
1215    }
1216}
1217
1218impl<T: Clone> Receiver<T> {
1219    /// Re-subscribes to the channel starting from the current tail element.
1220    ///
1221    /// This [`Receiver`] handle will receive a clone of all values sent
1222    /// **after** it has resubscribed. This will not include elements that are
1223    /// in the queue of the current receiver. Consider the following example.
1224    ///
1225    /// # Examples
1226    ///
1227    /// ```
1228    /// use tokio::sync::broadcast;
1229    ///
1230    /// #[tokio::main]
1231    /// async fn main() {
1232    ///   let (tx, mut rx) = broadcast::channel(2);
1233    ///
1234    ///   tx.send(1).unwrap();
1235    ///   let mut rx2 = rx.resubscribe();
1236    ///   tx.send(2).unwrap();
1237    ///
1238    ///   assert_eq!(rx2.recv().await.unwrap(), 2);
1239    ///   assert_eq!(rx.recv().await.unwrap(), 1);
1240    /// }
1241    /// ```
1242    pub fn resubscribe(&self) -> Self {
1243        let shared = self.shared.clone();
1244        new_receiver(shared)
1245    }
1246    /// Receives the next value for this receiver.
1247    ///
1248    /// Each [`Receiver`] handle will receive a clone of all values sent
1249    /// **after** it has subscribed.
1250    ///
1251    /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1252    /// dropped, indicating that no further values can be sent on the channel.
1253    ///
1254    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1255    /// sent values will overwrite old values. At this point, a call to [`recv`]
1256    /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1257    /// internal cursor is updated to point to the oldest value still held by
1258    /// the channel. A subsequent call to [`recv`] will return this value
1259    /// **unless** it has been since overwritten.
1260    ///
1261    /// # Cancel safety
1262    ///
1263    /// This method is cancel safe. If `recv` is used as the event in a
1264    /// [`tokio::select!`](crate::select) statement and some other branch
1265    /// completes first, it is guaranteed that no messages were received on this
1266    /// channel.
1267    ///
1268    /// [`Receiver`]: crate::sync::broadcast::Receiver
1269    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1270    ///
1271    /// # Examples
1272    ///
1273    /// ```
1274    /// use tokio::sync::broadcast;
1275    ///
1276    /// #[tokio::main]
1277    /// async fn main() {
1278    ///     let (tx, mut rx1) = broadcast::channel(16);
1279    ///     let mut rx2 = tx.subscribe();
1280    ///
1281    ///     tokio::spawn(async move {
1282    ///         assert_eq!(rx1.recv().await.unwrap(), 10);
1283    ///         assert_eq!(rx1.recv().await.unwrap(), 20);
1284    ///     });
1285    ///
1286    ///     tokio::spawn(async move {
1287    ///         assert_eq!(rx2.recv().await.unwrap(), 10);
1288    ///         assert_eq!(rx2.recv().await.unwrap(), 20);
1289    ///     });
1290    ///
1291    ///     tx.send(10).unwrap();
1292    ///     tx.send(20).unwrap();
1293    /// }
1294    /// ```
1295    ///
1296    /// Handling lag
1297    ///
1298    /// ```
1299    /// use tokio::sync::broadcast;
1300    ///
1301    /// #[tokio::main]
1302    /// async fn main() {
1303    ///     let (tx, mut rx) = broadcast::channel(2);
1304    ///
1305    ///     tx.send(10).unwrap();
1306    ///     tx.send(20).unwrap();
1307    ///     tx.send(30).unwrap();
1308    ///
1309    ///     // The receiver lagged behind
1310    ///     assert!(rx.recv().await.is_err());
1311    ///
1312    ///     // At this point, we can abort or continue with lost messages
1313    ///
1314    ///     assert_eq!(20, rx.recv().await.unwrap());
1315    ///     assert_eq!(30, rx.recv().await.unwrap());
1316    /// }
1317    /// ```
1318    pub async fn recv(&mut self) -> Result<T, RecvError> {
1319        cooperative(Recv::new(self)).await
1320    }
1321
1322    /// Attempts to return a pending value on this receiver without awaiting.
1323    ///
1324    /// This is useful for a flavor of "optimistic check" before deciding to
1325    /// await on a receiver.
1326    ///
1327    /// Compared with [`recv`], this function has three failure cases instead of two
1328    /// (one for closed, one for an empty buffer, one for a lagging receiver).
1329    ///
1330    /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1331    /// dropped, indicating that no further values can be sent on the channel.
1332    ///
1333    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1334    /// sent values will overwrite old values. At this point, a call to [`recv`]
1335    /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1336    /// internal cursor is updated to point to the oldest value still held by
1337    /// the channel. A subsequent call to [`try_recv`] will return this value
1338    /// **unless** it has been since overwritten. If there are no values to
1339    /// receive, `Err(TryRecvError::Empty)` is returned.
1340    ///
1341    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1342    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1343    /// [`Receiver`]: crate::sync::broadcast::Receiver
1344    ///
1345    /// # Examples
1346    ///
1347    /// ```
1348    /// use tokio::sync::broadcast;
1349    ///
1350    /// #[tokio::main]
1351    /// async fn main() {
1352    ///     let (tx, mut rx) = broadcast::channel(16);
1353    ///
1354    ///     assert!(rx.try_recv().is_err());
1355    ///
1356    ///     tx.send(10).unwrap();
1357    ///
1358    ///     let value = rx.try_recv().unwrap();
1359    ///     assert_eq!(10, value);
1360    /// }
1361    /// ```
1362    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1363        let guard = self.recv_ref(None)?;
1364        guard.clone_value().ok_or(TryRecvError::Closed)
1365    }
1366
1367    /// Blocking receive to call outside of asynchronous contexts.
1368    ///
1369    /// # Panics
1370    ///
1371    /// This function panics if called within an asynchronous execution
1372    /// context.
1373    ///
1374    /// # Examples
1375    /// ```
1376    /// use std::thread;
1377    /// use tokio::sync::broadcast;
1378    ///
1379    /// #[tokio::main]
1380    /// async fn main() {
1381    ///     let (tx, mut rx) = broadcast::channel(16);
1382    ///
1383    ///     let sync_code = thread::spawn(move || {
1384    ///         assert_eq!(rx.blocking_recv(), Ok(10));
1385    ///     });
1386    ///
1387    ///     let _ = tx.send(10);
1388    ///     sync_code.join().unwrap();
1389    /// }
1390    /// ```
1391    pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1392        crate::future::block_on(self.recv())
1393    }
1394}
1395
1396impl<T> Drop for Receiver<T> {
1397    fn drop(&mut self) {
1398        let mut tail = self.shared.tail.lock();
1399
1400        tail.rx_cnt -= 1;
1401        let until = tail.pos;
1402        let remaining_rx = tail.rx_cnt;
1403
1404        if remaining_rx == 0 {
1405            self.shared.notify_last_rx_drop.notify_waiters();
1406            tail.closed = true;
1407        }
1408
1409        drop(tail);
1410
1411        while self.next < until {
1412            match self.recv_ref(None) {
1413                Ok(_) => {}
1414                // The channel is closed
1415                Err(TryRecvError::Closed) => break,
1416                // Ignore lagging, we will catch up
1417                Err(TryRecvError::Lagged(..)) => {}
1418                // Can't be empty
1419                Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1420            }
1421        }
1422    }
1423}
1424
1425impl<'a, T> Recv<'a, T> {
1426    fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1427        Recv {
1428            receiver,
1429            waiter: UnsafeCell::new(Waiter {
1430                queued: AtomicBool::new(false),
1431                waker: None,
1432                pointers: linked_list::Pointers::new(),
1433                _p: PhantomPinned,
1434            }),
1435        }
1436    }
1437
1438    /// A custom `project` implementation is used in place of `pin-project-lite`
1439    /// as a custom drop implementation is needed.
1440    fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1441        unsafe {
1442            // Safety: Receiver is Unpin
1443            is_unpin::<&mut Receiver<T>>();
1444
1445            let me = self.get_unchecked_mut();
1446            (me.receiver, &me.waiter)
1447        }
1448    }
1449}
1450
1451impl<'a, T> Future for Recv<'a, T>
1452where
1453    T: Clone,
1454{
1455    type Output = Result<T, RecvError>;
1456
1457    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1458        ready!(crate::trace::trace_leaf(cx));
1459
1460        let (receiver, waiter) = self.project();
1461
1462        let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1463            Ok(value) => value,
1464            Err(TryRecvError::Empty) => return Poll::Pending,
1465            Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1466            Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1467        };
1468
1469        Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1470    }
1471}
1472
1473impl<'a, T> Drop for Recv<'a, T> {
1474    fn drop(&mut self) {
1475        // Safety: `waiter.queued` is atomic.
1476        // Acquire ordering is required to synchronize with
1477        // `Shared::notify_rx` before we drop the object.
1478        let queued = self
1479            .waiter
1480            .with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
1481
1482        // If the waiter is queued, we need to unlink it from the waiters list.
1483        // If not, no further synchronization is required, since the waiter
1484        // is not in the list and, as such, is not shared with any other threads.
1485        if queued {
1486            // Acquire the tail lock. This is required for safety before accessing
1487            // the waiter node.
1488            let mut tail = self.receiver.shared.tail.lock();
1489
1490            // Safety: tail lock is held.
1491            // `Relaxed` order suffices because we hold the tail lock.
1492            let queued = self
1493                .waiter
1494                .with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
1495
1496            if queued {
1497                // Remove the node
1498                //
1499                // safety: tail lock is held and the wait node is verified to be in
1500                // the list.
1501                unsafe {
1502                    self.waiter.with_mut(|ptr| {
1503                        tail.waiters.remove((&mut *ptr).into());
1504                    });
1505                }
1506            }
1507        }
1508    }
1509}
1510
1511/// # Safety
1512///
1513/// `Waiter` is forced to be !Unpin.
1514unsafe impl linked_list::Link for Waiter {
1515    type Handle = NonNull<Waiter>;
1516    type Target = Waiter;
1517
1518    fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1519        *handle
1520    }
1521
1522    unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1523        ptr
1524    }
1525
1526    unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1527        Waiter::addr_of_pointers(target)
1528    }
1529}
1530
1531impl<T> fmt::Debug for Sender<T> {
1532    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1533        write!(fmt, "broadcast::Sender")
1534    }
1535}
1536
1537impl<T> fmt::Debug for Receiver<T> {
1538    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1539        write!(fmt, "broadcast::Receiver")
1540    }
1541}
1542
1543impl<'a, T> RecvGuard<'a, T> {
1544    fn clone_value(&self) -> Option<T>
1545    where
1546        T: Clone,
1547    {
1548        self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
1549    }
1550}
1551
1552impl<'a, T> Drop for RecvGuard<'a, T> {
1553    fn drop(&mut self) {
1554        // Decrement the remaining counter
1555        if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1556            // Safety: Last receiver, drop the value
1557            self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
1558        }
1559    }
1560}
1561
1562fn is_unpin<T: Unpin>() {}
1563
1564#[cfg(not(loom))]
1565#[cfg(test)]
1566mod tests {
1567    use super::*;
1568
1569    #[test]
1570    fn receiver_count_on_sender_constructor() {
1571        let sender = Sender::<i32>::new(16);
1572        assert_eq!(sender.receiver_count(), 0);
1573
1574        let rx_1 = sender.subscribe();
1575        assert_eq!(sender.receiver_count(), 1);
1576
1577        let rx_2 = rx_1.resubscribe();
1578        assert_eq!(sender.receiver_count(), 2);
1579
1580        let rx_3 = sender.subscribe();
1581        assert_eq!(sender.receiver_count(), 3);
1582
1583        drop(rx_3);
1584        drop(rx_1);
1585        assert_eq!(sender.receiver_count(), 1);
1586
1587        drop(rx_2);
1588        assert_eq!(sender.receiver_count(), 0);
1589    }
1590
1591    #[cfg(not(loom))]
1592    #[test]
1593    fn receiver_count_on_channel_constructor() {
1594        let (sender, rx) = channel::<i32>(16);
1595        assert_eq!(sender.receiver_count(), 1);
1596
1597        let _rx_2 = rx.resubscribe();
1598        assert_eq!(sender.receiver_count(), 2);
1599    }
1600}