Wrapping ManualResetEvent as awaitable task
Solution 1:
RegisterWaitForSingleObject
will combine waits onto dedicated waiter threads, each of which can wait on multiple handles (specifically, 63 of them, which is MAXIMUM_WAIT_OBJECTS
minus one for a "control" handle).
So you should be able to use something like this (warning: untested):
public static class WaitHandleExtensions
{
public static Task AsTask(this WaitHandle handle)
{
return AsTask(handle, Timeout.InfiniteTimeSpan);
}
public static Task AsTask(this WaitHandle handle, TimeSpan timeout)
{
var tcs = new TaskCompletionSource<object>();
var registration = ThreadPool.RegisterWaitForSingleObject(handle, (state, timedOut) =>
{
var localTcs = (TaskCompletionSource<object>)state;
if (timedOut)
localTcs.TrySetCanceled();
else
localTcs.TrySetResult(null);
}, tcs, timeout, executeOnlyOnce: true);
tcs.Task.ContinueWith((_, state) => ((RegisteredWaitHandle)state).Unregister(null), registration, TaskScheduler.Default);
return tcs.Task;
}
}
Solution 2:
You also can use SemaphoreSlim.WaitAsync() which is similar to ManualResetEvent
Solution 3:
You can give this one a shot, https://www.badflyer.com/asyncmanualresetevent , tried to build upon the example on https://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx to support timeouts and cancellation.
using System;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// An async manual reset event.
/// </summary>
public sealed class ManualResetEventAsync
{
// Inspiration from https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-1-asyncmanualresetevent/
// and the .net implementation of SemaphoreSlim
/// <summary>
/// The timeout in milliseconds to wait indefinitly.
/// </summary>
private const int WaitIndefinitly = -1;
/// <summary>
/// True to run synchronous continuations on the thread which invoked Set. False to run them in the threadpool.
/// </summary>
private readonly bool runSynchronousContinuationsOnSetThread = true;
/// <summary>
/// The current task completion source.
/// </summary>
private volatile TaskCompletionSource<bool> completionSource = new TaskCompletionSource<bool>();
/// <summary>
/// Initializes a new instance of the <see cref="ManualResetEventAsync"/> class.
/// </summary>
/// <param name="isSet">True to set the task completion source on creation.</param>
public ManualResetEventAsync(bool isSet)
: this(isSet: isSet, runSynchronousContinuationsOnSetThread: true)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="ManualResetEventAsync"/> class.
/// </summary>
/// <param name="isSet">True to set the task completion source on creation.</param>
/// <param name="runSynchronousContinuationsOnSetThread">If you have synchronous continuations, they will run on the thread which invokes Set, unless you set this to false.</param>
public ManualResetEventAsync(bool isSet, bool runSynchronousContinuationsOnSetThread)
{
this.runSynchronousContinuationsOnSetThread = runSynchronousContinuationsOnSetThread;
if (isSet)
{
this.completionSource.TrySetResult(true);
}
}
/// <summary>
/// Wait for the manual reset event.
/// </summary>
/// <returns>A task which completes when the event is set.</returns>
public Task WaitAsync()
{
return this.AwaitCompletion(ManualResetEventAsync.WaitIndefinitly, default(CancellationToken));
}
/// <summary>
/// Wait for the manual reset event.
/// </summary>
/// <param name="token">A cancellation token.</param>
/// <returns>A task which waits for the manual reset event.</returns>
public Task WaitAsync(CancellationToken token)
{
return this.AwaitCompletion(ManualResetEventAsync.WaitIndefinitly, token);
}
/// <summary>
/// Wait for the manual reset event.
/// </summary>
/// <param name="timeout">A timeout.</param>
/// <param name="token">A cancellation token.</param>
/// <returns>A task which waits for the manual reset event. Returns true if the timeout has not expired. Returns false if the timeout expired.</returns>
public Task<bool> WaitAsync(TimeSpan timeout, CancellationToken token)
{
return this.AwaitCompletion((int)timeout.TotalMilliseconds, token);
}
/// <summary>
/// Wait for the manual reset event.
/// </summary>
/// <param name="timeout">A timeout.</param>
/// <returns>A task which waits for the manual reset event. Returns true if the timeout has not expired. Returns false if the timeout expired.</returns>
public Task<bool> WaitAsync(TimeSpan timeout)
{
return this.AwaitCompletion((int)timeout.TotalMilliseconds, default(CancellationToken));
}
/// <summary>
/// Set the completion source.
/// </summary>
public void Set()
{
if (this.runSynchronousContinuationsOnSetThread)
{
this.completionSource.TrySetResult(true);
}
else
{
// Run synchronous completions in the thread pool.
Task.Run(() => this.completionSource.TrySetResult(true));
}
}
/// <summary>
/// Reset the manual reset event.
/// </summary>
public void Reset()
{
// Grab a reference to the current completion source.
var currentCompletionSource = this.completionSource;
// Check if there is nothing to be done, return.
if (!currentCompletionSource.Task.IsCompleted)
{
return;
}
// Otherwise, try to replace it with a new completion source (if it is the same as the reference we took before).
Interlocked.CompareExchange(ref this.completionSource, new TaskCompletionSource<bool>(), currentCompletionSource);
}
/// <summary>
/// Await completion based on a timeout and a cancellation token.
/// </summary>
/// <param name="timeoutMS">The timeout in milliseconds.</param>
/// <param name="token">The cancellation token.</param>
/// <returns>A task (true if wait succeeded). (False on timeout).</returns>
private async Task<bool> AwaitCompletion(int timeoutMS, CancellationToken token)
{
// Validate arguments.
if (timeoutMS < -1 || timeoutMS > int.MaxValue)
{
throw new ArgumentException("The timeout must be either -1ms (indefinitely) or a positive ms value <= int.MaxValue");
}
CancellationTokenSource timeoutToken = null;
// If the token cannot be cancelled, then we dont need to create any sort of linked token source.
if (false == token.CanBeCanceled)
{
// If the wait is indefinite, then we don't need to create a second task at all to wait on, just wait for set.
if (timeoutMS == -1)
{
return await this.completionSource.Task;
}
timeoutToken = new CancellationTokenSource();
}
else
{
// A token source which will get canceled either when we cancel it, or when the linked token source is canceled.
timeoutToken = CancellationTokenSource.CreateLinkedTokenSource(token);
}
using (timeoutToken)
{
// Create a task to account for our timeout. The continuation just eats the task cancelled exception, but makes sure to observe it.
Task delayTask = Task.Delay(timeoutMS, timeoutToken.Token).ContinueWith((result) => { var e = result.Exception; }, TaskContinuationOptions.ExecuteSynchronously);
var resultingTask = await Task.WhenAny(this.completionSource.Task, delayTask).ConfigureAwait(false);
// The actual task finished, not the timeout, so we can cancel our cancellation token and return true.
if (resultingTask != delayTask)
{
// Cancel the timeout token to cancel the delay if it is still going.
timeoutToken.Cancel();
return true;
}
// Otherwise, the delay task finished. So throw if it finished because it was canceled.
token.ThrowIfCancellationRequested();
return false;
}
}
}
Solution 4:
Alternative solution: wait for the handles of the task and the manual reset event
I was having memory leaks when using Task.WaitAny()
with a Task
(returned by SqlConnection.OpenAsync()') and a Manual Reset Event received as parameter and wrapped in a Task
with AsTask()
. These object were not being disposed: TaskCompletionSource<Object>, Task<Object>, StandardTaskContinuation, RegisteredWaitHandle, RegisteredWaithandleSafe, ContinuationResultTaskFromresultTask<Object,bool>, _ThreadPoolWaitOrTimerCallback
).
This is real production code, used in a Windows service, of a function that tries to open a connection to a db in a loop until the connection is opened, or the operation fails, or the ManualResetEvent _finishRequest
, received as parameter in the function containing this code, is signaled by code in any other thread.
To avoid the leak, I decided to do it the other way round: wait for the handles of the _finishRequest
and the Task
returned by OpenAsync()
:
Task asyncOpening = sqlConnection.OpenAsync();
// Wait for the async open to finish, or until _finishRequest is signaled
var waitHandles = new WaitHandle[]
{
// index 0 in array: extract the AsyncWaitHandle from the Task
((IAsyncResult)asyncOpening).AsyncWaitHandle,
// index 1:
_finishRequest
};
// Check if finish was requested (index of signaled handle in the array = 1)
int whichFinished = WaitHandle.WaitAny(waitHandles);
finishRequested = whichFinished == 1;
// If so, break the loop to exit immediately
if (finishRequested)
break;
// If not, check if OpenAsync finished with error (it's a Task)
if (asyncOpening.IsFaulted)
{
// Extract the exception from the task, and throw it
// NOTE: adapt it to your case. In mine, I'm interested in the inner exception,
// but you can check the exception itself, for example to see if it was a timeout,
// if you specified it in the call to the async function that returns the Task
var ex = asyncOpening?.Exception?.InnerExceptions?[0];
if (ex != null) throw ex;
}
else
{
Log.Verbose("Connection to database {Database} on server {Server}", database, server);
break;
}
If you also need the timeout, you can include it in the call to OpenAsync
, or you asyn function, and then check if the result of the async operation was cancelled because of the timeout: check the status of the Task when finished, as you can see in the NOTE in the code comment.
Solution 5:
Stephen's Cleary solution looks perfect. Microsoft provides the similar one.
As I haven't seen an example with cancellation logic.
Here it is:
public static class WaitHandleExtensions
{
public static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite)
{
if (waitHandle == null)
throw new ArgumentNullException(nameof(waitHandle));
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
CancellationTokenRegistration ctr = cancellationToken.Register(() => tcs.TrySetCanceled());
TimeSpan timeout = timeoutMilliseconds > Timeout.Infinite ? TimeSpan.FromMilliseconds(timeoutMilliseconds) : Timeout.InfiniteTimeSpan;
RegisteredWaitHandle rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
(_, timedOut) =>
{
if (timedOut)
{
tcs.TrySetCanceled();
}
else
{
tcs.TrySetResult(true);
}
},
null, timeout, true);
Task<bool> task = tcs.Task;
_ = task.ContinueWith(_ =>
{
rwh.Unregister(null);
return ctr.Unregister();
}, CancellationToken.None);
return task;
}
}