Monday, March 3, 2014

SIMD-ifying Google Code Jam "Shop Credit" puzzle - Part 5, Binary Search

Yes, 2 in a row for today.  If you just read my post on doing a sort & search you saw I broke the O(n^2)-ness of the previous attempts, but was still slower than the best (vector) version.

1. "Tree" Version

And one reason why is that the sort & search is a bit wasteful.  It does n*log(n) work to sort and then another log(n) to search.  But the n*log(n) work is 'tossed' at the end as only one query is done on all that sorting work.

So then it occurred to me to instead 'search as you go'.  If I were to do a binary tree, I can insert an item in log(n) time.  I still must do O(n*log(n)) work, but maybe I can skip some of the sorting work.  That is, maybe I can skip sorting things I never need to scan.

Here's the code I came up with.  First I defined a struct to hold the data for an item.

struct item {
    int price;
    int idx;
    item* right;
    item* left;
};

The 'right' and 'left' pointers define the our 2 nodes - the left being smaller and the right being larger in price.  Next, as before I do my min / max determination for later culling.  

bool tree_version(int sz, item* items, int credit, int& ans1, int& ans2) {

    int min = items[0].price;
    int max = items[0].price;
    for (int i = 0; i < sz; i++) {
        min = items[i].price < min ? items[i].price : min;
        max = items[i].price > max ? items[i].price : max;
    }

Now it's time to build the binary tree.  I'm not being very fancy about it; it's not a balanced tree, for instance.

        for (int i = 1; i < sz; i++) {
        int goal = credit - items[i].price;
        if (goal > max) continue;
        if (goal < min) continue;

        item* it = &items[i];
        item* head = &items[0];

        while (true) {
            if (it->price < head->price) {
                if (head->left) {
                    head = head->left;
                } else {
                    head->left = it;
                    break;
                }
            } else {
                if (head->right) {
                    head = head->right;
                } else {
                    head->right = it;
                    break;
                }
            }
        }
    // look through the current tree
        if (scan(sz, &items[0], goal, ans1, ans2, i)) {
            return 1;
        }
    }

Note the 'scan' function is in the insert itself.  Since we are discarding the tree after one query we can abort when we find what we want.  

The 'scan' implementation is pretty simple...except for one thing....in my first version I did "did I find the answer" check first.  But then I realized this is the least common case.  Most of the time we don't find the answer.  By moving this to be the 3rd conditional clause I branch less often.  This showed up as a 6% gain.

bool scan(int sz, const item* items, int goal, int& ans1, int& ans2, 
    int skip_idx) {
    // scan tree
    const item* head = items;
    while (head) {
        if (head->price > goal) {
    // traverse left
            head = head->left;
        } else if (head->price < goal) {
    // traverse right
            head = head->right;
        } else if (head->idx != skip_idx) {
    // have I found the answer?
            ans1 = skip_idx < head->idx ? skip_idx + 1: head->idx+ 1;
            ans2 = skip_idx > head->idx ? skip_idx + 1: head->idx+ 1;
            return 1;
        } else {
    // traverse left; this is the case where I have found an item which 
    //  when added to itself gives the goal, which is not desired
            head = head->left;
        }
    }
    return 0;

}
   
2. Results

Version
Performance (abs)
Speedup
Time to Implement
Naive (scalar)
32M clocks
1x
15 mins
Less Naive (scalar)
16M clocks
2x
10 mins
Min / Max Cull (scalar) 
9M clocks
3.5x
20 mins
Intrinsics (vector) 
4M clocks
8x
540 mins
Sort + search (scalar) 
5M clocks
6.4x
20 mins
Binary Tree (scalar) 
3.4M clocks
9.4x
17 mins

Hooray!  A new record.  And again, there may be vector opportunities to add on top of this.



3. Conclusions

  1. In this case, at least so far, proper use of the scalar hardware has actually surpassed the best vector result
  2. Moreover, recall that while all 'scalar' versions are plainly-compiled C++, the vector version required tricky, non-portable, intricate intrinsics work.
  3. It would be exciting to see what vector + the BST can do.  Maybe the characteristics of the vector instruction set will preclude techniques that got us this far.
  4. This is an incremental improvement over the 'sort' version that takes advantage of 'early out' situations.


SIMD-ifying Google Code Jam "Shop Credit" puzzle - Part 4 - Sorting

Ok, time to take a break from SIMD, at least for now.

1. O(n^2)
One problem with all SIMD approaches thus far is that they have all been O(n^2).  Remember the super naive version?

bool naive(int sz, const int* data, int credit, int& ans1, int& ans2) {
    int min, max;
    for (int i = 0; i < sz; i++) {
        for (int j = 0; j < sz; j++) {
            if (i == j) continue;
            if (data[i] + data[j] == credit) {
                ans1 = i < j ? i + 1 : j + 1;
                ans2 = i < j ? j + 1 : i + 1;
                return 1;
            }
        }
    }
    return 0;
}

We see that for each search we do an 'i' and a 'j' loop each (potentially) as large as 'sz'.  Hence, our O(n^2).

In our SIMD version we had cut down that 'j' loop by the vector width (4 for SSE)....

   for (int i = 0; i < sz; i++) {
   // <...>
        int iters = (sz - i) / 4 + 1;
        for (int j = 0; j <  iters; j++) {
   // <...>
                }
      }

But O(n^2) / 4 is - at least for non-trivial sizes of 'n' dominated by n^2.

2. Sorting
Of the "2 n's" that multiply together, there isn't much we can do about the 'outer' one.  One way or another we will have to scan all the items once on average.  But what about the 'inner' n?

We know that a good sorting algorithm can find data in log2(n) time.  But it also costs us n*log2(n) time to sort it for a total of n*log2(n) + log2(n).

So, how does n^2 compare with n*log2(n) + log2(n)?  This of course reduces to (n + 1)*log2(n), which devolves to O(n*log2(n)), which is always less than n^2.  But in case you don't believe me, here's what Wolfram Alpha has to say about it.

So I tried it out and implemented it like this:

bool sort_version(int sz, int* data, int credit, int& ans1, int& ans2) {
    int min = data[0];
    int max = data[0];
    int j;
    int idxs[sz];
    for (int i = 0; i < sz; i++) {
        idxs[i] = i;
    }

    qsort(data, idxs, range(0, sz - 1));

    for (int i = 0; i < sz; i++) {
        min = data[i] < min ? data[i] : min;
        max = data[i] > max ? data[i] : max;
    }

    for (int i = 0; i < sz; i++) {
        if (data[i] + max < credit) continue; // hardly helps
        if (data[i] + min > credit) continue; // helps; 2x

        int j = find(data, credit - data[i], range(i+1, sz));
        if (j == -1) continue;
        ans1 = idxs[i] + 1;
        ans2 = idxs[j] + 1;

        if (ans1 > ans2) {
            int tmp = ans1;
            ans1 = ans2;
            ans2 = tmp;
        }
        return 1;
    }
    return 0;
}

Here's our results along with our previous work

Version
Performance (abs)
Speedup
Time to Implement
Naive (scalar)
32M clocks
1x
15 mins
Less Naive (scalar)
16M clocks
2x
10 mins
Min / Max Cull (scalar) 
9M clocks
3.5x
20 mins
Intrinsics (vector) 
4M clocks
8x
540 mins
Sort + search (scalar) 
5M clocks
6.4x
20 mins

3. Conclusions
Yes, we lost ground to the vector version.  However note:

  1. We are not using vector hardware...yet.  This result really compares against the other bolded row, which was the best non-vector result we achieved.  So, we achieved 1.8x by doing sorting / searching.  Who knows?  With a vectorized sort / search we might achieve 12.8x...
  2. This is in some sense a worst case scenario for sorting.  We are dong n*log(n) work to sort, then log(n) to search, and then throwing all that sorting work away.  If, say, we were tasked for finding lots of item combinations within the same inventory we could expect O(log(n)) results, which would crush all results so far.  In fact I project if searching was done to the extent the sorting were to be negligible, the result would be a gain of 3500x
  3. The sort / search version tracks O(n*log(n)) very nicely as you can see in the chart below.  The other versions 'get lucky' when they find answers close to the beginning.  The vector version is simply an attenuated result of the scalar version.
  4. Realize also we are dealing with only 1800 points.  For much larger working sets the sort / search version would crush it.
  5. Nonetheless, it is clear that if you have to pick where to invest your tuning time in workloads like this you likely want to do your vectorization work last.  I spent more time on the vectorized version than all the others combined.








Friday, February 7, 2014

SIMD-ifying Google Code Jam "Shop Credit" puzzle - Part 3

So last time I indulged in a reverie tuning the part of the 'Shop Credit' puzzle that doesn't affect performance - the min / max determination.  Let's get down to business and SIMD-ify the part that does matter.

Recall we are given a list of numbers and a desired sum.  We need to find the which of the 2 numbers in the list add up to the sum.  So if we had:

sum: 50
list: 0, 1, 2, 3, 4, 5, 6, 20, 30

...we want the answer {20, 30}

With SIMD we are looking for a way to do 'n' checks at a time preferably on data that lives sequentially.

There are different strategies. Here's the one I used in my program.

1) Select the first item in the list.  Let's call this A.
2) Take 4 (say for SSE) more items from the list.  Let's call them B{1-4}
3) See if any of A + B{1-4} adds up to our sum
4) If so, determine which of the 4 did and leave
5) Else, pick 4 more items, goto 2

First, here's the whole code.  Then I'll break it down to show how it handles these steps.

    __m128i credit_exp = _mm_set1_epi32(credit);

    for (int i = 0; i < sz; i++) {
        int A = data[i];
        if (A + max < credit) continue; // hardly helps
        if (A + min > credit) continue; // helps; 2x

        __m128i A_broadcast = _mm_set1_epi32(A);

        int start_at = (i / 4)*4;
        const int* B_ptr = &data[start_at];
        int iters = (sz - i) / 4 + 1;
        for (int j = 0; j <  iters; j++) {
            __m128i B_vals = _mm_load_si128((__m128i*)B_ptr);
            __m128i sum    = _mm_add_epi32(B_vals, A_broadcast);

            __m128i cmp = _mm_cmpeq_epi32(sum, credit_exp);
            int lanes = _mm_movemask_ps((__m128)cmp);
            if (lanes != 0) {
                if (j == 0) {
                    lanes &= ~(1 << (i % 4));
                }
                if (j ==  iters -1) {
                    int mask = 0x0f >> (3 - ((sz + 3)% 4));
                    lanes &= mask;
                }
            }

            if (lanes) {
                int answ = 4*j + start_at + _bit_scan_reverse(lanes);
                ans1 = i < answ ? i + 1 : answ + 1;
                ans2 = i < answ ? answ + 1 : i + 1;
                return 1;
            }
            B_ptr += 4;
        }
    }


1) Select the first item in the list.  Let's call this A.

 __m128i credit_exp = _mm_set1_epi32(credit);

We will want to compare multiple lanes to the credit (correct sum).  To do this we must have the sum in all lanes.  This takes the credit and broadcasts it to all lanes.

for (int i = 0; i < sz; i++) {
    int A = data[i];
    if (A + max < credit) continue; // hardly helps
    if (A + min > credit) continue; // helps; 2x

Here we are simply walking through our loop just as before.  We are doing the same "don't proceed if impossible" check as before (that is, if the target sum is 500, the max is 400 and this value is 5, skip).  It would be interesting to go back and have this check 4 at a time.

    __m128i A_broadcast = _mm_set1_epi32(A);

Just like 'credit', we broadcast 'A' to all lanes so it can be easily compared later.

2) Take 4 more items from the list.  Let's call them B{1-4}

    int start_at = (i / 4)*4;
    const int* B_ptr = &data[start_at];


Recall that if we are at the outer ('A') check of the 10th list element, we can skip checking elements 0-9 in our inner loop.  We are doing the same thing here.

Almost.  At least with SSE SIMD, one has to load an aligned chunk of data.  So when we are at outer element 5, we cannot start at 6; we must start at the smallest index that is at an alignment boundary.  Instead, as you see below, we must start at '4'

| 0 | 1 | 2 | 3 |
| 4 | 5 | 6 | 7 |
That our inner loop must start behind the outer loop in most cases will create another problem to solve later.

3) See if any of A + B{1-4} adds up to our sum

    int iters = (sz - i) / 4 + 1;
iters will tell us how many SIMD chunks we have to do.  If there are 10 elements like so....

| 0 | 1 | 2 | 3 |
| 4 | 5 | 6 | 7 |
| 8 | 9 |        

And our 'outer' index is at '2', we have 2 more SIMD groups to check (4-7 and 8,9).  Iters would compute to '2'.  Let's do our 'iters'.

    for (int j = 0; j <  iters; j++) {

        __m128i B_vals = _mm_load_si128((__m128i*)B_ptr);
        __m128i sum    = _mm_add_epi32(B_vals, A_broadcast);  
 

These 2 intrinsics are pretty simple; we are loading 4 'B' values into 'B_vals'.  Then we add our 'A' values to this.  If 'A' is 25 we have something like this.  All seem so simple, no?

   A: | 25 | 25 | 25 | 25 |
   B: | 14 |  5 |  9 |  7 |
 sum: | 39 | 30 | 34 | 32 |


4) If so, determine which of the 4 sums is the one we want

This is where things get nasty.  Finding the lane that compares to the answer we want is pretty easy:

        __m128i cmp = _mm_cmpeq_epi32(sum, credit_exp); 

The first intrinsic compares 'sum' and the broadcasted credit.  If sum was as above and the credit was 30, we'd have:


    sum: | 39 | 30 | 34 | 32 |

 credit: | 30 | 30 | 30 | 30 |

   cmp:  | 0x0 | 0xf0000000 | 0x0 | 0x0 |


SSE (and AVX for that matter) have a somewhat odd property for comparisons like this: the result in a lane is not '0' or '1' as a C-programmer would expect, but '0x0' or 'High bit set'.  This can create all kinds of puzzles if you are doing your own masking scheme, but I digress.

Luckily, pulling this 'cmp' out of lanes and into an integer is pretty easy:


        int lanes = _mm_movemask_ps((__m128)cmp);

In this case, 'lanes' would be 2.  The MOVMASK opcode which this intrinsic triggers in fact returns a bitfield for whichever lanes are true.  If the answer had been:
   cmp:  | 0x0 | 0xf0000000 | 0x0 | 0xf0000000 |


...'lanes' would instead by 10 (8 + 2).  We'll see why this is tricky in a minute.

4a) Do nasty stuff to make sure we really have the lane we want

I didn't have this listed as a step before so as to not scare you off.  Ok, here goes!

        if (lanes != 0) {

Ok, no big deal - we only have work to do if we in fact have one match.

            if (j == 0) {
                lanes &= ~(1 << (i % 4));
            }


What?!? Well recall the bit about how our 'inner' loop must overlap the outer loop.  This means sometimes we are checking an item against itself.  If the desired answer is 50, and 'A' is 25, then this overlap means we could get a false hit at 25 (A) + 25 (A) - that is, buying the same item twice.

This bit of bit twiddling says "if we are on the first SIMD chunk (which is the one that might overlap), mask out the element that corresponds to 'A'.  Note we must preserve what might be a legitimate match in another lane.

It gets worse.

             if (j ==  iters -1) {
                int mask = 0x0f >> (3 - ((sz + 4 - 1)% 4));
                lanes &= mask;
            }


Here's the other special case - when we get to the end of the line.  If we have a case like this:
| 0 | 1 | 2 | 3 |
| 4 | 5 | 6 | 7 |
| 8 | 9 |        


And we get to the third 'row', we must disregard the 2 slots after '9'.  They might be zero, or they might be garbage.  Either way they can trigger a false positive.  If the desired answer is 100 and we have 'A' = 100, then '0' would give a wrong answer.  The trickery on how this mask works is left as an exercise to the reader (gosh I've always wanted to say that).

Finally, we have the proper lane (if any).  Recall I said that the lanes are arranged in a bit field - 1, 2, 4, 8 for lanes 1, 2, 3, 4.  But to get our index we want the 1, 2, 3, 4.  One long-handed way is to do this:

        if (lanes) {
            int answ = j*4;
            if (lanes & 0x01) {
                answ += 0;
            }
            else if (lanes & 0x02) {
                answ += 1;
            }
            else if (lanes & 0x04) {
                answ += 2;
            }
            else if (lanes & 0x08) {
                answ += 3;
            }
        }


Yuck.  Well this is of course a First Find One operation.  Again, intrinsics come to our rescue (not that we were in much danger - this is a small portion of the computation)

        if (lanes) {
       int answ = 4*j + start_at + _bit_scan_reverse(lanes);
    }
And finally we're done.  Was it worth it?  Let's see:


The red line is our baseline - the not-quite-totally naive scalar version.  That is, it's the version that does the same tricks this version does, only with no intrinsic help.  As we'd expect, we see degradation at smaller datasets - teh SIMD version is even worse on one case - but we see a nice 2-ish average as we grow larger.

But though this is great, the amount of time investment for me was enormous vs. the non-SIMD versions.  No, I didn't code up those lane-conditioning tests perfectly in 5 minutes.  Truth be told, I probably invested a good day into shaping this up vs a matter of minutes - maybe an hour total for the previous scalar work.  2x is probably worth it to most people, but the bang for your programming time buck is not there.

And we have neglected the more dominant trend, which is this is still an O(N^2) solution.  This plot shows the clocks spent per point vs. how many points were checked.



This data is a bit skewed by the fact that often the code gets 'lucky' and finishes early.  Still, we would like to see this plot be constant, not with the clear trend toward increasing clocks-per-point for more points; obviously this will eventually blow up.

Till next time...

Friday, January 24, 2014

SIMD-ifying Google Code Jam "Shop Credit" puzzle - Part 2

Continuing where I left off last time...

I said I was going to try and SIMD-ify my work.  I'm going to do that, but I'm also going to break the cardinal rule of optimization and not optimize the thing that is most important.  Instead, I will invest unnecessary time in the min/max determination loop:

1. Scalar Version

The 'normal' code that will produce scalar ASM is as simple as can be...

    int min = data[0];
    int max = data[0];

    for (int i = 0; i < sz; i++) {
        min = data[i] < min ? data[i] : min;
        max = data[i] > max ? data[i] : max;
    }  

Let's look at the (scalar) ASM for this loop.  This is the Intel Compiler (ICC) ASM, though the G++ output is almost the same.

..B1.28:                  
  lea    (%rdx,%rdx), %esi    # compute offset for 'data[i]' 
                              #  (cute way to do %rdx*2)
  incl   %edx                 # incr loop counter
  movslq %esi, %rsi           # 'promote' offset
  movl   (%rbx,%rsi,4), %edi  # load data[i]
  cmpl   %r15d, %edi          # compare data[i] to min...
  movl   4(%rbx,%rsi,4), %r8d # load data[i+1]
  cmovl  %edi, %r15d          # ...store if data[i] < min
  cmpl   %r9d, %edi           # compare data[i] to max
  cmovge %edi, %r9d           # ...store if data[i] >= max
  cmpl   %r15d, %r8d          # compare data[i+1] to min
  cmovl  %r8d, %r15d          # ...store if data[i+1] < min
  cmpl   %r9d, %r8d           # compare data[i+1] to max
  cmovge %r8d, %r9d           # ...store if data[i+1] >= max
  cmpl   %eax, %edx           # done? 
  jb     ..B1.28              # ...if not, repeat

We see ICC has unrolled this loop for us; it is checking i and i+1 each loop iteration.  It has hoisted the loads up above most of the comparisons in an effort to hide the load latency via out-of-order magic.  I set up a test with more data than the Google puzzle; here's the time for 1,000,000 elements:

TestTime (clocks)Speedup
ICC Scalar (unrolled 2x)2.9M (2.9 cycles per iteration)1x

2. "Forced to not unroll" version

For the heck of it I hacked up the ASM to not unroll.  The new ASM is:

  sall $1, %eax  # somewhere, eax was set to sz / 2; I need to
                 #  double it again
..B1.28:
  movslq %esi, %rsi            # 'promote' offset
  movl   (%rbx,%rdx,4), %edi   # load data[i]
  incl   %edx                  # incr counter
  cmpl   %r15d, %edi           # compare data[i] to min
  cmovl  %edi, %r15d           # ...store if data[i] < min
  cmpl   %r9d, %edi            # compare data[i] to max
  cmovge %edi, %r9d            # ...store if data[i] >= max
  cmpl   %eax, %edx            # done?
  jb     ..B1.28               # ...if not, repeat

The ASM gets a bit simpler; I no longer need to do an LEA to compute an offset that is 2x my counter; I can just use the counter.  My shift-left-to-double-eax is hacky and would probably break in some cases, but there it is.  So, did performance worsen?

TestTime (clocks)Speedup
ICC Scalar (unrolled 2x)2.9M1x
ICC Scalar (not)2.8M1.03x

Nope.  Practically speaking the unrolling isn't helping nor hurting by much.  On the other hand, if the data was not in a nice linear layout we can imaging how the hoisted load would be well worth this tiny cost.

3. Intrinsic (SSE) Version

This of course should be a perfect SIMD opportunity.  It turns out ICC and G++ do vectorize this code, but I wrote this intrinsic code as an experiment anyway.  Here it is:

    int min = data[0];  // cannot init to 0!
    int max = data[0];

    // how many 'bundles' (full 4-wide chunks) do I have?
    int iters_to_do = sz/4;

    // if I have at least one...
    if (iters_to_do > 0) {

    // load a bundle into min & max
        __m128i _max = _mm_load_si128((__m128i*)&data[0]);
        __m128i _min = _max;

    // compare additional bundles to this one, one at a time
    //  adjusting min / max for each.  Note however this is a 
    //  *per lane* comparison
        for (int j = 1; j < iters_to_do; j++) {
            __m128i these_vals =
               _mm_load_si128((__m128i*)&data[j*4]);

            _min = _mm_min_epu32(_min, these_vals);
            _max = _mm_max_epu32(_max, these_vals);
        }   

     // Now each lane has the min/max for all values that
     //  occupied that lane.  'reduce' those values down to 
     //  1 scalar value    
        unsigned int min1 = _mm_extract_epi32(_min, 0); 
        unsigned int min2 = _mm_extract_epi32(_min, 1); 
        unsigned int min3 = _mm_extract_epi32(_min, 2); 
        unsigned int min4 = _mm_extract_epi32(_min, 3); 
        min = min1 < min ? min1 : min;
        min = min2 < min ? min2 : min;
        min = min3 < min ? min3 : min;
        min = min4 < min ? min4 : min;

        int max1 = _mm_extract_epi32(_max, 0); 
        int max2 = _mm_extract_epi32(_max, 1); 
        int max3 = _mm_extract_epi32(_max, 2); 
        int max4 = _mm_extract_epi32(_max, 3); 
        max = max1 > max ? max1 : max;
        max = max2 > max ? max2 : max;
        max = max3 > max ? max3 : max;
        max = max4 > max ? max4 : max;
    }    

    // epilogue: if the amount of data is not a multiple of 4,
    //  do the cleanup case scalar-style
    for (int i = iters_to_do*4; 
           i < iters_to_do*4 + sz % 4; i++) {
        min = data[i] < min ? data[i] : min;
        max = data[i] > max ? data[i] : max;
    }   


How did we do?   It seems to have paid off.  We were doing 2.9 cycles per iteration; now we need just half a clock to do the same check.


TestTime (clocks)Speedup
ICC Scalar (unrolled 2x)2,900,0001x
ICC Scalar (not)2,800,0001.03x
Intrinsics550,0005.2x


So what kind of ASM did this generate?  The main loop is pretty straightforward:

..B5.2:                         
  movdqu    (%rsi), %xmm1  # load initial bundle
  movl      $1, %r10d      # initialize counter to 1
  movdqa    %xmm1, %xmm0   # copy data to xmm0
  movl      $4, %edx       # init offset into data
  cmpl      $1, %eax       # is there > 1 bundle?
  jle       ..B5.6         # ...no?  bail
                                
..B5.4:                      
  incl      %r10d                # incr counter    
  movdqu    (%rsi,%rdx,4), %xmm2 # load next bundle
  addq      $4, %rdx             # add to offset for next load
  pminud    %xmm2, %xmm0         # compute lane-wise min
  pmaxud    %xmm2, %xmm1         # compute lane-wise max
  cmpl      %eax, %r10d          # am I done?
  jl        ..B5.4               # ...no, do next bundle


(I do notice the loads are unaligned (movdqu) vs. aligned (movdqa).  I hand-modified them to be aligned and saw virtually no difference, which kind of surprised me as this is still SSE - in AVX there is no penalty for an unaligned load on aligned data)

If you read the comments you know that the pminud and pmaxud are lane-wise min / max.  So if you have

xmm2 = | 10 | 14 | 10 | 10 |
xmm0 = | 11 | 12 | 13 | 14 |

Then pmaxud %xmm2, %xmm0 will give you:
xmm0 = | 11 | 14 | 13 | 14 |

Thus we need to then peel out the lanes and reduce.  Horizontal operations are not the strong suit of SSE (nor any SIMD arch I've seen for that matter).  On the other hand, this code executes just once vs. (size / 4).

Lastly there is the epilog, which will give you the same result as scalar.

4. Conclusions / Next time
We achieved some nice gains, but boy was that a lot of work relatively speaking to get the intrinsic code written and debugged.

And yes, the O(N^2) nature of the original code still eats this O(N) preamble for breakfast.  But ASM tweaking is a hobby of mine.

But next time I will SIMD-ify the main loop.


Thursday, January 23, 2014

SIMD-ifying Google Code Jam "Shop Credit" puzzle - Part 1

The other day I hit upon this Google Code Jam puzzle:

You receive a credit C at a local store and would like to buy two items. You first walk through the store and create a list L of all available items. From this list you would like to buy two items that add up to the entire value of the credit. The solution you provide will consist of the two integers indicating the positions of the items in your list (smaller number first).

I wrote a little harness that could call different implementations.  As usual, I started with a naive version:  Basically you scan the list of item prices one at a time, then scan all the items again to see if together the 'inner' and 'outer' scan value adds up to the goal.

1. Naive Version

bool naive(int sz, const int* data, int credit, int& ans1, int& ans2) {
    int min, max;
    for (int i = 0; i < sz; i++) {
        for (int j = 0; j < sz; j++) {
            if (i == j) continue;
            if (data[i] + data[j] == credit) {
                ans1 = i < j ? i + 1 : j + 1;
                ans2 = i < j ? j + 1 : i + 1;
                return 1;
            }
        }
    }
    return 0;

}

Results so far:
VersionPerformance (abs)SpeedupTime to Implement
Naive32M clocks1x15 mins

If there's one tricky thing here, it's the need to make sure an item is not being used with itself.  That is, if we have  $100 to spend, we cannot purchase the same $50 item twice.  Our 'continue' line checks against this.  

This was fast to implement, but slow to execute (as we will see).

2. Slightly Less Naive Version

Every CS 101 student will see an obvious ~2x trick here.  The inner 'j' loop need not start at '0'.  It can start at j+1.  This eliminates the need for the continue and cuts our execution time in half.  Now we simply have:

bool less_naive(int sz, const int* data, int credit, int& ans1, int& ans2) {
    int min, max;
    for (int i = 0; i < sz; i++) {
        for (int j = i + 1; j < sz; j++) {
            if (data[i] + data[j] == credit) {
                ans1 = i < j ? i + 1 : j + 1;
                ans2 = i < j ? j + 1 : i + 1;
                return 1;
            }
        }
    }
    return 0;
}

Results so far:
VersionPerformance (abs)SpeedupTime to Implement
Naive32M clocks1x15 mins
Less Naive16M clocks2x10 mins

And yes, I'll admit I spent 10 minutes thinking about this.  I spent a little more time thinking about this...

3. Min / Max Culling

So then it occurred to me that if I first know the min and max value of the items, I can abort checking an item  against others if I know there cannot be a combination that will work.  For instance if I must achieve a total of $100, and the max item is $70, then I can skip the outer loop for all items < $30.  Similarly if the minimum item value is $10 I can skip outer loop values that are > $90.  Here's the code:

bool min_max_cull(int sz, const int* data, int credit, int& ans1, int& ans2) {
    int min = data[0];
    int max = data[0];

    for (int i = 0; i < sz; i++) {
        min = data[i] < min ? data[i] : min;
        max = data[i] > max ? data[i] : max;
    }

    for (int i = 0; i < sz; i++) {
        if (data[i] + max < credit) continue; // hardly helps
        if (data[i] + min > credit) continue; // helps; 2x
        for (int j = i + 1; j < sz; j++) {
            if (data[i] + data[j] == credit) {
                ans1 = i < j ? i + 1 : j + 1;
                ans2 = i < j ? j + 1 : i + 1;
                return 1;
            }
        }
    }
    return 0;
}

Results so far:
VersionPerformance (abs)SpeedupTime to Implement
Naive32M clocks1x15 mins
Less Naive16M clocks2x10 mins
Min / Max Cull9M clocks3.5x20 mins

Yes, I have to do an additional loop scan, but given this is an O(n^2) kind of thing, an additional 'n' is small potatoes.  And as it turns out it is worth it.  We are are almost 2x faster than 'less naive' and 3.5x faster overall.  This one took a little longer to make work that I'd like to admit, and that's because I tried to get clever and meld the 'min max determination' loop in with the main loop.  The logic gets a bit tangled, and really the SIMD stuff is what I came here for.

3. SIMD, take 1
So one would think you could get a SIMD speedup on this code.  After all, if the inner loop can check one combo of values, why can't it check, say 4 (with SSE) values?  But try as I might I could not get the Intel Compiler nor GCC to automatically vectorize this code.

The reason is in the 'return'...

for (int j = i + 1; j < sz; j++) {
   if (data[i] + data[j] == credit) {
       ans1 = i < j ? i + 1 : j + 1;
       ans2 = i < j ? j + 1 : i + 1;
       return 1;
   }
}

So you and I both know 'sz' is the array size of 'data'.  And you and I both know that therefore it is impossible for us to read beyond the end of the array.  Unfortunately, the compiler does not know this.  Our function simply says 'data' is a pointer...
  
bool naive(int sz, const int* data, int credit, int& ans1, int& ans2) {

Ok, you say, but surely the compiler can reason it out...'j' is a simple linear walk.  Obviously it is indexing 'data'.  What could go wrong?  Or put another way, if something did go wrong it's the programmer's fault for giving a wrong value for 'sz'.  That is, if the code crashes in scalar, then it should crash in vector. 

Not quite.  Let's say 'data' really is 1,000 elements.  If the answer is the last element, but sz is 1,000,000, then the compiler in the scalar case would return at the right time and give the right answer.  The vector version would read in no-man's land and crash.  Not good.

So let's try SIMD-ifying this ourselves.  For that you will have to wait for tomorrow.