Skip to content
  • Categories
  • Recent
  • Tags
  • Popular
  • World
  • Users
  • Groups
Skins
  • Light
  • Cerulean
  • Cosmo
  • Flatly
  • Journal
  • Litera
  • Lumen
  • Lux
  • Materia
  • Minty
  • Morph
  • Pulse
  • Sandstone
  • Simplex
  • Sketchy
  • Spacelab
  • United
  • Yeti
  • Zephyr
  • Dark
  • Cyborg
  • Darkly
  • Quartz
  • Slate
  • Solar
  • Superhero
  • Vapor

  • Default (No Skin)
  • No Skin
Collapse
Code Project
  1. Home
  2. General Programming
  3. C#
  4. Will this work? (VERY short code review) :)

Will this work? (VERY short code review) :)

Scheduled Pinned Locked Moved C#
data-structuresdebuggingquestionannouncementcode-review
1 Posts 1 Posters 0 Views 1 Watching
  • Oldest to Newest
  • Newest to Oldest
  • Most Votes
Reply
  • Reply as topic
Log in to reply
This topic has been deleted. Only users with topic management privileges can see it.
  • S Offline
    S Offline
    SledgeHammer01
    wrote on last edited by
    #1

    This is sort of a spin off of another thread (no pun intended)... I need a cancellable, thread-safe, blocking queue. I know there is BlockingCollectionT. That is what I am using now. However, people / customers are complaining about all the exceptions BlockingCollectionT throws (yeah, I know its "normal" and yeah, I catch 'em, its just causing people issues -- don't ask :( ). Anyways... I slapped together this quick substitute that is cancellable without throwing a hissy fit. It's going to be used in a 1 consumer / multiple producer scenario. Take a quick look... am I missing anything (threading / race conditions / bugs / etc), or will this do the job? Yes, the test code is kinda dumb, lol, but it looks like its working and cancellable in a clean way.

    public class BlockingQueue<T>
    {
    	private SemaphoreSlim \_semaphore = new SemaphoreSlim(0);
    	private ConcurrentQueue<T> \_queue = new ConcurrentQueue<T>();
    
    	public BlockingQueue()
    	{
    	}
    
    	public void Add(T t)
    	{
    		\_queue.Enqueue(t);
    		\_semaphore.Release();
    	}
    
    	public T Take(CancellationTokenSource cts)
    	{
    		return \_Take(cts).Result;
    	}
    
    	private async Task<T> \_Take(CancellationTokenSource cts)
    	{
    		T result = default(T);
    
    		await \_semaphore.WaitAsync(cts.Token);
    
    		if (!cts.IsCancellationRequested)
    			\_queue.TryDequeue(out result);
    
    		return result;
    	}
    };
    
    class Program
    {
    	private static CancellationTokenSource \_cts = new CancellationTokenSource();
    
    	static void Main(string\[\] args)
    	{
    		BlockingQueue<int> q = new BlockingQueue<int>();
    
    		Task.Run(() =>
    		{
    			System.Diagnostics.Debug.WriteLine("ENTER");
    
    			while (true)
    			{
    				int i = q.Take(\_cts);
    
    				if (\_cts.IsCancellationRequested)
    				{
    					System.Diagnostics.Debug.WriteLine("CANCELLED");
    					break;
    				}
    
    				System.Diagnostics.Debug.WriteLine("GOT ONE: " + i);
    			}
    		});
    
    		for (int i = 0; i < 10; i++)
    		{
    			q.Add(i);
    
    			if (i == 5)
    				\_cts.Cancel();
    			Thread.Sleep(500);
    		}
    	}
    }
    
    1 Reply Last reply
    0
    Reply
    • Reply as topic
    Log in to reply
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes


    • Login

    • Don't have an account? Register

    • Login or register to search.
    • First post
      Last post
    0
    • Categories
    • Recent
    • Tags
    • Popular
    • World
    • Users
    • Groups