java - Limit usage of a remote resource using Rx -
i using rxjava asynchronously handle servlet requests. during each request, number of async calls remote service api made using flatmap operator.
due resource constraints, need limit total number of concurrent requests against api. trivial single rx stream calls api in single flatmap, using concurrency parameter. have multiple independent streams in application (basically 1 each servletrequest) , each stream has multiple flatmap calls api.
so think have funnel remote requests singleton stream performs actual calls , can limit concurrency. seems not trivial api responses original stream. additionally seems complicated maintain backpressure across such construct.
the other option use traditional semaphore, i'm not sure if blocking behaviour fit rx.
so there established pattern implement this? or missing clever combination of operators avoids these complications altogether?
in rxjava can create own schedulers regular java executors:
executorservice exec= executors.newfixedthreadpool(2); //2 fixed threads schedulers.from(exec);
so create executor limited number of threads each of resources, , use specific scheduler whenever accessing resource. limited number of threads limit number of concurrent calls.
edit:
apparently misunderstood question. if calls asynchronous, can try , use rx's backpressure manage them. here's idea on how manage such calls using rx:
you create "resource permit observable" emits (some sort of token) whenever api can called. rate of token (permit) creation maximum rate of usage of api. whenever observable needs call api, zip call permit observable. zip operator block until permit available, limiting api calls rate of permit generation
here's trivial implementation of permit observable emits timestamps:
public class permitobservable extends observable<long> { private final long msbetweenemissions; public permitobservable(long msbetweenemissions) { super(new synconsubscribe<long, long>() { @override protected long generatestate() { return system.currenttimemillis(); } @override protected long next(long state, observer<? super long> observer) { long nextemissionat = state + msbetweenemissions; long timetowait = nextemissionat - system.currenttimemillis(); if (timetowait > 0) { try { thread.sleep(timetowait); } catch (interruptedexception e) { observer.onerror(e); } } long = system.currenttimemillis(); observer.onnext(long.valueof(now)); // permit emission return now; } }); this.msbetweenemissions = msbetweenemissions; } }
Comments
Post a Comment