Counting Things (and optimizing 20 year old code)

Some people like to watch, I like to count :-)

Recently, I revamped a vi-like editor that a buddy of mine wrote some 20+ years ago. Of course, in those days, far pointers and 64k limits were all the rage.

Deep in the guts of an editor is a line pointers table. This is an array that has one entry per line, and tells the editor where the data for each line can be found. In the editor I was working in, the line pointers table has two elements, a 32-bit address, and an 8-bit flag. The address table told you where in the workfile the line was, and the flags table was used for tagging lines (e.g., with g/pattern/) for later processing.

Since memory was scarce in those days, every line that the editor worked on (except for a few that were cached) was written to the workfile, and the address element was updated with the offset. If you changed a line, a new entry was created at the current end of the workfile, and the line's offset was updated.

Due to the scarcity of memory, the arrays themselves were actually managed in a two-level manner. At the top level, an array was created of pointers to longs, and then individual blocks of long arrays were allocated to each entry of the top level array, like so:

struct oneLine {
    long addr;
    char flag;
};

struct oneWindow {
...
    int    numLines;
    struct oneLine FAR *lines [BlocksPerFile];
...
};

First thing I did was get rid of that structure (I didn't like the non-aligned, or space-wasting-if-aligned nature of it anyway), and replace it with:

typedef struct line_info_s {
    int       numLines;  // number of lines in use
    int       nalloc;    // how many addrs[] and flags[] are allocated
    uint32_t  *addrs;    // allocate addrs[] and flags[] as parallel data
    uint8_t   *flags;    // structures simultaneously
} line_info_t;

typedef struct one_window_s {
...
    line_info_t            lines;
...
} one_window_t;

The advantage of the new one is that because the addrs[] and flags[] arrays are allocated in parallel there is no wasted space or alignment issues.

So what about this counting?

Well, given the structure above, if I wanted to add a new line, I'd have to realloc() the addrs[] and flags[] members. For efficiency, I realloc() them in chunks of 4k lines, but that's still not an efficient approach for loading a huge file. My testcase file is 530MB and has 1.1M lines.

In the loader, for speed, I count the number of lines. This means counting the number of 0x0a newline characters. Once I have this number, I do a single malloc() for the addrs[] and another for the flags[] to get that all in one contiguous chunk. Then I run through the file again, and populate the addrs[] array.

For speed, I simply mmap() the file into memory, and access it via pointer. This also means that I can use the file "in situ" — the addrs[] values are really just the pointers to the mmap()ed area.

The naive way of counting newlines is:

int
nlines (void *ptr, int nbytes)
{
    int        count;
    int        i;
    uint8_t    *p8 = (uint8_t *) ptr;

    for (count = i = 0; nbytes--; ) {
        count += *p8++ == 0x0a;
    }
    return (count);
}

which runs in 3.6 seconds for my testcase file.

Of course, the next obvious improvement is to use 32-bit integers, to get more memory bandwidth:

int
nlines (void *ptr, int nbytes)
{
    int         count;
    int         i;
    uint32_t    *p32 = (uint32_t *) ptr;

    for (count = i = 0; nbytes -= 4 > 0; ) {
        count += (*p32   & 0x000000ff) == 0x0000000a;
        count += (*p32   & 0x0000ff00) == 0x00000a00;
        count += (*p32   & 0x00ff0000) == 0x000a0000;
        count += (*p32++ & 0xff000000) == 0x0a000000;
    }
    return (count);
}

Technically, I really should be more careful about the last few bytes in the file, but there are several mitigating factors:

The above ran in about half the time, 1.7 seconds. Going to 64 bit integer types didn't help improve the speed any more.

And then things got nasty. I started investigating the MMX/SSE instructions available on my AMD64. There are some beautiful ones out there.

PCMPEQB is an SSE instruction that takes as arguments two 128-bit values and compares them byte-by-byte. It expresses the results as 0x00 or 0xff in the corresponding bytes of the 128-bit output register. For example, comparing "abcdefghijklmnop" vs "abcdxxxxxxxxxxxx" would give you the 128-bit value 0xff ff ff ff 00 00 00 00 00 00 00 00 00 00 00 00 (with spaces added for clarity).

Another magic instruction, PMOVMSKB, takes as argument a 128-bit value, and considers it as 16 x 8-bit values. It extracts the sign bits of each of those 16 values and packs that into 16 bits. So, the result above gets packed into 1111000000000000 (binary).

Putting it all together, figuring out the GCC declarations and compiler flags, and making it into a generic "count the number of bytes that have a certain value", I came up with the ultimate version:

// compile with -Wno-pointer-sign -msse2 -flax-vector-conversions
typedef int __attribute__((mode(QI))) qi;
typedef qi __attribute__((vector_size (16))) v16qi;

int
fast_byte_popcount (void *ptr, int len, int byte)
{
    v16qi    search;
    v16qi    *p128;
    uint8_t  *p8 = (uint8_t *) ptr;
    int      count;
    int      i;
    int      v;

    count = 0;

    // do we need to do a "pre count" to align to 16-byte boundary?
    if ((int) ptr & 15) {
        for (i = 0; i < 16 - ((int) ptr & 15); i++) {
            count += byte == *p8++;
            len--;
        }
    }

    // do the bulk of the work; set up the search mask
    ptr = (void *) p8;
    p8 = (char *) &search;
    for (i = 0; i < 16; i++) {
        *p8++ = byte;
    }

    p128 = (v16qi *) ptr;
    for (i = 0; i < (len & ~15); i += 16) {
        v = __builtin_ia32_pmovmskb128 (__builtin_ia32_pcmpeqb128 (search, *p128++));
        for (; v; count++) v &= v - 1;
    }

    // do we need to do a "post count" to gather the stragglers?
    p8 = (char *) p128;
    for (; i < len; i++) {
        count += byte == *p8++;
    }

    return (count);
}

Guess how fast?

0.53 seconds (6.7X faster than the original).

The heart of the algorithm is this:

    p128 = (v16qi *) ptr;
    for (i = 0; i < (len & ~15); i += 16) {
        v = __builtin_ia32_pmovmskb128 (__builtin_ia32_pcmpeqb128 (search, *p128++));
        for (; v; count++) v &= v - 1;
    }

The stuff around it is just there to deal with boundary conditions (in case the data isn't aligned on a 16-byte boundary, in case there's extra data at the end, and some management to fill in the search value).

The above code takes a 128-bit pointer (p128) and uses the GCC builtin version of PCMPEQB to compare the value at the pointer against the 128-bit pre-populated search variable. The builtin version of PMOVMSKB converts the results into a 16 bit bitmask, and then finally the magic line:

for (; v; count++) v &= v - 1;

Counts the bits, adding them to count. (The explanation of this line is left as an exercise for the reader — hint: start with a simple case, like 0x09, and see how it works, step by step.)