Wednesday, August 28, 2013

More on Counters

If the previous two posts were not enough to get you bored on counters, then here are a few more techniques to implement counters in multi-threaded applications:

Chapter 4 of the book "Why is Parallel programming Hard, and if so, what can we do about it" is devoted solely to counters. The book itself is freely available at the this address (and I recommend it):
https://www.kernel.org/pub/linux/kernel/people/paulmck/perfbook/perfbook.2013.01.13a.pdf
They have several implementations, so it is worth the read if you need a fast counter in your multithreaded code.


If that's still not enough, then check out this recent paper on "Scalable Statistics Counters" by David Dice, Yossi Lev, and Mark Moir from Oracle Labs
https://blogs.oracle.com/dave/resource/spaa13-dice-ScalableStatsCounters.pdf
Pretty cool stuff!

Sunday, August 25, 2013

Concurrency Pattern: Distributed Cache-Line Counter

On the previous post we showed a Counter that can be used for single-Writer scenarios, this time we have one that works for multiple-writers-multiple-readers, which we named the Distributed Cache-Line Counter.

The trick behind this counter is that it distributes the Writers among different cache lines and then the Readers have to sum up all the counters in each cache-line.
It is sequentially consistent for get() as long as only increment() operations are done. If a clear() is  done, then get()will no longer be sequentially consistent.

Usage scenario:
- Multiple Writers that increment often.
- Preferably not too many Reads.
- Any thread can do writes.
- Any thread can do reads.

It has one array of "counters" where each entry is separated from the next one such that they end up on different cache lines.
There is a hashing function that takes the id of the thread and generates an index in the the array of counters so that statistically there will be a small chance of two Writer threads doing an increment() on the same memory location (cache-line).
The Reader will have to sum up all the values of all the counters in the array, which means it can be slow compared to a Reader acting on a single atomic variable.


The DistributedCacheLineCounter has three functions:
increment():
1. Hash the thread id, modulus that with the number of entries in the array, and multiply by the size of the cache line to get the index in the array
2. Do a fetch-and-add +1 on the index of the array


get():
1. Sum up all the entries in the array

clear():
1. Set each of the entries in the array to zero


Note that if sequential consistency is desired even when clear() operations are performed then you can always protect it with a rw-lock and do a sharedLock() for increment() and get(), and an exclusiveLock() for clear(), but it will damage performance which kind of defeats the purpose of using a "simple" counter.

Disadvantages:

- It's not linearizable (but who cares about it)
- It's only sequentially-consistent as long as the decrement() and clear() functions are not used. This is usually not a problem because from a logical point of view there is not much sense in performing a clear() at the same time as you do a get(), so if you do it, you'll get an inconsistent state, which is fine for nearly all applications.
- Requires more memory than a simple atomic variable because it needs to reserver (at least) one cache line per core.
- Reads are slower than on an atomic variable.

Advantages:

- Performance for Writes can be much better than an atomic counter under high contention.

Performance Plots:

Here are some performance plots comparing an AtomicLong from java.util.concurrent with the Distributed Cache-Line Counter, and as you can see, this new counter scales with the number of threads doing increment() almost linearly (at least on our 32-core machine)

Code:



public class DistributedCacheLineCounter {
    // Size of the counters[] array (TODO: explain the magical number 3)
    private final static int kNumCounters = Runtime.getRuntime().availableProcessors()*3;
   
    // Size of a cache line in ints
    private final static int COUNTER_CACHE_LINE = 64/8;   
   
    // Stores the number of readers holding the read-lock
    private final AtomicLongArray counters = new AtomicLongArray(kNumCounters*COUNTER_CACHE_LINE);   
   
    /**
     * An imprecise but fast hash function
     */
    private int tid2hash() {
        long x = Thread.currentThread().getId();
        x ^= (x << 21);
        x ^= (x >>> 35);
        x ^= (x << 4);
        final int idx = (int)((x % kNumCounters)*COUNTER_CACHE_LINE);
        return idx;
    }
   
    public void increment() {
        counters.getAndIncrement(tid2hash());
    }
       
    public long get() {
        long sum = 0;
        for (int idx = 0; idx < kNumCounters*COUNTER_CACHE_LINE; idx += COUNTER_CACHE_LINE) {
            sum += counters.get(idx);
        }
        return sum;
    }
   
    public void clear() {
        for (int idx = 0; idx < kNumCounters*COUNTER_CACHE_LINE; idx += COUNTER_CACHE_LINE) {
            counters.set(idx, 0);
        }
    }
}



Thursday, August 22, 2013

Concurrency Pattern: Off-by-X Counter

In this post we are going to talk about counters.
Counters are a widely used construct on many concurrent systems. They allow to measure events, like for example, the number of incoming packets on a network device or server.

When there isn't too much contention, the simplest approach is to use an atomic variable and increment it with a fetch-and-add.
In C11 we would use atomic_fetch_add(), in C++11 atomic<>.fetch_add() and in Java AtomicLong.getAndIncrement(). Notice that in Java the getAndIncrement() is actually doing a compare-and-swap and not a fetch-and-add (there is a bug opened for that on the JDK). Moreover, not all CPUs support native fetch-and-add operations so even C11 and C++11 may use some "trick".

The problem with using an atomic variable is that under high contention you start to have a large number of memory barriers being done to read/write on the variable atomically.
To fix that, we propose here a kind of counter that uses relaxed atomic operations which can improve performance, and is a nifty example of how to make something useful with relaxed atomic operations.
... by the way, if you don't know what relaxed atomics are then you can first look at this presentation from Herb Sutter: http://concurrencyfreaks.blogspot.com/2013/02/the-new-memory-model-in-c11-and-c11.html

I named this thechnique "Off-by-X Counter". This counter that can be written only from a specific thread, but read from multiple threads. The trick is to use a relaxed atomic and do the release-barrier only once in a while, namely every X increments.

Usage scenario:
- Only one Writer that may increment often
- As many Readers as desired.
- Only a single thread can do writes and it must always be the same.
- Any thread can do reads.

It has three functions:
increment():
1. Increment the counter
2. Check if (counter % X) == 0 and if it is then do the release-barrier

get():
1. Do the acquire-barrier
2. Read the value of the counter

clear():
1. Set the counter to zero
2. Do Release-barrier

The goal of this is to be used in a high-priority thread where you don't want to spend all the time doing release barriers for a variable that is only going to be read once in a while, and when it is read, it doesn't need to be completely up-to-date.
Notice that even if the release barrier is not done, as long as in some other place in the code there is a release barrier (or lock) the value will be updated, which means that most of the time


Disadvantages:
- It has undefined behavior if multiple threads call the increment() or clear() functions.
- In x86 CPUs the difference in performance is not impressive because there is no explicit acquire-barrier at the CPU level.

Advantages:
- Writes should be faster than on an atomic variable.


Here's what the code looks like on Java (I'll try to make a C++ version of it soon):


import java.lang.reflect.Field;
import sun.misc.Unsafe;

public class OffByXCounter {
   
    private static final sun.misc.Unsafe UNSAFE;
   
    static {
        try {
            UNSAFE = getUnsafe();
        } catch (Exception e) {
            throw new Error(e);
        }  
    }

    private static Unsafe getUnsafe() {
        try {
            Field f = Unsafe.class.getDeclaredField("theUnsafe");
            f.setAccessible(true);
            return (Unsafe)f.get(null);
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    // Member variables
    private long counter = 0;
    private final long theX;
   
    public OffByXCounter(long theX) {
        this.theX = theX;
    }
   
    public long get() {
        UNSAFE.loadFence();
        return counter;
    }
   
    public void increment() {
        counter++;
        if ((counter % theX) == 0) UNSAFE.storeFence();
    }
   
    public void clear() {
        counter = 0;
        UNSAFE.storeFence();
    }
}