Saturday, December 17, 2016

A C++ implementation of Kogan-Petrank wait-free queue with memory reclamation

KP is one of the best known MPMC wait-free queues. Its design is simple for a wait-free queue, though not as simple as our Turn queue. You can see the original paper and code here http://www.cs.technion.ac.il/~erez/Papers/wfquque-ppopp.pdf
(not to be confused with http://www.cs.technion.ac.il/~erez/Papers/wf-methodology-ppopp12.pdf)
and our implementation of it in Java here:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/Java/com/concurrencyfreaks/queues/KoganPetrankNoSLQueue.java
with self-linking:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/Java/com/concurrencyfreaks/queues/KoganPetrankQueue.java

The major drawback of this queue is that it's meant for Java with a Garbage Collector. We had to implement it in C++ to compare against our Turn queue and so we had to add Hazard Pointers to it. This turned out to be an incredibly difficult task but we managed.

You can get the C++ code here with Hazard Pointers:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/CPP/queues/KoganPetrankQueueCHP.hpp


How hard can it be do add hazard pointers and still maintain the queue wait-free?

Just to get an idea of the difficulty of the task, here is a comparison between the dequeue() method in Java with a GC with the C++ with Hazard Pointers implementation:

Java:

public E dequeue() {
    // We better have consecutive thread ids, otherwise this will blow up
    long phase = maxPhase() + 1;
    state.set(TID, new OpDesc<E>(phase, true, false, null));
    help(phase);
    help_finish_deq();
    final Node<E> node = state.get(TID).node;
    if (node == null) return null; // We return null instead of throwing an exception
    final E value = node.next.value;
    node.next = node;              // Self-link to help the GC
    return value;
}

C++:

T* dequeue(const int TID) {
    // We better have consecutive thread ids, otherwise this will blow up
    long long phase = maxPhase(TID) + 1;
    state[TID].store(new OpDesc(phase, true, false, nullptr));
    help(phase, TID);
    help_finish_deq(TID);
    OpDesc* curDesc = hpOpDesc.protect(kHpODCurr, state[TID], TID);
    Node* node = curDesc->node;
    if (node == nullptr) {
        hpOpDesc.clear(TID);
        hpNode.clear(TID);
        OpDesc* desc = state[TID].load();
        for (int i = 0; i < MAX_THREADS; i++) {
            if (state[TID].compare_exchange_strong(desc, OPDESC_END)) break;
            desc = state[TID].load();
            if (desc == OPDESC_END) break;
        }
        hpOpDesc.retire(desc, TID);
        return nullptr; // We return null instead of throwing an exception
    }
    Node* next = node->next
    T* value = next->item.load();
    next->item.store(nullptr); // "next" can be deleted now
    hpOpDesc.clear(TID);
    hpNode.clear(TID);
    hpNode.retire(node, TID); // "node" will be deleted only when node.item == nullptr
    OpDesc* desc = state[TID].load();
    for (int i = 0; i < maxThreads*2; i++) { // Is maxThreads+1 enough?
        if (desc == OPDESC_END) break;
        if (state[TID].compare_exchange_strong(desc, OPDESC_END)) break;
        desc = state[TID].load();
    }
    hpOpDesc.retire(desc, TID);
    return value;
}

See the difference between the two?
Java has 9 lines of code while C++ has 30 lines of code, and it's not about the language, it's just because in C++ we need to use Hazard Pointers (HP) and it has to be done in a wait-free way.   

Keep in mind that there are some very subtle details about this algorithm that we might have missed so we can't give our usual guarantee that this implementation is correct. We didn't design this, we just slapped (wait-free) HP on it, so use it at your own risk. I think it's right, and Andreia thinks it's right, and we never saw an issue in our stress tests, but that is as good as it gets for code that is several pages long and most of it wasn't written by us  ;)


Let's see some of the modifications we had to do to get this wait-free queue working in C++ with Hazard Pointers:

help():
- Line 38 requires an hazard pointer to read the other thread's state. The problem is that the validation of the hazard pointer may fail an infinite number of times because the instance in the state[] may change. However, there is a trick that allows us to stop after a finite number of steps. If we re-check the hp at most NUM_THRDS*2 we have the certainty that that particular thread has completed at least one request for enqueue/dequeue and it is ok to "skip it" and try to help the next thread in the array of states. Why this is so, is non-trivial and left as an exercise to the reader.

maxPhase():
- Line 51 requires an hp to read the other thread's phase. Again a similar problem to the one in help().
This one is even trickier because it has implications on whether or not the Lamport Bakery consensus is still valid if you skip one of the participants.
The answer is yes, it's still valid. The why it is so, would take me a small blog post to explain, so I'm not going to. Conclusion, waiting for NUM_THRDS+1 attempts per hazard pointer is enough, and if after this it's not validated, then just skip over to the next entry of state[]. Notice that this means that when computing the maxPhase it can happen that the return value is -1, even if all the other threads are actively participating and with a high phase, and that the current thread will "jump in front"
of all of them... as surprising as this may seem, it's still wait-free.


Ok, I could go on and on, but I'm bored and have better things to do with my time, and this is enough to get the picture: deploying hazard pointers in code that wasn't designed to do so, can be tricky.... and I didn't even mention that we had to come up with a new variant of hazard pointers to improve the performance of KP queue, but that's a topic for a different post. If you're interested, you can look at the Conditional Hazard Pointers class on github and read section 3.2 of our paper on the Turn queue:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/CPP/queues/HazardPointersConditional.hpp
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/crturnqueue-2016.pdf


Next post we'll talk about another queue.

No comments:

Post a Comment