futures_util/stream/stream/
take_until.rs1use core::fmt;
2use core::pin::Pin;
3use futures_core::future::Future;
4use futures_core::ready;
5use futures_core::stream::{FusedStream, Stream};
6use futures_core::task::{Context, Poll};
7#[cfg(feature = "sink")]
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10
11pin_project! {
14    #[must_use = "streams do nothing unless polled"]
16    pub struct TakeUntil<St: Stream, Fut: Future> {
17        #[pin]
18        stream: St,
19        #[pin]
22        fut: Option<Fut>,
23        fut_result: Option<Fut::Output>,
25        free: bool,
27    }
28}
29
30impl<St, Fut> fmt::Debug for TakeUntil<St, Fut>
31where
32    St: Stream + fmt::Debug,
33    St::Item: fmt::Debug,
34    Fut: Future + fmt::Debug,
35{
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        f.debug_struct("TakeUntil").field("stream", &self.stream).field("fut", &self.fut).finish()
38    }
39}
40
41impl<St, Fut> TakeUntil<St, Fut>
42where
43    St: Stream,
44    Fut: Future,
45{
46    pub(super) fn new(stream: St, fut: Fut) -> Self {
47        Self { stream, fut: Some(fut), fut_result: None, free: false }
48    }
49
50    delegate_access_inner!(stream, St, ());
51
52    pub fn take_future(&mut self) -> Option<Fut> {
57        if self.fut.is_some() {
58            self.free = true;
59        }
60
61        self.fut.take()
62    }
63
64    pub fn take_result(&mut self) -> Option<Fut::Output> {
101        self.fut_result.take()
102    }
103
104    pub fn is_stopped(&self) -> bool {
107        !self.free && self.fut.is_none()
108    }
109}
110
111impl<St, Fut> Stream for TakeUntil<St, Fut>
112where
113    St: Stream,
114    Fut: Future,
115{
116    type Item = St::Item;
117
118    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
119        let mut this = self.project();
120
121        if let Some(f) = this.fut.as_mut().as_pin_mut() {
122            if let Poll::Ready(result) = f.poll(cx) {
123                this.fut.set(None);
124                *this.fut_result = Some(result);
125            }
126        }
127
128        if !*this.free && this.fut.is_none() {
129            Poll::Ready(None)
131        } else {
132            let item = ready!(this.stream.poll_next(cx));
134            if item.is_none() {
135                this.fut.set(None);
136            }
137            Poll::Ready(item)
138        }
139    }
140
141    fn size_hint(&self) -> (usize, Option<usize>) {
142        if self.is_stopped() {
143            return (0, Some(0));
144        }
145
146        self.stream.size_hint()
147    }
148}
149
150impl<St, Fut> FusedStream for TakeUntil<St, Fut>
151where
152    St: Stream,
153    Fut: Future,
154{
155    fn is_terminated(&self) -> bool {
156        self.is_stopped()
157    }
158}
159
160#[cfg(feature = "sink")]
162impl<S, Fut, Item> Sink<Item> for TakeUntil<S, Fut>
163where
164    S: Stream + Sink<Item>,
165    Fut: Future,
166{
167    type Error = S::Error;
168
169    delegate_sink!(stream, Item);
170}