1use core::cell::RefCell;
4use core::future::{poll_fn, Future};
5use core::marker::PhantomData;
6use core::ops::{Deref, DerefMut};
7use core::task::{Context, Poll};
8
9use crate::blocking_mutex::raw::RawMutex;
10use crate::blocking_mutex::Mutex;
11use crate::waitqueue::MultiWakerRegistration;
12
13pub struct Watch<M: RawMutex, T: Clone, const N: usize> {
69    mutex: Mutex<M, RefCell<WatchState<T, N>>>,
70}
71
72struct WatchState<T: Clone, const N: usize> {
73    data: Option<T>,
74    current_id: u64,
75    wakers: MultiWakerRegistration<N>,
76    receiver_count: usize,
77}
78
79trait SealedWatchBehavior<T> {
80    fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
82
83    fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
86
87    fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
89
90    fn try_changed(&self, id: &mut u64) -> Option<T>;
92
93    fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
96
97    fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
100
101    fn drop_receiver(&self);
105
106    fn clear(&self);
108
109    fn send(&self, val: T);
111
112    fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>));
115
116    fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool);
119}
120
121#[allow(private_bounds)]
123pub trait WatchBehavior<T: Clone>: SealedWatchBehavior<T> {
124    fn try_get(&self, id: Option<&mut u64>) -> Option<T>;
126
127    fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
130
131    fn contains_value(&self) -> bool;
133}
134
135impl<M: RawMutex, T: Clone, const N: usize> SealedWatchBehavior<T> for Watch<M, T, N> {
136    fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
137        self.mutex.lock(|state| {
138            let mut s = state.borrow_mut();
139            match &s.data {
140                Some(data) => {
141                    *id = s.current_id;
142                    Poll::Ready(data.clone())
143                }
144                None => {
145                    s.wakers.register(cx.waker());
146                    Poll::Pending
147                }
148            }
149        })
150    }
151
152    fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
153        self.mutex.lock(|state| {
154            let mut s = state.borrow_mut();
155            match s.data {
156                Some(ref data) if f(data) => {
157                    *id = s.current_id;
158                    Poll::Ready(data.clone())
159                }
160                _ => {
161                    s.wakers.register(cx.waker());
162                    Poll::Pending
163                }
164            }
165        })
166    }
167
168    fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
169        self.mutex.lock(|state| {
170            let mut s = state.borrow_mut();
171            match (&s.data, s.current_id > *id) {
172                (Some(data), true) => {
173                    *id = s.current_id;
174                    Poll::Ready(data.clone())
175                }
176                _ => {
177                    s.wakers.register(cx.waker());
178                    Poll::Pending
179                }
180            }
181        })
182    }
183
184    fn try_changed(&self, id: &mut u64) -> Option<T> {
185        self.mutex.lock(|state| {
186            let s = state.borrow();
187            match s.current_id > *id {
188                true => {
189                    *id = s.current_id;
190                    s.data.clone()
191                }
192                false => None,
193            }
194        })
195    }
196
197    fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
198        self.mutex.lock(|state| {
199            let mut s = state.borrow_mut();
200            match (&s.data, s.current_id > *id) {
201                (Some(data), true) if f(data) => {
202                    *id = s.current_id;
203                    Poll::Ready(data.clone())
204                }
205                _ => {
206                    s.wakers.register(cx.waker());
207                    Poll::Pending
208                }
209            }
210        })
211    }
212
213    fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
214        self.mutex.lock(|state| {
215            let s = state.borrow();
216            match (&s.data, s.current_id > *id) {
217                (Some(data), true) if f(data) => {
218                    *id = s.current_id;
219                    s.data.clone()
220                }
221                _ => None,
222            }
223        })
224    }
225
226    fn drop_receiver(&self) {
227        self.mutex.lock(|state| {
228            let mut s = state.borrow_mut();
229            s.receiver_count -= 1;
230        })
231    }
232
233    fn clear(&self) {
234        self.mutex.lock(|state| {
235            let mut s = state.borrow_mut();
236            s.data = None;
237        })
238    }
239
240    fn send(&self, val: T) {
241        self.mutex.lock(|state| {
242            let mut s = state.borrow_mut();
243            s.data = Some(val);
244            s.current_id += 1;
245            s.wakers.wake();
246        })
247    }
248
249    fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)) {
250        self.mutex.lock(|state| {
251            let mut s = state.borrow_mut();
252            f(&mut s.data);
253            s.current_id += 1;
254            s.wakers.wake();
255        })
256    }
257
258    fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool) {
259        self.mutex.lock(|state| {
260            let mut s = state.borrow_mut();
261            if f(&mut s.data) {
262                s.current_id += 1;
263                s.wakers.wake();
264            }
265        })
266    }
267}
268
269impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> {
270    fn try_get(&self, id: Option<&mut u64>) -> Option<T> {
271        self.mutex.lock(|state| {
272            let s = state.borrow();
273            if let Some(id) = id {
274                *id = s.current_id;
275            }
276            s.data.clone()
277        })
278    }
279
280    fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
281        self.mutex.lock(|state| {
282            let s = state.borrow();
283            match s.data {
284                Some(ref data) if f(data) => {
285                    if let Some(id) = id {
286                        *id = s.current_id;
287                    }
288                    Some(data.clone())
289                }
290                _ => None,
291            }
292        })
293    }
294
295    fn contains_value(&self) -> bool {
296        self.mutex.lock(|state| state.borrow().data.is_some())
297    }
298}
299
300impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
301    pub const fn new() -> Self {
303        Self {
304            mutex: Mutex::new(RefCell::new(WatchState {
305                data: None,
306                current_id: 0,
307                wakers: MultiWakerRegistration::new(),
308                receiver_count: 0,
309            })),
310        }
311    }
312
313    pub const fn new_with(data: T) -> Self {
315        Self {
316            mutex: Mutex::new(RefCell::new(WatchState {
317                data: Some(data),
318                current_id: 0,
319                wakers: MultiWakerRegistration::new(),
320                receiver_count: 0,
321            })),
322        }
323    }
324
325    pub fn sender(&self) -> Sender<'_, M, T, N> {
327        Sender(Snd::new(self))
328    }
329
330    pub fn dyn_sender(&self) -> DynSender<'_, T> {
332        DynSender(Snd::new(self))
333    }
334
335    pub fn receiver(&self) -> Option<Receiver<'_, M, T, N>> {
338        self.mutex.lock(|state| {
339            let mut s = state.borrow_mut();
340            if s.receiver_count < N {
341                s.receiver_count += 1;
342                Some(Receiver(Rcv::new(self, 0)))
343            } else {
344                None
345            }
346        })
347    }
348
349    pub fn dyn_receiver(&self) -> Option<DynReceiver<'_, T>> {
352        self.mutex.lock(|state| {
353            let mut s = state.borrow_mut();
354            if s.receiver_count < N {
355                s.receiver_count += 1;
356                Some(DynReceiver(Rcv::new(self, 0)))
357            } else {
358                None
359            }
360        })
361    }
362
363    pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> {
365        AnonReceiver(AnonRcv::new(self, 0))
366    }
367
368    pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> {
370        DynAnonReceiver(AnonRcv::new(self, 0))
371    }
372
373    pub fn get_msg_id(&self) -> u64 {
377        self.mutex.lock(|state| state.borrow().current_id)
378    }
379
380    pub fn try_get(&self) -> Option<T> {
382        WatchBehavior::try_get(self, None)
383    }
384
385    pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
387    where
388        F: Fn(&T) -> bool,
389    {
390        WatchBehavior::try_get_and(self, None, &mut f)
391    }
392}
393
394pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
396    watch: &'a W,
397    _phantom: PhantomData<T>,
398}
399
400impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Clone for Snd<'a, T, W> {
401    fn clone(&self) -> Self {
402        Self {
403            watch: self.watch,
404            _phantom: PhantomData,
405        }
406    }
407}
408
409impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
410    fn new(watch: &'a W) -> Self {
412        Self {
413            watch,
414            _phantom: PhantomData,
415        }
416    }
417
418    pub fn send(&self, val: T) {
420        self.watch.send(val)
421    }
422
423    pub fn clear(&self) {
426        self.watch.clear()
427    }
428
429    pub fn try_get(&self) -> Option<T> {
431        self.watch.try_get(None)
432    }
433
434    pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
437    where
438        F: Fn(&T) -> bool,
439    {
440        self.watch.try_get_and(None, &mut f)
441    }
442
443    pub fn contains_value(&self) -> bool {
445        self.watch.contains_value()
446    }
447
448    pub fn send_modify<F>(&self, mut f: F)
450    where
451        F: Fn(&mut Option<T>),
452    {
453        self.watch.send_modify(&mut f)
454    }
455
456    pub fn send_if_modified<F>(&self, mut f: F)
459    where
460        F: Fn(&mut Option<T>) -> bool,
461    {
462        self.watch.send_if_modified(&mut f)
463    }
464}
465
466pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>);
471
472impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> {
473    fn clone(&self) -> Self {
474        Self(self.0.clone())
475    }
476}
477
478impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> {
479    pub fn as_dyn(self) -> DynSender<'a, T> {
481        DynSender(Snd::new(self.watch))
482    }
483}
484
485impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynSender<'a, T>> for Sender<'a, M, T, N> {
486    fn into(self) -> DynSender<'a, T> {
487        self.as_dyn()
488    }
489}
490
491impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> {
492    type Target = Snd<'a, T, Watch<M, T, N>>;
493
494    fn deref(&self) -> &Self::Target {
495        &self.0
496    }
497}
498
499impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> {
500    fn deref_mut(&mut self) -> &mut Self::Target {
501        &mut self.0
502    }
503}
504
505pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior<T> + 'a>);
509
510impl<'a, T: Clone> Clone for DynSender<'a, T> {
511    fn clone(&self) -> Self {
512        Self(self.0.clone())
513    }
514}
515
516impl<'a, T: Clone> Deref for DynSender<'a, T> {
517    type Target = Snd<'a, T, dyn WatchBehavior<T> + 'a>;
518
519    fn deref(&self) -> &Self::Target {
520        &self.0
521    }
522}
523
524impl<'a, T: Clone> DerefMut for DynSender<'a, T> {
525    fn deref_mut(&mut self) -> &mut Self::Target {
526        &mut self.0
527    }
528}
529
530pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
532    watch: &'a W,
533    at_id: u64,
534    _phantom: PhantomData<T>,
535}
536
537impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
538    fn new(watch: &'a W, at_id: u64) -> Self {
540        Self {
541            watch,
542            at_id,
543            _phantom: PhantomData,
544        }
545    }
546
547    pub fn get(&mut self) -> impl Future<Output = T> + '_ {
551        poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx))
552    }
553
554    pub fn try_get(&mut self) -> Option<T> {
556        self.watch.try_get(Some(&mut self.at_id))
557    }
558
559    pub async fn get_and<F>(&mut self, mut f: F) -> T
564    where
565        F: Fn(&T) -> bool,
566    {
567        poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await
568    }
569
570    pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
573    where
574        F: Fn(&T) -> bool,
575    {
576        self.watch.try_get_and(Some(&mut self.at_id), &mut f)
577    }
578
579    pub async fn changed(&mut self) -> T {
583        poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await
584    }
585
586    pub fn try_changed(&mut self) -> Option<T> {
588        self.watch.try_changed(&mut self.at_id)
589    }
590
591    pub async fn changed_and<F>(&mut self, mut f: F) -> T
596    where
597        F: Fn(&T) -> bool,
598    {
599        poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await
600    }
601
602    pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
605    where
606        F: Fn(&T) -> bool,
607    {
608        self.watch.try_changed_and(&mut self.at_id, &mut f)
609    }
610
611    pub fn contains_value(&self) -> bool {
614        self.watch.contains_value()
615    }
616}
617
618impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> {
619    fn drop(&mut self) {
620        self.watch.drop_receiver();
621    }
622}
623
624pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
626    watch: &'a W,
627    at_id: u64,
628    _phantom: PhantomData<T>,
629}
630
631impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> AnonRcv<'a, T, W> {
632    fn new(watch: &'a W, at_id: u64) -> Self {
634        Self {
635            watch,
636            at_id,
637            _phantom: PhantomData,
638        }
639    }
640
641    pub fn try_get(&mut self) -> Option<T> {
643        self.watch.try_get(Some(&mut self.at_id))
644    }
645
646    pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
649    where
650        F: Fn(&T) -> bool,
651    {
652        self.watch.try_get_and(Some(&mut self.at_id), &mut f)
653    }
654
655    pub fn try_changed(&mut self) -> Option<T> {
657        self.watch.try_changed(&mut self.at_id)
658    }
659
660    pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
663    where
664        F: Fn(&T) -> bool,
665    {
666        self.watch.try_changed_and(&mut self.at_id, &mut f)
667    }
668
669    pub fn contains_value(&self) -> bool {
672        self.watch.contains_value()
673    }
674}
675
676pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>);
678
679impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> {
680    pub fn as_dyn(self) -> DynReceiver<'a, T> {
682        let rcv = DynReceiver(Rcv::new(self.0.watch, self.at_id));
683        core::mem::forget(self); rcv
685    }
686}
687
688impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynReceiver<'a, T>> for Receiver<'a, M, T, N> {
689    fn into(self) -> DynReceiver<'a, T> {
690        self.as_dyn()
691    }
692}
693
694impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> {
695    type Target = Rcv<'a, T, Watch<M, T, N>>;
696
697    fn deref(&self) -> &Self::Target {
698        &self.0
699    }
700}
701
702impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> {
703    fn deref_mut(&mut self) -> &mut Self::Target {
704        &mut self.0
705    }
706}
707
708pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>);
713
714impl<'a, T: Clone> Deref for DynReceiver<'a, T> {
715    type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>;
716
717    fn deref(&self) -> &Self::Target {
718        &self.0
719    }
720}
721
722impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> {
723    fn deref_mut(&mut self) -> &mut Self::Target {
724        &mut self.0
725    }
726}
727
728pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>);
730
731impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> {
732    pub fn as_dyn(self) -> DynAnonReceiver<'a, T> {
734        let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id));
735        core::mem::forget(self); rcv
737    }
738}
739
740impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynAnonReceiver<'a, T>> for AnonReceiver<'a, M, T, N> {
741    fn into(self) -> DynAnonReceiver<'a, T> {
742        self.as_dyn()
743    }
744}
745
746impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> {
747    type Target = AnonRcv<'a, T, Watch<M, T, N>>;
748
749    fn deref(&self) -> &Self::Target {
750        &self.0
751    }
752}
753
754impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> {
755    fn deref_mut(&mut self) -> &mut Self::Target {
756        &mut self.0
757    }
758}
759
760pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>);
765
766impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> {
767    type Target = AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>;
768
769    fn deref(&self) -> &Self::Target {
770        &self.0
771    }
772}
773
774impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> {
775    fn deref_mut(&mut self) -> &mut Self::Target {
776        &mut self.0
777    }
778}
779
780#[cfg(test)]
781mod tests {
782    use futures_executor::block_on;
783
784    use super::Watch;
785    use crate::blocking_mutex::raw::CriticalSectionRawMutex;
786
787    #[test]
788    fn multiple_sends() {
789        let f = async {
790            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
791
792            let mut rcv = WATCH.receiver().unwrap();
794            let snd = WATCH.sender();
795
796            assert_eq!(rcv.try_changed(), None);
798
799            snd.send(10);
801            assert_eq!(rcv.changed().await, 10);
802
803            snd.send(20);
805            assert_eq!(rcv.try_changed(), Some(20));
806
807            assert_eq!(rcv.try_changed(), None);
809        };
810        block_on(f);
811    }
812
813    #[test]
814    fn all_try_get() {
815        let f = async {
816            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
817
818            let mut rcv = WATCH.receiver().unwrap();
820            let snd = WATCH.sender();
821
822            assert_eq!(WATCH.try_get(), None);
824            assert_eq!(rcv.try_get(), None);
825            assert_eq!(snd.try_get(), None);
826
827            snd.send(10);
829            assert_eq!(WATCH.try_get(), Some(10));
830            assert_eq!(rcv.try_get(), Some(10));
831            assert_eq!(snd.try_get(), Some(10));
832
833            assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10));
834            assert_eq!(rcv.try_get_and(|x| x > &5), Some(10));
835            assert_eq!(snd.try_get_and(|x| x > &5), Some(10));
836
837            assert_eq!(WATCH.try_get_and(|x| x < &5), None);
838            assert_eq!(rcv.try_get_and(|x| x < &5), None);
839            assert_eq!(snd.try_get_and(|x| x < &5), None);
840        };
841        block_on(f);
842    }
843
844    #[test]
845    fn once_lock_like() {
846        let f = async {
847            static CONFIG0: u8 = 10;
848            static CONFIG1: u8 = 20;
849
850            static WATCH: Watch<CriticalSectionRawMutex, &'static u8, 1> = Watch::new();
851
852            let mut rcv = WATCH.receiver().unwrap();
854            let snd = WATCH.sender();
855
856            assert_eq!(rcv.try_changed(), None);
858
859            snd.send(&CONFIG0);
861            let rcv0 = rcv.changed().await;
862            assert_eq!(rcv0, &10);
863
864            snd.send(&CONFIG1);
866            let rcv1 = rcv.try_changed();
867            assert_eq!(rcv1, Some(&20));
868
869            assert_eq!(rcv.try_changed(), None);
871
872            assert_eq!(rcv0, &CONFIG0);
874            assert_eq!(rcv1, Some(&CONFIG1));
875        };
876        block_on(f);
877    }
878
879    #[test]
880    fn sender_modify() {
881        let f = async {
882            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
883
884            let mut rcv = WATCH.receiver().unwrap();
886            let snd = WATCH.sender();
887
888            snd.send(10);
890            assert_eq!(rcv.try_changed(), Some(10));
891
892            snd.send_modify(|opt| {
894                if let Some(inner) = opt {
895                    *inner += 5;
896                }
897            });
898
899            assert_eq!(rcv.try_changed(), Some(15));
901            assert_eq!(rcv.try_changed(), None);
902        };
903        block_on(f);
904    }
905
906    #[test]
907    fn predicate_fn() {
908        let f = async {
909            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
910
911            let mut rcv = WATCH.receiver().unwrap();
913            let snd = WATCH.sender();
914
915            snd.send(15);
916            assert_eq!(rcv.try_get_and(|x| x > &5), Some(15));
917            assert_eq!(rcv.try_get_and(|x| x < &5), None);
918            assert!(rcv.try_changed().is_none());
919
920            snd.send(20);
921            assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20));
922            assert_eq!(rcv.try_changed_and(|x| x > &5), None);
923
924            snd.send(25);
925            assert_eq!(rcv.try_changed_and(|x| x < &5), None);
926            assert_eq!(rcv.try_changed(), Some(25));
927
928            snd.send(30);
929            assert_eq!(rcv.changed_and(|x| x > &5).await, 30);
930            assert_eq!(rcv.get_and(|x| x > &5).await, 30);
931        };
932        block_on(f);
933    }
934
935    #[test]
936    fn receive_after_create() {
937        let f = async {
938            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
939
940            let snd = WATCH.sender();
942            snd.send(10);
943
944            let mut rcv = WATCH.receiver().unwrap();
946            assert_eq!(rcv.try_changed(), Some(10));
947        };
948        block_on(f);
949    }
950
951    #[test]
952    fn max_receivers_drop() {
953        let f = async {
954            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
955
956            let rcv0 = WATCH.receiver();
958            let rcv1 = WATCH.receiver();
959            let rcv2 = WATCH.receiver();
960
961            assert!(rcv0.is_some());
963            assert!(rcv1.is_some());
964            assert!(rcv2.is_none());
965
966            drop(rcv0);
968
969            let rcv3 = WATCH.receiver();
971            assert!(rcv3.is_some());
972        };
973        block_on(f);
974    }
975
976    #[test]
977    fn multiple_receivers() {
978        let f = async {
979            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
980
981            let mut rcv0 = WATCH.receiver().unwrap();
983            let mut rcv1 = WATCH.anon_receiver();
984            let snd = WATCH.sender();
985
986            assert_eq!(rcv0.try_changed(), None);
988            assert_eq!(rcv1.try_changed(), None);
989
990            snd.send(0);
992
993            assert_eq!(rcv0.try_changed(), Some(0));
995            assert_eq!(rcv1.try_changed(), Some(0));
996        };
997        block_on(f);
998    }
999
1000    #[test]
1001    fn clone_senders() {
1002        let f = async {
1003            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
1005            let snd0 = WATCH.sender();
1006            let snd1 = snd0.clone();
1007
1008            let mut rcv = WATCH.receiver().unwrap().as_dyn();
1010
1011            snd0.send(10);
1013            assert_eq!(rcv.try_changed(), Some(10));
1014
1015            snd1.send(20);
1017            assert_eq!(rcv.try_changed(), Some(20));
1018        };
1019        block_on(f);
1020    }
1021
1022    #[test]
1023    fn use_dynamics() {
1024        let f = async {
1025            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1026
1027            let mut anon_rcv = WATCH.dyn_anon_receiver();
1029            let mut dyn_rcv = WATCH.dyn_receiver().unwrap();
1030            let dyn_snd = WATCH.dyn_sender();
1031
1032            dyn_snd.send(10);
1034
1035            assert_eq!(anon_rcv.try_changed(), Some(10));
1037            assert_eq!(dyn_rcv.try_changed(), Some(10));
1038            assert_eq!(dyn_rcv.try_changed(), None);
1039        };
1040        block_on(f);
1041    }
1042
1043    #[test]
1044    fn convert_to_dyn() {
1045        let f = async {
1046            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1047
1048            let anon_rcv = WATCH.anon_receiver();
1050            let rcv = WATCH.receiver().unwrap();
1051            let snd = WATCH.sender();
1052
1053            let mut dyn_anon_rcv = anon_rcv.as_dyn();
1055            let mut dyn_rcv = rcv.as_dyn();
1056            let dyn_snd = snd.as_dyn();
1057
1058            dyn_snd.send(10);
1060
1061            assert_eq!(dyn_anon_rcv.try_changed(), Some(10));
1063            assert_eq!(dyn_rcv.try_changed(), Some(10));
1064            assert_eq!(dyn_rcv.try_changed(), None);
1065        };
1066        block_on(f);
1067    }
1068
1069    #[test]
1070    fn dynamic_receiver_count() {
1071        let f = async {
1072            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1073
1074            let rcv0 = WATCH.receiver();
1076            let rcv1 = WATCH.receiver();
1077            let rcv2 = WATCH.receiver();
1078
1079            assert!(rcv0.is_some());
1081            assert!(rcv1.is_some());
1082            assert!(rcv2.is_none());
1083
1084            let dyn_rcv0 = rcv0.unwrap().as_dyn();
1086
1087            drop(dyn_rcv0);
1089
1090            let rcv3 = WATCH.receiver();
1092            let rcv4 = WATCH.receiver();
1093            assert!(rcv3.is_some());
1094            assert!(rcv4.is_none());
1095        };
1096        block_on(f);
1097    }
1098
1099    #[test]
1100    fn contains_value() {
1101        let f = async {
1102            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1103
1104            let rcv = WATCH.receiver().unwrap();
1106            let snd = WATCH.sender();
1107
1108            assert_eq!(rcv.contains_value(), false);
1110            assert_eq!(snd.contains_value(), false);
1111
1112            snd.send(10);
1114
1115            assert_eq!(rcv.contains_value(), true);
1117            assert_eq!(snd.contains_value(), true);
1118        };
1119        block_on(f);
1120    }
1121}