Published

Mon 06 May 2019

←Home

Multiple Condition Queues For Better Concurrency

I had been revisiting concurrent libraries that I had worked upon earlier and just wanted to highlight the importance of using separate wait sets and condition queues for your library implementations. The performance of these has been benchmarked using JMH.

Let me just list down the advantages of using separate condition queues per lock.

  • It avoids spurious wake ups and context switching. For example in case of traditional waiting if you use notifyAll you end up waking threads that were waiting on a different condition.
  • As you are waiting on separate condition queues, you can use signal instead of signalAll to further enhance the performance.

Here are the two classic implementations of a bounded BlockingQueue on top of an unbounded queue.

With separate wait sets per lock

  public class BlockingQueue<T> {

    private final Queue<T> queue;
    private final Lock lockObj = new ReentrantLock();
    private final Condition empty = lockObj.newCondition();
    private final Condition full = lockObj.newCondition();
    private int maxLength;
    private int currentSize = 0;

    public BlockingQueue(int maxLength) {
      this.queue = new ArrayDeque<T>();
      this.maxLength = maxLength;
    }

    public void offer(T elem) throws InterruptedException {
      lockObj.lock();
      try {
        while (currentSize == maxLength) {
          full.await();
        }
        queue.offer(elem);
        currentSize++;
        empty.signal();
      } finally {
        lockObj.unlock();
      }
    }

    public T poll() throws InterruptedException {
      lockObj.lock();
      try {
        while (currentSize == 0) {
          empty.await();
        }
        T elem = queue.poll();
        currentSize--;
        full.signal();
        return elem;
      } finally {
        lockObj.unlock();
      }
    }
  }

Performance test using JMH (measuring throughput)

 Benchmark                                       Mode  Cnt         Score        Error  Units
 BenchmarkBlockingDequeu.testProduceAndConsume  thrpt   25  12500542.933 ± 374127.076  ops/s

The old way of doing things (A single lock and waitset).

  public class BlockingQueueWait<T> {


    private final Queue<T> queue;
    private final Object lockObj = new Object();
    private int maxLength;
    private int currentSize = 0;

    public BlockingQueueWait(int maxLength) {
      this.queue = new ArrayDeque<T>();
      this.maxLength = maxLength;
    }

    public void offer(T elem) throws InterruptedException {
      synchronized (lockObj) {
        while (currentSize == maxLength) {
          lockObj.wait();
        }
        queue.offer(elem);
        currentSize++;
        lockObj.notifyAll();
      }
    }

    public T poll() throws InterruptedException {
      synchronized (lockObj) {
        while (currentSize == 0) {
          lockObj.wait();
        }
        T elem = queue.poll();
        currentSize--;
        lockObj.notifyAll();
        return elem;
      }
    }
  }

Performance test using JMH (measuring throughput)

  Benchmark                                     Mode  Cnt        Score       Error  Units
   BenchMarkBlockingWait.testProduceAndConsume  thrpt   25  2702842.067 ± 24534.073  ops/s

Conclusion

If you look carefully at the above implementations, the difference in ops/seconds is significant and most of that is being caused by spurious wake ups and the fact that without using explicit condition queues and wait sets per lock, you end up wasting precious cpu cycles. So, if you are programming a concurrent library implementation, just remember that you have better concurrency support and you can create multiple condition queues per lock to avoid spurious wake ups.

Edit: A user from reddit pointed out that I should explain why signal is safe here. So here, goes the explanation signal wakes up a thread, which in this case will be the correct thread, since it is waiting on the correct condition. Unlike notify() which is unsafe because there is a single condition queue, In this example it might wake a thread which was waiting on the queue to be empty, when the queue was already empty or vice versa.

Go Top
comments powered by Disqus