Branchless K-means (or other optimizations)
Too bad we can't use SSE4.1, but very well then, SSE2 it is. I haven't tested this, just compiled it to see if there were syntax errors and to see whether the assembly made sense (it's mostly alright, though GCC spills min_index
even with some xmm
registers not used, not sure why that happens)
int find_closest(float *x, float *y, float *z,
float pt_x, float pt_y, float pt_z, int n) {
__m128i min_index = _mm_set_epi32(3, 2, 1, 0);
__m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x));
__m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y));
__m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z));
__m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i index = min_index;
for (int i = 4; i < n; i += 4) {
xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x + i));
ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y + i));
zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z + i));
__m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
index = _mm_add_epi32(index, _mm_set1_epi32(4));
__m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
min_dist = _mm_min_ps(min_dist, dist);
min_index = _mm_or_si128(_mm_and_si128(index, mask),
_mm_andnot_si128(mask, min_index));
}
float mdist[4];
_mm_store_ps(mdist, min_dist);
uint32_t mindex[4];
_mm_store_si128((__m128i*)mindex, min_index);
float closest = mdist[0];
int closest_i = mindex[0];
for (int i = 1; i < 4; i++) {
if (mdist[i] < closest) {
closest = mdist[i];
closest_i = mindex[i];
}
}
return closest_i;
}
As usual, it expects the pointers to be 16-aligned. Also, the padding should be with points at infinity (so they're never closest to the target).
SSE 4.1 would let you replace this
min_index = _mm_or_si128(_mm_and_si128(index, mask),
_mm_andnot_si128(mask, min_index));
By this
min_index = _mm_blendv_epi8(min_index, index, mask);
Here's an asm version, made for vsyasm, tested a bit (seems to work)
bits 64
section .data
align 16
centroid_four:
dd 4, 4, 4, 4
centroid_index:
dd 0, 1, 2, 3
section .text
global find_closest
proc_frame find_closest
;
; arguments:
; ecx: number of points (multiple of 4 and at least 4)
; rdx -> array of 3 pointers to floats (x, y, z) (the points)
; r8 -> array of 3 floats (the reference point)
;
alloc_stack 0x58
save_xmm128 xmm6, 0
save_xmm128 xmm7, 16
save_xmm128 xmm8, 32
save_xmm128 xmm9, 48
[endprolog]
movss xmm0, [r8]
shufps xmm0, xmm0, 0
movss xmm1, [r8 + 4]
shufps xmm1, xmm1, 0
movss xmm2, [r8 + 8]
shufps xmm2, xmm2, 0
; pointers to x, y, z in r8, r9, r10
mov r8, [rdx]
mov r9, [rdx + 8]
mov r10, [rdx + 16]
; reference point is in xmm0, xmm1, xmm2 (x, y, z)
movdqa xmm3, [rel centroid_index] ; min_index
movdqa xmm4, xmm3 ; current index
movdqa xmm9, [rel centroid_four] ; index increment
paddd xmm4, xmm9
; calculate initial min_dist, xmm5
movaps xmm5, [r8]
subps xmm5, xmm0
movaps xmm7, [r9]
subps xmm7, xmm1
movaps xmm8, [r10]
subps xmm8, xmm2
mulps xmm5, xmm5
mulps xmm7, xmm7
mulps xmm8, xmm8
addps xmm5, xmm7
addps xmm5, xmm8
add r8, 16
add r9, 16
add r10, 16
sub ecx, 4
jna _tail
_loop:
movaps xmm6, [r8]
subps xmm6, xmm0
movaps xmm7, [r9]
subps xmm7, xmm1
movaps xmm8, [r10]
subps xmm8, xmm2
mulps xmm6, xmm6
mulps xmm7, xmm7
mulps xmm8, xmm8
addps xmm6, xmm7
addps xmm6, xmm8
add r8, 16
add r9, 16
add r10, 16
movaps xmm7, xmm6
cmpps xmm6, xmm5, 1
minps xmm5, xmm7
movdqa xmm7, xmm6
pand xmm6, xmm4
pandn xmm7, xmm3
por xmm6, xmm7
movdqa xmm3, xmm6
paddd xmm4, xmm9
sub ecx, 4
ja _loop
_tail:
; calculate horizontal minumum
pshufd xmm0, xmm5, 0xB1
minps xmm0, xmm5
pshufd xmm1, xmm0, 0x4E
minps xmm0, xmm1
; find index of the minimum
cmpps xmm0, xmm5, 0
movmskps eax, xmm0
bsf eax, eax
; index into xmm3, sort of
movaps [rsp + 64], xmm3
mov eax, [rsp + 64 + rax * 4]
movaps xmm9, [rsp + 48]
movaps xmm8, [rsp + 32]
movaps xmm7, [rsp + 16]
movaps xmm6, [rsp]
add rsp, 0x58
ret
endproc_frame
In C++:
extern "C" int find_closest(int n, float** points, float* reference_point);
You could use a branchless ternary operator, sometimes called bitselect ( condition ? true : false).
Just use it for the 2 members, defaulting to doing nothing.
Don't worry about the extra operations, they are nothing compared to the if statement branching.
bitselect implementation:
inline static int bitselect(int condition, int truereturnvalue, int falsereturnvalue)
{
return (truereturnvalue & -condition) | (falsereturnvalue & ~(-condition)); //a when TRUE and b when FALSE
}
inline static float bitselect(int condition, float truereturnvalue, float falsereturnvalue)
{
//Reinterpret floats. Would work because it's just a bit select, no matter the actual value
int& at = reinterpret_cast<int&>(truereturnvalue);
int& af = reinterpret_cast<int&>(falsereturnvalue);
int res = (at & -condition) | (af & ~(-condition)); //a when TRUE and b when FALSE
return reinterpret_cast<float&>(res);
}
And your loop should look like this:
for (int i=0; i < num_centroids; ++i)
{
const ClusterCentroid& cent = centroids[i];
const float dist = ...;
bool isSmaeller = dist < pt.min_dist;
//use same value if not smaller
pt.min_index = bitselect(isSmaeller, i, pt.min_index);
pt.min_dist = bitselect(isSmaeller, dist, pt.min_dist);
}
C++ is a high-level language. Your assumption that control flow in the C++ source code translates into branching instructions is flawed. I don't have the definition of some types from your example, so I made a simple test program with similar conditional assignments:
int g(int, int);
int f(const int *arr)
{
int min = 10000, minIndex = -1;
for ( int i = 0; i < 1000; ++i )
{
if ( arr[i] < min )
{
min = arr[i];
minIndex = i;
}
}
return g(min, minIndex);
}
Note that the use of the undefined "g" is merely to prevent the optimizer from deleting everything. I translated this with G++ 4.9.2 with -O3 and -S into x86_64 assembly (without even having to change the default for -march) and the (not overly surprising) result is that the loop body contains no branches
movl (%rdi,%rax,4), %ecx
movl %edx, %r8d
cmpl %edx, %ecx
cmovle %ecx, %r8d
cmovl %eax, %esi
addq $1, %rax
Apart from that, the assumption that branchless is necessarily faster may also be flawed because the probability that a new distance "beats" the old is decreasing the more elements you have looked at. It's not a coin toss. The "bitselect" trick was invented when compilers were much less aggressive at generating "as-if" assembly than they are today. I would much rather suggest to take a look at the kind of assembly your compiler is actually generating before either trying to rework the code so the compiler is better able to optimize it, or taking the result as a basis for hand-written assembly. If you want to look into SIMD, I would suggest trying a "minimum of minimums" approach with reduced data dependencies (in my example, the dependencies on "min" are probably a bottleneck).
Firstly, I'd suggest that before you try any code changes you look at the disassembly in an optimized build. Ideally you want to look at the profiler data at an assembly level. This can show up various things, for example:
- The compiler may not have generated an actual branch instruction.
- The line of code that has the bottleneck may have many more instructions associated with it than you might think - the dist calculation for example.
In addition to that there's the standard trick that when you're talking about distances computing them often requires a square root. You should do that square root at the end of the process on the minimum squared value.
SSE can process four values at once, without any branches, using _mm_min_ps. If you really need speed then you want to be using SSE (or AVX) intrinsics. Here's a basic example:
float MinimumDistance(const float *values, int count)
{
__m128 min = _mm_set_ps(FLT_MAX, FLT_MAX, FLT_MAX, FLT_MAX);
int i=0;
for (; i < count - 3; i+=4)
{
__m128 distances = _mm_loadu_ps(&values[i]);
min = _mm_min_ps(min, distances);
}
// Combine the four separate minimums to a single value
min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(2, 3, 0, 1)));
min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(1, 0, 3, 2)));
// Deal with the last 0-3 elements the slow way
float result = FLT_MAX;
if (count > 3) _mm_store_ss(&result, min);
for (; i < count; i++)
{
result = min(values[i], result);
}
return result;
}
For best SSE performance you should make sure the loads happen at aligned addresses. You can handle the first few misaligned elements in the same way as the last few in the code above if necessary.
The other thing to watch out for is memory bandwidth. If there's several members of the ClusterCentroid structure that you don't use during that loop then you'll be reading much more data from memory than you really need to as memory is read in cache line sized chunks, which are 64 bytes each.