Blocking Queue
-
Hi everybody, I need a multi-thread blocking queue to implement producer/consumer pattern in C#. I have found many BlockingQueue implementations around the internet. But I don't know which one is better. Which one do you recommend me? Regards
It depends. How are you going to use it? Will you put ValueTypes in it? Would it be OK if the buffer was fixed sized? If the answer is yes to both, I would pick one that uses a circular array and two Semaphores. Which implementations are we allowed to choose from?
-
It depends. How are you going to use it? Will you put ValueTypes in it? Would it be OK if the buffer was fixed sized? If the answer is yes to both, I would pick one that uses a circular array and two Semaphores. Which implementations are we allowed to choose from?
Well, what I'm trying to do is a protocol stack implementation with layer abstraction. So, the communication between layers should be a product/manager scenario. I don't know if I'm right. So, I'm going to exchange my own objects in these queues. I suppose, the buffer shouldn't be fixed sized. I know it can be less eficient, but the size should grow when it's needed. The BlockingQueue implementation i have right now (I can't test it yet) is this:
public class BlockingQueue<T> : IEnumerable<T>
{
private int _count = 0;
private Queue<T> _queue = new Queue<T>();public T Dequeue() { lock (\_queue) { while (\_count <= 0) Monitor.Wait(\_queue); \_count--; return \_queue.Dequeue(); } } public void Enqueue(T data) { if (data == null) throw new ArgumentNullException("data"); lock (\_queue) { \_queue.Enqueue(data); \_count++; Monitor.Pulse(\_queue); } } IEnumerator<T> IEnumerable<T>.GetEnumerator() { while (true) yield return Dequeue(); } IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerable<T>)this).GetEnumerator(); } }
Regards
-
Well, what I'm trying to do is a protocol stack implementation with layer abstraction. So, the communication between layers should be a product/manager scenario. I don't know if I'm right. So, I'm going to exchange my own objects in these queues. I suppose, the buffer shouldn't be fixed sized. I know it can be less eficient, but the size should grow when it's needed. The BlockingQueue implementation i have right now (I can't test it yet) is this:
public class BlockingQueue<T> : IEnumerable<T>
{
private int _count = 0;
private Queue<T> _queue = new Queue<T>();public T Dequeue() { lock (\_queue) { while (\_count <= 0) Monitor.Wait(\_queue); \_count--; return \_queue.Dequeue(); } } public void Enqueue(T data) { if (data == null) throw new ArgumentNullException("data"); lock (\_queue) { \_queue.Enqueue(data); \_count++; Monitor.Pulse(\_queue); } } IEnumerator<T> IEnumerable<T>.GetEnumerator() { while (true) yield return Dequeue(); } IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerable<T>)this).GetEnumerator(); } }
Regards
Should it really be allowed to grow unbounded? What's worse, your use of Pulse/Wait + lock looks completely wrong to me - since it's locked those pulse/wait are never going to be useful (well it's locked) so Dequeue will wait forever, it can never be Pulsed out of its Wait since Enqueue can not acquire its lock. Silly moment, scratch that. So, how about this? Like so: (untested!)
public class BlockingQueue<T> : IEnumerable<T> { private Queue<T> \_queue = new Queue<T>(); private Semaphore \_semaphore = new Semaphore(0, a lot); public T Dequeue() { \_semaphore.WaitOne(); lock (\_queue) return \_queue.Dequeue(); } public void Enqueue(T data) { lock (\_queue) \_queue.Enqueue(data); \_semaphore.Release(); } IEnumerator<T> IEnumerable<T>.GetEnumerator() { while (true) yield return Dequeue(); } IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerable<T>)this).GetEnumerator(); } }
That, of course, still requires locking, since operations on a
Queue<T>
are not thread safe. So it's dead slow. All the time. Not only when there is too much stuff to put into the buffer (which is when it doesn't matter much if the producer gets slowed down a bit since the consumer is the bottleneck anyway)modified on Monday, January 11, 2010 11:48 AM
-
Hi everybody, I need a multi-thread blocking queue to implement producer/consumer pattern in C#. I have found many BlockingQueue implementations around the internet. But I don't know which one is better. Which one do you recommend me? Regards
There is an example in this article: Thread synchronization: Wait and Pulse demystified[^] :-O Your implementation looks correct, but I guess you want to limit the size of the queue at some point, not let it grow to infinity. The implementation in that article also allows you to terminate the queue cleanly. Or you might find something in the recent release of MS Reactive Extensions on DevLabs[^]. It contains an ( unsupported ) back port of the new .NET 4.0 Task Parallel Library for .NET 3.5. I haven't explored it yet, but I would be surprised if it didn't contain the concurrent collection, including
BlockingCollection
. Nick---------------------------------- Be excellent to each other :)