marko devcic

Software Engineer
  • github:
    deva666
  • email:
    madevcic {at} gmail.com
Priority producer consumer queue

Priority producer consumer queue

Posted on 03/15/2015

Producer consumer queue is a classic multi threaded pattern in which one or more threads are producing work items and one or more threads are consuming them. The usual implementation is based on standard queue data structure. This means that first work item added will be the first one consumed. Now that works well if you are offloading a bunch of short running work items. In more concurrent scenario where work items are long running it would be cool if the queue gets to decide which items are more important (which have higher priority) for processing. To get that behavior we would just build our producer consumer queue around a priority queue. But since .NET doesn't have a priority queue, unlike Java, I'm going to show you my implementation of it.

The class is generic and only constraint is that type T implements IComparable interface. It's built around heap data structure. Heap is a binary tree structure where values of parent nodes are higher or equal (if it is a max-heap) to children's values. Heap can be implemented with pointers but it is also possible to represent it with an array. In array child nodes are in position (p*2 +1) for a left child and (p*2 +2) for a right. And to get a parent from a child it's simply (c-1) / 2. With that in mind, enqueuing is done by inserting an item to the last position and shifting it up the tree by swapping the value of its parent as long as parent has smaller value. This takes O(log n) time. Extracting an item takes O(1) time as the item with highest value is sitting on the top. But after the top item is extracted, to maintain the heap property we take the lowest item from the bottom, put it on top and shift it down until it is lower or equal to its parent. Because of that, extraction also takes O(log n) time.

Here's the complete code:

publicclass PriorityQueue<T> where T : IComparable<T>
    {
        privateconstint DEFAULT_CAPACITY = 4;
        private T[] items;
        privateint size = 0;

        privatestatic T[] emptyArray = new T[0];

        publicint Size { get { return size; } }

        public PriorityQueue()
        {
            items = emptyArray;
        }

        public PriorityQueue(int size)
        {
            if (size < 0)
                thrownew ArgumentOutOfRangeException("Size must be 0 or greater");

            items = new T[size];
        }

        public PriorityQueue(IEnumerable<T> sourceCollection)
        {
            if (sourceCollection == null)
                thrownew ArgumentNullException("sourceCollection");

            var sourceSize = sourceCollection.Count();
            items = new T[sourceSize];
            for (int i = 0; i < sourceSize; i++)
            {
                InsertAndShift(sourceCollection.ElementAt(i));
            }
            size = sourceSize;
        }

        publicvoid Clear()
        {
            items = emptyArray;
            size = 0;
        }

        publicvoid Insert(T item)
        {
            InsertAndShift(item);
        }

        publicvoid InsertRange(IEnumerable<T> collection)
        {
            if (collection == null)
                thrownew ArgumentNullException("collection");

            var requiredSize = size + collection.Count();
            if (requiredSize > items.Length)
                Enlarge(requiredSize);

            foreach (var item in collection)
            {
                InsertAndShift(item);
            }
        }

        privatevoid InsertAndShift(T item)
        {
            if (size == items.Length)
                Enlarge();

            var index = size++;
            items[index] = item;
            ShiftUp(index);
        }

        privatevoid ShiftUp(int index)
        {
            var parent = GetParent(index);
            while (index > 0 && items[index].CompareTo(items[parent]) >= 0)
            {
                Swap(index, parent);
                index = parent;
                parent = GetParent(index);
            }
        }

        privateint GetParent(int index)
        {
            var parent = (index - 1) >> 1;
            return parent;
        }

        public T PeekTopItem()
        {
            if (size == 0)
                thrownew InvalidOperationException("Empty queue");

            return items[0];
        }

        public T ExtractTopItem()
        {
            var topItem = PeekTopItem();
  
            items[0] = items[--size];

            ShiftDown();

            if (size <= items.Length >> 1)
                Reduce();

            return topItem;
        }

        privatevoid ShiftDown()
        {
            var root = 0;

            while (root * 2 + 1 < size)
            {
                int next;
                var left = root * 2 + 1;

                if (items[left].CompareTo(items[root]) > 0)
                    next = left;
                else
                    next = root;

                var right = root * 2 + 2;
                if (right < size && items[right].CompareTo(items[next]) > 0)
                    next = right;
                if (next != root)
                {
                    Swap(next, root);
                    root = next;
                }
                elsebreak;
            }
        }

        privatevoid Reduce()
        {
            if (size >= DEFAULT_CAPACITY)
            {
                var newSize = items.Length >> 1;
                Resize(newSize, false);
            }
        }

        privatevoid Enlarge()
        {
            var newSize = size == 0 ? DEFAULT_CAPACITY : size << 1;
            Resize(newSize, true);
        }

        privatevoid Enlarge(int addToSize)
        {
            var newSize = size + addToSize;
            Resize(newSize, true);
        }

        privatevoid Resize(int newSize, bool enlarge)
        {
            var newItems = new T[newSize];
            Array.Copy(items, 0, newItems, 0, enlarge ?  size : newSize);
            items = newItems;
        }

        privatevoid Swap(int first, int second)
        {
            var temp = items[first];
            items[first] = items[second];
            items[second] = temp;
        }
    }

Now to the producer consumer queue. PC queue can be very useful when we need to offload some work, we just have to enqueue an Action delegate and it can return to caller immediately.
In the background, a dedicated thread picks an item with highest priority from the queue and does the work.
The implementation is fairly simple, in the constructor we pick up a thread pool thread. Thread starts the DoWork method immediately. DoWork enters a loop and tries to get a Worker item from the queue, if there is none it blocks on the AutoResetEvent. Whenever we enqueue a Worker item, we also signal the AutoResetEvent so the DoWork unblocks and picks an item from the queue. Since the priority queue is not thread safe, we must lock when accessing it.
The queue's Enqueue method returns a Task, so we can monitor it's completion or be notified if it fails.
It accepts an Action type delegate, but if needed it can be easily converted to support Func type delegate if the work is going to be producing some value.

Here's the code:

publicenum Priority
    {
        Idle,
        BelowNormal,
        Normal,
        AboveNormal,
        TimeCritical
    }

    publicclass ProducerConsumerQueue
    {
        privatebool isShutdown = false;
        privatereadonlyobject _lock = newobject();
        privatereadonly CancellationTokenSource cancelToken;
        privatereadonly PriorityQueue<Worker> queue = new PriorityQueue<Worker>();
        privatereadonly AutoResetEvent waitHandle = new AutoResetEvent(false);

        public ProducerConsumerQueue()
        {
            cancelToken = new CancellationTokenSource();
            Task.Factory.StartNew(new Action(DoWork), TaskCreationOptions.LongRunning);
        }

        public Task Enqueue(Action _delegate, Priority priority)
        {
            if (isShutdown)
                thrownew InvalidOperationException("Queue is shutdown");

            var worker = new Worker(_delegate, priority);
            lock (_lock)
            {
                queue.Insert(worker);
            }
            waitHandle.Set();
            return worker.TaskCompletionSrc.Task;
        }

        public Task Enqueue(Action _delegate)
        {
            return Enqueue(_delegate, Priority.Normal);
        }

        privatevoid DoWork()
        {
            while (true)
            {
                if (cancelToken.IsCancellationRequested)
                {
                    break;
                }

                Worker worker = null;
                lock (_lock)
                {
                    if (queue.Size > 0)
                    {
                        worker = queue.ExtractTopItem();
                    }
                }

                if (worker != null)
                {
                    try
                    {
                        worker.Delegate.Invoke();
                        worker.TaskCompletionSrc.SetResult(null);
                    }
                    catch (Exception fail)
                    {
                        worker.TaskCompletionSrc.SetException(fail);
                    }
                }
                else
                {
                    waitHandle.WaitOne();
                }
            }
        }

        publicvoid Shutdown()
        {
            isShutdown = true;
            cancelToken.Cancel();
            waitHandle.Set();
        }

        privateclass Worker : IComparable<Worker>
        {
            public Priority Priority { get; privateset; }
            public TaskCompletionSource<object> TaskCompletionSrc { get; privateset; }
            public Action Delegate { get; privateset; }

            public Worker(Action _delegate, Priority priority)
            {
                if (_delegate == null)
                    thrownew ArgumentNullException("delegate");

                this.Delegate = _delegate;
                this.Priority = priority;
                this.TaskCompletionSrc = new TaskCompletionSource<object>();
            }

            publicint CompareTo(Worker other)
            {
                if (this.Priority > other.Priority)
                    return 1;
                elseif (this.Priority < other.Priority)
                    return -1;
                return 0;
            }
        }
    }

Testing the queue:

var queue = new ProducerConsumerQueue();
queue.Enqueue(() =>{ Thread.Sleep(500); });

Thread.Sleep(200);

var priorities = Enum.GetValues(typeof(Priority));
var random = new Random();
for (int i = 0; i < 10; i++)
{
    var p = random.Next(0, priorities.Length);
    var priority = (Priority)Enum.Parse(typeof(Priority), p.ToString());
    queue.Enqueue(() =>
    {
        Thread.Sleep((int)p * 50);
        Console.WriteLine("Priority: " + priority.ToString());
    }, priority);
}
Output

Priority: Time critical
Priority: Time critical
Priority: Time critical
Priority: Above normal
Priority: Normal
Priority: Normal
Priority: Below normal
Priority: Below normal
Priority: Idle
Priority: Idle