c# - registering observers with IEventProcessor -


this part of ieventprocessor implementation taken here:

public class simpleeventprocessor : ieventprocessor {     public async task processeventsasync(partitioncontext context, ienumerable<eventdata> events)     {         foreach (eventdata eventdata in events)         {          }     } } 

as new events added eventhub, processeventsasync method invoked , foreach loop can used process events. add observers simpleeventprocessor using, example, observerregistry discussed here. proposed observerregistry looks this:

public class observerregistry : iobserverregistry<iprojectionwriterfactory> {     ienumerable<object> getobservers(iprojectionwriterfactory factory)     {         yield return new loanapplicationobserver();         yield return new offerobserver();         // more observers...     } } 

unfortunately, there few things missing. how register several observers simpleeventprocessor events passed processeventsasync observers , when methods?

full source code here. synopsis follows:

you define static event on simpleeventprocessor:

public class simpleeventprocessor : ieventprocessor  {     public static event eventhandler<messagereceivedeventargs> onmessagereceived;              public simpleeventprocessor()     { } } 

then raise onmessagereceived event in processeventsasync:

public task processeventsasync(partitioncontext context, ienumerable<eventdata> messages) {     foreach (eventdata message in messages)     {         onmessagereceived(this, new messagereceivedeventargs() { receivedon = datetimeoffset.utcnow, message = message });     } } 

very important: ensure subscribers removed on processor close. important, because static event missing unsubscription can cause memory leaks article explaining this.:

public async task closeasync(partitioncontext context, closereason reason) {     if (onmessagereceived != null)     {         foreach (eventhandler<messagereceivedeventargs> subscriber in onmessagereceived.getinvocationlist())         {             onmessagereceived -= subscriber;         }     } } 

finally can hook observers part of initialization logic:

observerregistry registry = new observerregistry(); foreach (iobserver observer in registry.getobservers()) {     simpleeventprocessor.onmessagereceived += new eventhandler<messagereceivedeventargs>(     (sender, e) => observer.when(e)); } 

example output console application:

simpleeventprocessor: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18 observer1: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18 observer2: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18 simpleeventprocessor: a29d5875-7c53-4a7c-8113-ef7c24c2851f observer1: a29d5875-7c53-4a7c-8113-ef7c24c2851f observer2: a29d5875-7c53-4a7c-8113-ef7c24c2851f 

i highlight following:

  1. in use case registering ieventprocessorfactory might more effective have more control on processor instantiation , disposal.
  2. it recommended keep processeventsasync method light , fast possible. may creating separate consumer groups better option in case?

hope above answers question.


Comments

Popular posts from this blog

sublimetext3 - what keyboard shortcut is to comment/uncomment for this script tag in sublime -

java - No use of nillable="0" in SOAP Webservice -

ubuntu - Laravel 5.2 quickstart guide gives Not Found Error -