Saturday, December 24, 2016

Sim Queue in Java (part 1/2)

It's Christmas, time for gifts and sharing, so I wanted to share some really cool queues we've been working on, but their papers aren't ready yet, so it will be delayed for some more posts. Instead I'm going to talk about SimQueue, a Multi-Producer-Multi-Consumer Wait-Free queue.

SimQueue was published in 2011 and was invented by Panagiota Fatourou and Nikolaos Kallimanis, and the academic paper is here:
http://thalis.cs.uoi.gr/tech_reports/publications/TR2011-01.pdf
Jump directly to page 16 and skip the universal wait-free construct, Algorithms 4, 5, 6 are the interesting ones.

Outside of academics, very few people have heard of SimQueue, and it's unfortunate because this is the highest scaling MPMC wait-free queue currently known. Yes, it actually has better scalability than our Turn queue and has better throughput that any of the lock-free single-item-per-node queues.
Ohhhh yes, it's a wait-free queue that is better than the lock-free queues, you heard me right!


If it's so good, how come nobody is using it?

For several reasons:

First, the academic paper is hard to read, even for those fluent in "academese". It gets mixed with the universal wait-free construct in the same paper that has the name PSim, and although it is certainly possible to wrap a single-threaded queue in PSim to obtain a wait-free queue, such an approach would not be very good, certainly not as good as SimQueue.
SimQueue is a wait-free queue that uses some of the same tricks as PSim, but not all.
Don't confuse PSim and SimQueue, they're in the same paper, they share some of the same tricks, but they're different things.

Second, there is an implementation of SimQueue in C99 by the authors available on github but it's not pretty (C99 code rarely is), and it has several bugs. We pointed them out to the authors who have fixed one of them, but not all... the others we fixed ourselves in our implementation which you can get at the end of this post. Feel free to look at their implementation, at least it's available and compiles which is more than what most authors do, but be aware it has bugs:
https://github.com/nkallima/sim-universal-construction/blob/master/sim-synch1.4/simqueue.c

Third, there is no memory reclamation to SimQueue and it's not easy to add it in C/C++... well, at least it's not easy to add and keep the wait-free progress for the dequeue() method.
Without memory reclamation there is no purpose to this queue, except academic, but even there I think most academics shy away from it.
Simplicity is really important when it comes to this stuff, that's why Michael-Scott's queue is so pervasive, because it's simple to understand and simple to explain, it's simple to use in an academic paper/benchmarks. Go and try to understand SimQueue and then come back to see how successful that was.


If it's that complicated, why spend the time to learn how it works?

Well, because it's awesome!
Yeah that's right, it's awesome, and not just for one single reason, no, it has several cool tricks that make it awesome, tricks that may be used in other stuff and that are vital to understanding lock-free and wait-free algorithms.
So let's dive into SimQueue and try to simplify it as much as possible.
Before we start, here are the differences in our implementation in Java from the original code:
  •  We don't use Fetch-And-Add (FAA), but we could if we wanted to. Instead we use an array of integers (could be bools);
  •  There is no pointer_t or pools, we just allocate what we need on the fly and let the GC handle the trash;
  •  There are no HalfEnq or HalfDeq types and corresponding objects;
  •  Our dequeue() uses the CRTurn consensus to be wait-free. The original SimQueue starts from zero which is a bit unfair, but still wait-free.
  •  - We don't use a pool of nodes. If the CAS on EnqState fails we just let the GC clean the sub-list and we make new nodes;


Enqueue():
Before we start explaining the enqueue() method we need to define an EnqState object, which is used to have a coherent snapshort of the enqueuers state: 

// All the members in this class and their contents are immutable (afer visible to other threads)
static class EnqState<E> {
    final Node<E> tail;       // The current tail
    final Node<E> nextNode;   // The next node to add to the tail (beginning of sublist)
    final Node<E> nextTail;   // The future tail, once tail.next becomes nextNode (end of sublist)
    final int[] applied;      // Enqueue requests to match enqueuers[]
}

It uses the immutable after visible trick that Copy-On-Write based algorithm do. Yes, it can be wasteful, but the GC will take care of it, and on the original algorithm we could re-use instances. We're not going to re-use instances because then we have to deal with ABA problems and the algorithm just gets too complicated (but faster).
The queue class itself has an array of enqueuers (seq-cst atomics), and array of items to be inserted, and a pointer to the current EnqState (seq-cst atomic).
    // Used by enqueuers
    private final AtomicIntegerArray enqueuers;
    private final E[] items;
    @sun.misc.Contended
    private volatile EnqState<E> enqState;
Here's what the enqueue() looks like:     
public void enqueue(E item, final int tid) {
    if (item == null) throw new NullPointerException();
    // Publish enqueue request
    items[tid] = item;
    final int newrequest = (enqState.applied[tid]+1)%2;
    enqueuers.set(tid, newrequest);       
    for (int iter = 0; iter < 3; iter++) {
        final EnqState<E> lstate = enqState;
        // Advance the tail if needed
        if (lstate.tail.next != lstate.nextNode) lstate.tail.next = lstate.nextNode;
        // Check if my request has been done
        if (lstate.applied[tid] == newrequest) return;
        // Help other requests, starting from zero
        Node<E> first = null, node = null;
        int[] applied = lstate.applied.clone();
        for (int i = 0; i < maxThreads; i++) {
            // Check if it is an open request
            if (enqueuers.get(i) == applied[i]) continue;
            applied[i] = (applied[i]+1) % 2;
            Node<E> prev = node;
            node = new Node<E>(items[i]);
            if (first == null) {
                first = node;
            } else {
                prev.relaxedStoreNext(node);  // We don't want the volatile store here
            }
            if (lstate != enqState) break;
        }
        // Try to apply the new sublist
       if (lstate == enqState) casEnqState(lstate, new EnqState<E>(lstate.nextTail, first, node, applied));
    }
}

We start by publishing the item we want to enqueue and then opening an enqueue request by toggling the bit on the entry of the current thread in enqueuers[] array. This atomic store will indicate to other enqueuers that we are attempting to insert the "item" in the queue.


Next step is to link the last node with the current first of the sublist in case the previous enqueuer did not finish its job... yes SimQueue inserts a sublist at a time instead of a single node, but we'll get to that.

Then, we start helping all the other opened enqueue requests, by checking which ones have a different enqueuers and applied entry. This is the first trick of SimQueue: signaling the open request by toggling a single bit, and then keeping all the currently satisfied requests in the last EnqState instance.
Remember several posts ago when I said that "The Encapsulator" queue was a different way to solve the problem of knowing when a certain enqueue request has been done or not? Well, it's a really hard problem, and SimQueue solves it elegantly in this way. On the original code they used and XOR on the bits, but it's the same thing.


Afterwards, we scan for all open enqueue requests, and for each we create a new node and insert in it the corresponding item, thus creating a sub-list of nodes. When one node has been created and linked for each opened request, we create a EnqState with the start and end of the sublist, the new applied states to know which requests have been satisfied, and try to replace the current EnqState with our own.
If it fails, we try again starting over form the new EnqState, but we know that we need to do this at most two times, a trick original used by Maurice Herlihy in its Universal Wait-Free Construct. In our implementation we do it three times because we need an extra iteration to make sure the sub-list is properly linked.



One of the cool things is that the construction of the sub-list is single-threaded code and only after it has been done do we try to insert the whole sub-list at the tail of the current list, by doing the CAS on the EnqState, and not on the tail.next
The disadvantage is that when the CAS fails we need to throw away all the created nodes. On the original code there is a pool of nodes that can be re-used, and the same thing can be done here, but I didn't want to complicate the code too much.

Source code to an over-simplified SimQueue in Java can be found here:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/Java/com/concurrencyfreaks/queues/CRSimQueue.java
That's it for now, we'll look at the dequeue() method on the next post.

Merry Christmas!

Saturday, December 17, 2016

A C++ implementation of Kogan-Petrank wait-free queue with memory reclamation

KP is one of the best known MPMC wait-free queues. Its design is simple for a wait-free queue, though not as simple as our Turn queue. You can see the original paper and code here http://www.cs.technion.ac.il/~erez/Papers/wfquque-ppopp.pdf
(not to be confused with http://www.cs.technion.ac.il/~erez/Papers/wf-methodology-ppopp12.pdf)
and our implementation of it in Java here:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/Java/com/concurrencyfreaks/queues/KoganPetrankNoSLQueue.java
with self-linking:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/Java/com/concurrencyfreaks/queues/KoganPetrankQueue.java

The major drawback of this queue is that it's meant for Java with a Garbage Collector. We had to implement it in C++ to compare against our Turn queue and so we had to add Hazard Pointers to it. This turned out to be an incredibly difficult task but we managed.

You can get the C++ code here with Hazard Pointers:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/CPP/queues/KoganPetrankQueueCHP.hpp


How hard can it be do add hazard pointers and still maintain the queue wait-free?

Just to get an idea of the difficulty of the task, here is a comparison between the dequeue() method in Java with a GC with the C++ with Hazard Pointers implementation:

Java:

public E dequeue() {
    // We better have consecutive thread ids, otherwise this will blow up
    long phase = maxPhase() + 1;
    state.set(TID, new OpDesc<E>(phase, true, false, null));
    help(phase);
    help_finish_deq();
    final Node<E> node = state.get(TID).node;
    if (node == null) return null; // We return null instead of throwing an exception
    final E value = node.next.value;
    node.next = node;              // Self-link to help the GC
    return value;
}

C++:

T* dequeue(const int TID) {
    // We better have consecutive thread ids, otherwise this will blow up
    long long phase = maxPhase(TID) + 1;
    state[TID].store(new OpDesc(phase, true, false, nullptr));
    help(phase, TID);
    help_finish_deq(TID);
    OpDesc* curDesc = hpOpDesc.protect(kHpODCurr, state[TID], TID);
    Node* node = curDesc->node;
    if (node == nullptr) {
        hpOpDesc.clear(TID);
        hpNode.clear(TID);
        OpDesc* desc = state[TID].load();
        for (int i = 0; i < MAX_THREADS; i++) {
            if (state[TID].compare_exchange_strong(desc, OPDESC_END)) break;
            desc = state[TID].load();
            if (desc == OPDESC_END) break;
        }
        hpOpDesc.retire(desc, TID);
        return nullptr; // We return null instead of throwing an exception
    }
    Node* next = node->next
    T* value = next->item.load();
    next->item.store(nullptr); // "next" can be deleted now
    hpOpDesc.clear(TID);
    hpNode.clear(TID);
    hpNode.retire(node, TID); // "node" will be deleted only when node.item == nullptr
    OpDesc* desc = state[TID].load();
    for (int i = 0; i < maxThreads*2; i++) { // Is maxThreads+1 enough?
        if (desc == OPDESC_END) break;
        if (state[TID].compare_exchange_strong(desc, OPDESC_END)) break;
        desc = state[TID].load();
    }
    hpOpDesc.retire(desc, TID);
    return value;
}

See the difference between the two?
Java has 9 lines of code while C++ has 30 lines of code, and it's not about the language, it's just because in C++ we need to use Hazard Pointers (HP) and it has to be done in a wait-free way.   

Keep in mind that there are some very subtle details about this algorithm that we might have missed so we can't give our usual guarantee that this implementation is correct. We didn't design this, we just slapped (wait-free) HP on it, so use it at your own risk. I think it's right, and Andreia thinks it's right, and we never saw an issue in our stress tests, but that is as good as it gets for code that is several pages long and most of it wasn't written by us  ;)


Let's see some of the modifications we had to do to get this wait-free queue working in C++ with Hazard Pointers:

help():
- Line 38 requires an hazard pointer to read the other thread's state. The problem is that the validation of the hazard pointer may fail an infinite number of times because the instance in the state[] may change. However, there is a trick that allows us to stop after a finite number of steps. If we re-check the hp at most NUM_THRDS*2 we have the certainty that that particular thread has completed at least one request for enqueue/dequeue and it is ok to "skip it" and try to help the next thread in the array of states. Why this is so, is non-trivial and left as an exercise to the reader.

maxPhase():
- Line 51 requires an hp to read the other thread's phase. Again a similar problem to the one in help().
This one is even trickier because it has implications on whether or not the Lamport Bakery consensus is still valid if you skip one of the participants.
The answer is yes, it's still valid. The why it is so, would take me a small blog post to explain, so I'm not going to. Conclusion, waiting for NUM_THRDS+1 attempts per hazard pointer is enough, and if after this it's not validated, then just skip over to the next entry of state[]. Notice that this means that when computing the maxPhase it can happen that the return value is -1, even if all the other threads are actively participating and with a high phase, and that the current thread will "jump in front"
of all of them... as surprising as this may seem, it's still wait-free.


Ok, I could go on and on, but I'm bored and have better things to do with my time, and this is enough to get the picture: deploying hazard pointers in code that wasn't designed to do so, can be tricky.... and I didn't even mention that we had to come up with a new variant of hazard pointers to improve the performance of KP queue, but that's a topic for a different post. If you're interested, you can look at the Conditional Hazard Pointers class on github and read section 3.2 of our paper on the Turn queue:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/CPP/queues/HazardPointersConditional.hpp
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/crturnqueue-2016.pdf


Next post we'll talk about another queue.

Sunday, December 11, 2016

Two-Phase Grace Sharing Userspace RCU

This week we interrupt our regular topic of queues to talk about userspace RCU.

Andreia and I came up with two new algorithms for doing simple Userspace RCU (URCU) with "grace sharing", using only the C11/C++1x memory model and its atomics API.
Grace sharing is an important feature of URCU because without it the scalability of calls to synchronize_rcu() will be at best flat.
We wrote a small brief announcement about the two algorithms and you can get it here:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/gracesharingurcu-2016.pdf

The first and simplest is "Grace Version" which we called "Readers Version" on a previous post:
http://www.concurrencyfreaks.com/2016/09/a-simple-userspace-rcu-in-java.html
The second one, which we never talked about before, is called "Two-Phase".


Why have two algorithms for doing the same thing, Grace Version and Two-Phase?

Well, because Grace Version requires each thread to have a unique id so that it has a unique entry in an array. This means that in your application you need to know how many threads are doing read-side critical sections. This isn't always easy in practice, so, in those cases, you can use the Two-Phase algorithm. The main advantage of the Two-Phase algorithm is that it can use any kind of ReadIndicator.
I've blabered about ReadIndicators before, and if you want to know more about those you can look at my CppCon talk from last year:
https://www.youtube.com/watch?v=FtaD0maxwec
and on this post:
http://www.concurrencyfreaks.com/2014/11/a-catalog-of-read-indicators.html

In the Two-Phase algorithm any ReadIndicator can be used, particularly, one of our favourites which is an array of atomic counters where each reader does fetch-and-add to increment based on a hash of its thread's id, and decrement when leaving the read-side critical section.
This idea isn't new, and two-phased protocols have been used by Bullet-Proof URCU and even our Left-Right technique uses it:
https://github.com/urcu/userspace-rcu
http://www.concurrencyfreaks.com/2013/12/left-right-concurrency-control.html
For such ReadIndicators, no thread registration is needed, which is much more versatile and a much more friendly API to be able to deploy URCU in your application.
However, in our Two-Phase algorithm the grace periods can be shared by multiple threads simultaneously calling synchronize_rcu(), which provides a better scalability for the "updaters" (threads calling synchronize_rcu(). We never thought about this approach before because for Left-Right it doesn't make sense to use an URCU like that, seen that there can only be one updater (writer) at any given time.


How does the Two-Phase algorithm works?

The best is to read the paper, but the main idea is simple: start from the usual two-phase protocols using two ReadIndicators (like Classical Left-Right) and then exit from each of the loops if the version has advanced enough to guarantee that some other updater has seen the opposite ReadIndicator as being empty.
If none of this makes sense, then just go and take a look at the Left-Right paper to see where these ideas came from  ;)
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/left-right-2014.pdf


Benchmarking:

On the first plot we're comparing just pure readers: how fast can you do rcu_read_lock()/unlock(). The results show that both our Grace Version and Two-Phase using one entry per thread have the same (nearly linear) scalability, with the Two-Phase using an array of counters being close below. Notice that neither the Two-Phase CountersArray nor the Bullet-Proof need thread registration, while the other implementations do.



The second plot shows what happens when we're just calling synchronize_rcu() without any ongoing readers. This scenario is interesting to see the overhead in synchronization.
Our three algorithms behave more or less the same, while the previous URCU implementations are way below.
This isn't a completely fair comparison because URCU Bullet-Proof and URCU Default do other things like handle call_rcu() and others, which incurs overheads, but anyways, if you don't need anything else besides rcu_read_lock()/unlock() and synchronize_rcu(), then our implementations will do great.


The third plot shows the effects os grace sharing: there are two readers threads and the rest are updaters. The read-site critical sections are long, at least long enough not to notice the difference in overhead of the different algorithms (seen on plot above).
The main effect here is the sharing of the grace period and it's easy to see that Bullet-Proof is the only one that is incapable of sharing the grace period.



In essence, if all you need is an URCU with the APIs rcu_read_lock(), rcu_read_unlock() and synchronize_rcu(), then our algorithms are the best in all tested scenarios, so feel free to copy them in your code base, or use one of the implementations we have in github:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/CPP/papers/gracesharingurcu/URCUTwoPhase.hpp
https://github.com/pramalhe/ConcurrencyFreaks/tree/master/CPP/papers/gracesharingurcu


And now, back to queues...