What does MaxDegreeOfParallelism do?
Solution 1:
The answer is that it is the upper limit for the entire parallel operation, irrespective of the number of cores.
So even if you don't use the CPU because you are waiting on IO, or a lock, no extra tasks will run in parallel, only the maximum that you specifiy.
To find this out, I wrote this piece of test code. There is an artificial lock in there to stimulate the TPL to use more threads. The same will happen when your code is waiting for IO or database.
class Program
{
static void Main(string[] args)
{
var locker = new Object();
int count = 0;
Parallel.For
(0
, 1000
, new ParallelOptions { MaxDegreeOfParallelism = 2 }
, (i) =>
{
Interlocked.Increment(ref count);
lock (locker)
{
Console.WriteLine("Number of active threads:" + count);
Thread.Sleep(10);
}
Interlocked.Decrement(ref count);
}
);
}
}
If I don't specify MaxDegreeOfParallelism, the console logging shows that up to around 8 tasks are running at the same time. Like this:
Number of active threads:6
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:6
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
It starts lower, increases over time and at the end it is trying to run 8 at the same time.
If I limit it to some arbitrary value (say 2), I get
Number of active threads:2
Number of active threads:1
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Oh, and this is on a quadcore machine.
Solution 2:
For example, is there anyway to do something like, if CPU has two cores, then use 20, if CPU has four cores then 40?
You can do this to make parallelism dependent on the number of CPU cores:
var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 10 };
Parallel.ForEach(sourceCollection, options, sourceItem =>
{
// do something
});
However, newer CPU's tend to use hyper-threading to simulate extra cores. So if you have a quad-core processor, then Environment.ProcessorCount
will probably report this as 8 cores. I've found that if you set the parallelism to account for the simulated cores then it actually slows down other threads such as UI threads.
So although the operation will finish a bit faster, an application UI may experience significant lag during this time. Dividing the `Environment.ProcessorCount' by 2 seems to achieve the same processing speeds while still keeping the CPU available for UI threads.
Solution 3:
It sounds like the code that you're running in parallel is deadlocking, which means that unless you can find and fix the issue that's causing that, you shouldn't parallelize it at all.
Solution 4:
Something else to consider, especially for those finding this many years later, is depending on your situation it's usually best to collect all data in a DataTable and then use SqlBulkCopy toward the end of each major task.
For example I have a process that I made that runs through millions of files and I ran into the same errors when each file transaction made a DB query to insert the record. I instead moved to storing it all in a DataTable in memory for each share I iterated through, dumping the DataTable into my SQL Server and clearing it between each separate share. The bulk insert takes a split second and has the benefit of not opening thousands of connections at once.
EDIT: Here's a quick & dirty working example The SQLBulkCopy method:
private static void updateDatabase(DataTable targetTable)
{
try
{
DataSet ds = new DataSet("FileFolderAttribute");
ds.Tables.Add(targetTable);
writeToLog(targetTable.TableName + " - Rows: " + targetTable.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
writeToLog(@"Opening SQL connection", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
Console.WriteLine(@"Opening SQL connection");
SqlConnection sqlConnection = new SqlConnection(sqlConnectionString);
sqlConnection.Open();
SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConnection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null);
bulkCopy.DestinationTableName = "FileFolderAttribute";
writeToLog(@"Copying data to SQL Server table", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
Console.WriteLine(@"Copying data to SQL Server table");
foreach (var table in ds.Tables)
{
writeToLog(table.ToString(), logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
Console.WriteLine(table.ToString());
}
bulkCopy.WriteToServer(ds.Tables[0]);
sqlConnection.Close();
sqlConnection.Dispose();
writeToLog(@"Closing SQL connection", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
writeToLog(@"Clearing local DataTable...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
Console.WriteLine(@"Closing SQL connection");
Console.WriteLine(@"Clearing local DataTable...");
targetTable.Clear();
ds.Tables.Remove(targetTable);
ds.Clear();
ds.Dispose();
}
catch (Exception error)
{
errorLogging(error, getCurrentMethod(), logDatabaseFile);
}
}
...and for dumping it into the datatable:
private static void writeToDataTable(string ServerHostname, string RootDirectory, string RecordType, string Path, string PathDirectory, string PathFileName, string PathFileExtension, decimal SizeBytes, decimal SizeMB, DateTime DateCreated, DateTime DateModified, DateTime DateLastAccessed, string Owner, int PathLength, DateTime RecordWriteDateTime)
{
try
{
if (tableToggle)
{
DataRow toInsert = results_1.NewRow();
toInsert[0] = ServerHostname;
toInsert[1] = RootDirectory;
toInsert[2] = RecordType;
toInsert[3] = Path;
toInsert[4] = PathDirectory;
toInsert[5] = PathFileName;
toInsert[6] = PathFileExtension;
toInsert[7] = SizeBytes;
toInsert[8] = SizeMB;
toInsert[9] = DateCreated;
toInsert[10] = DateModified;
toInsert[11] = DateLastAccessed;
toInsert[12] = Owner;
toInsert[13] = PathLength;
toInsert[14] = RecordWriteDateTime;
results_1.Rows.Add(toInsert);
}
else
{
DataRow toInsert = results_2.NewRow();
toInsert[0] = ServerHostname;
toInsert[1] = RootDirectory;
toInsert[2] = RecordType;
toInsert[3] = Path;
toInsert[4] = PathDirectory;
toInsert[5] = PathFileName;
toInsert[6] = PathFileExtension;
toInsert[7] = SizeBytes;
toInsert[8] = SizeMB;
toInsert[9] = DateCreated;
toInsert[10] = DateModified;
toInsert[11] = DateLastAccessed;
toInsert[12] = Owner;
toInsert[13] = PathLength;
toInsert[14] = RecordWriteDateTime;
results_2.Rows.Add(toInsert);
}
}
catch (Exception error)
{
errorLogging(error, getCurrentMethod(), logFile);
}
}
...and here's the context, the looping piece itself:
private static void processTargetDirectory(DirectoryInfo rootDirectory, string targetPathRoot)
{
DateTime StartTime = DateTime.Now;
int directoryCount = 0;
int fileCount = 0;
try
{
manageDataTables();
Console.WriteLine(rootDirectory.FullName);
writeToLog(@"Working in Directory: " + rootDirectory.FullName, logFile, getLineNumber(), getCurrentMethod(), true);
applicationsDirectoryCount++;
// REPORT DIRECTORY INFO //
string directoryOwner = "";
try
{
directoryOwner = File.GetAccessControl(rootDirectory.FullName).GetOwner(typeof(System.Security.Principal.NTAccount)).ToString();
}
catch (Exception error)
{
//writeToLog("\t" + rootDirectory.FullName, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
writeToLog("[" + error.Message + "] - " + rootDirectory.FullName, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
errorLogging(error, getCurrentMethod(), logFile);
directoryOwner = "SeparatedUser";
}
writeToRawLog(serverHostname + "," + targetPathRoot + "," + "Directory" + "," + rootDirectory.Name + "," + rootDirectory.Extension + "," + 0 + "," + 0 + "," + rootDirectory.CreationTime + "," + rootDirectory.LastWriteTime + "," + rootDirectory.LastAccessTime + "," + directoryOwner + "," + rootDirectory.FullName.Length + "," + DateTime.Now + "," + rootDirectory.FullName + "," + "", logResultsFile, true, logFile);
//writeToDBLog(serverHostname, targetPathRoot, "Directory", rootDirectory.FullName, "", rootDirectory.Name, rootDirectory.Extension, 0, 0, rootDirectory.CreationTime, rootDirectory.LastWriteTime, rootDirectory.LastAccessTime, directoryOwner, rootDirectory.FullName.Length, DateTime.Now);
writeToDataTable(serverHostname, targetPathRoot, "Directory", rootDirectory.FullName, "", rootDirectory.Name, rootDirectory.Extension, 0, 0, rootDirectory.CreationTime, rootDirectory.LastWriteTime, rootDirectory.LastAccessTime, directoryOwner, rootDirectory.FullName.Length, DateTime.Now);
if (rootDirectory.GetDirectories().Length > 0)
{
Parallel.ForEach(rootDirectory.GetDirectories(), new ParallelOptions { MaxDegreeOfParallelism = directoryDegreeOfParallelism }, dir =>
{
directoryCount++;
Interlocked.Increment(ref threadCount);
processTargetDirectory(dir, targetPathRoot);
});
}
// REPORT FILE INFO //
Parallel.ForEach(rootDirectory.GetFiles(), new ParallelOptions { MaxDegreeOfParallelism = fileDegreeOfParallelism }, file =>
{
applicationsFileCount++;
fileCount++;
Interlocked.Increment(ref threadCount);
processTargetFile(file, targetPathRoot);
});
}
catch (Exception error)
{
writeToLog(error.Message, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
errorLogging(error, getCurrentMethod(), logFile);
}
finally
{
Interlocked.Decrement(ref threadCount);
}
DateTime EndTime = DateTime.Now;
writeToLog(@"Run time for " + rootDirectory.FullName + @" is: " + (EndTime - StartTime).ToString() + @" | File Count: " + fileCount + @", Directory Count: " + directoryCount, logTimingFile, getLineNumber(), getCurrentMethod(), true);
}
Like noted above, this is quick & dirty, but works very well.
For memory-related issues I ran into once I got to around 2,000,000 records, I had to create a second DataTable and alternate between the 2, dumping the records to SQL server between alternation. So my SQL connections consist of 1 every 100,000 records.
I managed that like this:
private static void manageDataTables()
{
try
{
Console.WriteLine(@"[Checking datatable size] toggleValue: " + tableToggle + " | " + @"r1: " + results_1.Rows.Count + " - " + @"r2: " + results_2.Rows.Count);
if (tableToggle)
{
int rowCount = 0;
if (results_1.Rows.Count > datatableRecordCountThreshhold)
{
tableToggle ^= true;
writeToLog(@"results_1 row count > 100000 @ " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
rowCount = results_1.Rows.Count;
logResultsFile = "FileServerReport_Results_" + DateTime.Now.ToString("yyyyMMdd-HHmmss") + ".txt";
Thread.Sleep(5000);
if (results_1.Rows.Count != rowCount)
{
writeToLog(@"results_1 row count increased, @ " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
rowCount = results_1.Rows.Count;
Thread.Sleep(15000);
}
writeToLog(@"results_1 row count stopped increasing, updating database...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
updateDatabase(results_1);
results_1.Clear();
writeToLog(@"results_1 cleared, count: " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
}
}
else
{
int rowCount = 0;
if (results_2.Rows.Count > datatableRecordCountThreshhold)
{
tableToggle ^= true;
writeToLog(@"results_2 row count > 100000 @ " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
rowCount = results_2.Rows.Count;
logResultsFile = "FileServerReport_Results_" + DateTime.Now.ToString("yyyyMMdd-HHmmss") + ".txt";
Thread.Sleep(5000);
if (results_2.Rows.Count != rowCount)
{
writeToLog(@"results_2 row count increased, @ " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
rowCount = results_2.Rows.Count;
Thread.Sleep(15000);
}
writeToLog(@"results_2 row count stopped increasing, updating database...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
updateDatabase(results_2);
results_2.Clear();
writeToLog(@"results_2 cleared, count: " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
}
}
}
catch (Exception error)
{
errorLogging(error, getCurrentMethod(), logDatabaseFile);
}
}
Where "datatableRecordCountThreshhold = 100000"