scala - how to conditionally pass a message to a specific next stage (among other stages)? -


consider simple scenario when depending on attribute of message passing through, want processed specific next stage , continue on.

[source[actionmessage]] ~> [flow[actionmessage, enrichedactionmessage]]  ~> (eam: enrichedactionmessage => eam.actiontype match {       case actiona => eam ~> flow[enrichedactionmessage, reactiona] ~> sink[reactiona]       case actionb => eam ~> flow[enrichedactionmessage, reactionb] ~> sink[reactionb]       case actionc => eam ~> flow[enrichedactionmessage, reactionc] ~> sink[reactionc]     }) 

how achieve conditional routing stage graph stage ?

this answer based on akka-stream version 2.4.2-rc1. api can different in other versions. dependency can consumed sbt:

librarydependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2-rc1" 

use partition component:

val shape = graphdsl.create() { implicit b ⇒   import graphdsl.implicits._    val first = b.add(sink.foreach[int](elem ⇒ println("first:\t" + elem)))   val second = b.add(sink.foreach[int](elem ⇒ println("second:\t" + elem)))   val third = b.add(sink.foreach[int](elem ⇒ println("third:\t" + elem)))   val p = b.add(partition[int](3, elem ⇒ elem match {     case 0                ⇒ 0     case elem if elem < 0 ⇒ 1     case elem if elem > 0 ⇒ 2   }))    p ~> first   p ~> second   p ~> third    sinkshape(p.in) } source(list(0, 1, 2, -1, 1, -5, 0)).to(shape).run()  /* output: first: 0 third: 1 third: 2 second: -1 third: 1 second: -5 first: 0 */ 

instead of sinkshape can return new fanoutshape3(p.in, p.out(0), p.out(1), p.out(2)) if wish processing of elements @ later point.


Comments

Popular posts from this blog

sublimetext3 - what keyboard shortcut is to comment/uncomment for this script tag in sublime -

java - No use of nillable="0" in SOAP Webservice -

ubuntu - Laravel 5.2 quickstart guide gives Not Found Error -