How to write Asynchronous LINQ query?

After I read a bunch of LINQ related stuff, I suddenly realized that no articles introduce how to write asynchronous LINQ query.

Suppose we use LINQ to SQL, below statement is clear. However, if the SQL database responds slowly, then the thread using this block of code would be hindered.

var result = from item in Products where item.Price > 3 select item.Name;
foreach (var name in result)
{
    Console.WriteLine(name);
}

Seems that current LINQ query spec doesn't provide support to this.

Is there any way to do asynchronous programming LINQ? It works like there is a callback notification when results are ready to use without any blocking delay on I/O.


While LINQ doesn't really have this per se, the framework itself does... You can easily roll your own asynchronous query executor in 30 lines or so... In fact, I just threw this together for you :)

EDIT: Through writing this, I've discovered why they didn't implement it. It cannot handle anonymous types since they are scoped local. Thus, you have no way of defining your callback function. This is a pretty major thing since a lot of linq to sql stuff creates them in the select clause. Any of the below suggestions suffer the same fate, so I still think this one is the easiest to use!

EDIT: The only solution is to not use anonymous types. You can declare the callback as just taking IEnumerable (no type args), and use reflection to access the fields (ICK!!). Another way would be to declare the callback as "dynamic"... oh... wait... That's not out yet. :) This is another decent example of how dynamic could be used. Some may call it abuse.

Throw this in your utilities library:

public static class AsynchronousQueryExecutor
{
    public static void Call<T>(IEnumerable<T> query, Action<IEnumerable<T>> callback, Action<Exception> errorCallback)
    {
        Func<IEnumerable<T>, IEnumerable<T>> func =
            new Func<IEnumerable<T>, IEnumerable<T>>(InnerEnumerate<T>);
        IEnumerable<T> result = null;
        IAsyncResult ar = func.BeginInvoke(
                            query,
                            new AsyncCallback(delegate(IAsyncResult arr)
                            {
                                try
                                {
                                    result = ((Func<IEnumerable<T>, IEnumerable<T>>)((AsyncResult)arr).AsyncDelegate).EndInvoke(arr);
                                }
                                catch (Exception ex)
                                {
                                    if (errorCallback != null)
                                    {
                                        errorCallback(ex);
                                    }
                                    return;
                                }
                                //errors from inside here are the callbacks problem
                                //I think it would be confusing to report them
                                callback(result);
                            }),
                            null);
    }
    private static IEnumerable<T> InnerEnumerate<T>(IEnumerable<T> query)
    {
        foreach (var item in query) //the method hangs here while the query executes
        {
            yield return item;
        }
    }
}

And you could use it like this:

class Program
{

    public static void Main(string[] args)
    {
        //this could be your linq query
        var qry = TestSlowLoadingEnumerable();

        //We begin the call and give it our callback delegate
        //and a delegate to an error handler
        AsynchronousQueryExecutor.Call(qry, HandleResults, HandleError);

        Console.WriteLine("Call began on seperate thread, execution continued");
        Console.ReadLine();
    }

    public static void HandleResults(IEnumerable<int> results)
    {
        //the results are available in here
        foreach (var item in results)
        {
            Console.WriteLine(item);
        }
    }

    public static void HandleError(Exception ex)
    {
        Console.WriteLine("error");
    }

    //just a sample lazy loading enumerable
    public static IEnumerable<int> TestSlowLoadingEnumerable()
    {
        Thread.Sleep(5000);
        foreach (var i in new int[] { 1, 2, 3, 4, 5, 6 })
        {
            yield return i;
        }
    }

}

Going to go put this up on my blog now, pretty handy.


TheSoftwareJedi's and ulrikb's(aka user316318) solutions are good for any LINQ type, but (as pointed by Chris Moschini) do NOT delegating to underlying asynchronous calls that leverage Windows I/O Completion Ports.

Wesley Bakker's Asynchronous DataContext post (triggered by a blog post of Scott Hanselman ) describe class for LINQ to SQL that uses sqlCommand.BeginExecuteReader/sqlCommand.EndExecuteReader, which leverage Windows I/O Completion Ports.

I/O completion ports provide an efficient threading model for processing multiple asynchronous I/O requests on a multiprocessor system.


Based on Michael Freidgeim's answer and mentioned blog post from Scott Hansellman and fact that you can use async/await, you can implement reusable ExecuteAsync<T>(...) method, which executes underlying SqlCommand asynchronously:

protected static async Task<IEnumerable<T>> ExecuteAsync<T>(IQueryable<T> query,
    DataContext ctx,
    CancellationToken token = default(CancellationToken))
{
    var cmd = (SqlCommand)ctx.GetCommand(query);

    if (cmd.Connection.State == ConnectionState.Closed)
        await cmd.Connection.OpenAsync(token);
    var reader = await cmd.ExecuteReaderAsync(token);

    return ctx.Translate<T>(reader);
}

And then you can (re)use it like this:

public async Task WriteNamesToConsoleAsync(string connectionString, CancellationToken token = default(CancellationToken))
{
    using (var ctx = new DataContext(connectionString))
    {
        var query = from item in Products where item.Price > 3 select item.Name;
        var result = await ExecuteAsync(query, ctx, token);
        foreach (var name in result)
        {
            Console.WriteLine(name);
        }
    }
}