.NET Asynchronous stream read/write

Even though it goes against the grain to help people with their homework, given that this is more than a year old, here's the proper way to accomplish this. All you need to overlap your read/write operations — no spawning of additional threads, or anything else is required.

public static class StreamExtensions
{
    private const int DEFAULT_BUFFER_SIZE = short.MaxValue ; // +32767
    public static void CopyTo( this Stream input , Stream output )
    {
        input.CopyTo( output , DEFAULT_BUFFER_SIZE ) ;
        return ;
    }
    public static void CopyTo( this Stream input , Stream output , int bufferSize )
    {
        if ( !input.CanRead ) throw new InvalidOperationException(   "input must be open for reading"  );
        if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );

        byte[][]     buf   = { new byte[bufferSize] , new byte[bufferSize] } ;
        int[]        bufl  = { 0 , 0 }                                       ;
        int          bufno = 0 ;
        IAsyncResult read  = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
        IAsyncResult write = null ;

        while ( true )
        {

            // wait for the read operation to complete
            read.AsyncWaitHandle.WaitOne() ; 
            bufl[bufno] = input.EndRead(read) ;

            // if zero bytes read, the copy is complete
            if ( bufl[bufno] == 0 )
            {
                break ;
            }

            // wait for the in-flight write operation, if one exists, to complete
            // the only time one won't exist is after the very first read operation completes
            if ( write != null )
            {
                write.AsyncWaitHandle.WaitOne() ;
                output.EndWrite(write) ;
            }

            // start the new write operation
            write = output.BeginWrite( buf[bufno] , 0 , bufl[bufno] , null , null ) ;

            // toggle the current, in-use buffer
            // and start the read operation on the new buffer.
            //
            // Changed to use XOR to toggle between 0 and 1.
            // A little speedier than using a ternary expression.
            bufno ^= 1 ; // bufno = ( bufno == 0 ? 1 : 0 ) ;
            read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;

        }

        // wait for the final in-flight write operation, if one exists, to complete
        // the only time one won't exist is if the input stream is empty.
        if ( write != null )
        {
            write.AsyncWaitHandle.WaitOne() ;
            output.EndWrite(write) ;
        }

        output.Flush() ;

        // return to the caller ;
        return ;
    }


    public static async Task CopyToAsync( this Stream input , Stream output )
    {
        await input.CopyToAsync( output , DEFAULT_BUFFER_SIZE ) ;
        return;
    }

    public static async Task CopyToAsync( this Stream input , Stream output , int bufferSize )
    {
        if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" );
        if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );

        byte[][]     buf   = { new byte[bufferSize] , new byte[bufferSize] } ;
        int[]        bufl  = { 0 , 0 } ;
        int          bufno = 0 ;
        Task<int>    read  = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ) ;
        Task         write = null ;

        while ( true )
        {

            await read ;
            bufl[bufno] = read.Result ;

            // if zero bytes read, the copy is complete
            if ( bufl[bufno] == 0 )
            {
                break;
            }

            // wait for the in-flight write operation, if one exists, to complete
            // the only time one won't exist is after the very first read operation completes
            if ( write != null )
            {
                await write ;
            }

            // start the new write operation
            write = output.WriteAsync( buf[bufno] , 0 , bufl[bufno] ) ;

            // toggle the current, in-use buffer
            // and start the read operation on the new buffer.
            //
            // Changed to use XOR to toggle between 0 and 1.
            // A little speedier than using a ternary expression.
            bufno ^= 1; // bufno = ( bufno == 0 ? 1 : 0 ) ;
            read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length );

        }

        // wait for the final in-flight write operation, if one exists, to complete
        // the only time one won't exist is if the input stream is empty.
        if ( write != null )
        {
            await write;
        }

        output.Flush();

        // return to the caller ;
        return;
    }

}

Cheers.


I doubt this is the fastest code (there's some overhead from the .NET Task abstraction) but I do think it's a cleaner approach to the whole async copy thing.

I needed a CopyTransformAsync where I could pass a delegate to do something as chunks were passed through the copy operation. e.g. compute a message digest while copying. That's why I got interested in rolling my own option.

Findings:

  • CopyToAsync bufferSize is sensitive (a large buffer is required)
  • FileOptions.Asynchronous -> makes it horrendously slow (not sure exactly why that is)
  • The bufferSize of the FileStream objects can be smaller (it's not that important)
  • The Serial test is clearly the fastest and most resource intensive

Here's what I've found and the complete source code for the program I used to test this. On my machine, these tests were run on a SSD disk and is the equivalent of a file copy. Normally, you'd not want to use this for just copying files, instead when you have a network stream (which is what my use case is), that's when you'd wanna use something like this.

4K buffer

Serial...                                in 0.474s
CopyToAsync...                           timed out
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    timed out
CopyTransformAsync (Asynchronous)...     timed out

8K buffer

Serial...                                in 0.344s
CopyToAsync...                           timed out
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    in 1.116s
CopyTransformAsync (Asynchronous)...     timed out

40K buffer

Serial...                                in 0.195s
CopyToAsync...                           in 0.624s
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    in 0.378s
CopyTransformAsync (Asynchronous)...     timed out

80K buffer

Serial...                                in 0.190s
CopyToAsync...                           in 0.355s
CopyToAsync (Asynchronous)...            in 1.196s
CopyTransformAsync...                    in 0.300s
CopyTransformAsync (Asynchronous)...     in 0.886s

160K buffer

Serial...                                in 0.432s
CopyToAsync...                           in 0.252s
CopyToAsync (Asynchronous)...            in 0.454s
CopyTransformAsync...                    in 0.447s
CopyTransformAsync (Asynchronous)...     in 0.555s

Here you can see the Process Explorer, performance graph as the test is run. Basically each top (in the lower of the three graphs) is the start of the serial test. You can clearly see how the throughput increases dramatically as the buffer size grows. It would appear as if it plans out somewhere around 80K which is what the .NET framework CopyToAsync method uses, internally.

Performance Graph

The nice thing here is that the final implementation wasn't that complicated:

static Task CompletedTask = ((Task)Task.FromResult(0));
static async Task CopyTransformAsync(Stream inputStream
    , Stream outputStream
    , Func<ArraySegment<byte>, ArraySegment<byte>> transform = null
    )
{
    var temp = new byte[bufferSize];
    var temp2 = new byte[bufferSize];

    int i = 0;

    var readTask = inputStream
        .ReadAsync(temp, 0, bufferSize)
        .ConfigureAwait(false);

    var writeTask = CompletedTask.ConfigureAwait(false);

    for (; ; )
    {
        // synchronize read
        int read = await readTask;
        if (read == 0)
        {
            break;
        }

        if (i++ > 0)
        {
            // synchronize write
            await writeTask;
        }

        var chunk = new ArraySegment<byte>(temp, 0, read);

        // do transform (if any)
        if (!(transform == null))
        {
            chunk = transform(chunk);
        }

        // queue write
        writeTask = outputStream
            .WriteAsync(chunk.Array, chunk.Offset, chunk.Count)
            .ConfigureAwait(false);

        // queue read
        readTask = inputStream
            .ReadAsync(temp2, 0, bufferSize)
            .ConfigureAwait(false);

        // swap buffer
        var temp3 = temp;
        temp = temp2;
        temp2 = temp3;
    }

    await writeTask; // complete any lingering write task
}

This method of interleaving the read/write despite the huge buffers is somewhere between 18% faster than the BCL CopyToAsync.

Out of curiosity, I did change the async calls to typical begin/end async pattern calls and that did not improve the situation one bit, it made it worse. For all I like to bash on the Task abstraction overhead, they do some nifty things when you write you code with the async/await keywords and it is much nicer to read that code!