Thread safety for DataTable
DataTable
is simply not designed or intended for concurrent usage (in particular where there is any form of mutation involved). The advisable "wrapper" here would, in my view, be either:
- remove the need to work on the
DataTable
concurrently (when involving mutation), or: - remove the
DataTable
, instead using a data-structure that either directly supports what you need (for example a concurrent collection), or which is much simpler and can be trivially synchronized (either exclusive or reader/writer)
Basically: change the problem.
From comments:
The code looks like:
Parallel.ForEach(strings, str=> { DataRow row; lock(table){ row= table.NewRow(); } MyParser.Parse(str, out row); lock(table){ table.Rows.Add(row) } });
I can only hope that out row
is a typo here, as that won't actually lead to it populating the row created via NewRow()
, but: if you absolutely have to use that approach, you can't use NewRow
, as the pending row is kinda shared. Your best bet would be:
Parallel.ForEach(strings, str=> {
object[] values = MyParser.Parse(str);
lock(table) {
table.Rows.Add(values);
}
});
The important change in the above is that the lock
covers the entire new row process. Note that you will have no guarantee of order when using Parallel.ForEach
like this, so it is important that the final order does not need to match exactly (which shouldn't be a problem if the data includes a time component).
However! I still think you are approaching this the wrong way: for parallelism to be relevant, it must be non-trivial data. If you have non-trivial data, you really don't want to have to buffer it all in memory. I strongly suggest doing something like the following, which will work fine on a single thread:
using(var bcp = new SqlBulkCopy())
using(var reader = ObjectReader.Create(ParseFile(path)))
{
bcp.DestinationTable = "MyLog";
bcp.WriteToServer(reader);
}
...
static IEnumerable<LogRow> ParseFile(string path)
{
using(var reader = File.OpenText(path))
{
string line;
while((line = reader.ReadLine()) != null)
{
yield return new LogRow {
// TODO: populate the row from line here
};
}
}
}
...
public sealed class LogRow {
/* define your schema here */
}
Advantages:
- no buffering - this is a fully streaming operation (
yield return
does not put things into a list or similar) - for that reason, the rows can start streaming immediately without needing to wait for the entire file to be pre-processed first
- no memory saturation issues
- no threading complications / overheads
- you get to preserve the original order (not usually critical, but nice)
- you are only constrained by how fast you can read the original file, which is typically faster on a single thread than it is from multiple threads (contention on a single IO device is just overhead)
- avoids all the overheads of
DataTable
, which is overkill here - because it is so flexible it has significant overheads - read (from the log file) and write (to the database) are now concurrent rather than sequential
I do a lot of things like ^^^ in my own work, and from experience it is usually at least twice as fast than populating a DataTable
in memory first.
And finally - here's an example of an IEnumerable<T>
implementation that accepts concurrent readers and writers without requiring everything to be buffered in memory - which would allow multiple threads to parse the data (calling Add
and finally Close
) with a single thread for SqlBulkCopy
via the IEnumerable<T>
API:
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Acts as a container for concurrent read/write flushing (for example, parsing a
/// file while concurrently uploading the contents); supports any number of concurrent
/// writers and readers, but note that each item will only be returned once (and once
/// fetched, is discarded). It is necessary to Close() the bucket after adding the last
/// of the data, otherwise any iterators will never finish
/// </summary>
class ThreadSafeBucket<T> : IEnumerable<T>
{
private readonly Queue<T> queue = new Queue<T>();
public void Add(T value)
{
lock (queue)
{
if (closed) // no more data once closed
throw new InvalidOperationException("The bucket has been marked as closed");
queue.Enqueue(value);
if (queue.Count == 1)
{ // someone may be waiting for data
Monitor.PulseAll(queue);
}
}
}
public void Close()
{
lock (queue)
{
closed = true;
Monitor.PulseAll(queue);
}
}
private bool closed;
public IEnumerator<T> GetEnumerator()
{
while (true)
{
T value;
lock (queue)
{
if (queue.Count == 0)
{
// no data; should we expect any?
if (closed) yield break; // nothing more ever coming
// else wait to be woken, and redo from start
Monitor.Wait(queue);
continue;
}
value = queue.Dequeue();
}
// yield it **outside** of the lock
yield return value;
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
static class Program
{
static void Main()
{
var bucket = new ThreadSafeBucket<int>();
int expectedTotal = 0;
ThreadPool.QueueUserWorkItem(delegate
{
int count = 0, sum = 0;
foreach(var item in bucket)
{
count++;
sum += item;
if ((count % 100) == 0)
Console.WriteLine("After {0}: {1}", count, sum);
}
Console.WriteLine("Total over {0}: {1}", count, sum);
});
Parallel.For(0, 5000,
new ParallelOptions { MaxDegreeOfParallelism = 3 },
i => {
bucket.Add(i);
Interlocked.Add(ref expectedTotal, i);
}
);
Console.WriteLine("all data added; closing bucket");
bucket.Close();
Thread.Sleep(100);
Console.WriteLine("expecting total: {0}",
Interlocked.CompareExchange(ref expectedTotal, 0, 0));
Console.ReadLine();
}
}
Faced with the same problem, I decided to implement nested ConcurrentDictionaries
It is generic, but could be changed to use defined types instead. Example method to convert to DataTable included
/// <summary>
/// A thread safe data table
/// </summary>
/// <typeparam name="TX">The X axis type</typeparam>
/// <typeparam name="TY">The Y axis type</typeparam>
/// <typeparam name="TZ">The value type</typeparam>
public class HeatMap<TX,TY,TZ>
{
public ConcurrentDictionary<TX, ConcurrentDictionary<TY, TZ>> Table { get; set; } = new ConcurrentDictionary<TX, ConcurrentDictionary<TY, TZ>>();
public void SetValue(TX x, TY y, TZ val)
{
var row = Table.GetOrAdd(x, u => new ConcurrentDictionary<TY, TZ>());
row.AddOrUpdate(y, v => val,
(ty, v) => val);
}
public TZ GetValue(TX x, TY y)
{
var row = Table.GetOrAdd(x, u => new ConcurrentDictionary<TY, TZ>());
if (!row.TryGetValue(y, out TZ val))
return default;
return val;
}
public DataTable GetDataTable()
{
var dataTable = new DataTable();
dataTable.Columns.Add("");
var columnList = new List<string>();
foreach (var row in Table)
{
foreach (var valueKey in row.Value.Keys)
{
var columnName = valueKey.ToString();
if (!columnList.Contains(columnName))
columnList.Add(columnName);
}
}
foreach (var s in columnList)
dataTable.Columns.Add(s);
foreach (var row in Table)
{
var dataRow = dataTable.NewRow();
dataRow[0] = row.Key.ToString();
foreach (var column in row.Value)
{
dataRow[column.Key.ToString()] = column.Value;
}
dataTable.Rows.Add(dataRow);
}
return dataTable;
}
}