Halo semuanya, nama saya Ivan, saya seorang pengembang Android. Hari ini saya ingin berbagi pengalaman saya dengan RxJava2 dan memberi tahu Anda bagaimana inisialisasi rantai berlangsung. Mengapa saya memutuskan untuk mengangkat ini sama sekali? Setelah berbicara dengan sesama pengembang, saya menyadari bahwa tidak semua orang yang menggunakan alat ini memahami cara kerjanya. Dan kemudian saya memutuskan untuk mencari tahu bagaimana langganan diatur di RxJava2 dan dalam urutan apa semua pekerjaan diinisialisasi. Saya belum menemukan satu artikel pun yang menjelaskan hal ini. Mengingat hal ini, saya masuk ke kode sumber untuk melihat bagaimana semuanya bekerja dan membuat sketsa untuk diri saya sendiri lembar contekan kecil, yang berkembang menjadi artikel ini.
Dalam artikel ini, saya tidak akan menjelaskan apa itu Observable
, Observer
dan semua entitas lain yang digunakan di RxJava2. Jika Anda memutuskan untuk membaca artikel ini, maka saya berasumsi bahwa Anda sudah familiar dengan informasi ini. Dan jika Anda masih belum terbiasa dengan konsep-konsep ini, maka saya sarankan Anda membiasakan diri dengan mereka sebelum membaca.
Berikut cara memulai:
Menjelajahi RxJava 2 untuk Android
Mari kita lihat cara kerja rantai paling sederhana:
Observable.just (1, 2, 3, 4, 5)
.map {β¦}
.filter {β¦}
.subscribe();
Di atas
Pertama, saya akan menjelaskan secara singkat setiap langkah yang kita lalui dalam rantai ini (langkah dimulai dari atas ke bawah):
Sebuah objek dibuat dalam pernyataan just
ObservableFromArray
.
Sebuah objek dibuat dalam pernyataan peta
ObservableMap
, yang mengambil konstruktor referensi ke objek yang dibuat sebelumnya dalam pernyataan adil.
filter
ObservableFilter
, map, just.
Observable
βObservable
subscribe()
(ObservableFilter
filter)Observer
, .
ObservableFilter.subscribe()
ObservableFilter.subscribeActual()
,Observer
, filter,FilterObserver
.Observer
Observer
ObservableFilter.subscribe()
.
ObservableMap.subscribe()
ObservableMap.subscribeActual()
Observer,
map,MapObserver
,FilterObserver
.
ObservableFromArray.subscribe()
ObservableFromArray.subscribeActual()
,onSubscribe()
ObservableFromArray.subscribeActual()
Observer
β.
onSubscribe()
Observer
β .
ObservableFromArray
onNext()
Observer
β.
, just()
null, fromArray(),
Observable
.
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) {
ObjectHelper.requireNonNull(item1, "item1 is null");
ObjectHelper.requireNonNull(item2, "item2 is null");
ObjectHelper.requireNonNull(item3, "item3 is null");
ObjectHelper.requireNonNull(item4, "item4 is null");
ObjectHelper.requireNonNull(item5, "item5 is null");
return fromArray(item1, item2, item3, item4, item5);
}
fromArray()
, .
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
}
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
ObservableFromArray
, .
onAssembly()
, - Observable
, , .
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
onAssembly()
Observable
- , :
RxJavaPlugins.setOnObservableAssembly(o -> {
if (o instanceof ObservableFromArray) {
return new ObservableFromArray<>(new Integer[] { 4, 5, 6 });
}
return o;
});
Observable.just(1, 2, 3)
.filter(v -> v > 3)
.test()
.assertResult(4, 5, 6);
map()
. , . null, ObservableMap
.
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
, ObservableMap
mapper, , this (source). this ObservableFromArray
. ObservableMap
AbstractObservableWithUpstream
, source.
AbstractObservableWithUpstream
, Observable
.
onAssembly()
Observable
.
filter()
. , , ObservableFilter
this ObservableMap
( ObservableFromArray
, ) .
public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
subscribe()
, . onNext()
. subscribe()
ObservableFilter
, Observable
.
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
null, LambdaObserver
.
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
, .
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) {
......
}
}
subscribeActual()
LambdaObserver
. subscribeActual()
ObservableFilter
. .
public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new FilterObserver<T>(observer, predicate));
}
FilterObserver
, LambdaObserver
, ObservableFilter
.
FilterObserver
BasicFuseableObserver
, onSubscribe()
. BasicFuseableObserver
, Observer
β. , 6 , FilterObserver
MapObserver
. BasicFuseableObserver.onSubscribe()
onSubscribe()
Observer
β, . :
public final void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
this.qd = (QueueDisposable<T>)d;
}
if (beforeDownstream()) {
downstream.onSubscribe(this);
afterDownstream();
}
}
}
, ObservableFilter
FilterObserver
, source.subscribe()
. , source ObservableMap
, . ObservableMap
subscribe()
.
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) {
......
}
}
, subscribe()
subscribeActual()
, ObservableMap
. subscribeActual()
MapObserver
FilterObserver
mapper
β.
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
Observer
β BasicFuseableObserver
, onSubscribe()
, Observer
, onSubscribe()
.
subscribeActual()
run()
, Observer
β.
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The element at index " + i + " is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
onNext()
Observer
β, onComplete()
onError()
, .
Observable
β callbackβ Observer
β, .
onSubscribe()
, doOnSubscribe()
.
3 :
Observable
Observer
Oleh karena itu, saat menggunakan operator, harus diingat bahwa setiap operator mengalokasikan memori untuk beberapa objek dan Anda tidak boleh menambahkan operator ke rantai, hanya karena "mungkin".
RxJava adalah alat yang hebat, tetapi Anda perlu memahami cara kerjanya dan untuk apa menggunakannya. Jika Anda hanya perlu menjalankan permintaan jaringan di utas latar belakang dan kemudian mengeksekusi hasilnya di utas utama, maka itu seperti "menembak burung pipit dengan meriam", Anda bisa tertangkap, tetapi konsekuensinya bisa serius.