Observable.forkJoin()で並列でObservableを購読しようとしたら上手く動かなかった。
調べていくと片方のObservableがBehaviorSubjectより発行されたConnectableObservableだった為、Completeを発行せずにObservable.forkJoin()がしないようです。
ColdなObservableをHotに変換するのにはpublish()などがあるけど逆は調べてもあまり見つからず。
調べていくとtake(1)を利用するとBehaviorSubjectの最新の値を取ってきてCompleteが発行されるのでObservable.forkJoin()で制御できるようなります。
2017年10月24日
2017年10月10日
RxJSの購読管理
RxJSでは Rx.Subscription() で購読情報を管理することができるオブジェクトを作成できる。
subscribe()の返り値をこのオブジェクトにadd()していくと、あとでまとめてunsubscribe()することができます。
subscribe()の返り値をこのオブジェクトにadd()していくと、あとでまとめてunsubscribe()することができます。
import Rx from 'rxjs/Rx';
const subscription = new Rx.Subscription();
const s1 = Rx.Observable.interval(1000).subscribe(x => console.log(1,x));
subscription.add(s1);
const s2 = Rx.Observable.interval(300).subscribe(x => console.log(2,x));
subscription.add(s2);
document.body.addEventListener('click', () => {
subscription.unsubscribe();
});
2017年09月27日
RxJSのオペレーター
.map(e => e.text) // ストリームを加工する
.delay(200) // ストリームを送れて配信
.throttleTime(200) // ストリームを間引く
.debounceTime(200) // 短い期間で実行されたストリームを間引く
.take(3) // n回だけストリームを流す
.takeUntil(myObserbal) // ストリーム(myObserbal)を受け取るまでストリームを流す
.pluck('target', 'value') //ストリームのオブジェクト( strem.target.vlaue)を流す
.pairwise() // 前回と今回のストリームを配列で流す
.distinct() // 重複するストリームを流さない
.distinctUntilChanged() // 連続するストリームが同じ値の場合はストリームを流さない
.scan(beforeValue => beforeValue + 1, defaultVAlue) // 前回のストリームを流す
タグ:RxJS
