Async method not executing when using System.Threading.Channels
Solution 1:
If you are new, I recommend reading Tutorial: Learn to debug C# code using Visual Studio. You should know how to put breakpoints to see your code running step-by-step.
Now however since this one involves async/Task, it may looks confusing, but when you step in Consumer
, you will see it stops at await foreach (var item in ch.Reader.ReadAllAsync(cancellationToken))
line.
The reason is the consumer is waiting for something that producer never puts in. The reason is your first await
put a stop to your code so the 2nd line never get executed.
await Consumer(channel, cancel);
await Producer(channel, cancel);
This should fix the issue:
var consumerTask = Consumer(channel, cancel);
var producerTask = Producer(channel, cancel);
await Task.WhenAll(consumerTask, producerTask);
What the above code says is,
-
Run Consumer Task, don't wait for it, but keep track of it in
consumerTask
. -
Run Producer Task, don't wait for it, but keep track of it in
producerTask
. -
Wait until both
consumerTask
andproducerTask
finishes using Task.WhenAll.
Note that it seems you still have a logical problem with Consumer, since it will never exit so your ReadKey()
will likely not getting hit (your app would stuck at the WhenAll
line). I think it's much easier "practice" for you if you intend to fix it if it's a bug.
Solution 2:
Your code is trying to consume all messages in the channel before any are produced. While you can store the producer/consumer tasks instead of awaiting them, it's better to use idioms and patterns specific to channels.
Instead of using a Channel as some kind of container, only expose and share Readers to a channel created and owned by a consumer. That's how Channels are used in Go.
That's why you can only work with a ChannelReader and a ChannelWriter too:
- a ChannelReader is a
ch ->
in Go, the only way to read from a channel - a ChannelWriter is a
ch <-
in Go, the only way to write.
Using Owned channels
If you need to process data asynchronously, do this in a task inside the producer/consumer methods. This makes it a lot easier to control the channels and know when processing is finished or cancelled. It also allows you to construct pipelines from channels quite easily.
In your case, the producer could be :
public ChannelReader<int> Producer(CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Task.Run(()=>{
for (int i = 0; i < 59; i++)
{
await Task.Delay(1000, cancellationToken);
await writer.WriteAsync(i, cancellationToken);
}
},cancellationToken)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
The consumer, if one is lazy, can be :
static async Task ConsumeNumbers(this ChannelReader<int> reader, CancellationToken cancellationToken)
{
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine(item);
}
}
Making this an extension method Both can be combined with :
await Producer(cancel)
.ConsumeNumbers(cancel);
In the more generic case, a pipeline block reads from a channel and returns a channel :
public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
var newItem=Math.Pow(item,pow);
await writer.WriteAsync(newItem);
}
},cancellationToken)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
This would allow creating a pipeline of steps, eg :
await Producer(cancel)
.RaiseTo(0.3,cancel)
.RaiseTo(3,cancel)
.ConsumeNumbers(cancel);
Parallel processing
It's also possible to use multiple tasks per block, to speed up processing. In .NET 6 this can be done easily with Parallel.ForEachAsync
:
public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Parallel.ForEachAsync(
reader.ReadAllAsync(cancellationToken),
cancellationToken,
async item=>
{
var newItem=Math.Pow(item,pow);
await writer.WriteAsync(newItem);
})
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
Beware the order
A Channel preserves the order of items and read requests. This means that a single-task step will always consume and produce messages in order. There's no such guarantee with Parallel.ForEachAsync
though. If order is important you'd have to add code to ensure messages are emitted in order, or try to reorder them with another step.