embassy_sync/
zerocopy_channel.rs1use core::cell::RefCell;
18use core::future::{poll_fn, Future};
19use core::marker::PhantomData;
20use core::task::{Context, Poll};
21
22use crate::blocking_mutex::raw::RawMutex;
23use crate::blocking_mutex::Mutex;
24use crate::waitqueue::WakerRegistration;
25
26pub struct Channel<'a, M: RawMutex, T> {
38    buf: *mut T,
39    phantom: PhantomData<&'a mut T>,
40    state: Mutex<M, RefCell<State>>,
41}
42
43impl<'a, M: RawMutex, T> Channel<'a, M, T> {
44    pub fn new(buf: &'a mut [T]) -> Self {
49        let len = buf.len();
50        assert!(len != 0);
51
52        Self {
53            buf: buf.as_mut_ptr(),
54            phantom: PhantomData,
55            state: Mutex::new(RefCell::new(State {
56                capacity: len,
57                front: 0,
58                back: 0,
59                full: false,
60                send_waker: WakerRegistration::new(),
61                receive_waker: WakerRegistration::new(),
62            })),
63        }
64    }
65
66    pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
71        (Sender { channel: self }, Receiver { channel: self })
72    }
73
74    pub fn clear(&mut self) {
76        self.state.lock(|s| {
77            s.borrow_mut().clear();
78        });
79    }
80
81    pub fn len(&self) -> usize {
83        self.state.lock(|s| s.borrow().len())
84    }
85
86    pub fn is_empty(&self) -> bool {
88        self.state.lock(|s| s.borrow().is_empty())
89    }
90
91    pub fn is_full(&self) -> bool {
93        self.state.lock(|s| s.borrow().is_full())
94    }
95}
96
97pub struct Sender<'a, M: RawMutex, T> {
99    channel: &'a Channel<'a, M, T>,
100}
101
102impl<'a, M: RawMutex, T> Sender<'a, M, T> {
103    pub fn borrow(&mut self) -> Sender<'_, M, T> {
105        Sender { channel: self.channel }
106    }
107
108    pub fn try_send(&mut self) -> Option<&mut T> {
110        self.channel.state.lock(|s| {
111            let s = &mut *s.borrow_mut();
112            match s.push_index() {
113                Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
114                None => None,
115            }
116        })
117    }
118
119    pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
121        self.channel.state.lock(|s| {
122            let s = &mut *s.borrow_mut();
123            match s.push_index() {
124                Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
125                None => {
126                    s.receive_waker.register(cx.waker());
127                    Poll::Pending
128                }
129            }
130        })
131    }
132
133    pub fn send(&mut self) -> impl Future<Output = &mut T> {
135        poll_fn(|cx| {
136            self.channel.state.lock(|s| {
137                let s = &mut *s.borrow_mut();
138                match s.push_index() {
139                    Some(i) => {
140                        let r = unsafe { &mut *self.channel.buf.add(i) };
141                        Poll::Ready(r)
142                    }
143                    None => {
144                        s.receive_waker.register(cx.waker());
145                        Poll::Pending
146                    }
147                }
148            })
149        })
150    }
151
152    pub fn send_done(&mut self) {
154        self.channel.state.lock(|s| s.borrow_mut().push_done())
155    }
156
157    pub fn clear(&mut self) {
159        self.channel.state.lock(|s| {
160            s.borrow_mut().clear();
161        });
162    }
163
164    pub fn len(&self) -> usize {
166        self.channel.state.lock(|s| s.borrow().len())
167    }
168
169    pub fn is_empty(&self) -> bool {
171        self.channel.state.lock(|s| s.borrow().is_empty())
172    }
173
174    pub fn is_full(&self) -> bool {
176        self.channel.state.lock(|s| s.borrow().is_full())
177    }
178}
179
180pub struct Receiver<'a, M: RawMutex, T> {
182    channel: &'a Channel<'a, M, T>,
183}
184
185impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
186    pub fn borrow(&mut self) -> Receiver<'_, M, T> {
188        Receiver { channel: self.channel }
189    }
190
191    pub fn try_receive(&mut self) -> Option<&mut T> {
193        self.channel.state.lock(|s| {
194            let s = &mut *s.borrow_mut();
195            match s.pop_index() {
196                Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
197                None => None,
198            }
199        })
200    }
201
202    pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
204        self.channel.state.lock(|s| {
205            let s = &mut *s.borrow_mut();
206            match s.pop_index() {
207                Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
208                None => {
209                    s.send_waker.register(cx.waker());
210                    Poll::Pending
211                }
212            }
213        })
214    }
215
216    pub fn receive(&mut self) -> impl Future<Output = &mut T> {
218        poll_fn(|cx| {
219            self.channel.state.lock(|s| {
220                let s = &mut *s.borrow_mut();
221                match s.pop_index() {
222                    Some(i) => {
223                        let r = unsafe { &mut *self.channel.buf.add(i) };
224                        Poll::Ready(r)
225                    }
226                    None => {
227                        s.send_waker.register(cx.waker());
228                        Poll::Pending
229                    }
230                }
231            })
232        })
233    }
234
235    pub fn receive_done(&mut self) {
237        self.channel.state.lock(|s| s.borrow_mut().pop_done())
238    }
239
240    pub fn clear(&mut self) {
242        self.channel.state.lock(|s| {
243            s.borrow_mut().clear();
244        });
245    }
246
247    pub fn len(&self) -> usize {
249        self.channel.state.lock(|s| s.borrow().len())
250    }
251
252    pub fn is_empty(&self) -> bool {
254        self.channel.state.lock(|s| s.borrow().is_empty())
255    }
256
257    pub fn is_full(&self) -> bool {
259        self.channel.state.lock(|s| s.borrow().is_full())
260    }
261}
262
263struct State {
264    capacity: usize,
266
267    front: usize,
269    back: usize,
271
272    full: bool,
275
276    send_waker: WakerRegistration,
277    receive_waker: WakerRegistration,
278}
279
280impl State {
281    fn increment(&self, i: usize) -> usize {
282        if i + 1 == self.capacity {
283            0
284        } else {
285            i + 1
286        }
287    }
288
289    fn clear(&mut self) {
290        self.front = 0;
291        self.back = 0;
292        self.full = false;
293    }
294
295    fn len(&self) -> usize {
296        if !self.full {
297            if self.back >= self.front {
298                self.back - self.front
299            } else {
300                self.capacity + self.back - self.front
301            }
302        } else {
303            self.capacity
304        }
305    }
306
307    fn is_full(&self) -> bool {
308        self.full
309    }
310
311    fn is_empty(&self) -> bool {
312        self.front == self.back && !self.full
313    }
314
315    fn push_index(&mut self) -> Option<usize> {
316        match self.is_full() {
317            true => None,
318            false => Some(self.back),
319        }
320    }
321
322    fn push_done(&mut self) {
323        assert!(!self.is_full());
324        self.back = self.increment(self.back);
325        if self.back == self.front {
326            self.full = true;
327        }
328        self.send_waker.wake();
329    }
330
331    fn pop_index(&mut self) -> Option<usize> {
332        match self.is_empty() {
333            true => None,
334            false => Some(self.front),
335        }
336    }
337
338    fn pop_done(&mut self) {
339        assert!(!self.is_empty());
340        self.front = self.increment(self.front);
341        self.full = false;
342        self.receive_waker.wake();
343    }
344}