1use core::future::Future;
4use core::marker::PhantomData;
5use core::ops::{Deref, DerefMut};
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use super::{PubSubBehavior, PubSubChannel, WaitResult};
10use crate::blocking_mutex::raw::RawMutex;
11
12pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
14    next_message_id: u64,
16    channel: &'a PSB,
18    _phantom: PhantomData<T>,
19}
20
21impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
22    pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self {
23        Self {
24            next_message_id,
25            channel,
26            _phantom: Default::default(),
27        }
28    }
29
30    pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> {
32        SubscriberWaitFuture { subscriber: self }
33    }
34
35    pub async fn next_message_pure(&mut self) -> T {
37        loop {
38            match self.next_message().await {
39                WaitResult::Lagged(_) => continue,
40                WaitResult::Message(message) => break message,
41            }
42        }
43    }
44
45    pub fn try_next_message(&mut self) -> Option<WaitResult<T>> {
49        match self.channel.get_message_with_context(&mut self.next_message_id, None) {
50            Poll::Ready(result) => Some(result),
51            Poll::Pending => None,
52        }
53    }
54
55    pub fn try_next_message_pure(&mut self) -> Option<T> {
59        loop {
60            match self.try_next_message() {
61                Some(WaitResult::Lagged(_)) => continue,
62                Some(WaitResult::Message(message)) => break Some(message),
63                None => break None,
64            }
65        }
66    }
67
68    pub fn available(&self) -> u64 {
71        self.channel.available(self.next_message_id)
72    }
73
74    pub fn capacity(&self) -> usize {
76        self.channel.capacity()
77    }
78
79    pub fn free_capacity(&self) -> usize {
83        self.channel.free_capacity()
84    }
85
86    pub fn clear(&self) {
88        self.channel.clear();
89    }
90
91    pub fn len(&self) -> usize {
94        self.channel.len()
95    }
96
97    pub fn is_empty(&self) -> bool {
99        self.channel.is_empty()
100    }
101
102    pub fn is_full(&self) -> bool {
104        self.channel.is_full()
105    }
106}
107
108impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
109    fn drop(&mut self) {
110        self.channel.unregister_subscriber(self.next_message_id)
111    }
112}
113
114impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {}
115
116impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> {
119    type Item = T;
120
121    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
122        match self
123            .channel
124            .get_message_with_context(&mut self.next_message_id, Some(cx))
125        {
126            Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)),
127            Poll::Ready(WaitResult::Lagged(_)) => {
128                cx.waker().wake_by_ref();
129                Poll::Pending
130            }
131            Poll::Pending => Poll::Pending,
132        }
133    }
134}
135
136pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior<T> + 'a, T>);
138
139impl<'a, T: Clone> Deref for DynSubscriber<'a, T> {
140    type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>;
141
142    fn deref(&self) -> &Self::Target {
143        &self.0
144    }
145}
146
147impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> {
148    fn deref_mut(&mut self) -> &mut Self::Target {
149        &mut self.0
150    }
151}
152
153pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
155    pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
156);
157
158impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
159    for Subscriber<'a, M, T, CAP, SUBS, PUBS>
160{
161    type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
162
163    fn deref(&self) -> &Self::Target {
164        &self.0
165    }
166}
167
168impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
169    for Subscriber<'a, M, T, CAP, SUBS, PUBS>
170{
171    fn deref_mut(&mut self) -> &mut Self::Target {
172        &mut self.0
173    }
174}
175
176#[must_use = "futures do nothing unless you `.await` or poll them"]
178pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
179    subscriber: &'s mut Sub<'a, PSB, T>,
180}
181
182impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> {
183    type Output = WaitResult<T>;
184
185    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
186        self.subscriber
187            .channel
188            .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx))
189    }
190}
191
192impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {}