Concurrent Streams Does't Print Anything to Console

I'm trying to build an example concerning using the Stream.concurrently method in fs2. I'm developing the producer/consumer pattern, using a Queue as the shared state:

import cats.effect.std.{Queue, Random}

object Fs2Tutorial extends IOApp {
  val random: IO[Random[IO]] = Random.scalaUtilRandom[IO]
  val queue: IO[Queue[IO, Int]] = Queue.bounded[IO, Int](10)

  val producer: IO[Nothing] = for {
    r <- random
    q <- queue
    p <-
      r.betweenInt(1, 11)
      .flatMap(q.offer)
      .flatTap(_ => IO.sleep(1.second))
      .foreverM
  } yield p

  val consumer: IO[Nothing] = for {
    q <- queue
    c <- q.take.flatMap { n =>
      IO.println(s"Consumed $n")
    }.foreverM
  } yield c

  val concurrently: Stream[IO, Nothing] = Stream.eval(producer).concurrently(Stream.eval(consumer))

  override def run(args: List[String]): IO[ExitCode] = {
    concurrently.compile.drain.as(ExitCode.Success)
  }
}

I expect the program to print some "Consumed n", for some n. However, the program prints nothing to the console.

What's wrong with the above code?


Solution 1:

What's wrong with the above code?

You are not using the same Queue in the consumer and in the producer, rather each of them is creating its own new independent Queue (the same happens with Random BTW)

This is a common mistake made by newbies who don't grasp yet the main principles behind a data type like IO

When you do val queue: IO[Queue[IO, Int]] = Queue.bounded[IO, Int](10) you are saying that queue is a program that when evaluated will produce a value of type Queue[IO, Unit], that is the point of all this.
The program become a value, and as any value you can manipulate it in any ways to produce new values, for example using flatMap so when both consumer & producer crate a new program by flatMapping queue they both create new independent programs / values.

You can fix that code like this:

import cats.effect.{IO, IOApp}
import cats.effect.std.{Queue, Random}
import cats.syntax.all._
import fs2.Stream

import scala.concurrent.duration._

object Fs2Tutorial extends IOApp.Simple {  
  override final val run: IO[Unit] = {
    val resources =
      (
        Random.scalaUtilRandom[IO],
        Queue.bounded[IO, Int](10)
      ).tupled
    
    val concurrently =
      Stream.eval(resources).flatMap {
        case (random, queue) =>
          val producer = 
            Stream
              .fixedDelay[IO](1.second)
              .evalMap(_ => random.betweenInt(1, 11))
              .evalMap(queue.offer)

        val consumer =
          Stream.fromQueueUnterminated(queue).evalMap(n => IO.println(s"Consumed $n"))
        
        producer.concurrently(consumer)
      }
    
    concurrently.interruptAfter(10.seconds).compile.drain >> IO.println("Finished!")
  }
}

(You can see it running here).


PS: I would recommend you to look into the "Programs as Values" Series from Fabio Labella: https://systemfw.org/archive.html