c# - Reactive Extensions: Split input, process, and concatenate back -
basically, have observable of input strings want process individually , result. if input string contains commas (as delimiter), want split string , process each substring individually , each sequence of strings. snippet below illustrates simplified version of trying do:
[fact] public void unitest1() { var observable = new replaysubject<string>(); observable.onnext("a,b"); observable.onnext("c,d,e"); observable.oncompleted(); var result = new list<string[]>(); observable .selectmany(x => x.split(',')) .select(x => x.toupper()) .toarray() // how collect ienumerable each item here? .do(s => result.add(s)) .subscribe(); // here, result {{"a","b","c","d","e"}}, need {{"a","b"},{"c","d","e"}} assert.equal(2, result.count); assert.equal("a", result[0][0]); assert.equal("b", result[0][1]); assert.equal("c", result[1][0]); assert.equal("d", result[1][1]); assert.equal("e", result[1][2]); } as explained in comment, above not work. .toarray()-call concatenates entire observable single sequence.
however, have solved putting splitting , processing single action, such:
[fact] public void unitest2() { var observable = new replaysubject<string>(); observable.onnext("a,b"); observable.onnext("c,d,e"); observable.oncompleted(); var result = new list<string[]>(); observable .select(x => x.split(',').select(s => s.toupper()).toarray()) .do(s => result.add(s)) .subscribe(); // result expected: {{"a","b"},{"c","d","e"}} assert.equal(2, result.count); assert.equal("a", result[0][0]); assert.equal("b", result[0][1]); assert.equal("c", result[1][0]); assert.equal("d", result[1][1]); assert.equal("e", result[1][2]); } but there way, using rx, solve problem not putting splitting , processing in same action? recommended solution problem?
i should mention processing, i.e. toupper()-call, in reality web-service call. used toupper() in examples problem should easy explain. means want processing done in parallel , non-blocking.
there number of things you've ended raising in code worth mentioning.
to start with, .toarray() operator takes observable returns 0 or more single values , changes observable returns single array of 0 or more values. such observable must complete before can return 1 , value.
with in mind results of first query should make sense.
your second query x.split(',').select(s => s.toupper()).toarray() produces output wanted, wanted know "is there way, using rx, solve problem not putting splitting , processing in same action".
well, trivially, yes:
var result = new list<string[]>(); observable .select(x => x.split(',')) .select(x => x.select(s => s.toupper())) .select(x => x.toarray()) .do(s => result.add(s)) .subscribe(); however, doesn't process items in parallel. rx designed work in series unless invoke operation introduces parallelism.
often, easy way take long-running select, such .select(x => longrunningoperation(x)) , it:
.selectmany(x => observable.start(() => longrunningoperation(x))) in case begin doing this:
observable .observeon(scheduler.default) .selectmany(x => observable.start(() => x.split(','))) .selectmany(x => observable.start(() => x.select(s => s.toupper()))) .selectmany(x => observable.start(() => x.toarray())) .do(s => result.add(s)) .subscribe(); but parallelizing each original .onnext call, not processing within. need turn result of x.split(',') observable, , process in parallel.
observable .selectmany(x => observable.start(() => x.split(',').toobservable())) .selectmany(x => observable.start(() => x.selectmany(s => observable.start(() => s.toupper())))) .selectmany(x => observable.start(() => x.toarray())) .do(s => s.do(t => result.add(t))) .merge() .subscribe(); but that's starting crazy , no longer runs on current thread, meaning test isn't going wait results.
let's relook @ query.
i've begun getting rid of .do call. these debugging, state changes bad. can run @ point on thread within query need make sure code in .do call thread-safe , calling result.add(s) not thread-safe.
i've introduced "webservice" call replace .toupper() 1 second processing delay can see how long query takes process , know if running in parallel or not. if final query takes 5 seconds run no parallism , if it's less we're winning.
so, if write query in basic way looks this:
func<string, string> webservice = x => { thread.sleep(1000); return x.toupper(); }; var query = observable .select(ls => p in ls.split(',') select webservice(p)) .select(rs => rs.toarray()) .toarray() .select(rss => new list<string[]>(rss)); var sw = stopwatch.startnew(); list<string[]> result = query.wait(); sw.stop(); when run results expected {{"a","b"},{"c","d","e"}}, takes on 5 seconds complete. no parallelism here expected.
let's introduce parallelism:
var query = observable .select(ls => p in ls.split(',').toobservable() r in observable.start(() => webservice(p)) select r) .select(rs => rs.toarray()) .merge() .toarray() .select(rss => new list<string[]>(rss)); i've applied "select selectmany/start" pattern described above. tricky part .select(rs => rs.toarray()) went being iobservable<string[]> iobservable<iobservable<string[]>> popped in .merge() flatten out. normal when introduce parallelism rx queries.
now when run query - boom - on 1 second. 5 of inputs running in parallel. problem order no longer determinant. can't when results performed in parallel.
one such run got result:
if running test sort results known order , compare expected result.

Comments
Post a Comment