Inisialisasi rantai Rx

Halo semuanya, nama saya Ivan, saya seorang pengembang Android. Hari ini saya ingin berbagi pengalaman saya dengan RxJava2 dan memberi tahu Anda bagaimana inisialisasi rantai berlangsung. Mengapa saya memutuskan untuk mengangkat ini sama sekali? Setelah berbicara dengan sesama pengembang, saya menyadari bahwa tidak semua orang yang menggunakan alat ini memahami cara kerjanya. Dan kemudian saya memutuskan untuk mencari tahu bagaimana langganan diatur di RxJava2 dan dalam urutan apa semua pekerjaan diinisialisasi. Saya belum menemukan satu artikel pun yang menjelaskan hal ini. Mengingat hal ini, saya masuk ke kode sumber untuk melihat bagaimana semuanya bekerja dan membuat sketsa untuk diri saya sendiri lembar contekan kecil, yang berkembang menjadi artikel ini.





Dalam artikel ini, saya tidak akan menjelaskan apa itu Observable



, Observer



dan semua entitas lain yang digunakan di RxJava2. Jika Anda memutuskan untuk membaca artikel ini, maka saya berasumsi bahwa Anda sudah familiar dengan informasi ini. Dan jika Anda masih belum terbiasa dengan konsep-konsep ini, maka saya sarankan Anda membiasakan diri dengan mereka sebelum membaca.





Berikut cara memulai:





Grock * RxJava





Menjelajahi RxJava 2 untuk Android





Mari kita lihat cara kerja rantai paling sederhana:





Observable.just (1, 2, 3, 4, 5)
.map {…}
.filter {…}
.subscribe();
      
      



Di atas

Pertama, saya akan menjelaskan secara singkat setiap langkah yang kita lalui dalam rantai ini (langkah dimulai dari atas ke bawah):





  • Sebuah objek dibuat dalam pernyataan just ObservableFromArray



    .





  • Sebuah objek dibuat dalam pernyataan peta ObservableMap



    , yang mengambil konstruktor referensi ke objek yang dibuat sebelumnya dalam pernyataan adil.





  • filter ObservableFilter



    , map, just.





  • Observable



    ’ Observable



    subscribe()



    ( ObservableFilter



    filter) Observer



    , .





  • ObservableFilter.subscribe()



    ObservableFilter.subscribeActual()



    , Observer



    , filter, FilterObserver



    . Observer



    Observer



    ObservableFilter.subscribe()



    .





  • ObservableMap.subscribe()



    ObservableMap.subscribeActual()



    Observer,



    map, MapObserver



    , FilterObserver



    .





  • ObservableFromArray.subscribe()



    ObservableFromArray.subscribeActual()



    , onSubscribe()



    ObservableFromArray.subscribeActual()



    Observer



    ’.





  • onSubscribe()



    Observer



    ’ .





  • ObservableFromArray



    onNext()



    Observer



    ’.





Sebuah representasi visual dari diagram di atas.
.

, just()



null, fromArray(),



Observable



.





public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) {
   ObjectHelper.requireNonNull(item1, "item1 is null");
   ObjectHelper.requireNonNull(item2, "item2 is null");
   ObjectHelper.requireNonNull(item3, "item3 is null");
   ObjectHelper.requireNonNull(item4, "item4 is null");
   ObjectHelper.requireNonNull(item5, "item5 is null");

   return fromArray(item1, item2, item3, item4, item5);
}
      
      



fromArray()



, .





public static <T> Observable<T> fromArray(T... items) {
   ObjectHelper.requireNonNull(items, "items is null");
   if (items.length == 0) {
       return empty();
   }
   if (items.length == 1) {
       return just(items[0]);
   }
   return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
      
      



ObservableFromArray



, .





onAssembly()



, - Observable



, , .





public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
   Function<? super Observable, ? extends Observable> f = onObservableAssembly;
   if (f != null) {
       return apply(f, source);
   }
   return source;
}
      
      



onAssembly()



Observable



- , :





RxJavaPlugins.setOnObservableAssembly(o -> {
	if (o instanceof ObservableFromArray) {
    	return new ObservableFromArray<>(new Integer[] { 4, 5, 6 });
	}
	return o;
});
 
Observable.just(1, 2, 3)
.filter(v -> v > 3)
.test()
.assertResult(4, 5, 6);
      
      



ObservableFromArray yang baru saja dibuat
ObservableFromArray

map()



. , . null, ObservableMap



.





public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
   ObjectHelper.requireNonNull(mapper, "mapper is null");
   return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
      
      



, ObservableMap



mapper, , this (source). this ObservableFromArray



. ObservableMap



AbstractObservableWithUpstream



, source.





AbstractObservableWithUpstream



, Observable



.





onAssembly()



Observable







Skema yang diperbarui dengan ObservableMap yang dihasilkan
ObservableMap

filter()



. , , ObservableFilter



this ObservableMap



( ObservableFromArray



, ) .





public final Observable<T> filter(Predicate<? super T> predicate) {
   ObjectHelper.requireNonNull(predicate, "predicate is null");
   return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
      
      



Skema yang diperbarui dengan ObservableFilter yang dihasilkan
ObservableFilter

subscribe()



, . onNext()



. subscribe()



ObservableFilter



, Observable



.





public final Disposable subscribe(Consumer<? super T> onNext) {
   return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
      
      



null, LambdaObserver



.





public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
       Action onComplete, Consumer<? super Disposable> onSubscribe) {
   ObjectHelper.requireNonNull(onNext, "onNext is null");
   ObjectHelper.requireNonNull(onError, "onError is null");
   ObjectHelper.requireNonNull(onComplete, "onComplete is null");
   ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

   LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

   subscribe(ls);

   return ls;
}
      
      



, .





public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

       subscribeActual(observer);
   } catch (NullPointerException e) { 
     ......
 }
}
      
      



subscribeActual()



LambdaObserver



. subscribeActual()



ObservableFilter



. .





public void subscribeActual(Observer<? super T> observer) {
   source.subscribe(new FilterObserver<T>(observer, predicate));
}
      
      



FilterObserver



, LambdaObserver



, ObservableFilter



.





FilterObserver



BasicFuseableObserver



, onSubscribe()



. BasicFuseableObserver



, Observer



’. , 6 , FilterObserver



MapObserver



. BasicFuseableObserver.onSubscribe()



onSubscribe()



Observer



’, . :





public final void onSubscribe(Disposable d) {
   if (DisposableHelper.validate(this.upstream, d)) {
       this.upstream = d;
       if (d instanceof QueueDisposable) {
           this.qd = (QueueDisposable<T>)d;
       }
       if (beforeDownstream()) {

           downstream.onSubscribe(this);

           afterDownstream();
       }
   }
}
      
      



, ObservableFilter



FilterObserver



, source.subscribe()



. , source ObservableMap



, . ObservableMap



subscribe()



.





public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

       subscribeActual(observer);
   } catch (NullPointerException e) { 
     ......
 }
}
      
      



, subscribe()



subscribeActual()



, ObservableMap



. subscribeActual()



MapObserver



FilterObserver



mapper



’. 





public void subscribeActual(Observer<? super U> t) {
   source.subscribe(new MapObserver<T, U>(t, function));
}
      
      



public void subscribeActual(Observer<? super T> observer) {
   FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);

   observer.onSubscribe(d);

   if (d.fusionMode) {
       return;
   }
   d.run();
}
      
      



Observer



’ BasicFuseableObserver



, onSubscribe()



, Observer



, onSubscribe()



.





subscribeActual()



run()



, Observer



’.





void run() {
   T[] a = array;
   int n = a.length;

   for (int i = 0; i < n && !isDisposed(); i++) {
       T value = a[i];
       if (value == null) {
           downstream.onError(new NullPointerException("The element at index " + i + " is null"));
           return;
       }
       downstream.onNext(value);
   }
   if (!isDisposed()) {
       downstream.onComplete();
   }
}
      
      



onNext()



Observer



’, onComplete()



onError()



, .





Representasi visual dari proses pembuatan dan berlangganan





Observable



’ callback’ Observer



’, .





onSubscribe()



, doOnSubscribe()



.





3 :









  • Observable







  • Observer







Oleh karena itu, saat menggunakan operator, harus diingat bahwa setiap operator mengalokasikan memori untuk beberapa objek dan Anda tidak boleh menambahkan operator ke rantai, hanya karena "mungkin".





RxJava adalah alat yang hebat, tetapi Anda perlu memahami cara kerjanya dan untuk apa menggunakannya. Jika Anda hanya perlu menjalankan permintaan jaringan di utas latar belakang dan kemudian mengeksekusi hasilnya di utas utama, maka itu seperti "menembak burung pipit dengan meriam", Anda bisa tertangkap, tetapi konsekuensinya bisa serius.








All Articles