Chaining adapters with custom `futures::Stream` breaks trait bounds
I needed to implement a custom Stream
that yields items in a sliding window (ie. [1, 2, 3] => [(1, 2), (2, 3)]). So I implemented and gave it an adapter called .tuple_windows()
. Allowing the following code
let iter = stream::iter(0..=3);
assert_eq!(
iter.tuple_windows().collect::<Vec<_>>().await,
vec![(0, 1), (1, 2), (2, 3)]
)
I ran into a weird situation when chaining other adapters with it where the final type doesn't implement the Stream
trait.
code (playground):
use anyhow; // 1.0.52
use futures; // 0.3.19
use futures::{stream, Stream, StreamExt};
use pin_project_lite;
use pin_project_lite::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio; // 1.15.0 // 0.2.8
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut stream = stream::iter(0..20)
.map(|_| stream::iter(2..10)) // this works with the custom Stream
// .map(|_| stream::iter(2..10).enumerate()) // but this doesn't
.enumerate(); // this works regardless what happens in `map`
// .tuple_windows(); // this only works with the first map
while let Some(_) = stream.next().await {}
Ok(())
}
impl<T: Stream> TupleWindowsExt for T {}
pub trait TupleWindowsExt: Stream + Sized {
fn tuple_windows(self) -> TupleWindows<Self> {
TupleWindows::new(self)
}
}
pin_project! {
#[derive(Debug)]
pub struct TupleWindows<S: Stream> {
#[pin]
stream: S,
previous: Option<S::Item>,
}
}
impl<S: Stream> TupleWindows<S> {
pub fn new(stream: S) -> Self {
Self {
stream,
previous: None,
}
}
}
impl<S: Stream> Stream for TupleWindows<S>
where
S::Item: Clone,
{
type Item = (S::Item, S::Item);
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let current = match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(next) => next,
None => return Poll::Ready(None),
};
if let Some(previous) = this.previous {
let res = (previous.clone(), current.clone());
*this.previous = Some(current);
Poll::Ready(Some(res))
} else {
let next = match this.stream.poll_next(cx) {
Poll::Ready(next) => next,
Poll::Pending => {
*this.previous = Some(current);
return Poll::Pending;
}
};
*this.previous = next.clone();
Poll::Ready(next.map(|next| (current, next)))
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let (lower, upper) = self.stream.size_hint();
(
lower.saturating_mul(2),
upper.and_then(|upper| upper.checked_mul(2)),
)
}
}
The compiler error is not helpful either as it only tells me that Stream
is not implemented for the newly created type:
error[E0599]: the method `next` exists for struct `TupleWindows<futures::stream::Map<futures::stream::Iter<std::ops::Range<{integer}>>, [closure@src/main.rs:16:11: 16:46]>>`, but its trait bounds were not satisfied
What am I missing here ?
Your Stream
implementation requires that items from the inner stream be cloneable:
impl<S: Stream> Stream for TupleWindows<S>
where
S::Item: Clone,
In the working case, stream::iter(0..20).map(|_| stream::iter(2..10)).tuple_windows()
, you're passing an stream of futures::stream::Iter<std::ops::Range<i32>>
items to tuple_windows()
. Iter
implements Clone
when the inner iterator type implements Clone
. The inner iterator type here is std::ops::Range<i32>
, which does implement Clone
.
When you change the code to add a call to enumerate()
within the map()
, you're now passing an stream of futures::stream::Enumerate<futures::stream::Iter<std::ops::Range<i32>>>
items (i.e. a stream of streams) to tuple_windows()
. Enumerate
doesn't implement Clone
at all (as of futures 0.3.19).
I can't see any reason why Enumerate
couldn't implement Clone
(with the appropriate trait bounds); I suppose it wasn't implemented because nobody asked for it.