Halo, Khabrovites. Hari ini kita akan berbicara tentang RxJava. Saya tahu bahwa sebuah gerobak dan gerobak kecil telah ditulis tentang dia, tetapi menurut saya saya memiliki beberapa poin menarik yang layak untuk dibagikan. Pertama, saya akan memberi tahu Anda bagaimana kami menggunakan RxJava bersama dengan arsitektur VIPER untuk aplikasi Android, bersama dengan melihat cara "klasik" untuk menggunakannya. Setelah itu, mari kita bahas fitur utama RxJava dan membahas lebih detail tentang cara kerja penjadwal. Jika Anda sudah membeli makanan ringan, selamat datang di bawah cat.
Arsitektur yang cocok untuk semua orang
RxJava adalah implementasi dari konsep ReactiveX dan dibuat oleh Netflix. Blog mereka memiliki serangkaian artikel tentang mengapa mereka melakukannya dan masalah apa yang mereka pecahkan. Tautan (1, 2) dapat ditemukan di akhir artikel. Netflix menggunakan RxJava di sisi server (backend) untuk memparalelkan pemrosesan satu permintaan besar. Meskipun mereka menyarankan cara untuk menggunakan RxJava pada backend, arsitektur ini cocok untuk menulis berbagai jenis aplikasi (seluler, desktop, backend, dan banyak lainnya). Pengembang Netflix menggunakan RxJava di lapisan layanan sedemikian rupa sehingga setiap metode lapisan layanan menampilkan Observable. Intinya adalah bahwa elemen dalam Observable dapat dikirimkan secara sinkron dan asinkron. Ini memungkinkan metode untuk memutuskan sendiri apakah akan mengembalikan nilai dengan segera secara sinkron, (misalnya,jika tersedia dalam cache) atau dapatkan nilai ini terlebih dahulu (misalnya, dari database atau layanan jarak jauh) dan kembalikan secara asinkron. Dalam kasus apa pun, kontrol akan kembali segera setelah memanggil metode (baik dengan atau tanpa data).
/**
* , ,
* , ,
* callback `onNext()`
*/
public Observable<T> getProduct(String name) {
if (productInCache(name)) {
// ,
return Observable.create(observer -> {
observer.onNext(getProductFromCache(name));
observer.onComplete();
});
} else {
//
return Observable.<T>create(observer -> {
try {
//
T product = getProductFromRemoteService(name);
//
observer.onNext(product);
observer.onComplete();
} catch (Exception e) {
observer.onError(e);
}
})
// Observable IO
// /
.subscribeOn(Schedulers.io());
}
}
Dengan pendekatan ini, kami mendapatkan satu API yang tidak dapat diubah untuk klien (dalam kasus kami, pengontrol) dan implementasi yang berbeda. Klien selalu berinteraksi dengan Observable dengan cara yang sama. Tidak masalah sama sekali apakah nilai-nilai tersebut diterima secara serempak atau tidak. Pada saat yang sama, implementasi API dapat berubah dari sinkron menjadi asinkron, tanpa memengaruhi interaksi dengan klien dengan cara apa pun. Dengan pendekatan ini, Anda tidak dapat sepenuhnya memikirkan tentang bagaimana mengatur multithreading, dan fokus pada pelaksanaan tugas bisnis.
Pendekatan ini dapat diterapkan tidak hanya di lapisan layanan di backend, tetapi juga di arsitektur MVC, MVP, MVVM, dll. Misalnya, untuk MVP, kita dapat membuat kelas Interactor yang akan bertanggung jawab untuk menerima dan menyimpan data ke berbagai sumber, dan membuat semuanya metodenya mengembalikan Observable. Mereka akan menjadi kontrak untuk interaksi dengan Model. Ini juga akan memungkinkan Presenter untuk memanfaatkan kekuatan penuh dari operator yang tersedia di RxJava.
Kita bisa melangkah lebih jauh dan membuat Presenter reactive API, tapi untuk ini kita perlu mengimplementasikan mekanisme unsubscribe dengan benar yang memungkinkan semua View untuk unsubscribe secara bersamaan dari Presenter.
Selanjutnya, mari kita lihat contoh bagaimana pendekatan ini diterapkan untuk arsitektur VIPER, yang merupakan MVP yang ditingkatkan. Perlu juga diingat bahwa Anda tidak dapat membuat objek tunggal Observable, karena langganan ke Observable tersebut akan menghasilkan kebocoran memori.
Pengalaman di Android dan VIPER
Di sebagian besar proyek Android saat ini dan baru, kami menggunakan arsitektur VIPER. Saya bertemu dengannya ketika saya bergabung dengan salah satu proyek di mana dia sudah digunakan. Saya ingat pernah terkejut ketika ditanya apakah saya mencari iOS. “IOS dalam proyek Android?” Pikir saya. Sementara itu, VIPER datang kepada kami dari dunia iOS dan sebenarnya adalah versi MVP yang lebih terstruktur dan modular. VIPER ditulis dengan sangat baik di artikel ini (3).
Pada awalnya semuanya tampak baik-baik saja: lapisan dibagi dengan benar, tidak kelebihan beban, setiap lapisan memiliki area tanggung jawabnya sendiri, logika yang jelas. Tetapi setelah beberapa waktu, satu kelemahan mulai muncul, dan seiring dengan pertumbuhan dan perubahan proyek, bahkan mulai ikut campur.
Faktanya adalah kami menggunakan Interactor dengan cara yang sama seperti kolega kami di artikel kami. Interactor mengimplementasikan kasus penggunaan kecil, misalnya, "mendownload produk dari jaringan" atau "mengambil produk dari database dengan id", dan melakukan tindakan dalam alur kerja. Secara internal, Interactor melakukan operasi menggunakan Observable. Untuk "menjalankan" Interactor dan mendapatkan hasilnya, pengguna mengimplementasikan antarmuka ObserverEntity bersama dengan metode onNext, onError, dan onComplete dan meneruskannya bersama dengan parameter ke metode eksekusi (params, ObserverEntity).
Anda mungkin sudah memperhatikan masalahnya - struktur antarmuka. Dalam praktiknya, kita jarang membutuhkan ketiga metode tersebut, seringkali menggunakan satu atau dua di antaranya. Karena itu, metode kosong mungkin muncul di kode Anda. Tentu saja, kita dapat menandai semua metode antarmuka sebagai default, tetapi metode seperti itu lebih mungkin untuk menambahkan fungsionalitas baru ke antarmuka. Selain itu, aneh rasanya memiliki antarmuka yang semua metodenya opsional. Kita juga bisa, misalnya, membuat kelas abstrak yang mewarisi antarmuka dan menimpa metode yang kita butuhkan. Atau, terakhir, buat versi metode eksekusi (params, ObserverEntity) yang kelebihan beban yang menerima satu hingga tiga antarmuka fungsional. Masalah ini buruk untuk keterbacaan kode, tetapi, untungnya, ini cukup mudah dipecahkan. Namun, dia bukan satu-satunya.
saveProductInteractor.execute(product, new ObserverEntity<Void>() {
@Override
public void onNext(Void aVoid) {
// ,
//
}
@Override
public void onError(Throwable throwable) {
//
// -
}
@Override
public void onComplete() {
//
// -
}
});
Selain metode kosong, ada masalah yang lebih mengganggu. Kami menggunakan Interactor untuk melakukan beberapa tindakan, tetapi hampir selalu tindakan ini bukan satu-satunya. Misalnya, kita bisa mengambil produk dari database, kemudian mendapatkan review dan gambaran tentangnya, lalu menyimpannya ke tempat lain dan terakhir ke layar lain. Di sini, setiap tindakan bergantung pada tindakan sebelumnya, dan saat menggunakan Interactors, kita mendapatkan rantai callback yang sangat besar, yang bisa sangat membosankan untuk dilacak.
private void checkProduct(int id, Locale locale) {
getProductByIdInteractor.execute(new TypesUtil.Pair<>(id, locale), new ObserverEntity<Product>() {
@Override
public void onNext(Product product) {
getProductInfo(product);
}
@Override
public void onError(Throwable throwable) {
// -
}
@Override
public void onComplete() {
}
});
}
private void getProductInfo(Product product) {
getReviewsByProductIdInteractor.execute(product.getId(), new ObserverEntity<List<Review>>() {
@Override
public void onNext(List<Review> reviews) {
product.setReviews(reviews);
saveProduct(productInfo);
}
@Override
public void onError(Throwable throwable) {
// -
}
@Override
public void onComplete() {
// -
}
});
getImageForProductInteractor.execute(product.getId(), new ObserverEntity<Image>() {
@Override
public void onNext(Image image) {
product.setImage(image);
saveProduct(product);
}
@Override
public void onError(Throwable throwable) {
// -
}
@Override
public void onComplete() {
}
});
}
private void saveProduct(Product product) {
saveProductInteractor.execute(product, new ObserverEntity<Void>() {
@Override
public void onNext(Void aVoid) {
}
@Override
public void onError(Throwable throwable) {
// -
}
@Override
public void onComplete() {
goToSomeScreen();
}
});
}
Nah, bagaimana Anda menyukai pasta ini? Pada saat yang sama, kami memiliki logika bisnis sederhana dan bersarang tunggal, tetapi bayangkan apa yang akan terjadi dengan kode yang lebih kompleks. Ini juga menyulitkan untuk menggunakan kembali metode dan menerapkan penjadwal yang berbeda untuk Interactor.
Solusinya sangat sederhana. Apakah Anda merasa pendekatan ini mencoba meniru perilaku yang Dapat Diamati, tetapi apakah itu salah dan menciptakan batasan aneh itu sendiri? Seperti yang saya katakan sebelumnya, kami mendapatkan kode ini dari proyek yang sudah ada. Saat memperbaiki kode warisan ini, kami akan menggunakan pendekatan yang diwariskan oleh orang-orang dari Netflix kepada kami. Alih-alih harus mengimplementasikan ObserverEntity setiap saat, mari buat Interactor mengembalikan Observable.
private Observable<Product> getProductById(int id, Locale locale) {
return getProductByIdInteractor.execute(new TypesUtil.Pair<>(id, locale));
}
private Observable<Product> getProductInfo(Product product) {
return getReviewsByProductIdInteractor.execute(product.getId())
.map(reviews -> {
product.set(reviews);
return product;
})
.flatMap(product -> {
getImageForProductInteractor.execute(product.getId())
.map(image -> {
product.set(image);
return product;
})
});
}
private Observable<Product> saveProduct(Product product) {
return saveProductInteractor.execute(product);
}
private doAll(int id, Locale locale) {
//
getProductById (id, locale)
//
.flatMap(product -> getProductInfo(product))
//
.flatMap(product -> saveProduct(product))
//
.ignoreElements()
//
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
//
.subscribe(() -> goToSomeScreen(), throwable -> handleError());
}
Voila! Jadi kami tidak hanya menyingkirkan kengerian yang rumit dan berat itu, tetapi juga membawa kekuatan RxJava ke Presenter.
Konsep di hati
Saya cukup sering melihat bagaimana mereka mencoba menjelaskan konsep RxJava menggunakan pemrograman reaktif fungsional (selanjutnya FRP). Sebenarnya, ini tidak ada hubungannya dengan perpustakaan ini. FRP lebih banyak tentang makna (perilaku) yang terus berubah secara dinamis, waktu berkelanjutan, dan semantik denotasional. Di akhir artikel, Anda dapat menemukan beberapa tautan menarik (4, 5, 6, 7).
RxJava menggunakan pemrograman reaktif dan pemrograman fungsional sebagai konsep intinya. Pemrograman reaktif dapat digambarkan sebagai transfer informasi berurutan dari objek yang diamati ke objek pengamat sedemikian rupa sehingga objek pengamat menerimanya secara otomatis (secara asinkron) saat informasi ini muncul.
Pemrograman fungsional menggunakan konsep fungsi murni, yaitu fungsi yang tidak menggunakan atau mengubah status eksternal; mereka sepenuhnya bergantung pada masukan mereka untuk mendapatkan keluarannya. Tidak adanya efek samping untuk fungsi murni memungkinkan untuk menggunakan hasil dari satu fungsi sebagai parameter input ke parameter lainnya. Ini memungkinkan untuk membuat rantai fungsi yang tidak terbatas.
Mengikat kedua konsep ini bersama-sama, bersama dengan GoF Observer dan pola Iterator, memungkinkan Anda membuat aliran data asinkron dan memprosesnya dengan banyak sekali fungsi yang sangat berguna. Ini juga memungkinkan untuk menggunakan multithreading dengan sangat sederhana, dan yang terpenting dengan aman, tanpa memikirkan masalahnya seperti sinkronisasi, ketidakkonsistenan memori, tumpang tindih utas, dll.
Tiga paus RxJava
Tiga komponen utama tempat dibangunnya RxJava adalah Observable, operator, dan penjadwal.
Dapat diamati di RxJava bertanggung jawab untuk menerapkan paradigma reaktif. Dapat diamati sering disebut sebagai aliran karena mereka menerapkan konsep aliran data dan penyebaran perubahan. Observable adalah jenis yang mencapai paradigma reaktif dengan menggabungkan dua pola dari Gang of Four: Observer dan Iterator. Observable menambahkan dua semantik yang hilang ke Observer, yang ada di Iterable:
- Kemampuan produsen untuk memberi sinyal kepada konsumen bahwa tidak ada lagi data yang tersedia (loop foreach di Iterable berakhir dan hanya kembali; Observable dalam hal ini memanggil metode onCompleate).
- Kemampuan produsen untuk memberi tahu konsumen bahwa telah terjadi kesalahan dan Observable tidak dapat lagi memancarkan elemen (Iterable melontarkan pengecualian jika terjadi kesalahan selama iterasi; Pengamatan memanggil onError pada pengamat dan keluarnya).
Jika Iterable menggunakan pendekatan "pull", yaitu, konsumen meminta nilai dari produsen, dan thread memblokir hingga nilai tersebut tiba, maka Observable adalah padanan "push" -nya. Artinya, produsen hanya mengirimkan nilai kepada konsumen saat sudah tersedia.
Dapat diamati hanyalah awal dari RxJava. Ini memungkinkan Anda untuk mengambil nilai secara asynchronous, tetapi kekuatan sebenarnya hadir dengan "ekstensi reaktif" (karenanya ReactiveX) - operatoryang memungkinkan Anda untuk mengubah, menggabungkan, dan membuat urutan elemen yang dipancarkan oleh Observable. Di sinilah paradigma fungsional mengedepankan fungsi murni. Operator memanfaatkan sepenuhnya konsep ini. Mereka memungkinkan Anda untuk bekerja dengan aman dengan urutan elemen yang dipancarkan Observable, tanpa takut efek samping, kecuali tentu saja Anda membuatnya sendiri. Operator mengizinkan multithreading tanpa mengkhawatirkan masalah seperti keamanan thread, kontrol thread level rendah, sinkronisasi, kesalahan ketidakkonsistenan memori, overlay thread, dll. Memiliki gudang fungsi yang besar, Anda dapat dengan mudah mengoperasikannya dengan berbagai data. Ini memberi kita alat yang sangat ampuh. Hal utama yang harus diingat adalah operator memodifikasi item yang dipancarkan oleh Observable, bukan Observable itu sendiri.Dapat diamati tidak pernah berubah sejak diciptakan. Saat memikirkan tentang utas dan operator, yang terbaik adalah berpikir dalam bagan. Jika Anda tidak tahu bagaimana menyelesaikan masalah, pikirkan, lihat seluruh daftar operator yang tersedia dan pikirkan lagi.
Sementara konsep pemrograman reaktif itu sendiri adalah asynchronous (jangan disamakan dengan multithreading), secara default semua item dalam sebuah Observable dikirim ke pelanggan secara sinkron, pada thread yang sama di mana metode subscribe () dipanggil. Untuk memperkenalkan sendiri asinkron itu, Anda perlu memanggil sendiri metode onNext (T), onError (Throwable), onComplete () di thread lain eksekusi, atau menggunakan penjadwal. Biasanya semua orang menganalisis perilakunya, jadi mari kita lihat strukturnya.
Perencanaabstrak pengguna dari sumber paralelisme di balik API mereka sendiri. Mereka menjamin bahwa mereka akan menyediakan properti tertentu, terlepas dari mekanisme konkurensi yang mendasarinya (implementasi), seperti Threads, event loop, atau Executor. Penjadwal menggunakan utas daemon. Ini berarti bahwa program akan berhenti dengan penghentian thread utama eksekusi, bahkan jika beberapa komputasi terjadi di dalam operator Observable.
RxJava memiliki beberapa penjadwal standar yang cocok untuk tujuan tertentu. Mereka semua memperluas kelas Penjadwal abstrak dan menerapkan logikanya sendiri untuk mengelola pekerja. Misalnya, ComputationScheduler, pada saat pembuatannya, membentuk kumpulan pekerja, yang jumlahnya sama dengan jumlah utas prosesor. ComputationScheduler kemudian menggunakan pekerja untuk melakukan tugas-tugas yang dapat dijalankan. Anda bisa meneruskan Runnable ke penjadwal menggunakan metode scheduleDirect () dan schedulePeriodicallyDirect (). Untuk kedua metode, penjadwal mengambil pekerja berikutnya dari kumpulan dan meneruskan Runnable ke sana.
Pekerja berada di dalam penjadwal dan merupakan entitas yang menjalankan objek Runnable (tugas) menggunakan salah satu dari beberapa skema konkurensi. Dengan kata lain, penjadwal menerima Runnable dan meneruskannya ke pekerja untuk dieksekusi. Anda juga dapat memperoleh pekerja secara mandiri dari penjadwal dan mentransfer satu atau lebih Dapat dijalankan kepadanya, secara independen dari pekerja lain dan penjadwal itu sendiri. Saat seorang pekerja menerima tugas, dia meletakkannya di antrian. Pekerja menjamin bahwa tugas dijalankan secara berurutan sesuai urutan pengirimannya, tetapi urutannya bisa terganggu oleh tugas yang tertunda. Misalnya, di ComputationScheduler, pekerja diimplementasikan menggunakan ScheduledExecutorService utas tunggal.
Jadi, kami memiliki pekerja abstrak yang dapat mengimplementasikan skema paralelisme apa pun. Pendekatan ini memberikan banyak keuntungan: modularitas, fleksibilitas, satu API, implementasi berbeda. Kami melihat pendekatan serupa di ExecutorService. Plus, kita bisa menggunakan penjadwal terpisah dari Observable.
Kesimpulan
RxJava adalah pustaka yang sangat kuat yang dapat digunakan dalam berbagai cara di banyak arsitektur. Cara penggunaannya tidak terbatas pada yang sudah ada, jadi usahakan selalu menyesuaikannya sendiri. Namun, ingatlah tentang PADAT, KERING dan prinsip desain lainnya, dan jangan lupa untuk berbagi pengalaman Anda dengan rekan kerja. Saya harap Anda dapat mempelajari sesuatu yang baru dan menarik dari artikel ini, sampai jumpa!