How do you implement a message queuing system?
-
Yes, now were down to the harder part. Okay so we have an EventQueue which has a number of events in it. Since the EventQueue itself does not do the dispatching of the messages there needs to be some mechinism for seeing if there are any queued meesages and if so dispatch them. This is the fuzzy part for me. That is why I orginally had mentioned threads/timers. I would prefer not to have something polling the message queue but it may be the only way. Are you going to share your EventQueue that you have been hammering out?
smesser wrote:
Okay so we have an EventQueue which has a number of events in it. Since the EventQueue itself does not do the dispatching of the messages there needs to be some mechinism for seeing if there are any queued meesages and if so dispatch them.
Oh, but the
EventQueue
does take care of that for us. :) It runs in its own thread. Actually, it uses myDelegateQueue
class, which runs in its own thread. The dispatching of the events is taken care of by theEventQueue
. Without that functionality, it wouldn't be of much use to us.smesser wrote:
Are you going to share your EventQueue that you have been hammering out?
Yes. I'll try to post a link to where you can download it within the hour or so. It will be an untested version, but at least you'll get to play around with the class to see if it's what you're looking for.
-
smesser wrote:
Okay so we have an EventQueue which has a number of events in it. Since the EventQueue itself does not do the dispatching of the messages there needs to be some mechinism for seeing if there are any queued meesages and if so dispatch them.
Oh, but the
EventQueue
does take care of that for us. :) It runs in its own thread. Actually, it uses myDelegateQueue
class, which runs in its own thread. The dispatching of the events is taken care of by theEventQueue
. Without that functionality, it wouldn't be of much use to us.smesser wrote:
Are you going to share your EventQueue that you have been hammering out?
Yes. I'll try to post a link to where you can download it within the hour or so. It will be an untested version, but at least you'll get to play around with the class to see if it's what you're looking for.
Cool on both parts. The EventQueue runing on it's own thread and the download I look forward to it.
-
Cool on both parts. The EventQueue runing on it's own thread and the download I look forward to it.
Here[^] is the download for the
EventQueue
class. In addition, you'll need to download the source code for my DelegateQueue[^] class. And the code for my Deque[^] class. You can compile all of this together into one assembly, though you'll want to change the namespace names so that they all match, and maybe move the using LSCollections directive. An alternative would be to use the assemblies that I have compiled. I can email you those if you'd like. Let me know how it goes and if you have any questions. Again, theEventQueue
class is completely untested. -
Here[^] is the download for the
EventQueue
class. In addition, you'll need to download the source code for my DelegateQueue[^] class. And the code for my Deque[^] class. You can compile all of this together into one assembly, though you'll want to change the namespace names so that they all match, and maybe move the using LSCollections directive. An alternative would be to use the assemblies that I have compiled. I can email you those if you'd like. Let me know how it goes and if you have any questions. Again, theEventQueue
class is completely untested.Thanks, I have downloaded your code and put all of the required files into one project so that I would at least build. It will take some time for all this to sink in and for me to understand the code. I don't understand invoke as I have not used it yet. At a quick glance I don't understand how the EventQueue is running in a thread but I only have 10 minutes vested thus far. Are you just banging this out for me or will you have an example to use the EventQueue class? Thanks much for your efforts and discussion.
-
Thanks, I have downloaded your code and put all of the required files into one project so that I would at least build. It will take some time for all this to sink in and for me to understand the code. I don't understand invoke as I have not used it yet. At a quick glance I don't understand how the EventQueue is running in a thread but I only have 10 minutes vested thus far. Are you just banging this out for me or will you have an example to use the EventQueue class? Thanks much for your efforts and discussion.
smesser wrote:
Are you just banging this out for me or will you have an example to use the EventQueue class?
I may devote an article to it at some point. I'll definitely put it in the next version of my state machine toolkit for others to use. Let's see if I can give you a quick example of using the
EventQueue
.public class MySystem
{
private EventQueue eventQueue = new EventQueue();private MusicPlugin musicPlugin; public MySystem() { // Create events. eventQueue.CreateEvent("Play"); eventQueue.CreateEvent("PlayingStopped"); // Create music plugin and give it the event queue. musicPlugin = new MusicPlugin(eventQueue); // Subscribe to the playing stop event. eventQueue.Subscribe("PlayingStopped", new EventQueueEventHandler(HandlePlayingStopped); } private void HandlePlayingStopped(object sender, EventQueueEventArgs e) { // Logic for handling playing stopped event. }
}
Here, the system object handles an event sent to it from the music plugin telling the system that it has stopped playing. Also, the system takes on the responsibility for creating all of the events before passing the event queue on to the plugin(s).
public class MusicPlugin
{
private EventQueue eventQueue;public MusicPlugin(EventQueue eventQueue) { this.eventQueue = eventQueue; eventQueue.Subscribe("Play", new EventQueueEventHandler(HandlePlayEvent)); } private void HandlePlayEvent(object sender, EventQueueEventArgs e) { // Logic for starting playback. } public void StopPlaying() { eventQueue.Send("PlayingStopped", this, null); }
}
Now with an example this small, there's not much of an advantage here over using C#'s built in events. However, if you have a lot of plugins that are sending events to each other as well as to the system, this approach could help keep the plugins decoupled. You would have one central event queue for handling all of the event notification.
-
smesser wrote:
Are you just banging this out for me or will you have an example to use the EventQueue class?
I may devote an article to it at some point. I'll definitely put it in the next version of my state machine toolkit for others to use. Let's see if I can give you a quick example of using the
EventQueue
.public class MySystem
{
private EventQueue eventQueue = new EventQueue();private MusicPlugin musicPlugin; public MySystem() { // Create events. eventQueue.CreateEvent("Play"); eventQueue.CreateEvent("PlayingStopped"); // Create music plugin and give it the event queue. musicPlugin = new MusicPlugin(eventQueue); // Subscribe to the playing stop event. eventQueue.Subscribe("PlayingStopped", new EventQueueEventHandler(HandlePlayingStopped); } private void HandlePlayingStopped(object sender, EventQueueEventArgs e) { // Logic for handling playing stopped event. }
}
Here, the system object handles an event sent to it from the music plugin telling the system that it has stopped playing. Also, the system takes on the responsibility for creating all of the events before passing the event queue on to the plugin(s).
public class MusicPlugin
{
private EventQueue eventQueue;public MusicPlugin(EventQueue eventQueue) { this.eventQueue = eventQueue; eventQueue.Subscribe("Play", new EventQueueEventHandler(HandlePlayEvent)); } private void HandlePlayEvent(object sender, EventQueueEventArgs e) { // Logic for starting playback. } public void StopPlaying() { eventQueue.Send("PlayingStopped", this, null); }
}
Now with an example this small, there's not much of an advantage here over using C#'s built in events. However, if you have a lot of plugins that are sending events to each other as well as to the system, this approach could help keep the plugins decoupled. You would have one central event queue for handling all of the event notification.
Thanks for the example. This is exaclty what I needed to get my project back on track again. I have been mulling over this issue for too long.Having the plugins loosely coupled has been one of my goals since the beginning of my project. I have many parts completely coded and now finally I have a way to tie it all together. Once again, I am very greatful for your discussion and your help.
-
Thanks for the example. This is exaclty what I needed to get my project back on track again. I have been mulling over this issue for too long.Having the plugins loosely coupled has been one of my goals since the beginning of my project. I have many parts completely coded and now finally I have a way to tie it all together. Once again, I am very greatful for your discussion and your help.
I've been testing out my
EventQueue
class, and I've noticed a bug:delegateQueue.Invoke(new SendOrPostCallback(delegate(object state)
{
List subscribers = (List)events[eventName];if(subscribers == null) { throw new InvalidOperationException("Event does not exist."); } else { subscribers.Add(new Subscriber(handler, context)); }
}), null);
Get rid of the null that's getting passed to the annonymous method. It confuses the runtime when it starts to invoke the delegate. So it should look like this:
delegateQueue.Invoke(new SendOrPostCallback(delegate(object state)
{
List subscribers = (List)events[eventName];if(subscribers == null) { throw new InvalidOperationException("Event does not exist."); } else { subscribers.Add(new Subscriber(handler, context)); }
}), new object());
[EDIT] I will have a new and improved version up later today as I test it more thoroughly. Also, that should be "new object()" getting passed to the anonymous method. [/EDIT] -- modified at 11:32 Monday 26th June, 2006
-
I've been testing out my
EventQueue
class, and I've noticed a bug:delegateQueue.Invoke(new SendOrPostCallback(delegate(object state)
{
List subscribers = (List)events[eventName];if(subscribers == null) { throw new InvalidOperationException("Event does not exist."); } else { subscribers.Add(new Subscriber(handler, context)); }
}), null);
Get rid of the null that's getting passed to the annonymous method. It confuses the runtime when it starts to invoke the delegate. So it should look like this:
delegateQueue.Invoke(new SendOrPostCallback(delegate(object state)
{
List subscribers = (List)events[eventName];if(subscribers == null) { throw new InvalidOperationException("Event does not exist."); } else { subscribers.Add(new Subscriber(handler, context)); }
}), new object());
[EDIT] I will have a new and improved version up later today as I test it more thoroughly. Also, that should be "new object()" getting passed to the anonymous method. [/EDIT] -- modified at 11:32 Monday 26th June, 2006
Thanks for the bug fix. I look forward to the new and improved version.
-
I am trying to implement a message queuing system in my applicaton. I have a main application which dynamically loads plugins that implement a required interface. In order to send out message to and from plugins I have an OnMessage() function that receives any message sent to it. Here is how it works. The main application has a central object called lets say System which the plugins have access to. One the the methods of system allows you to create a message. IMessage message = System.NewMessage() ... message.Send(); All messages are to be created, dispatched, and disposed of my the system object. How should I approach this problem. I'm sure I will need a message queue, but I am not sure about the rest. Any ideas welcome. PS: I don't want to use MS Message queue and all communication is only on the local machine.
-
Thanks you are most kind.
-
I've been testing out my
EventQueue
class, and I've noticed a bug:delegateQueue.Invoke(new SendOrPostCallback(delegate(object state)
{
List subscribers = (List)events[eventName];if(subscribers == null) { throw new InvalidOperationException("Event does not exist."); } else { subscribers.Add(new Subscriber(handler, context)); }
}), null);
Get rid of the null that's getting passed to the annonymous method. It confuses the runtime when it starts to invoke the delegate. So it should look like this:
delegateQueue.Invoke(new SendOrPostCallback(delegate(object state)
{
List subscribers = (List)events[eventName];if(subscribers == null) { throw new InvalidOperationException("Event does not exist."); } else { subscribers.Add(new Subscriber(handler, context)); }
}), new object());
[EDIT] I will have a new and improved version up later today as I test it more thoroughly. Also, that should be "new object()" getting passed to the anonymous method. [/EDIT] -- modified at 11:32 Monday 26th June, 2006
I finally had a chance to look at your code, I was on vacation. I really like it but I want to use it a little differently than your example. I don't want to expose the eventQueue directly to plugins. I don't want them to have to have that code in order to compile a plugin. Maybe you can help me out. I want my system object to be responsible for creating events and subscribing plugins to them. If you remember my setup Plugins have an
bool OnMessage( ISystem sys, IMessage msg );
The plugin can create a message using:IMessage m = sys.NewMessage( "music.play" ); // and then send it. m.Send();
My problem is that the message object gets created over and over again and therefore not persistent. The Send method in the Message class needs to be able to create and send the events. How can I accomplish this? Thanks -- modified at 14:44 Tuesday 4th July, 2006 -
Thanks you are most kind.
For some reason, I keep getting an error when trying to reply to your latest message, so I'm trying here instead. The System object could have a hash table where it keeps its messages. When a message is created, the name of the message is stored as a key in the hash table and the message object is stored as its value. Next time someone calls the
NewMessage
method asking for the same message, instead of creating a new message, the original message is retrieved from the hash table. So only one message of each type is created. When a message object is created, it can be passed theEventQueue
along with the name of the message. When itsSend
method is called, theMessage
sends the message using theEventQueue
. So theEventQueue
is hidden inside theMessage
class. The plugins don't have to know anything about it. -
For some reason, I keep getting an error when trying to reply to your latest message, so I'm trying here instead. The System object could have a hash table where it keeps its messages. When a message is created, the name of the message is stored as a key in the hash table and the message object is stored as its value. Next time someone calls the
NewMessage
method asking for the same message, instead of creating a new message, the original message is retrieved from the hash table. So only one message of each type is created. When a message object is created, it can be passed theEventQueue
along with the name of the message. When itsSend
method is called, theMessage
sends the message using theEventQueue
. So theEventQueue
is hidden inside theMessage
class. The plugins don't have to know anything about it.This part I understand
Leslie Sanford wrote:
The System object could have a hash table where it keeps its messages. When a message is created, the name of the message is stored as a key in the hash table and the message object is stored as its value. Next time someone calls the NewMessage method asking for the same message, instead of creating a new message, the original message is retrieved from the hash table. So only one message of each type is created.
This part I don't understand
Leslie Sanford wrote:
When a message object is created, it can be passed the EventQueue along with the name of the message. When its Send method is called, the Message sends the message using the EventQueue. So the EventQueue is hidden inside the Message class. The plugins don't have to know anything about it.
class System { Hashtable hashList = new Hashtable(); EventQueue eventQueue = new EventQueue(); public Message NewMessage( string sub ) { Message m = null; // = new Message( "music.play" ); if( !hashList.ContainsKey( sub ) ) { m = new Message( "music.play" ); hashList.Add( sub, m ); eventQueue.CreateEvent( sub ); } else { m = (Message)hashList[sub]; } return m; } } class Message { string subject; string data; public Message(string s) { this.subject = s; } public void Send() { string sub = this.subject; string datium = this.data; // what should happen here? } }
-
This part I understand
Leslie Sanford wrote:
The System object could have a hash table where it keeps its messages. When a message is created, the name of the message is stored as a key in the hash table and the message object is stored as its value. Next time someone calls the NewMessage method asking for the same message, instead of creating a new message, the original message is retrieved from the hash table. So only one message of each type is created.
This part I don't understand
Leslie Sanford wrote:
When a message object is created, it can be passed the EventQueue along with the name of the message. When its Send method is called, the Message sends the message using the EventQueue. So the EventQueue is hidden inside the Message class. The plugins don't have to know anything about it.
class System { Hashtable hashList = new Hashtable(); EventQueue eventQueue = new EventQueue(); public Message NewMessage( string sub ) { Message m = null; // = new Message( "music.play" ); if( !hashList.ContainsKey( sub ) ) { m = new Message( "music.play" ); hashList.Add( sub, m ); eventQueue.CreateEvent( sub ); } else { m = (Message)hashList[sub]; } return m; } } class Message { string subject; string data; public Message(string s) { this.subject = s; } public void Send() { string sub = this.subject; string datium = this.data; // what should happen here? } }
This is what I was getting at:
public class Message
{
string subject;
EventQueue queue;
string data = string.Empty;public Message(string s, EventQueue queue) { this.subject = s; this.queue = queue; } public Send() { queue.Send(this.subject, this, data); }
}
-
This is what I was getting at:
public class Message
{
string subject;
EventQueue queue;
string data = string.Empty;public Message(string s, EventQueue queue) { this.subject = s; this.queue = queue; } public Send() { queue.Send(this.subject, this, data); }
}
I thought that the eventQueue was part of the system object. Using your example the eventQueue will get disposed of each time a message is created and thereby losing any subscriptions. Also your still passing in the eventQueue which means the plugin will need that code to compile. Am I misunderstanding still? -- modified at 17:10 Tuesday 4th July, 2006
-
I thought that the eventQueue was part of the system object. Using your example the eventQueue will get disposed of each time a message is created and thereby losing any subscriptions. Also your still passing in the eventQueue which means the plugin will need that code to compile. Am I misunderstanding still? -- modified at 17:10 Tuesday 4th July, 2006
smesser wrote:
I thought that the eventQueue was part of the system object.
The System object owns an EventQueue object. It's the System's job to dispose of it. It's simply passing the EventQueue object along to the Message objects so that they can use it to send messages. Make sense?
-
smesser wrote:
I thought that the eventQueue was part of the system object.
The System object owns an EventQueue object. It's the System's job to dispose of it. It's simply passing the EventQueue object along to the Message objects so that they can use it to send messages. Make sense?
It makes perfect sense but it violates the coupling I mentioned several responses back when we were talking about hiding the eventqueue completely from the plugin. My intention is that the System object creates the eventqueue and any new messages and sends them to all plugins which have subscribed. The plugin would send reponses that the system object would turn into events and add them to the queue or send them. Thats what I thought we were trying to accomplish a few posts back.
-
It makes perfect sense but it violates the coupling I mentioned several responses back when we were talking about hiding the eventqueue completely from the plugin. My intention is that the System object creates the eventqueue and any new messages and sends them to all plugins which have subscribed. The plugin would send reponses that the system object would turn into events and add them to the queue or send them. Thats what I thought we were trying to accomplish a few posts back.
Hmm, I may not have a clear understanding of the architecture you're trying to implement. As far as coupling goes, if you have an
IMessage
interface that all message classes implement, then all the plugins have to know about is the interface. The concrete implementation will be hidden from them, and thus theEventQueue
. -
Hmm, I may not have a clear understanding of the architecture you're trying to implement. As far as coupling goes, if you have an
IMessage
interface that all message classes implement, then all the plugins have to know about is the interface. The concrete implementation will be hidden from them, and thus theEventQueue
.Yes, but then that brings me back to one of my orginal questions. If the System object creates and disposes of the event queue. Then where does the event queue inside of the message class fit it? Each instance of message would have it's own eventqueue that would get disposed of when the message was disposed of taking with it any subscriptions that may have been made to it. Simple overview: 1. System object creates messages and manages the global eventqueue 2. System object creates and sends events from plugins via the Message send method. This way there is one global event queue. 3. Plugin has no concept of event queue nor cares about it. Hope this makes sense.
-
Yes, but then that brings me back to one of my orginal questions. If the System object creates and disposes of the event queue. Then where does the event queue inside of the message class fit it? Each instance of message would have it's own eventqueue that would get disposed of when the message was disposed of taking with it any subscriptions that may have been made to it. Simple overview: 1. System object creates messages and manages the global eventqueue 2. System object creates and sends events from plugins via the Message send method. This way there is one global event queue. 3. Plugin has no concept of event queue nor cares about it. Hope this makes sense.
smesser wrote:
Each instance of message would have it's own eventqueue that would get disposed of when the message was disposed of taking with it any subscriptions that may have been made to it.
But the messages wouldn't dispose of the EventQueue given to them. It's all in how you implement the message class's Dispose method. Whatever the message does when it is disposed of, it knows not to dispose of the EventQueue because it doesn't belong to it.
smesser wrote:
Simple overview: 1. System object creates messages and manages the global eventqueue 2. System object creates and sends events from plugins via the Message send method. This way there is one global event queue. 3. Plugin has no concept of event queue nor cares about it.
I think it's number 2 that's giving me trouble. Let's run through a scenario to see if I understand: 1. The System sends a message to a plugin via the OnMessage method. 2. The plugin does something with the message. 3. The plugin wants to send a message telling the System and any other plugins listening that something has happened. 4. The plugin asks the System for a message object representing a specific message, e.g. "PlayCompleted". 5. The plugin gets the message back from the System and sends it calling the message's Send method. Is this close?