c# - registering observers with IEventProcessor -
this part of ieventprocessor implementation taken here:
public class simpleeventprocessor : ieventprocessor { public async task processeventsasync(partitioncontext context, ienumerable<eventdata> events) { foreach (eventdata eventdata in events) { } } }
as new events added eventhub, processeventsasync method invoked , foreach loop can used process events. add observers simpleeventprocessor using, example, observerregistry discussed here. proposed observerregistry looks this:
public class observerregistry : iobserverregistry<iprojectionwriterfactory> { ienumerable<object> getobservers(iprojectionwriterfactory factory) { yield return new loanapplicationobserver(); yield return new offerobserver(); // more observers... } }
unfortunately, there few things missing. how register several observers simpleeventprocessor events passed processeventsasync observers , when methods?
full source code here. synopsis follows:
you define static event on simpleeventprocessor:
public class simpleeventprocessor : ieventprocessor { public static event eventhandler<messagereceivedeventargs> onmessagereceived; public simpleeventprocessor() { } }
then raise onmessagereceived event in processeventsasync:
public task processeventsasync(partitioncontext context, ienumerable<eventdata> messages) { foreach (eventdata message in messages) { onmessagereceived(this, new messagereceivedeventargs() { receivedon = datetimeoffset.utcnow, message = message }); } }
very important: ensure subscribers removed on processor close. important, because static event missing unsubscription can cause memory leaks article explaining this.:
public async task closeasync(partitioncontext context, closereason reason) { if (onmessagereceived != null) { foreach (eventhandler<messagereceivedeventargs> subscriber in onmessagereceived.getinvocationlist()) { onmessagereceived -= subscriber; } } }
finally can hook observers part of initialization logic:
observerregistry registry = new observerregistry(); foreach (iobserver observer in registry.getobservers()) { simpleeventprocessor.onmessagereceived += new eventhandler<messagereceivedeventargs>( (sender, e) => observer.when(e)); }
example output console application:
simpleeventprocessor: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18 observer1: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18 observer2: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18 simpleeventprocessor: a29d5875-7c53-4a7c-8113-ef7c24c2851f observer1: a29d5875-7c53-4a7c-8113-ef7c24c2851f observer2: a29d5875-7c53-4a7c-8113-ef7c24c2851f
i highlight following:
- in use case registering
ieventprocessorfactory
might more effective have more control on processor instantiation , disposal. - it recommended keep processeventsasync method light , fast possible. may creating separate consumer groups better option in case?
hope above answers question.
Comments
Post a Comment