Chaining adapters with custom `futures::Stream` breaks trait bounds

I needed to implement a custom Stream that yields items in a sliding window (ie. [1, 2, 3] => [(1, 2), (2, 3)]). So I implemented and gave it an adapter called .tuple_windows(). Allowing the following code

let iter = stream::iter(0..=3);
assert_eq!(
    iter.tuple_windows().collect::<Vec<_>>().await,
    vec![(0, 1), (1, 2), (2, 3)]
)

I ran into a weird situation when chaining other adapters with it where the final type doesn't implement the Stream trait.

code (playground):

use anyhow; // 1.0.52
use futures; // 0.3.19
use futures::{stream, Stream, StreamExt};
use pin_project_lite;
use pin_project_lite::pin_project;
use std::{
    pin::Pin,
    task::{Context, Poll},
};

use tokio; // 1.15.0 // 0.2.8
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut stream = stream::iter(0..20)
        .map(|_| stream::iter(2..10)) // this works with the custom Stream
        //      .map(|_| stream::iter(2..10).enumerate())   // but this doesn't
        .enumerate(); // this works regardless what happens in `map`
                      //     .tuple_windows(); // this only works with the first map

    while let Some(_) = stream.next().await {}
    Ok(())
}

impl<T: Stream> TupleWindowsExt for T {}
pub trait TupleWindowsExt: Stream + Sized {
    fn tuple_windows(self) -> TupleWindows<Self> {
        TupleWindows::new(self)
    }
}

pin_project! {
    #[derive(Debug)]
    pub struct TupleWindows<S: Stream> {
        #[pin]
        stream: S,
        previous: Option<S::Item>,
    }
}

impl<S: Stream> TupleWindows<S> {
    pub fn new(stream: S) -> Self {
        Self {
            stream,
            previous: None,
        }
    }
}

impl<S: Stream> Stream for TupleWindows<S>
where
    S::Item: Clone,
{
    type Item = (S::Item, S::Item);

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();

        let current = match futures::ready!(this.stream.as_mut().poll_next(cx)) {
            Some(next) => next,
            None => return Poll::Ready(None),
        };

        if let Some(previous) = this.previous {
            let res = (previous.clone(), current.clone());
            *this.previous = Some(current);
            Poll::Ready(Some(res))
        } else {
            let next = match this.stream.poll_next(cx) {
                Poll::Ready(next) => next,
                Poll::Pending => {
                    *this.previous = Some(current);
                    return Poll::Pending;
                }
            };
            *this.previous = next.clone();
            Poll::Ready(next.map(|next| (current, next)))
        }
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        let (lower, upper) = self.stream.size_hint();
        (
            lower.saturating_mul(2),
            upper.and_then(|upper| upper.checked_mul(2)),
        )
    }
}

The compiler error is not helpful either as it only tells me that Stream is not implemented for the newly created type:

error[E0599]: the method `next` exists for struct `TupleWindows<futures::stream::Map<futures::stream::Iter<std::ops::Range<{integer}>>, [closure@src/main.rs:16:11: 16:46]>>`, but its trait bounds were not satisfied

What am I missing here ?


Your Stream implementation requires that items from the inner stream be cloneable:

impl<S: Stream> Stream for TupleWindows<S>
where
    S::Item: Clone,

In the working case, stream::iter(0..20).map(|_| stream::iter(2..10)).tuple_windows(), you're passing an stream of futures::stream::Iter<std::ops::Range<i32>> items to tuple_windows(). Iter implements Clone when the inner iterator type implements Clone. The inner iterator type here is std::ops::Range<i32>, which does implement Clone.

When you change the code to add a call to enumerate() within the map(), you're now passing an stream of futures::stream::Enumerate<futures::stream::Iter<std::ops::Range<i32>>> items (i.e. a stream of streams) to tuple_windows(). Enumerate doesn't implement Clone at all (as of futures 0.3.19).

I can't see any reason why Enumerate couldn't implement Clone (with the appropriate trait bounds); I suppose it wasn't implemented because nobody asked for it.