c# - Reactive Extensions: Split input, process, and concatenate back -


basically, have observable of input strings want process individually , result. if input string contains commas (as delimiter), want split string , process each substring individually , each sequence of strings. snippet below illustrates simplified version of trying do:

[fact] public void unitest1() {     var observable = new replaysubject<string>();     observable.onnext("a,b");     observable.onnext("c,d,e");     observable.oncompleted();      var result = new list<string[]>();     observable         .selectmany(x => x.split(','))         .select(x => x.toupper())         .toarray() // how collect ienumerable each item here?         .do(s => result.add(s))         .subscribe();      // here, result {{"a","b","c","d","e"}}, need {{"a","b"},{"c","d","e"}}     assert.equal(2, result.count);      assert.equal("a", result[0][0]);     assert.equal("b", result[0][1]);      assert.equal("c", result[1][0]);     assert.equal("d", result[1][1]);     assert.equal("e", result[1][2]); } 

as explained in comment, above not work. .toarray()-call concatenates entire observable single sequence.

however, have solved putting splitting , processing single action, such:

[fact] public void unitest2() {     var observable = new replaysubject<string>();     observable.onnext("a,b");     observable.onnext("c,d,e");     observable.oncompleted();      var result = new list<string[]>();     observable         .select(x => x.split(',').select(s => s.toupper()).toarray())         .do(s => result.add(s))         .subscribe();      // result expected: {{"a","b"},{"c","d","e"}}     assert.equal(2, result.count);     assert.equal("a", result[0][0]);     assert.equal("b", result[0][1]);     assert.equal("c", result[1][0]);     assert.equal("d", result[1][1]);     assert.equal("e", result[1][2]); } 

but there way, using rx, solve problem not putting splitting , processing in same action? recommended solution problem?

i should mention processing, i.e. toupper()-call, in reality web-service call. used toupper() in examples problem should easy explain. means want processing done in parallel , non-blocking.

there number of things you've ended raising in code worth mentioning.

to start with, .toarray() operator takes observable returns 0 or more single values , changes observable returns single array of 0 or more values. such observable must complete before can return 1 , value.

with in mind results of first query should make sense.

your second query x.split(',').select(s => s.toupper()).toarray() produces output wanted, wanted know "is there way, using rx, solve problem not putting splitting , processing in same action".

well, trivially, yes:

var result = new list<string[]>(); observable     .select(x => x.split(','))     .select(x => x.select(s => s.toupper()))     .select(x => x.toarray())     .do(s => result.add(s))     .subscribe(); 

however, doesn't process items in parallel. rx designed work in series unless invoke operation introduces parallelism.

often, easy way take long-running select, such .select(x => longrunningoperation(x)) , it:

.selectmany(x => observable.start(() => longrunningoperation(x))) 

in case begin doing this:

observable     .observeon(scheduler.default)     .selectmany(x => observable.start(() => x.split(',')))     .selectmany(x => observable.start(() => x.select(s => s.toupper())))     .selectmany(x => observable.start(() => x.toarray()))     .do(s => result.add(s))     .subscribe(); 

but parallelizing each original .onnext call, not processing within. need turn result of x.split(',') observable, , process in parallel.

    observable         .selectmany(x => observable.start(() => x.split(',').toobservable()))         .selectmany(x => observable.start(() => x.selectmany(s => observable.start(() => s.toupper()))))         .selectmany(x => observable.start(() => x.toarray()))         .do(s => s.do(t => result.add(t)))         .merge()         .subscribe(); 

but that's starting crazy , no longer runs on current thread, meaning test isn't going wait results.

let's relook @ query.

i've begun getting rid of .do call. these debugging, state changes bad. can run @ point on thread within query need make sure code in .do call thread-safe , calling result.add(s) not thread-safe.

i've introduced "webservice" call replace .toupper() 1 second processing delay can see how long query takes process , know if running in parallel or not. if final query takes 5 seconds run no parallism , if it's less we're winning.

so, if write query in basic way looks this:

func<string, string> webservice = x => {     thread.sleep(1000);     return x.toupper(); };  var query =     observable         .select(ls =>             p in ls.split(',')             select webservice(p))         .select(rs => rs.toarray())         .toarray()         .select(rss => new list<string[]>(rss));  var sw = stopwatch.startnew(); list<string[]> result = query.wait(); sw.stop(); 

when run results expected {{"a","b"},{"c","d","e"}}, takes on 5 seconds complete. no parallelism here expected.

let's introduce parallelism:

var query =     observable         .select(ls =>             p in ls.split(',').toobservable()             r in observable.start(() => webservice(p))             select r)         .select(rs => rs.toarray())         .merge()         .toarray()         .select(rss => new list<string[]>(rss)); 

i've applied "select selectmany/start" pattern described above. tricky part .select(rs => rs.toarray()) went being iobservable<string[]> iobservable<iobservable<string[]>> popped in .merge() flatten out. normal when introduce parallelism rx queries.

now when run query - boom - on 1 second. 5 of inputs running in parallel. problem order no longer determinant. can't when results performed in parallel.

one such run got result:

results

if running test sort results known order , compare expected result.


Comments

Popular posts from this blog

routing - AngularJS State management ->load multiple states in one page -

python - GRASS parser() error -

Swift game error message -