.NET Asynchronous stream read/write
Even though it goes against the grain to help people with their homework, given that this is more than a year old, here's the proper way to accomplish this. All you need to overlap your read/write operations — no spawning of additional threads, or anything else is required.
public static class StreamExtensions
{
private const int DEFAULT_BUFFER_SIZE = short.MaxValue ; // +32767
public static void CopyTo( this Stream input , Stream output )
{
input.CopyTo( output , DEFAULT_BUFFER_SIZE ) ;
return ;
}
public static void CopyTo( this Stream input , Stream output , int bufferSize )
{
if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" );
if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );
byte[][] buf = { new byte[bufferSize] , new byte[bufferSize] } ;
int[] bufl = { 0 , 0 } ;
int bufno = 0 ;
IAsyncResult read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
IAsyncResult write = null ;
while ( true )
{
// wait for the read operation to complete
read.AsyncWaitHandle.WaitOne() ;
bufl[bufno] = input.EndRead(read) ;
// if zero bytes read, the copy is complete
if ( bufl[bufno] == 0 )
{
break ;
}
// wait for the in-flight write operation, if one exists, to complete
// the only time one won't exist is after the very first read operation completes
if ( write != null )
{
write.AsyncWaitHandle.WaitOne() ;
output.EndWrite(write) ;
}
// start the new write operation
write = output.BeginWrite( buf[bufno] , 0 , bufl[bufno] , null , null ) ;
// toggle the current, in-use buffer
// and start the read operation on the new buffer.
//
// Changed to use XOR to toggle between 0 and 1.
// A little speedier than using a ternary expression.
bufno ^= 1 ; // bufno = ( bufno == 0 ? 1 : 0 ) ;
read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
}
// wait for the final in-flight write operation, if one exists, to complete
// the only time one won't exist is if the input stream is empty.
if ( write != null )
{
write.AsyncWaitHandle.WaitOne() ;
output.EndWrite(write) ;
}
output.Flush() ;
// return to the caller ;
return ;
}
public static async Task CopyToAsync( this Stream input , Stream output )
{
await input.CopyToAsync( output , DEFAULT_BUFFER_SIZE ) ;
return;
}
public static async Task CopyToAsync( this Stream input , Stream output , int bufferSize )
{
if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" );
if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );
byte[][] buf = { new byte[bufferSize] , new byte[bufferSize] } ;
int[] bufl = { 0 , 0 } ;
int bufno = 0 ;
Task<int> read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ) ;
Task write = null ;
while ( true )
{
await read ;
bufl[bufno] = read.Result ;
// if zero bytes read, the copy is complete
if ( bufl[bufno] == 0 )
{
break;
}
// wait for the in-flight write operation, if one exists, to complete
// the only time one won't exist is after the very first read operation completes
if ( write != null )
{
await write ;
}
// start the new write operation
write = output.WriteAsync( buf[bufno] , 0 , bufl[bufno] ) ;
// toggle the current, in-use buffer
// and start the read operation on the new buffer.
//
// Changed to use XOR to toggle between 0 and 1.
// A little speedier than using a ternary expression.
bufno ^= 1; // bufno = ( bufno == 0 ? 1 : 0 ) ;
read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length );
}
// wait for the final in-flight write operation, if one exists, to complete
// the only time one won't exist is if the input stream is empty.
if ( write != null )
{
await write;
}
output.Flush();
// return to the caller ;
return;
}
}
Cheers.
I doubt this is the fastest code (there's some overhead from the .NET Task abstraction) but I do think it's a cleaner approach to the whole async copy thing.
I needed a CopyTransformAsync
where I could pass a delegate to do something as chunks were passed through the copy operation. e.g. compute a message digest while copying. That's why I got interested in rolling my own option.
Findings:
- CopyToAsync bufferSize is sensitive (a large buffer is required)
- FileOptions.Asynchronous -> makes it horrendously slow (not sure exactly why that is)
- The bufferSize of the FileStream objects can be smaller (it's not that important)
- The
Serial
test is clearly the fastest and most resource intensive
Here's what I've found and the complete source code for the program I used to test this. On my machine, these tests were run on a SSD disk and is the equivalent of a file copy. Normally, you'd not want to use this for just copying files, instead when you have a network stream (which is what my use case is), that's when you'd wanna use something like this.
4K buffer
Serial... in 0.474s
CopyToAsync... timed out
CopyToAsync (Asynchronous)... timed out
CopyTransformAsync... timed out
CopyTransformAsync (Asynchronous)... timed out
8K buffer
Serial... in 0.344s
CopyToAsync... timed out
CopyToAsync (Asynchronous)... timed out
CopyTransformAsync... in 1.116s
CopyTransformAsync (Asynchronous)... timed out
40K buffer
Serial... in 0.195s
CopyToAsync... in 0.624s
CopyToAsync (Asynchronous)... timed out
CopyTransformAsync... in 0.378s
CopyTransformAsync (Asynchronous)... timed out
80K buffer
Serial... in 0.190s
CopyToAsync... in 0.355s
CopyToAsync (Asynchronous)... in 1.196s
CopyTransformAsync... in 0.300s
CopyTransformAsync (Asynchronous)... in 0.886s
160K buffer
Serial... in 0.432s
CopyToAsync... in 0.252s
CopyToAsync (Asynchronous)... in 0.454s
CopyTransformAsync... in 0.447s
CopyTransformAsync (Asynchronous)... in 0.555s
Here you can see the Process Explorer, performance graph as the test is run. Basically each top (in the lower of the three graphs) is the start of the serial test. You can clearly see how the throughput increases dramatically as the buffer size grows. It would appear as if it plans out somewhere around 80K which is what the .NET framework CopyToAsync
method uses, internally.
The nice thing here is that the final implementation wasn't that complicated:
static Task CompletedTask = ((Task)Task.FromResult(0));
static async Task CopyTransformAsync(Stream inputStream
, Stream outputStream
, Func<ArraySegment<byte>, ArraySegment<byte>> transform = null
)
{
var temp = new byte[bufferSize];
var temp2 = new byte[bufferSize];
int i = 0;
var readTask = inputStream
.ReadAsync(temp, 0, bufferSize)
.ConfigureAwait(false);
var writeTask = CompletedTask.ConfigureAwait(false);
for (; ; )
{
// synchronize read
int read = await readTask;
if (read == 0)
{
break;
}
if (i++ > 0)
{
// synchronize write
await writeTask;
}
var chunk = new ArraySegment<byte>(temp, 0, read);
// do transform (if any)
if (!(transform == null))
{
chunk = transform(chunk);
}
// queue write
writeTask = outputStream
.WriteAsync(chunk.Array, chunk.Offset, chunk.Count)
.ConfigureAwait(false);
// queue read
readTask = inputStream
.ReadAsync(temp2, 0, bufferSize)
.ConfigureAwait(false);
// swap buffer
var temp3 = temp;
temp = temp2;
temp2 = temp3;
}
await writeTask; // complete any lingering write task
}
This method of interleaving the read/write despite the huge buffers is somewhere between 18% faster than the BCL CopyToAsync
.
Out of curiosity, I did change the async calls to typical begin/end async pattern calls and that did not improve the situation one bit, it made it worse. For all I like to bash on the Task abstraction overhead, they do some nifty things when you write you code with the async/await keywords and it is much nicer to read that code!