marko devcic

  • github:
    deva666
  • email:
    madevcic {at} gmail.com

Priority producer consumer queue

Posted on 15 March 2015

Producer consumer queue is a classic multi threaded pattern in which one or more threads are producing work items and one or more threads are consuming them. The usual implementation is based on standard queue data structure. This means that first work item added will be the first one consumed. Now that works well if you are offloading a bunch of short running work items. In more concurrent scenario where work items are long running it would be cool if the queue gets to decide which items are more important (which have higher priority) for processing. To get that behavior we would just build our producer consumer queue around a priority queue. But since .NET doesn't have a priority queue, unlike Java, I'm going to show you my implementation of it.

The class is generic and only constraint is that type T implements IComparable interface. It's built around heap data structure. Heap is a binary tree structure where values of parent nodes are higher or equal (if it is a max-heap) to children's values. Heap can be implemented with pointers but it is also possible to represent it with an array. In array child nodes are in position (p*2 +1) for a left child and (p*2 +2) for a right. And to get a parent from a child it's simply (c-1) / 2. With that in mind, enqueuing is done by inserting an item to the last position and shifting it up the tree by swapping the value of its parent as long as parent has smaller value. This takes O(log n) time. Extracting an item takes O(1) time as the item with highest value is sitting on the top. But after the top item is extracted, to maintain the heap property we take the lowest item from the bottom, put it on top and shift it down until it is lower or equal to its parent. Because of that, extraction also takes O(log n) time.

Here's the complete code:

 public class PriorityQueue<T> where T : IComparable<T>
    {
        private const int DEFAULT_CAPACITY = 4;
        private T[] items;
        private int size = 0;

        private static T[] emptyArray = new T[0];

        public int Size { get { return size; } }

        public PriorityQueue()
        {
            items = emptyArray;
        }

        public PriorityQueue(int size)
        {
            if (size < 0)
                throw new ArgumentOutOfRangeException("Size must be 0 or greater");

            items = new T[size];
        }

        public PriorityQueue(IEnumerable<T> sourceCollection)
        {
            if (sourceCollection == null)
                throw new ArgumentNullException("sourceCollection");

            var sourceSize = sourceCollection.Count();
            items = new T[sourceSize];
            for (int i = 0; i < sourceSize; i++)
            {
                InsertAndShift(sourceCollection.ElementAt(i));
            }
            size = sourceSize;
        }

        public void Clear()
        {
            items = emptyArray;
            size = 0;
        }

        public void Insert(T item)
        {
            InsertAndShift(item);
        }

        public void InsertRange(IEnumerable<T> collection)
        {
            if (collection == null)
                throw new ArgumentNullException("collection");

            var requiredSize = size + collection.Count();
            if (requiredSize > items.Length)
                Enlarge(requiredSize);

            foreach (var item in collection)
            {
                InsertAndShift(item);
            }
        }

        private void InsertAndShift(T item)
        {
            if (size == items.Length)
                Enlarge();

            var index = size++;
            items[index] = item;
            ShiftUp(index);
        }

        private void ShiftUp(int index)
        {
            var parent = GetParent(index);
            while (index > 0 && items[index].CompareTo(items[parent]) >= 0)
            {
                Swap(index, parent);
                index = parent;
                parent = GetParent(index);
            }
        }

        private int GetParent(int index)
        {
            var parent = (index - 1) >> 1;
            return parent;
        }

        public T PeekTopItem()
        {
            if (size == 0)
                throw new InvalidOperationException("Empty queue");

            return items[0];
        }

        public T ExtractTopItem()
        {
            var topItem = PeekTopItem();
  
            items[0] = items[--size];

            ShiftDown();

            if (size <= items.Length >> 1)
                Reduce();

            return topItem;
        }

        private void ShiftDown()
        {
            var root = 0;

            while (root * 2 + 1 < size)
            {
                int next;
                var left = root * 2 + 1;

                if (items[left].CompareTo(items[root]) > 0)
                    next = left;
                else
                    next = root;

                var right = root * 2 + 2;
                if (right < size && items[right].CompareTo(items[next]) > 0)
                    next = right;
                if (next != root)
                {
                    Swap(next, root);
                    root = next;
                }
                else
                    break;
            }
        }

        private void Reduce()
        {
            if (size >= DEFAULT_CAPACITY)
            {
                var newSize = items.Length >> 1;
                Resize(newSize, false);
            }
        }

        private void Enlarge()
        {
            var newSize = size == 0 ? DEFAULT_CAPACITY : size << 1;
            Resize(newSize, true);
        }

        private void Enlarge(int addToSize)
        {
            var newSize = size + addToSize;
            Resize(newSize, true);
        }

        private void Resize(int newSize, bool enlarge)
        {
            var newItems = new T[newSize];
            Array.Copy(items, 0, newItems, 0, enlarge ?  size : newSize);
            items = newItems;
        }

        private void Swap(int first, int second)
        {
            var temp = items[first];
            items[first] = items[second];
            items[second] = temp;
        }
    }

Now to the producer consumer queue. PC queue can be very useful when we need to offload some work, we just have to enqueue an Action delegate and it can return to caller immediately.
In the background, a dedicated thread picks an item with highest priority from the queue and does the work.
The implementation is fairly simple, in the constructor we pick up a thread pool thread. Thread starts the DoWork method immediately. DoWork enters a loop and tries to get a Worker item from the queue, if there is none it blocks on the AutoResetEvent. Whenever we enqueue a Worker item, we also signal the AutoResetEvent so the DoWork unblocks and picks an item from the queue. Since the priority queue is not thread safe, we must lock when accessing it.
The queue's Enqueue method returns a Task, so we can monitor it's completion or be notified if it fails.
It accepts an Action type delegate, but if needed it can be easily converted to support Func type delegate if the work is going to be producing some value.

Here's the code:

 public enum Priority
    {
        Idle,
        BelowNormal,
        Normal,
        AboveNormal,
        TimeCritical
    }

    public class ProducerConsumerQueue
    {
        private bool isShutdown = false;
        private readonly object _lock = new object();
        private readonly CancellationTokenSource cancelToken;
        private readonly PriorityQueue<Worker> queue = new PriorityQueue<Worker>();
        private readonly AutoResetEvent waitHandle = new AutoResetEvent(false);

        public ProducerConsumerQueue()
        {
            cancelToken = new CancellationTokenSource();
            Task.Factory.StartNew(new Action(DoWork), TaskCreationOptions.LongRunning);
        }

        public Task Enqueue(Action _delegate, Priority priority)
        {
            if (isShutdown)
                throw new InvalidOperationException("Queue is shutdown");

            var worker = new Worker(_delegate, priority);
            lock (_lock)
            {
                queue.Insert(worker);
            }
            waitHandle.Set();
            return worker.TaskCompletionSrc.Task;
        }

        public Task Enqueue(Action _delegate)
        {
            return Enqueue(_delegate, Priority.Normal);
        }

        private void DoWork()
        {
            while (true)
            {
                if (cancelToken.IsCancellationRequested)
                {
                    break;
                }

                Worker worker = null;
                lock (_lock)
                {
                    if (queue.Size > 0)
                    {
                        worker = queue.ExtractTopItem();
                    }
                }

                if (worker != null)
                {
                    try
                    {
                        worker.Delegate.Invoke();
                        worker.TaskCompletionSrc.SetResult(null);
                    }
                    catch (Exception fail)
                    {
                        worker.TaskCompletionSrc.SetException(fail);
                    }
                }
                else
                {
                    waitHandle.WaitOne();
                }
            }
        }

        public void Shutdown()
        {
            isShutdown = true;
            cancelToken.Cancel();
            waitHandle.Set();
        }

        private class Worker : IComparable<Worker>
        {
            public Priority Priority { get; private set; }
            public TaskCompletionSource<object> TaskCompletionSrc { get; private set; }
            public Action Delegate { get; private set; }

            public Worker(Action _delegate, Priority priority)
            {
                if (_delegate == null)
                    throw new ArgumentNullException("delegate");

                this.Delegate = _delegate;
                this.Priority = priority;
                this.TaskCompletionSrc = new TaskCompletionSource<object>();
            }

            public int CompareTo(Worker other)
            {
                if (this.Priority > other.Priority)
                    return 1;
                else if (this.Priority < other.Priority)
                    return -1;
                return 0;
            }
        }
    }

Testing the queue:

var queue = new ProducerConsumerQueue();
queue.Enqueue(() =>{ Thread.Sleep(500); });

Thread.Sleep(200);

var priorities = Enum.GetValues(typeof(Priority));
var random = new Random();
for (int i = 0; i < 10; i++)
{
    var p = random.Next(0, priorities.Length);
    var priority = (Priority)Enum.Parse(typeof(Priority), p.ToString());
    queue.Enqueue(() =>
    {
        Thread.Sleep((int)p * 50);
        Console.WriteLine("Priority: " + priority.ToString());
    }, priority);
}
Output

Priority: Time critical
Priority: Time critical
Priority: Time critical
Priority: Above normal
Priority: Normal
Priority: Normal
Priority: Below normal
Priority: Below normal
Priority: Idle
Priority: Idle