Will this work? (VERY short code review) :)
-
This is sort of a spin off of another thread (no pun intended)... I need a cancellable, thread-safe, blocking queue. I know there is BlockingCollectionT. That is what I am using now. However, people / customers are complaining about all the exceptions BlockingCollectionT throws (yeah, I know its "normal" and yeah, I catch 'em, its just causing people issues -- don't ask :( ). Anyways... I slapped together this quick substitute that is cancellable without throwing a hissy fit. It's going to be used in a 1 consumer / multiple producer scenario. Take a quick look... am I missing anything (threading / race conditions / bugs / etc), or will this do the job? Yes, the test code is kinda dumb, lol, but it looks like its working and cancellable in a clean way.
public class BlockingQueue<T> { private SemaphoreSlim \_semaphore = new SemaphoreSlim(0); private ConcurrentQueue<T> \_queue = new ConcurrentQueue<T>(); public BlockingQueue() { } public void Add(T t) { \_queue.Enqueue(t); \_semaphore.Release(); } public T Take(CancellationTokenSource cts) { return \_Take(cts).Result; } private async Task<T> \_Take(CancellationTokenSource cts) { T result = default(T); await \_semaphore.WaitAsync(cts.Token); if (!cts.IsCancellationRequested) \_queue.TryDequeue(out result); return result; } }; class Program { private static CancellationTokenSource \_cts = new CancellationTokenSource(); static void Main(string\[\] args) { BlockingQueue<int> q = new BlockingQueue<int>(); Task.Run(() => { System.Diagnostics.Debug.WriteLine("ENTER"); while (true) { int i = q.Take(\_cts); if (\_cts.IsCancellationRequested) { System.Diagnostics.Debug.WriteLine("CANCELLED"); break; } System.Diagnostics.Debug.WriteLine("GOT ONE: " + i); } }); for (int i = 0; i < 10; i++) { q.Add(i); if (i == 5) \_cts.Cancel(); Thread.Sleep(500); } } }