Reductions in parallel in logarithmic time
Given n
partial sums it's possible to sum all the partial sums in log2 parallel steps. For example assume there are eight threads with eight partial sums: s0, s1, s2, s3, s4, s5, s6, s7
. This could be reduced in log2(8) = 3
sequential steps like this;
thread0 thread1 thread2 thread4
s0 += s1 s2 += s3 s4 += s5 s6 +=s7
s0 += s2 s4 += s6
s0 += s4
I would like to do this with OpenMP but I don't want to use OpenMP's reduction
clause. I have come up with a solution but I think a better solution can be found maybe using OpenMP's task
clause.
This is more general than scalar addition. Let me choose a more useful case: an array reduction (see here, here, and here for more about array reductions).
Let's say I want to do an array reduction on an array a
. Here is some code which fills private arrays in parallel for each thread.
int bins = 20;
int a[bins];
int **at; // array of pointers to arrays
for(int i = 0; i<bins; i++) a[i] = 0;
#pragma omp parallel
{
#pragma omp single
at = (int**)malloc(sizeof *at * omp_get_num_threads());
at[omp_get_thread_num()] = (int*)malloc(sizeof **at * bins);
int a_private[bins];
//arbitrary function to fill the arrays for each thread
for(int i = 0; i<bins; i++) at[omp_get_thread_num()][i] = i + omp_get_thread_num();
}
At this point I have have an array of pointers to arrays for each thread. Now I want to add all these arrays together and write the final sum to a
. Here is the solution I came up with.
#pragma omp parallel
{
int n = omp_get_num_threads();
for(int m=1; n>1; m*=2) {
int c = n%2;
n/=2;
#pragma omp for
for(int i = 0; i<n; i++) {
int *p1 = at[2*i*m], *p2 = at[2*i*m+m];
for(int j = 0; j<bins; j++) p1[j] += p2[j];
}
n+=c;
}
#pragma omp single
memcpy(a, at[0], sizeof *a*bins);
free(at[omp_get_thread_num()]);
#pragma omp single
free(at);
}
Let me try and explain what this code does. Let's assume there are eight threads. Let's define the +=
operator to mean to sum over the array. e.g. s0 += s1
is
for(int i=0; i<bins; i++) s0[i] += s1[i]
then this code would do
n thread0 thread1 thread2 thread4
4 s0 += s1 s2 += s3 s4 += s5 s6 +=s7
2 s0 += s2 s4 += s6
1 s0 += s4
But this code is not ideal as I would like it.
One problem is that there are a few implicit barriers which require all the threads to sync. These barriers should not be necessary. The first barrier is between filling the arrays and doing the reduction. The second barrier is in the #pragma omp for
declaration in the reduction. But I can't use the nowait
clause with this method to remove the barrier.
Another problem is that there are several threads that don't need to be used. For example with eight threads. The first step in the reduction only needs four threads, the second step two threads, and the last step only one thread. However, this method would involve all eight threads in the reduction. Although, the other threads don't do much anyway and should go right to the barrier and wait so it's probably not much of an issue.
My instinct is that a better method can be found using the omp task
clause. Unfortunately I have little experience with the task
clause and all my efforts so far with it do a reduction better than what I have now have failed.
Can someone suggest a better solution to do the reduction in logarithmic time using e.g. OpenMP's task
clause?
I found a method which solves the barrier problem. This reduces asynchronously. The only remaining problem is that it still puts threads which don't participate in the reduction into a busy loop. This method uses something like a stack to push pointers to the stack (but never pops them) in critical sections (this was one of the keys as critical sections don't have implicit barriers. The stack is operated on serially but the reduction in parallel.
Here is a working example.
#include <stdio.h>
#include <omp.h>
#include <stdlib.h>
#include <string.h>
void foo6() {
int nthreads = 13;
omp_set_num_threads(nthreads);
int bins= 21;
int a[bins];
int **at;
int m = 0;
int nsums = 0;
for(int i = 0; i<bins; i++) a[i] = 0;
#pragma omp parallel
{
int n = omp_get_num_threads();
int ithread = omp_get_thread_num();
#pragma omp single
at = (int**)malloc(sizeof *at * n * 2);
int* a_private = (int*)malloc(sizeof *a_private * bins);
//arbitrary fill function
for(int i = 0; i<bins; i++) a_private[i] = i + omp_get_thread_num();
#pragma omp critical (stack_section)
at[nsums++] = a_private;
while(nsums<2*n-2) {
int *p1, *p2;
char pop = 0;
#pragma omp critical (stack_section)
if((nsums-m)>1) p1 = at[m], p2 = at[m+1], m +=2, pop = 1;
if(pop) {
for(int i = 0; i<bins; i++) p1[i] += p2[i];
#pragma omp critical (stack_section)
at[nsums++] = p1;
}
}
#pragma omp barrier
#pragma omp single
memcpy(a, at[2*n-2], sizeof **at *bins);
free(a_private);
#pragma omp single
free(at);
}
for(int i = 0; i<bins; i++) printf("%d ", a[i]); puts("");
for(int i = 0; i<bins; i++) printf("%d ", (nthreads-1)*nthreads/2 +nthreads*i); puts("");
}
int main(void) {
foo6();
}
I sill feel a better method may be found using tasks which does not put the threads not being used in a busy loop.
Solution 1:
Actually, it is quite simple to implement that cleanly with tasks using a recursive divide-and-conquer approach. This is almost textbook code.
void operation(int* p1, int* p2, size_t bins)
{
for (int i = 0; i < bins; i++)
p1[i] += p2[i];
}
void reduce(int** arrs, size_t bins, int begin, int end)
{
assert(begin < end);
if (end - begin == 1) {
return;
}
int pivot = (begin + end) / 2;
/* Moving the termination condition here will avoid very short tasks,
* but make the code less nice. */
#pragma omp task
reduce(arrs, bins, begin, pivot);
#pragma omp task
reduce(arrs, bins, pivot, end);
#pragma omp taskwait
/* now begin and pivot contain the partial sums. */
operation(arrs[begin], arrs[pivot], bins);
}
/* call this within a parallel region */
#pragma omp single
reduce(at, bins, 0, n);
As far as i can tell, there are no unnecessary synchronizations and there is no weird polling on critical sections. It also works naturally with a data size different than your number of ranks. I find it very clean and easy to understand. So I do indeed think this is better than both of your solutions.
But let's look at how it performs in practice*. For that we can use Score-p and Vampir:
*bins=10000
so the reduction actually takes a little bit of time. Executed on a 24-core Haswell system w/o turbo. gcc 4.8.4, -O3
. I added some buffer around the actual execution to hide initialization/post-processing
The picture reveals what is happening at any thread within the application on a horizontal time-axis. The tree implementations from top to bottom:
-
omp for
loop -
omp critical
kind of tasking. omp task
This shows nicely how the specific implementations actually execute. Now it seems that the for loop is actually the fastest, despite the unnecessary synchronizations. But there are still a number of flaws in this performance analysis. For example, I didn't pin the threads. In practice NUMA (non-uniform memory access) matters a lot: Does the core does have this data in it's own cache / memory of it's own socket? This is where the task solution becomes non-deterministic. The very significant variance among repetitions is not considered in the simple comparison.
If the reduction operation becomes variable in runtime, then the task solution will become better than thy synchronized for loop.
The critical
solution has some interesting aspect, the passive threads are not continuously waiting, so they will more likely consume CPU resources. This can be bad for performance e.g. in case of turbo mode.
Remember that the task
solution has more optimization potential by avoiding spawning tasks that immediately return. How these solutions perform also highly depends on the specific OpenMP runtime. Intel's runtime seems to do much worse for tasks.
My recommendation is:
- Implement the most maintainable solution with optimal algorithmic complexity
- Measure which parts of the code actually matter for run-time
- Analyze based on actual measurements what is the bottleneck. In my experience it is more about NUMA and scheduling rather than some unnecessary barrier.
- Perform the micro-optimization based on your actual measurements
Linear solution
Here is the timeline for the linear proccess_data_v1
from this question.
OpenMP 4 Reduction
So I thought about OpenMP reduction. The tricky part seems to be getting the data from the at
array inside the loop without a copy. I do initialize the worker array with NULL
and simply move the pointer the first time:
void meta_op(int** pp1, int* p2, size_t bins)
{
if (*pp1 == NULL) {
*pp1 = p2;
return;
}
operation(*pp1, p2, bins);
}
// ...
// declare before parallel region as global
int* awork = NULL;
#pragma omp declare reduction(merge : int* : meta_op(&omp_out, omp_in, 100000)) initializer (omp_priv=NULL)
#pragma omp for reduction(merge : awork)
for (int t = 0; t < n; t++) {
meta_op(&awork, at[t], bins);
}
Surprisingly, this doesn't look too good:
top is icc 16.0.2
, bottom is gcc 5.3.0
, both with -O3
.
Both seem to implement the reduction serialized. I tried to look into gcc
/ libgomp
, but it's not immediately apparent to me what is happening. From intermediate code / disassembly, they seem to be wrapping the final merge in a GOMP_atomic_start
/end
- and that seems to be a global mutex. Similarly icc
wraps the call to the operation
in a kmpc_critical
. I suppose there wasn't much optimization going into costly custom reduction operations. A traditional reduction can be done with a hardware-supported atomic operation.
Notice how each operation
is faster because the input is cached locally, but due to the serialization it is overall slower. Again this is not a perfect comparison due to high variances, and earlier screenshots were with different gcc
version. But the trend is clear, and I also have data on the cache effects.