1//! A zero-copy queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by a producer (sender) and a
4//! consumer (receiver), i.e. it is an "SPSC channel".
5//!
6//! This queue takes a Mutex type so that various
7//! targets can be attained. For example, a ThreadModeMutex can be used
8//! for single-core Cortex-M targets where messages are only passed
9//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
10//! can also be used for single-core targets where messages are to be
11//! passed from exception mode e.g. out of an interrupt handler.
12//!
13//! This module provides a bounded channel that has a limit on the number of
14//! messages that it can store, and if this limit is reached, trying to send
15//! another message will result in an error being returned.
1617use core::cell::RefCell;
18use core::future::{poll_fn, Future};
19use core::marker::PhantomData;
20use core::task::{Context, Poll};
2122use crate::blocking_mutex::raw::RawMutex;
23use crate::blocking_mutex::Mutex;
24use crate::waitqueue::WakerRegistration;
2526/// A bounded zero-copy channel for communicating between asynchronous tasks
27/// with backpressure.
28///
29/// The channel will buffer up to the provided number of messages. Once the
30/// buffer is full, attempts to `send` new messages will wait until a message is
31/// received from the channel.
32///
33/// All data sent will become available in the same order as it was sent.
34///
35/// The channel requires a buffer of recyclable elements. Writing to the channel is done through
36/// an `&mut T`.
37pub struct Channel<'a, M: RawMutex, T> {
38 buf: *mut T,
39 phantom: PhantomData<&'a mut T>,
40 state: Mutex<M, RefCell<State>>,
41}
4243impl<'a, M: RawMutex, T> Channel<'a, M, T> {
44/// Initialize a new [`Channel`].
45 ///
46 /// The provided buffer will be used and reused by the channel's logic, and thus dictates the
47 /// channel's capacity.
48pub fn new(buf: &'a mut [T]) -> Self {
49let len = buf.len();
50assert!(len != 0);
5152Self {
53 buf: buf.as_mut_ptr(),
54 phantom: PhantomData,
55 state: Mutex::new(RefCell::new(State {
56 capacity: len,
57 front: 0,
58 back: 0,
59 full: false,
60 send_waker: WakerRegistration::new(),
61 receive_waker: WakerRegistration::new(),
62 })),
63 }
64 }
6566/// Creates a [`Sender`] and [`Receiver`] from an existing channel.
67 ///
68 /// Further Senders and Receivers can be created through [`Sender::borrow`] and
69 /// [`Receiver::borrow`] respectively.
70pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
71 (Sender { channel: self }, Receiver { channel: self })
72 }
7374/// Clears all elements in the channel.
75pub fn clear(&mut self) {
76self.state.lock(|s| {
77 s.borrow_mut().clear();
78 });
79 }
8081/// Returns the number of elements currently in the channel.
82pub fn len(&self) -> usize {
83self.state.lock(|s| s.borrow().len())
84 }
8586/// Returns whether the channel is empty.
87pub fn is_empty(&self) -> bool {
88self.state.lock(|s| s.borrow().is_empty())
89 }
9091/// Returns whether the channel is full.
92pub fn is_full(&self) -> bool {
93self.state.lock(|s| s.borrow().is_full())
94 }
95}
9697/// Send-only access to a [`Channel`].
98pub struct Sender<'a, M: RawMutex, T> {
99 channel: &'a Channel<'a, M, T>,
100}
101102impl<'a, M: RawMutex, T> Sender<'a, M, T> {
103/// Creates one further [`Sender`] over the same channel.
104pub fn borrow(&mut self) -> Sender<'_, M, T> {
105 Sender { channel: self.channel }
106 }
107108/// Attempts to send a value over the channel.
109pub fn try_send(&mut self) -> Option<&mut T> {
110self.channel.state.lock(|s| {
111let s = &mut *s.borrow_mut();
112match s.push_index() {
113Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
114None => None,
115 }
116 })
117 }
118119/// Attempts to send a value over the channel.
120pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
121self.channel.state.lock(|s| {
122let s = &mut *s.borrow_mut();
123match s.push_index() {
124Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
125None => {
126 s.receive_waker.register(cx.waker());
127 Poll::Pending
128 }
129 }
130 })
131 }
132133/// Asynchronously send a value over the channel.
134pub fn send(&mut self) -> impl Future<Output = &mut T> {
135 poll_fn(|cx| {
136self.channel.state.lock(|s| {
137let s = &mut *s.borrow_mut();
138match s.push_index() {
139Some(i) => {
140let r = unsafe { &mut *self.channel.buf.add(i) };
141 Poll::Ready(r)
142 }
143None => {
144 s.receive_waker.register(cx.waker());
145 Poll::Pending
146 }
147 }
148 })
149 })
150 }
151152/// Notify the channel that the sending of the value has been finalized.
153pub fn send_done(&mut self) {
154self.channel.state.lock(|s| s.borrow_mut().push_done())
155 }
156157/// Clears all elements in the channel.
158pub fn clear(&mut self) {
159self.channel.state.lock(|s| {
160 s.borrow_mut().clear();
161 });
162 }
163164/// Returns the number of elements currently in the channel.
165pub fn len(&self) -> usize {
166self.channel.state.lock(|s| s.borrow().len())
167 }
168169/// Returns whether the channel is empty.
170pub fn is_empty(&self) -> bool {
171self.channel.state.lock(|s| s.borrow().is_empty())
172 }
173174/// Returns whether the channel is full.
175pub fn is_full(&self) -> bool {
176self.channel.state.lock(|s| s.borrow().is_full())
177 }
178}
179180/// Receive-only access to a [`Channel`].
181pub struct Receiver<'a, M: RawMutex, T> {
182 channel: &'a Channel<'a, M, T>,
183}
184185impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
186/// Creates one further [`Sender`] over the same channel.
187pub fn borrow(&mut self) -> Receiver<'_, M, T> {
188 Receiver { channel: self.channel }
189 }
190191/// Attempts to receive a value over the channel.
192pub fn try_receive(&mut self) -> Option<&mut T> {
193self.channel.state.lock(|s| {
194let s = &mut *s.borrow_mut();
195match s.pop_index() {
196Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
197None => None,
198 }
199 })
200 }
201202/// Attempts to asynchronously receive a value over the channel.
203pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
204self.channel.state.lock(|s| {
205let s = &mut *s.borrow_mut();
206match s.pop_index() {
207Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
208None => {
209 s.send_waker.register(cx.waker());
210 Poll::Pending
211 }
212 }
213 })
214 }
215216/// Asynchronously receive a value over the channel.
217pub fn receive(&mut self) -> impl Future<Output = &mut T> {
218 poll_fn(|cx| {
219self.channel.state.lock(|s| {
220let s = &mut *s.borrow_mut();
221match s.pop_index() {
222Some(i) => {
223let r = unsafe { &mut *self.channel.buf.add(i) };
224 Poll::Ready(r)
225 }
226None => {
227 s.send_waker.register(cx.waker());
228 Poll::Pending
229 }
230 }
231 })
232 })
233 }
234235/// Notify the channel that the receiving of the value has been finalized.
236pub fn receive_done(&mut self) {
237self.channel.state.lock(|s| s.borrow_mut().pop_done())
238 }
239240/// Clears all elements in the channel.
241pub fn clear(&mut self) {
242self.channel.state.lock(|s| {
243 s.borrow_mut().clear();
244 });
245 }
246247/// Returns the number of elements currently in the channel.
248pub fn len(&self) -> usize {
249self.channel.state.lock(|s| s.borrow().len())
250 }
251252/// Returns whether the channel is empty.
253pub fn is_empty(&self) -> bool {
254self.channel.state.lock(|s| s.borrow().is_empty())
255 }
256257/// Returns whether the channel is full.
258pub fn is_full(&self) -> bool {
259self.channel.state.lock(|s| s.borrow().is_full())
260 }
261}
262263struct State {
264/// Maximum number of elements the channel can hold.
265capacity: usize,
266267/// Front index. Always 0..=(N-1)
268front: usize,
269/// Back index. Always 0..=(N-1).
270back: usize,
271272/// Used to distinguish "empty" and "full" cases when `front == back`.
273 /// May only be `true` if `front == back`, always `false` otherwise.
274full: bool,
275276 send_waker: WakerRegistration,
277 receive_waker: WakerRegistration,
278}
279280impl State {
281fn increment(&self, i: usize) -> usize {
282if i + 1 == self.capacity {
2830
284} else {
285 i + 1
286}
287 }
288289fn clear(&mut self) {
290self.front = 0;
291self.back = 0;
292self.full = false;
293 }
294295fn len(&self) -> usize {
296if !self.full {
297if self.back >= self.front {
298self.back - self.front
299 } else {
300self.capacity + self.back - self.front
301 }
302 } else {
303self.capacity
304 }
305 }
306307fn is_full(&self) -> bool {
308self.full
309 }
310311fn is_empty(&self) -> bool {
312self.front == self.back && !self.full
313 }
314315fn push_index(&mut self) -> Option<usize> {
316match self.is_full() {
317true => None,
318false => Some(self.back),
319 }
320 }
321322fn push_done(&mut self) {
323assert!(!self.is_full());
324self.back = self.increment(self.back);
325if self.back == self.front {
326self.full = true;
327 }
328self.send_waker.wake();
329 }
330331fn pop_index(&mut self) -> Option<usize> {
332match self.is_empty() {
333true => None,
334false => Some(self.front),
335 }
336 }
337338fn pop_done(&mut self) {
339assert!(!self.is_empty());
340self.front = self.increment(self.front);
341self.full = false;
342self.receive_waker.wake();
343 }
344}