- tekanan balik itu keren
- backpressure hanya tersedia di pustaka yang menerapkan spesifikasi aliran reaktif
- spesifikasi ini sangat kompleks sehingga Anda bahkan tidak boleh mencoba menerapkannya sendiri
Pada artikel ini saya akan mencoba menunjukkan bahwa:
- backpressure sangat sederhana
- untuk menerapkan tekanan balik asinkron, cukup dengan membuat versi asinkron dari semaphore
- jika Anda memiliki implementasi semaphore asinkron, antarmuka org.reactivestreams.Publisher diimplementasikan dalam beberapa lusin baris kode
Backpressure adalah umpan balik yang menyesuaikan kecepatan produsen data agar sesuai dengan kecepatan konsumen. Dengan tidak adanya koneksi seperti itu, produsen yang lebih cepat dapat meluap buffer konsumen atau, jika buffer tidak berdimensi, menghabiskan semua RAM.
Dalam pemrograman multithread, masalah ini diselesaikan oleh Dijkstroy, yang mengusulkan mekanisme sinkronisasi baru - semaphore. Semaphore dapat dianggap sebagai penghitung izin. Diasumsikan bahwa produsen meminta izin dari semaphore sebelum melakukan tindakan intensif sumber daya. Jika semaphore kosong, maka thread produsen diblokir.
Program asinkron tidak dapat memblokir utas, sehingga mereka tidak dapat mengakses semaphore kosong untuk mendapatkan izin (tetapi mereka dapat melakukan semua operasi semaphore lainnya). Mereka harus memblokir eksekusinya dengan cara lain. Cara lain ini adalah mereka hanya meninggalkan thread pekerja tempat mereka mengeksekusi, tetapi sebelum itu mereka mengatur untuk kembali bekerja segera setelah semaphore penuh.
Cara paling elegan untuk menjeda dan melanjutkan program asinkron adalah dengan menyusunnya sebagai aktor aliran data dengan port :
Model aliran data - aktor dengan port, koneksi terarah antara port mereka, dan token awal. Diambil dari: Deskripsi Terstruktur Aktor Dataflow Dan Aplikasinya
Ada port input dan output. Port input menerima token (pesan dan sinyal) dari port output aktor lain. Jika port input berisi token, dan port output memiliki tempat untuk menempatkan token, maka port tersebut dianggap aktif. Jika semua port aktor aktif, itu dikirim untuk dieksekusi. Jadi, saat melanjutkan pekerjaannya, program aktor dapat dengan aman membaca token dari port input dan menulis hingga akhir pekan. Mekanisme sederhana ini berisi semua kebijaksanaan pemrograman asynchronous. Mengalokasikan port sebagai subobjek aktor terpisah sangat menyederhanakan pengkodean program asinkron dan memungkinkan Anda meningkatkan keragamannya dengan menggabungkan port dari berbagai jenis.
Aktor Hewitt klasik berisi 2 port - satu terlihat, dengan buffer untuk pesan masuk, biner tersembunyi lainnya, yang memblokir saat aktor dikirim untuk dieksekusi dan, dengan demikian, mencegah aktor memulai ulang hingga akhir peluncuran awal. Semaphore asynchronous yang diinginkan adalah persilangan antara dua port ini. Seperti buffer pesan, ia dapat menyimpan banyak token, dan seperti port tersembunyi, token ini berwarna hitam, yang tidak dapat dibedakan, seperti di jaring Petri, dan penghitung token sudah cukup untuk menyimpannya.
Pada level pertama hierarki, kami memiliki kelas
AbstractActordengan tiga kelas bersarang - basis Portdan turunannya AsyncSemaPortdan InPort, serta dengan mekanisme untuk meluncurkan aktor untuk dieksekusi tanpa adanya port yang diblokir. Singkatnya, ini terlihat seperti ini:
public abstract class AbstractActor {
/** */
private int blocked = 0;
protected synchronized void restart() {
controlPort.unBlock();
}
private synchronized void incBlockCount() {
blocked++;
}
private synchronized void decBlockCount() {
blocked--;
if (blocked == 0) {
controlPort.block();
excecutor.execute(this::run);
}
}
protected abstract void turn() throws Throwable;
/** */
private void run() {
try {
turn();
restart();
} catch (Throwable throwable) {
whenError(throwable);
}
}
}
Ini berisi sekumpulan minimal kelas port:
Port- kelas dasar dari semua port
protected class Port {
private boolean isBlocked = true;
public Port() {
incBlockCount();
}
protected synchronized void block() {
if (isBlocked) {
return;
}
isBlocked = true;
incBlockCount();
}
protected synchronized void unBlock() {
if (!isBlocked) {
return;
}
isBlocked = false;
decBlockCount();
}
}
Semaphore asinkron:
public class AsyncSemaPort extends Port {
private long permissions = 0;
public synchronized void release(long n) {
permissions += n;
if (permissions > 0) {
unBlock();
}
}
public synchronized void aquire(long delta) {
permissions -= delta;
if (permissions <= 0) {
//
// ,
//
block();
}
}
}
InPort - buffer minimum untuk satu pesan masuk:
public class InPort<T> extends Port implements OutMessagePort<T> {
private T item;
@Override
public void onNext(T item) {
this.item = item;
unBlock();
}
public synchronized T poll() {
T res = item;
item = null;
return res;
}
}
Versi lengkap kelas
AbstractActordapat dilihat di sini.
Pada level hierarki berikutnya, kami memiliki tiga aktor abstrak dengan port tertentu, tetapi dengan rutinitas pemrosesan yang tidak ditentukan:
- kelas
AbstractProduceradalah aktor dengan satu port dari jenis semaphore asynchronous (dan port kontrol internal, hadir di semua aktor secara default). - kelasnya
AbstractTransformeradalah aktor Hewitt biasa, dengan referensi ke port input aktor berikutnya dalam rantai, di mana ia mengirimkan token yang dikonversi. - kelas
AbstractConsumerjuga merupakan aktor biasa, tetapi tidak mengirim token yang dikonversi ke mana pun, sementara memiliki tautan ke semaphore produser, dan membuka semafor ini setelah menyerap token input. Dengan cara ini, jumlah token dalam proses dipertahankan konstan dan tidak ada buffer overflow yang terjadi.
Di tingkat terakhir, sudah di direktori pengujian, aktor tertentu yang digunakan dalam pengujian didefinisikan :
- kelas
ProducerActormenghasilkan aliran integer yang terbatas. - kelas
TransformerActormengambil nomor berikutnya dari aliran dan mengirimkannya ke rantai. - kelas
ConsumerActor- menerima dan mencetak nomor yang dihasilkan
Sekarang kita dapat membangun rantai penangan pemrosesan paralel dan asinkron sebagai berikut: produsen - sejumlah transformator - konsumen
Jadi, kita telah menerapkan tekanan balik, dan bahkan dalam bentuk yang lebih umum daripada dalam spesifikasi aliran reaktif - umpan balik dapat menjangkau sejumlah acak pemrosesan kaskade, dan tidak hanya yang berdekatan, seperti dalam spesifikasi.
Untuk mengimplementasikan spesifikasi, Anda perlu menentukan port keluaran yang sensitif terhadap jumlah izin yang diteruskan ke sana menggunakan metode request () - ini akan menjadi
Publisher, dan melengkapi yang sudah ada dengan InPortpanggilan ke metode ini - ini akan menjadi Subscriber. Artinya, kami menganggap bahwa antarmuka PublisherdanSubscribermenggambarkan perilaku pelabuhan, bukan aktornya. Tetapi dilihat dari fakta bahwa dalam daftar antarmuka juga ada Processor, yang sama sekali tidak bisa menjadi antarmuka port, penulis spesifikasi menganggap antarmuka mereka sebagai antarmuka aktor. Nah, kita bisa membuat aktor yang mengimplementasikan semua antarmuka ini dengan mendelegasikan eksekusi fungsi antarmuka ke port yang sesuai.
Untuk kesederhanaan, biarkan milik kita
Publishertidak memiliki buffer sendiri dan akan menulis langsung ke buffer Subscriber. Untuk melakukan ini, Anda memerlukan seseorang untuk Subscriberberlangganan dan memenuhi request(), yaitu, kami memiliki 2 ketentuan dan, karenanya, kami memerlukan 2 port - InPort<Subscriber>dan AsyncSemaPort. Tidak ada satupun yang cocok sebagai dasar implementasiPublisher'a, karena ini berisi metode yang tidak perlu, jadi kita akan membuat variabel internal port ini:
public class ReactiveOutPort<T> implements Publisher<T>, Subscription, OutMessagePort<T> {
protected AbstractActor.InPort<Subscriber<? super T>> subscriber;
protected AbstractActor.AsyncSemaPort sema;
public ReactiveOutPort(AbstractActor actor) {
subscriber = actor.new InPort<>();
sema = actor.new AsyncSemaPort();
}
}
Kali ini, kami
ReactiveOutPorttidak mendefinisikan kelas sebagai kelas bersarang, sehingga diperlukan parameter konstruktor, referensi ke aktor yang melampirkan, untuk membuat instance port yang didefinisikan sebagai kelas bersarang.
Metode
subscribe(Subscriber subscriber)intinya adalah menyimpan pelanggan dan panggilan subscriber.onSubscribe():
public synchronized void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) {
throw new NullPointerException();
}
if (this.subscriber.isFull()) {
subscriber.onError(new IllegalStateException());
return;
}
this.subscriber.onNext(subscriber);
subscriber.onSubscribe(this);
}
yang biasanya menghasilkan panggilan
Publisher.request()yang bermuara pada menaikkan semaphore dengan panggilan AsyncSemaPort.release():
public synchronized void request(long n) {
if (subscriber.isEmpty()) {
return; // this spec requirement
}
if (n <= 0) {
subscriber.current().onError(new IllegalArgumentException());
return;
}
sema.release(n);
}
Dan sekarang tinggal bagi kita untuk tidak lupa menurunkan semaphore menggunakan panggilan
AsyncSemaPort.aquire()pada saat penggunaan resource:
public synchronized void onNext(T item) {
Subscriber<? super T> subscriber = this.subscriber.current();
if (subscriber == null) {
throw new IllegalStateException();
}
sema.aquire();
subscriber.onNext(item);
}
Proyek AsyncSemaphore dirancang khusus untuk artikel ini. Ini sengaja dibuat sekompak mungkin agar tidak melelahkan pembaca. Akibatnya, ini mengandung batasan yang signifikan:
-
Publisher'Subscriber' -
Subscriber' 1
Selain itu,
AsyncSemaPortini bukan analog lengkap dari semaphore sinkron - hanya satu klien yang dapat melakukan operasi aquire()y AsyncSemaPort(artinya aktor yang melampirkan). Tapi ini bukan kerugian - AsyncSemaPortitu memenuhi perannya dengan baik. Pada prinsipnya, Anda dapat melakukannya secara berbeda - ambil java.util.concurrent.Semaphoredan lengkapi dengan antarmuka langganan asinkron (lihat AsyncSemaphore.java dari proyek DF4J ). Semaphore semacam itu dapat mengikat aktor dan rangkaian eksekusi dalam urutan apa pun.
Secara umum, setiap jenis interaksi sinkron (pemblokiran) memiliki pasangan asinkron (non-pemblokiran) sendiri. Jadi, dalam proyek DF4J yang sama ada implementasi
BlockingQueue, dilengkapi dengan antarmuka asinkron. Ini membuka kemungkinan transformasi langkah-demi-langkah dari program multithread menjadi program yang tidak sinkron, sebagian menggantikan utas dengan aktor.