Even though Event Aggregator could be termed as one of the lesser known patterns, the useful of the pattern cannot be ignored. In fact, it makes me wonder why it is lesser known.
Let us recap the observer pattern. One of the problems with observer pattern is that it could grow out of hand when the system has multiple event sources and multiple event subscribers. The Subscribers could themselves be source of some other events.
The Event Aggregator, as Martin Fowler mentions it, is a single source of truth – or rather in this case, single source of events indirection. It gathers events from multiple sources and propagate them to interested parties. The event aggregator is loosely coupled with both Observers and the Subjects, sitting in the middle of observers and publishers as a centralized message broker, allowing a single point of registration of events.
I have been using Caliburn Micro for my WPF applications for a while and it provides a quite useful implementation of Event Aggregator. Following section discusses a simplified version of Caliburn Micro’s implementation of Event Aggregator.
public interface IEventAggregator
{
void Subscribe(object subscriber);
void Unsubscribe(object subscriber);
void PublishMessage(object message);
}
The Event Aggregator has three major methods exposed. One method each for Subscribing and Unsubscribing, and anther additional method for Subjects to publish events. The Subscribe()
method could be defined as
private readonly List<WeakReference<Handler>> _handlers = new List<WeakReference<Handler>>();
public void Subscribe(object subscriber)
{
lock (_handlers)
{
if (_handlers.Any(x => x == subscriber))
{
return;
}
_handlers.Add(new WeakReference<Handler>(new Handler(subscriber)));
}
}
The Subscribe()
method implementation is pretty straightforward. Each time it is invoked it would add the subscriber instance (passed as parameter) to a collection. The collection has been maintained as WeakReference
– being a distant cousin of Observer pattern, the EventAggregator also inheritance some of the memory leak issues that is evident in Observer Pattern. The WeakReference
allows us to workaround the problem to an extend.
As seen in the code above, the collection is of a type Handler
(or rather WeakReference<Handler>
to precise).The Handler
Type maintains a reference to the subscriber along with a list of Message Types subscriber is interested in and the corresponding Event Handlers. We will look at the Handler
definition before discussing further.
private class Handler
{
private readonly object _subscriber;
private Dictionary<Type, MethodInfo> _handlers = new Dictionary<Type, MethodInfo>();
public Handler(object subscriber)
{
_subscriber = subscriber;
var interfaces = _subscriber.GetType()
.GetTypeInfo()
.ImplementedInterfaces
.Where(x => x.GetTypeInfo().IsGenericType && x.GetGenericTypeDefinition() == typeof(IHandleAsync<>));
foreach (var @interface in interfaces)
{
var genericTypeArg = @interface.GetTypeInfo().GenericTypeArguments[0];
var method = @interface.GetRuntimeMethod("HandleAsync", new[] { genericTypeArg });
_handlers.Add(genericTypeArg, method);
}
}
public object Subscriber => _subscriber;
public void Handle(Type messageType, object message)
{
if (!_handlers.ContainsKey(messageType))
{
throw new Exception($"Message type {message} not registered");
}
var method = _handlers[messageType];
method.Invoke(_subscriber, new[] { message });
}
}
As you can observe in the code above, the Handler
type uses reflection to iterate over the interfaces implemented by the subscriber. In particular, the Handler
is interested in the generic interface IHandleAsync<>
. Let us look at the interface first.
public interface IHandleAsync<T> where T : EventMessageBase
{
Task HandleAsync(T message);
}
As seen in the code above, the interface has only a single method. HandleAsync<T>()
which accepts a single parameter of Type T
. This interface would be implemented by the subscribers who are interested in the publication of messages of type T
.
Let us now look at the Unsubscribe method.
public void Unsubscribe(object subscriber)
{
lock (_handlers)
{
WeakReference<Handler> toRemove = null;
lock (_handlers)
{
foreach (var handler in _handlers)
{
if (handler.TryGetTarget(out var target))
{
if (target.Subscriber == subscriber)
toRemove = handler;
}
}
if(toRemove != null)
{
_handlers.Remove(toRemove);
}
}
}
}
Not too much to explain there other than being conscience of thread safety. One another point to note in the Unsubscribe()
method is that it removes any instances of Handler
that has been reclaimed by the GC.
The last bit of code we need to look at in the implementation of EventAggregator
is the PublishMessage()
method.
public void PublishMessage(object message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var handlersToNotify = new List<Handler>();
lock (_handlers)
{
foreach(var handler in _handlers)
{
if(handler.TryGetTarget(out var target))
{
handlersToNotify.Add(target);
}
}
}
foreach (var handler in handlersToNotify)
{
handler.Handle(message.GetType(), message);
}
}
The core functionality of the PublishMessage()
method is to accept a message and iterate over the active Subscribers. It would then inturn Handler
instance to check the internal dictionary of the instance has subscribed for the message of type which was accepted as parameter.This is accomplished using the Handler.Handle()
method. It would retrieve the event handlers associated and invoke them as seen the code below (implementation of the Handler.Handle()
which we had already seen above)
public void Handle(Type messageType, object message)
{
if (!_handlers.ContainsKey(messageType))
{
throw new Exception($"Message type {message} not registered");
}
var method = _handlers[messageType];
method.Invoke(_subscriber, new[] { message });
}
That would complete our EventAggregator
. Now it is time to see EventAggregator in action and how the subscriber subscribles themselves.
public class SubscriberAlpha:IHandleAsync<UserSaysHelloMessage>
{
public SubscriberAlpha(IEventAggregator eventAggregator)
{
eventAggregator.Subscribe(this);
}
public Task HandleAsync(UserSaysHelloMessage message)
{
Console.WriteLine($"Message : {message.Message}");
return Task.CompletedTask;
}
}
As seen in the code above, the subscriber has done couple of things here. First, it has implemented the IHandleAsync<>
interface with the generic parameter UserSaysHelloMessage
.The generic parameter type, in this case UserSaysHelloMessage
is the type of the message which this particular subscriber is interested in.
It also uses the instance of IEventAggregator
to subscribe to the centerlized EventAggregator. The HandleMessageAsync(UserSaysHelloMessage msg)
method would be called by the Event Aggregator when any publisher publishes a message of the same type.
Let us write some client code to see how our little Event Aggregator example works.
var eventAggregator = new EventAggregator();
var subscriber = new SubscriberAlpha(eventAggregator);
var publisher = new PublisherAlpha(eventAggregator);
publisher.PublishMessage("John says hello");
eventAggregator.Unsubscribe(subscriber);
publisher.PublishMessage("John says hello");
Where the PublisherAlpha
is defined as
public class PublisherAlpha
{
private IEventAggregator _aggregator;
public PublisherAlpha(IEventAggregator eventAggregator)
{
_aggregator = eventAggregator;
}
public void PublishMessage(string message)
{
_aggregator.PublishMessage(new UserSaysHelloMessage(this,message));
}
}
As expected our output would contain only a single Message.
Message : John says hello
By the time the publisher sends the second message, the subscriber has already unsubscribed from the central message broker. Hence, the second message would not be notified to the subscriber.
While I liked the implementation of Event Aggregator by Caliburn Micro, one part which I would like to tweak around would be have the subscriber the ability to subscribe/unsubscribe individual message types in the runtime. In the next blog post, we will look into another example of Event aggregator implementation, which would address this problem (or rather wish).
If you would like to have a look at the complete code sample shown here, you can access it here in my github.
One thought on “Event Aggregator”