scala - how to conditionally pass a message to a specific next stage (among other stages)? -
consider simple scenario when depending on attribute of message passing through, want processed specific next stage , continue on.
[source[actionmessage]] ~> [flow[actionmessage, enrichedactionmessage]] ~> (eam: enrichedactionmessage => eam.actiontype match { case actiona => eam ~> flow[enrichedactionmessage, reactiona] ~> sink[reactiona] case actionb => eam ~> flow[enrichedactionmessage, reactionb] ~> sink[reactionb] case actionc => eam ~> flow[enrichedactionmessage, reactionc] ~> sink[reactionc] })
how achieve conditional routing stage graph stage ?
this answer based on akka-stream
version 2.4.2-rc1
. api can different in other versions. dependency can consumed sbt:
librarydependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2-rc1"
use partition
component:
val shape = graphdsl.create() { implicit b ⇒ import graphdsl.implicits._ val first = b.add(sink.foreach[int](elem ⇒ println("first:\t" + elem))) val second = b.add(sink.foreach[int](elem ⇒ println("second:\t" + elem))) val third = b.add(sink.foreach[int](elem ⇒ println("third:\t" + elem))) val p = b.add(partition[int](3, elem ⇒ elem match { case 0 ⇒ 0 case elem if elem < 0 ⇒ 1 case elem if elem > 0 ⇒ 2 })) p ~> first p ~> second p ~> third sinkshape(p.in) } source(list(0, 1, 2, -1, 1, -5, 0)).to(shape).run() /* output: first: 0 third: 1 third: 2 second: -1 third: 1 second: -5 first: 0 */
instead of sinkshape
can return new fanoutshape3(p.in, p.out(0), p.out(1), p.out(2))
if wish processing of elements @ later point.
Comments
Post a Comment