A thread-safe generic first in first out (FIFO) collection with support for priority queuing.
Nuget: https://www.nuget.org/packages/ConcurrentPriorityQueue
- Thread-Safe.
- Manages items according to a
First in first out
policy andpriority
on top of that. - Implements
IProducerConsumerCollection<T>
interface. - Extends to a
BlockingCollection<T>
. - Supports multi-frameworks, includes
net48
netstandard2.0
net6.0
net8.0
Items in the collection must implement the generic interface IHavePriority<T>
where T: implements IEquatable<T>
, IComparable<T>
and also overrides Object.GetHashCode()
:
// Simplest implementation of IHavePriority<T>
public class SomeClass : IHavePriority<int> {
int Priority {get; set;}
}
Simple flow for creating a Priority-By-Integer
queue and adding an item:
// Create a new prioritized item.
var itemWithPriority = new SomeClass { Priority = 0 };
// Initialize an unbounded priority-by-integer queue.
var priorityQueue = new ConcurrentPriorityQueue<IHavePriority<int>, int>();
// Enqueue item and handle result.
Result result = priorityQueue.Enqueue(itemWithPriority);
Use the ConcurrentPriorityByIntegerQueue
implementation to simplify the above example:
var priorityQueue = new ConcurrentPriorityByIntegerQueue<IHavePriority<int>>();
Consume items by priority/first-in-first-out policy, using Dequeue()
and Peek()
:
// Lower value -> Higher priority.
var item1 = new SomeClass { Priority = 1 };
var item2 = new SomeClass { Priority = 0 };
var item3 = new SomeClass { Priority = 0 };
priorityQueue.Enqueue(item1);
priorityQueue.Enqueue(item2);
priorityQueue.Enqueue(item3);
var result = priority.Dequeue(); // item2
var result = priority.Dequeue(); // item3
var result = priority.Dequeue(); // item1
Iterating over the collection will yield items according to their priority and position (FIFO):
var item1 = new SomeClass { Priority = 1 };
var item2 = new SomeClass { Priority = 0 };
var item3 = new SomeClass { Priority = 0 };
priorityQueue.Enqueue(item1);
priorityQueue.Enqueue(item2);
priorityQueue.Enqueue(item3);
foreach(var item in priorityQueue) {
// Iteration 1 -> item2
// Iteration 2 -> item3
// Iteration 3 -> item1
}
ConcurrentPriorityQueue
supports Generic Priorities.
Implement your own Business Priority object and configure the queue to handle it:
// TimeToProcess class implements IEquatable<T>, IComparable<T> and overrides Object.GetHashCode().
public class TimeToProcess : IEquatable<TimeToProcess>, IComparable<TimeToProcess> {
public decimal TimeInMilliseconds { get; set;}
public int CompareTo(TimeToProcess other) =>
TimeInMilliseconds.CompareTo(other.TimeInMilliseconds);
public bool Equals(TimeToProcess other) =>
TimeInMilliseconds.Equals(other.TimeInMilliseconds);
public override int GetHashCode() => TimeInMilliseconds.GetHashCode();
}
// BusinessPriorityItem implements IHavePriority<T>
public class BusinessPriorityItem : IHavePriority<TimeToProcess> {
TimeToProcess Priority {get; set;}
}
// Create a new prioritized item.
var item = new BusinessPriorityItem { Priority = new TimeToProcess { TimeInMilliseconds = 0.25M } };
// Initialize an unbounded priority-by-TimeToProcess queue.
var priorityQueue = new ConcurrentPriorityQueue<IHavePriority<TimeToProcess>, TimeToProcess>();
// Enqueue item and handle result.
Result result = priorityQueue.Enqueue(item);
ConcurrentPriorityQueue<T>
can be bounded to a fixed amount of priorities:
// Create a bounded ConcurrentPriorityQueue to support a fixed amount of priorities.
var maxAmountOfPriorities = 2;
var priorityQueue = new ConcurrentPriorityByIntegerQueue<IHavePriority<int>>(maxAmountOfPriorities);
Result result = PriorityQueue.Enqueue(new SomeClass {Priority = 0}); // result.OK
Result result = PriorityQueue.Enqueue(new SomeClass {Priority = 1}); // result.OK
Result result = PriorityQueue.Enqueue(new SomeClass {Priority = 2}); // result.Fail -> Queue supports [0, 1]
Result result = PriorityQueue.Enqueue(new SomeClass {Priority = 0}); // result.OK
ConcurrentPriorityQueue<T>
can be extended to a BlockingCollection<T>
using the ToBlockingCollection<T>
extension method:
var blockingPriorityQueue = new ConcurrentPriorityByIntegerQueue<IHavePriority<int>>()
.ToBlockingCollection();
foreach(var item in blockingPriorityQueue.GetConsumingEnumerable()) {
// Do something...
// Blocks until signaled on completion.
}