How do you implement a message queuing system?
-
smesser wrote:
Now this is really getting exciting... Programming is exciting right ????
Yeah, this is fun stuff. :-D Since posting, I've already banging out most of an
EventQueue
class for fun. I'll get into the details in a moment. :)smesser wrote:
One thing is starting to get a little fuzzy for me. That is the difference betweek a message and an event. I have been kind of picturing them as one in the same.
Same here. I use event and message interchangeably. I guess if we really wanted to nail down a definition, we could say that an event is something that happens, and a message is the information that accompanies the event. But honestly, I use them both to mean the same thing. Let me run through how the
EventQueue
class can be used, and we'll see where your approach and mine can meet. First, create anEventQueue
and also create an event.EventQueue eventQueue = new EventQueue();
int endOfSongEventID = eventQueue.CreateEvent("EndOfSong");Now, subscribers can subscribe to this event. The
EventQueue
can provide a list of all of the events available on demand. Next, in some other object, we subscribe to the "EndOfSong" event.eventQueue.Subscribe("EndOfSong", new EventQueueEventHandler(HandleEndOfSong));
The second argument is a delegate to the method that will handle the "EndOfSong" event. It looks like this:
private void HandleEndOfSong(object sender, EventQueueEventArgs e)
{
// Do stuff in response to EndOfSong event.
}The
EventQueueEventArgs
class is a class representing information about any event raised by theEventQueue
. So somewhere else in our code, whereever the EndOfSong event originates, we send an event to ourEventQueue
:eventQueue.Send(endOfSongEventID, this, null);
The
endOfSongEventID
is the integer value returned when we first created the event. So instead of passing the event name when we send an event, we use the integer event ID. This is more efficient for theEventQueue
to deal with. At some point later in time, theEventQueue
dequeues the event and notifies all of the subscribers that have subscribed to that event.I guess the difference for me is that event seems to be some action that has happened without any state information while my version of a message can be both. That is why I want to create a message to send that would get added to the EventerrMessageQueue. Since my message inherits from a data object it can be a message, while if you create a message with only a subject then it acts as an event. I like the idea of having an event id but I see a problem with it. As you mention you could request a list of all events available. Since the list would probably return human readable strings it would make sense to have a plugin face that used strings to refer to them and an id for internal house keeping. If I requested a list of music events and the resulting list was 100, 123, 235 then you would have to maintain another list as to what they mean. And if the id aren't constant each time the app runs it could break the plugin. For example if id 5 means play. My code looks for a 5 the next time but now it's a 6. Just a thought. I think there is definitely room for two approaches here. The Callback delegate would be a good approach for most applications but for mine I think it would be better to let the System object have access to the eventqueue and the plugin manager and send out the messages via that path. Each plugin has a function called bool OnMessage( ISystem SystemObject, IMessage Message); It could return true/false if consumed or not and use the system object to create message to send the replies or responses. Just rambling a bit but this is sounding great.
-
I guess the difference for me is that event seems to be some action that has happened without any state information while my version of a message can be both. That is why I want to create a message to send that would get added to the EventerrMessageQueue. Since my message inherits from a data object it can be a message, while if you create a message with only a subject then it acts as an event. I like the idea of having an event id but I see a problem with it. As you mention you could request a list of all events available. Since the list would probably return human readable strings it would make sense to have a plugin face that used strings to refer to them and an id for internal house keeping. If I requested a list of music events and the resulting list was 100, 123, 235 then you would have to maintain another list as to what they mean. And if the id aren't constant each time the app runs it could break the plugin. For example if id 5 means play. My code looks for a 5 the next time but now it's a 6. Just a thought. I think there is definitely room for two approaches here. The Callback delegate would be a good approach for most applications but for mine I think it would be better to let the System object have access to the eventqueue and the plugin manager and send out the messages via that path. Each plugin has a function called bool OnMessage( ISystem SystemObject, IMessage Message); It could return true/false if consumed or not and use the system object to create message to send the replies or responses. Just rambling a bit but this is sounding great.
smesser wrote:
I like the idea of having an event id but I see a problem with it. As you mention you could request a list of all events available. Since the list would probably return human readable strings it would make sense to have a plugin face that used strings to refer to them and an id for internal house keeping.
I agree. The reason I was using an integer event ID is that it makes it more efficient for the
EventQueue
than having to do a string look up. However, it would be easy to use strings instead and have theEventQueue
use a hash table with the event names as keys.smesser wrote:
If I requested a list of music events and the resulting list was 100, 123, 235 then you would have to maintain another list as to what they mean. And if the id aren't constant each time the app runs it could break the plugin. For example if id 5 means play. My code looks for a 5 the next time but now it's a 6. Just a thought.
Oh, this would be easy. You could retrieve the event ID for any event as long as you know the name. Something like this:
int eventID = eventQueue["EndOfSong"];
Then once you have this event ID, you can use it to send the EndOfSong event. But you're right, using strings for event IDs would be simpler and more consistent, if less efficient for the
EventQueue
.smesser wrote:
I think there is definitely room for two approaches here. The Callback delegate would be a good approach for most applications but for mine I think it would be better to let the System object have access to the eventqueue and the plugin manager and send out the messages via that path.
The system and well as plugin manager could have access to the
EventQueue
. They could create their own events that plugins could subscribe to. However, all of this may be overkill for what you want.smesser wrote:
Each plugin has a function called bool OnMessage( ISystem SystemObject, IMessage Message); It could return true/false if consumed or not and use the system object to create message to send the replies or responses.
That sounds good. I guess the question is how to route the messages to the specific plugins? What part of this approach are you fuzzy on as to how to implement it?
-
smesser wrote:
I like the idea of having an event id but I see a problem with it. As you mention you could request a list of all events available. Since the list would probably return human readable strings it would make sense to have a plugin face that used strings to refer to them and an id for internal house keeping.
I agree. The reason I was using an integer event ID is that it makes it more efficient for the
EventQueue
than having to do a string look up. However, it would be easy to use strings instead and have theEventQueue
use a hash table with the event names as keys.smesser wrote:
If I requested a list of music events and the resulting list was 100, 123, 235 then you would have to maintain another list as to what they mean. And if the id aren't constant each time the app runs it could break the plugin. For example if id 5 means play. My code looks for a 5 the next time but now it's a 6. Just a thought.
Oh, this would be easy. You could retrieve the event ID for any event as long as you know the name. Something like this:
int eventID = eventQueue["EndOfSong"];
Then once you have this event ID, you can use it to send the EndOfSong event. But you're right, using strings for event IDs would be simpler and more consistent, if less efficient for the
EventQueue
.smesser wrote:
I think there is definitely room for two approaches here. The Callback delegate would be a good approach for most applications but for mine I think it would be better to let the System object have access to the eventqueue and the plugin manager and send out the messages via that path.
The system and well as plugin manager could have access to the
EventQueue
. They could create their own events that plugins could subscribe to. However, all of this may be overkill for what you want.smesser wrote:
Each plugin has a function called bool OnMessage( ISystem SystemObject, IMessage Message); It could return true/false if consumed or not and use the system object to create message to send the replies or responses.
That sounds good. I guess the question is how to route the messages to the specific plugins? What part of this approach are you fuzzy on as to how to implement it?
Yes, now were down to the harder part. Okay so we have an EventQueue which has a number of events in it. Since the EventQueue itself does not do the dispatching of the messages there needs to be some mechinism for seeing if there are any queued meesages and if so dispatch them. This is the fuzzy part for me. That is why I orginally had mentioned threads/timers. I would prefer not to have something polling the message queue but it may be the only way. Are you going to share your EventQueue that you have been hammering out?
-
Yes, now were down to the harder part. Okay so we have an EventQueue which has a number of events in it. Since the EventQueue itself does not do the dispatching of the messages there needs to be some mechinism for seeing if there are any queued meesages and if so dispatch them. This is the fuzzy part for me. That is why I orginally had mentioned threads/timers. I would prefer not to have something polling the message queue but it may be the only way. Are you going to share your EventQueue that you have been hammering out?
smesser wrote:
Okay so we have an EventQueue which has a number of events in it. Since the EventQueue itself does not do the dispatching of the messages there needs to be some mechinism for seeing if there are any queued meesages and if so dispatch them.
Oh, but the
EventQueue
does take care of that for us. :) It runs in its own thread. Actually, it uses myDelegateQueue
class, which runs in its own thread. The dispatching of the events is taken care of by theEventQueue
. Without that functionality, it wouldn't be of much use to us.smesser wrote:
Are you going to share your EventQueue that you have been hammering out?
Yes. I'll try to post a link to where you can download it within the hour or so. It will be an untested version, but at least you'll get to play around with the class to see if it's what you're looking for.
-
smesser wrote:
Okay so we have an EventQueue which has a number of events in it. Since the EventQueue itself does not do the dispatching of the messages there needs to be some mechinism for seeing if there are any queued meesages and if so dispatch them.
Oh, but the
EventQueue
does take care of that for us. :) It runs in its own thread. Actually, it uses myDelegateQueue
class, which runs in its own thread. The dispatching of the events is taken care of by theEventQueue
. Without that functionality, it wouldn't be of much use to us.smesser wrote:
Are you going to share your EventQueue that you have been hammering out?
Yes. I'll try to post a link to where you can download it within the hour or so. It will be an untested version, but at least you'll get to play around with the class to see if it's what you're looking for.
Cool on both parts. The EventQueue runing on it's own thread and the download I look forward to it.
-
Cool on both parts. The EventQueue runing on it's own thread and the download I look forward to it.
Here[^] is the download for the
EventQueue
class. In addition, you'll need to download the source code for my DelegateQueue[^] class. And the code for my Deque[^] class. You can compile all of this together into one assembly, though you'll want to change the namespace names so that they all match, and maybe move the using LSCollections directive. An alternative would be to use the assemblies that I have compiled. I can email you those if you'd like. Let me know how it goes and if you have any questions. Again, theEventQueue
class is completely untested. -
Here[^] is the download for the
EventQueue
class. In addition, you'll need to download the source code for my DelegateQueue[^] class. And the code for my Deque[^] class. You can compile all of this together into one assembly, though you'll want to change the namespace names so that they all match, and maybe move the using LSCollections directive. An alternative would be to use the assemblies that I have compiled. I can email you those if you'd like. Let me know how it goes and if you have any questions. Again, theEventQueue
class is completely untested.Thanks, I have downloaded your code and put all of the required files into one project so that I would at least build. It will take some time for all this to sink in and for me to understand the code. I don't understand invoke as I have not used it yet. At a quick glance I don't understand how the EventQueue is running in a thread but I only have 10 minutes vested thus far. Are you just banging this out for me or will you have an example to use the EventQueue class? Thanks much for your efforts and discussion.
-
Thanks, I have downloaded your code and put all of the required files into one project so that I would at least build. It will take some time for all this to sink in and for me to understand the code. I don't understand invoke as I have not used it yet. At a quick glance I don't understand how the EventQueue is running in a thread but I only have 10 minutes vested thus far. Are you just banging this out for me or will you have an example to use the EventQueue class? Thanks much for your efforts and discussion.
smesser wrote:
Are you just banging this out for me or will you have an example to use the EventQueue class?
I may devote an article to it at some point. I'll definitely put it in the next version of my state machine toolkit for others to use. Let's see if I can give you a quick example of using the
EventQueue
.public class MySystem
{
private EventQueue eventQueue = new EventQueue();private MusicPlugin musicPlugin; public MySystem() { // Create events. eventQueue.CreateEvent("Play"); eventQueue.CreateEvent("PlayingStopped"); // Create music plugin and give it the event queue. musicPlugin = new MusicPlugin(eventQueue); // Subscribe to the playing stop event. eventQueue.Subscribe("PlayingStopped", new EventQueueEventHandler(HandlePlayingStopped); } private void HandlePlayingStopped(object sender, EventQueueEventArgs e) { // Logic for handling playing stopped event. }
}
Here, the system object handles an event sent to it from the music plugin telling the system that it has stopped playing. Also, the system takes on the responsibility for creating all of the events before passing the event queue on to the plugin(s).
public class MusicPlugin
{
private EventQueue eventQueue;public MusicPlugin(EventQueue eventQueue) { this.eventQueue = eventQueue; eventQueue.Subscribe("Play", new EventQueueEventHandler(HandlePlayEvent)); } private void HandlePlayEvent(object sender, EventQueueEventArgs e) { // Logic for starting playback. } public void StopPlaying() { eventQueue.Send("PlayingStopped", this, null); }
}
Now with an example this small, there's not much of an advantage here over using C#'s built in events. However, if you have a lot of plugins that are sending events to each other as well as to the system, this approach could help keep the plugins decoupled. You would have one central event queue for handling all of the event notification.
-
smesser wrote:
Are you just banging this out for me or will you have an example to use the EventQueue class?
I may devote an article to it at some point. I'll definitely put it in the next version of my state machine toolkit for others to use. Let's see if I can give you a quick example of using the
EventQueue
.public class MySystem
{
private EventQueue eventQueue = new EventQueue();private MusicPlugin musicPlugin; public MySystem() { // Create events. eventQueue.CreateEvent("Play"); eventQueue.CreateEvent("PlayingStopped"); // Create music plugin and give it the event queue. musicPlugin = new MusicPlugin(eventQueue); // Subscribe to the playing stop event. eventQueue.Subscribe("PlayingStopped", new EventQueueEventHandler(HandlePlayingStopped); } private void HandlePlayingStopped(object sender, EventQueueEventArgs e) { // Logic for handling playing stopped event. }
}
Here, the system object handles an event sent to it from the music plugin telling the system that it has stopped playing. Also, the system takes on the responsibility for creating all of the events before passing the event queue on to the plugin(s).
public class MusicPlugin
{
private EventQueue eventQueue;public MusicPlugin(EventQueue eventQueue) { this.eventQueue = eventQueue; eventQueue.Subscribe("Play", new EventQueueEventHandler(HandlePlayEvent)); } private void HandlePlayEvent(object sender, EventQueueEventArgs e) { // Logic for starting playback. } public void StopPlaying() { eventQueue.Send("PlayingStopped", this, null); }
}
Now with an example this small, there's not much of an advantage here over using C#'s built in events. However, if you have a lot of plugins that are sending events to each other as well as to the system, this approach could help keep the plugins decoupled. You would have one central event queue for handling all of the event notification.
Thanks for the example. This is exaclty what I needed to get my project back on track again. I have been mulling over this issue for too long.Having the plugins loosely coupled has been one of my goals since the beginning of my project. I have many parts completely coded and now finally I have a way to tie it all together. Once again, I am very greatful for your discussion and your help.
-
Thanks for the example. This is exaclty what I needed to get my project back on track again. I have been mulling over this issue for too long.Having the plugins loosely coupled has been one of my goals since the beginning of my project. I have many parts completely coded and now finally I have a way to tie it all together. Once again, I am very greatful for your discussion and your help.
I've been testing out my
EventQueue
class, and I've noticed a bug:delegateQueue.Invoke(new SendOrPostCallback(delegate(object state)
{
List subscribers = (List)events[eventName];if(subscribers == null) { throw new InvalidOperationException("Event does not exist."); } else { subscribers.Add(new Subscriber(handler, context)); }
}), null);
Get rid of the null that's getting passed to the annonymous method. It confuses the runtime when it starts to invoke the delegate. So it should look like this:
delegateQueue.Invoke(new SendOrPostCallback(delegate(object state)
{
List subscribers = (List)events[eventName];if(subscribers == null) { throw new InvalidOperationException("Event does not exist."); } else { subscribers.Add(new Subscriber(handler, context)); }
}), new object());
[EDIT] I will have a new and improved version up later today as I test it more thoroughly. Also, that should be "new object()" getting passed to the anonymous method. [/EDIT] -- modified at 11:32 Monday 26th June, 2006
-
I've been testing out my
EventQueue
class, and I've noticed a bug:delegateQueue.Invoke(new SendOrPostCallback(delegate(object state)
{
List subscribers = (List)events[eventName];if(subscribers == null) { throw new InvalidOperationException("Event does not exist."); } else { subscribers.Add(new Subscriber(handler, context)); }
}), null);
Get rid of the null that's getting passed to the annonymous method. It confuses the runtime when it starts to invoke the delegate. So it should look like this:
delegateQueue.Invoke(new SendOrPostCallback(delegate(object state)
{
List subscribers = (List)events[eventName];if(subscribers == null) { throw new InvalidOperationException("Event does not exist."); } else { subscribers.Add(new Subscriber(handler, context)); }
}), new object());
[EDIT] I will have a new and improved version up later today as I test it more thoroughly. Also, that should be "new object()" getting passed to the anonymous method. [/EDIT] -- modified at 11:32 Monday 26th June, 2006
Thanks for the bug fix. I look forward to the new and improved version.
-
I am trying to implement a message queuing system in my applicaton. I have a main application which dynamically loads plugins that implement a required interface. In order to send out message to and from plugins I have an OnMessage() function that receives any message sent to it. Here is how it works. The main application has a central object called lets say System which the plugins have access to. One the the methods of system allows you to create a message. IMessage message = System.NewMessage() ... message.Send(); All messages are to be created, dispatched, and disposed of my the system object. How should I approach this problem. I'm sure I will need a message queue, but I am not sure about the rest. Any ideas welcome. PS: I don't want to use MS Message queue and all communication is only on the local machine.
-
Thanks you are most kind.
-
I've been testing out my
EventQueue
class, and I've noticed a bug:delegateQueue.Invoke(new SendOrPostCallback(delegate(object state)
{
List subscribers = (List)events[eventName];if(subscribers == null) { throw new InvalidOperationException("Event does not exist."); } else { subscribers.Add(new Subscriber(handler, context)); }
}), null);
Get rid of the null that's getting passed to the annonymous method. It confuses the runtime when it starts to invoke the delegate. So it should look like this:
delegateQueue.Invoke(new SendOrPostCallback(delegate(object state)
{
List subscribers = (List)events[eventName];if(subscribers == null) { throw new InvalidOperationException("Event does not exist."); } else { subscribers.Add(new Subscriber(handler, context)); }
}), new object());
[EDIT] I will have a new and improved version up later today as I test it more thoroughly. Also, that should be "new object()" getting passed to the anonymous method. [/EDIT] -- modified at 11:32 Monday 26th June, 2006
I finally had a chance to look at your code, I was on vacation. I really like it but I want to use it a little differently than your example. I don't want to expose the eventQueue directly to plugins. I don't want them to have to have that code in order to compile a plugin. Maybe you can help me out. I want my system object to be responsible for creating events and subscribing plugins to them. If you remember my setup Plugins have an
bool OnMessage( ISystem sys, IMessage msg );
The plugin can create a message using:IMessage m = sys.NewMessage( "music.play" ); // and then send it. m.Send();
My problem is that the message object gets created over and over again and therefore not persistent. The Send method in the Message class needs to be able to create and send the events. How can I accomplish this? Thanks -- modified at 14:44 Tuesday 4th July, 2006 -
Thanks you are most kind.
For some reason, I keep getting an error when trying to reply to your latest message, so I'm trying here instead. The System object could have a hash table where it keeps its messages. When a message is created, the name of the message is stored as a key in the hash table and the message object is stored as its value. Next time someone calls the
NewMessage
method asking for the same message, instead of creating a new message, the original message is retrieved from the hash table. So only one message of each type is created. When a message object is created, it can be passed theEventQueue
along with the name of the message. When itsSend
method is called, theMessage
sends the message using theEventQueue
. So theEventQueue
is hidden inside theMessage
class. The plugins don't have to know anything about it. -
For some reason, I keep getting an error when trying to reply to your latest message, so I'm trying here instead. The System object could have a hash table where it keeps its messages. When a message is created, the name of the message is stored as a key in the hash table and the message object is stored as its value. Next time someone calls the
NewMessage
method asking for the same message, instead of creating a new message, the original message is retrieved from the hash table. So only one message of each type is created. When a message object is created, it can be passed theEventQueue
along with the name of the message. When itsSend
method is called, theMessage
sends the message using theEventQueue
. So theEventQueue
is hidden inside theMessage
class. The plugins don't have to know anything about it.This part I understand
Leslie Sanford wrote:
The System object could have a hash table where it keeps its messages. When a message is created, the name of the message is stored as a key in the hash table and the message object is stored as its value. Next time someone calls the NewMessage method asking for the same message, instead of creating a new message, the original message is retrieved from the hash table. So only one message of each type is created.
This part I don't understand
Leslie Sanford wrote:
When a message object is created, it can be passed the EventQueue along with the name of the message. When its Send method is called, the Message sends the message using the EventQueue. So the EventQueue is hidden inside the Message class. The plugins don't have to know anything about it.
class System { Hashtable hashList = new Hashtable(); EventQueue eventQueue = new EventQueue(); public Message NewMessage( string sub ) { Message m = null; // = new Message( "music.play" ); if( !hashList.ContainsKey( sub ) ) { m = new Message( "music.play" ); hashList.Add( sub, m ); eventQueue.CreateEvent( sub ); } else { m = (Message)hashList[sub]; } return m; } } class Message { string subject; string data; public Message(string s) { this.subject = s; } public void Send() { string sub = this.subject; string datium = this.data; // what should happen here? } }
-
This part I understand
Leslie Sanford wrote:
The System object could have a hash table where it keeps its messages. When a message is created, the name of the message is stored as a key in the hash table and the message object is stored as its value. Next time someone calls the NewMessage method asking for the same message, instead of creating a new message, the original message is retrieved from the hash table. So only one message of each type is created.
This part I don't understand
Leslie Sanford wrote:
When a message object is created, it can be passed the EventQueue along with the name of the message. When its Send method is called, the Message sends the message using the EventQueue. So the EventQueue is hidden inside the Message class. The plugins don't have to know anything about it.
class System { Hashtable hashList = new Hashtable(); EventQueue eventQueue = new EventQueue(); public Message NewMessage( string sub ) { Message m = null; // = new Message( "music.play" ); if( !hashList.ContainsKey( sub ) ) { m = new Message( "music.play" ); hashList.Add( sub, m ); eventQueue.CreateEvent( sub ); } else { m = (Message)hashList[sub]; } return m; } } class Message { string subject; string data; public Message(string s) { this.subject = s; } public void Send() { string sub = this.subject; string datium = this.data; // what should happen here? } }
This is what I was getting at:
public class Message
{
string subject;
EventQueue queue;
string data = string.Empty;public Message(string s, EventQueue queue) { this.subject = s; this.queue = queue; } public Send() { queue.Send(this.subject, this, data); }
}
-
This is what I was getting at:
public class Message
{
string subject;
EventQueue queue;
string data = string.Empty;public Message(string s, EventQueue queue) { this.subject = s; this.queue = queue; } public Send() { queue.Send(this.subject, this, data); }
}
I thought that the eventQueue was part of the system object. Using your example the eventQueue will get disposed of each time a message is created and thereby losing any subscriptions. Also your still passing in the eventQueue which means the plugin will need that code to compile. Am I misunderstanding still? -- modified at 17:10 Tuesday 4th July, 2006
-
I thought that the eventQueue was part of the system object. Using your example the eventQueue will get disposed of each time a message is created and thereby losing any subscriptions. Also your still passing in the eventQueue which means the plugin will need that code to compile. Am I misunderstanding still? -- modified at 17:10 Tuesday 4th July, 2006
smesser wrote:
I thought that the eventQueue was part of the system object.
The System object owns an EventQueue object. It's the System's job to dispose of it. It's simply passing the EventQueue object along to the Message objects so that they can use it to send messages. Make sense?
-
smesser wrote:
I thought that the eventQueue was part of the system object.
The System object owns an EventQueue object. It's the System's job to dispose of it. It's simply passing the EventQueue object along to the Message objects so that they can use it to send messages. Make sense?
It makes perfect sense but it violates the coupling I mentioned several responses back when we were talking about hiding the eventqueue completely from the plugin. My intention is that the System object creates the eventqueue and any new messages and sends them to all plugins which have subscribed. The plugin would send reponses that the system object would turn into events and add them to the queue or send them. Thats what I thought we were trying to accomplish a few posts back.