java - RxJava: Find out if BehaviorSubject was a repeated value or not -
i'm making android interface shows data fetched network. want have show latest available data, , never empty (unless no data has been fetched @ yet) i'm using behaviorsubject give subscribers (my ui) latest available info, while refreshing in background update it.
this works, due requirement in ui, have know whether or not published result gotten fresh network or not. (in other words, need know if published result behaviorsubject's saved item or not.)
how can achieve this? if need split multiple observables, that's fine, long i'm able caching behavior of behaviorsubject (getting last available result) while being able tell if result returned cache or not. hacky way can think of check if timestamp of response relatively soon, that'd sloppy , i'd rather figure out way rxjava.
as mentioned in question, can accomplished multiple observables. in essence, have 2 observables: "the fresh response can observed", , "the cached response can observed". if can "observed", can express observable. let's name first 1 original
, second replayed
.
see jsbin (javascript concepts can directly translated java. there isn't javabin far know, these purposes).
var original = rx.observable.interval(1000) .map(function (x) { return {value: x, from: 'original'}; }) .take(4) .publish().refcount(); var replayed = original .map(function (x) { return {value: x.value, from: 'replayed'}; }) .replay(null, 1).refcount(); var merged = rx.observable.merge(original, replayed) .replay(null, 1).refcount() .distinctuntilchanged(function (obj) { return obj.value; }); console.log('subscribe 1st'); merged.subscribe(function (x) { console.log('subscriber1: value ' + x.value + ', from: ' + x.from); }); settimeout(function () { console.log(' subscribe 2nd'); merged.subscribe(function (x) { console.log(' subscriber2: value ' + x.value + ', from: ' + x.from); }); }, 2500);
the overall idea here is: annotate event field from
indicating origin. if it's original
, it's fresh response. if it's replayed
, it's cached response. observable original
emit from: 'original'
, observable replayed
emit from: 'replayed'
. in java require bit more boilerplate because need make class represent these annotated events. otherwise same operators in rxjs can found in rxjava.
the original observable publish().refcount()
because want 1 instance of stream, shared observers. in fact in rxjs , rx.net, share()
alias publish().refcount()
.
the replayed observable replay(1).refcount()
because shared original 1 is, replay(1)
gives caching behavior.
merged
observable contains both original , replayed, , should expose subscribers. since replayed
emit whenever original
does, use distinctuntilchanged
on event's value ignore immediate consecutives. reason replay(1).refcount()
also merged because want merge of original , replay 1 single shared instance of stream shared among observers. have used publish().refcount()
purpose, cannot lose replay effect replayed
contains, hence it's replay(1).refcount()
, not publish().refcount()
.
Comments
Post a Comment