Siapa yang akan tertarik?
Reaktor saat ini bergaya, modis, awet muda. Mengapa begitu banyak dari kita yang mempraktikkan pemrograman reaktif? Hanya sedikit yang bisa menjawab pertanyaan ini dengan tegas. Baik - jika Anda memahami keuntungan Anda, buruk - jika reaktor dipaksakan oleh organisasi sebagai pemberian. Sebagian besar argumen "UNTUK" adalah penggunaan arsitektur layanan mikro, yang pada gilirannya mengharuskan layanan mikro untuk sering dan sering berkomunikasi satu sama lain. Untuk komunikasi, dalam banyak kasus, interaksi HTTP dipilih. HTTP membutuhkan server web yang ringan, apa yang pertama kali terlintas dalam pikiran? Kucing jantan. Di sini ada masalah dengan batas jumlah sesi maksimum, setelah melebihi itu server web mulai menolak permintaan (walaupun batas ini tidak mudah dicapai). Di sini reaktor datang untuk menyelamatkan, yang tidak dibatasi oleh batasan seperti itu, dan, misalnya,Netty sebagai server web yang bekerja dengan reaktivitas di luar kotak. Karena ada web server reaktif, Anda memerlukan klien web reaktif (Spring WebClient atau Reactive Feign), dan karena klien reaktif, maka semua horor ini meresap ke dalam logika bisnis, Mono dan Flux menjadi teman terbaik Anda (walaupun pada awalnya ada hanya kebencian :))
Di antara tugas-tugas bisnis, sangat sering ada prosedur serius yang memproses data dalam jumlah besar, dan kita juga harus menggunakan reaktor untuk itu. Di sini kejutan dimulai, jika Anda tidak tahu cara memasak reaktor, Anda bisa mendapatkan banyak masalah. Melebihi batas deskriptor file di server, OutOfMemory karena kecepatan kode non-blocking yang tidak terkendali, dan banyak lagi, yang akan kita bicarakan hari ini. Rekan-rekan saya dan saya telah mengalami banyak kesulitan karena masalah dalam memahami bagaimana menjaga reaktor tetap terkendali, tetapi segala sesuatu yang tidak membunuh kita membuat kita lebih pintar!
Kode pemblokiran dan non-pemblokiran
Anda tidak akan mengerti apa-apa lagi jika Anda tidak memahami perbedaan antara kode pemblokiran dan non-pemblokiran . Karena itu, mari kita berhenti dan memahami dengan cermat apa perbedaannya. Udah tau kan, kode blokir reaktor itu musuh, kode non blokir itu gan. Satu-satunya masalah adalah bahwa saat ini, tidak semua interaksi memiliki rekan non-pemblokiran.
Pemimpin di sini adalah interaksi HTTP, ada banyak opsi, pilih apa saja. Saya lebih suka Reactive Feign dari Playtika, dalam kombinasi dengan Spring Boot + WebFlux + Eureka kami mendapatkan build yang sangat bagus untuk arsitektur layanan mikro.
-: , , reactive, - :) Hibernate + PostgreSQL - , JavaMail - , IBMMQ - . , , MongoDB - . , , , (Thread.sleep() / Socket.read() ), - . ? , . 2 :
. BlockHound ( )
, , :
Schedulers.boundedElastic()
.publishOn
&subscribeOn
, , !
1
@Test
fun testLevel1() {
val result = Mono.just("")
.map { "123" }
.block()
assertEquals("123", result)
}
, reactor . ? Mono.just
:) map
"123" block
subscribe
.
block
, , , .block
RestController
, .
2
fun nonBlockingMethod1sec(data: String)
= data.toMono().delayElement(Duration.ofMillis(1000))
@Test
fun testLevel2() {
val result = nonBlockingMethod1sec("Hello world")
.flatMap { nonBlockingMethod1sec(it) }
.block()
assertEquals("Hello world", result)
}
, nonBlockingMethod1sec
, - . - , , .
3
fun collectTasks() = (0..99)
@Test
fun testLevel3() {
val result = nonBlockingMethod1sec("Hello world")
.flatMap { businessContext ->
collectTasks()
.toFlux()
.map {
businessContext + it
}
.collectList()
}
.block()!!
assertEquals(collectTasks().toList().size, result.size)
}
- Flux
! collectTasks
, , Flux
- . map. collectList
.
, . " ", .
4
fun collectTasks() = (0..100)
@Test
fun testLevel4() {
val result = nonBlockingMethod1sec("Hello world")
.flatMap { businessContext ->
collectTasks().toFlux()
.flatMap {
Mono.deferContextual { reactiveContext ->
val hash = businessContext + it + reactiveContext["requestId"]
hash.toMono()
}
}.collectList()
}
.contextWrite { it.put("requestId", UUID.randomUUID().toString()) }
.block()!!
assertEquals(collectTasks().toList().size, result.size)
}
. (15)
, (10)
. .
5
fun collectTasks() = (0..1000)
fun doSomethingNonBlocking(data: String)
= data.toMono().delayElement(Duration.ofMillis(1000))
fun doSomethingBlocking(data: String): String {
Thread.sleep(1000); return data
}
val pool = Schedulers.newBoundedElastic(10, Int.MAX_VALUE, "test-pool")
private val logger = getLogger()
@Test
fun testLevel5() {
val counter = AtomicInteger(0)
val result = nonBlockingMethod1sec("Hello world")
.flatMap { _ ->
collectTasks().toFlux()
.parallel()
.runOn(pool)
.flatMap {
Mono.deferContextual { _ ->
doSomethingNonBlocking(it.toString())
.doOnRequest { logger.info("Added task in pool ${counter.incrementAndGet()}") }
.doOnNext { logger.info("Non blocking code finished ${counter.get()}") }
.map { doSomethingBlocking(it) }
.doOnNext { logger.info("Removed task from pool ${counter.decrementAndGet()}") }
}
}.sequential()
.collectList()
}
.block()!!
assertEquals(collectTasks().toList().size, result.size)
}
! , . : doSomethingNonBlocking
(3)
& doSomethingBlocking
(6)
- , . (10)
, (15)
. parallel
(19)
sequential
(29)
. (20)
. , , doOnRequest
( ), doOnNext
( ). - , .
"", , . - , , . , , . , Flux .
. . , ? 100 , 1 , 1 , 10 ? ( senior reactor developer :))
12 . :) , 100 10 , 10 . , . " " .
(26) .map { doSomethingBlocking(it) }
. , , ?
2 ! 1 " " 1 . 100 . 10 ? ? .
collectTasks()
... 1000? 15000? ?
2 ! 1 " " 1 . . . ?
?
? ? ? 30000 , , , , ( web-client feign, ?) , , SSH . , , " ".
. Thread Pool & Reactor
- , - X , X , - . ? :) .
thread pool - . - , .
reactor! ?
, , . ? epoll , . . , , . , " ?", , . . , - , 500 -, . ! , , Schedulers.boundedElastic()
.
"", ?
!
, , , , , , 4-8 production 32 .
parallel
parallelism
parallelism
, rails ( , , ). Prefetch .
parallelism , .
flatMap
( Flux) , maxConcurrency
maxConcurrency
, Integer.MAX_VALUE
( . ?
, , ( http ), ! .
.
:
parallel (parallelism)
flatMap (maxConcurrency)
, .
- * Integer.MAX_VALUE *
, 5 5 . !
val result = nonBlockingMethod1sec("Hello world")
.flatMap { _ ->
collectTasks().toFlux()
.parallel(1)
.runOn(pool, 1)
.flatMap({
Mono.deferContextual { _ ->
doSomethingNonBlocking(it.toString())
}
}, false, 1, 1)
.sequential()
.collectList()
}
.block()!!
, ?
Thread Pool
? . - , , ! ? , :)
, Schedulers.parallel() ? =)
( parallel, ), , , .
. , , , . , , production . .
, round-robin, .
production , , , .
collectList()
, , 1 . , , .
concatMap
flatMap
( , )
, ( )
, ( )
prefetch
( !)
prefetch
flatMap
& runOn
, , , . - 256. 1, "work stealing", , , , .
Itu semua untukku. Akan menarik untuk membaca komentar dan komentar Anda, saya tidak berpura-pura 100% benar, tetapi semua hasil didukung oleh contoh-contoh praktis, pada Spring Boot + Project Reactor 3.4. Terimakasih untuk semua!