Pada pekerjaan saat ini, kami menulis di Reaktor. Teknologi ini keren, tapi seperti biasa ada banyak TAPI. Beberapa hal mengganggu, kodenya lebih sulit untuk ditulis dan dibaca, dan ThreadLocal adalah bencana yang nyata. Saya memutuskan untuk melihat masalah apa yang akan hilang jika Anda beralih ke Kotlin Coroutines, dan masalah apa, sebaliknya, yang akan ditambahkan.
Kartu pasien
Saya menulis proyek kecil untuk artikel tersebut , mereproduksi masalah yang saya temui di tempat kerja. Kode utama ada di sini . Algoritme dengan sengaja tidak memecahnya menjadi metode terpisah, sehingga masalahnya dapat dilihat dengan lebih baik.
Singkatnya tentang algoritme:
Kami mentransfer uang dari satu akun ke akun lain, mencatat transaksi tentang fakta transfer.
Terjemahan itu idempoten, jadi jika transaksi sudah ada di database, maka kami membalas ke klien bahwa semuanya baik-baik saja. Saat memasukkan transaksi, DataIntegrityViolationException dapat dilempar, ini juga berarti bahwa transaksi tersebut sudah ada.
Agar tidak berubah menjadi negatif, centang + Kode kunci optimis, yang tidak memungkinkan pembaruan kompetitif akun. Untuk membuatnya bekerja, Anda perlu mencoba lagi dan penanganan kesalahan tambahan.
Bagi mereka yang tidak menyukai algoritme itu sendiri
Algoritme untuk proyek dipilih untuk mereproduksi masalah, bukan agar efisien dan benar secara arsitektural. Alih-alih satu transaksi, Anda perlu memasukkan semikonduktor, kunci optimis tidak diperlukan sama sekali (alih-alih memeriksa kepositifan akun di sql), pilih + sisipkan harus diganti dengan upsert.
Keluhan pasien
Stacktrace tidak menunjukkan bagaimana kami sampai ke area masalah.
Kode tersebut jelas lebih kompleks daripada pada pemblokiran teknologi.
Kode multistage bersarang karena flatMap.
Penanganan dan pelemparan kesalahan yang tidak nyaman.
Penanganan perilaku yang kompleks untuk Mono.empty ().
Kesulitan dengan logging, jika Anda perlu menambahkan sesuatu yang global ke log, misalnya traceId. (Saya tidak menjelaskan di artikel, tetapi masalah yang sama dengan variabel ThreadLocal lainnya, misalnya SpringSecurity)
Tidak nyaman untuk melakukan debug.
Api implisit untuk paralelisasi.
Kemajuan pengobatan
PR Java Kotlin.
.
com.fasterxml.jackson.module:jackson-module-kotlin data org.jetbrains.kotlin.plugin.spring open .
suspend fun transfer(@RequestBody request: TransferRequest)
public Mono<Void> transfer(@RequestBody TransferRequest request)
suspend fun save(account: Account): Account
Mono<Account> save(Account account);
, , suspend , , Reactor .
runBlocking { β¦ }
, suspend .
Retry kotlin-retry. , , ( PR).
, , . -.
:
public Mono<Void> transfer(String transactionKey, long fromAccountId,
long toAccountId, BigDecimal amount) {
return transactionRepository.findByUniqueKey(transactionKey)
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(withMDC(foundTransaction -> {
if (foundTransaction.isPresent()) {
log.warn("retry of transaction " + transactionKey);
return Mono.empty();
}
return accountRepository.findById(fromAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound()))
.flatMap(fromAccount -> accountRepository.findById(toAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound()))
.flatMap(toAccount -> {
var transactionToInsert = Transaction.builder()
.amount(amount)
.fromAccountId(fromAccountId)
.toAccountId(toAccountId)
.uniqueKey(transactionKey)
.build();
var amountAfter = fromAccount.getAmount().subtract(amount);
if (amountAfter.compareTo(BigDecimal.ZERO) < 0) {
return Mono.error(new NotEnoghtMoney());
}
return transactionalOperator.transactional(
transactionRepository.save(transactionToInsert)
.onErrorResume(error -> {
//transaction was inserted on parallel transaction,
//we may return success response
if (error instanceof DataIntegrityViolationException
&& error.getMessage().contains("TRANSACTION_UNIQUE_KEY")) {
return Mono.empty();
} else {
return Mono.error(error);
}
})
.then(accountRepository.transferAmount(
fromAccount.getId(), fromAccount.getVersion(),
amount.negate()
))
.then(accountRepository.transferAmount(
toAccount.getId(), toAccount.getVersion(), amount
))
);
}));
}))
.retryWhen(Retry.backoff(3, Duration.ofMillis(1))
.filter(OptimisticLockException.class::isInstance)
.onRetryExhaustedThrow((__, retrySignal) -> retrySignal.failure())
)
.onErrorMap(
OptimisticLockException.class,
e -> new ResponseStatusException(
BANDWIDTH_LIMIT_EXCEEDED,
"limit of OptimisticLockException exceeded", e
)
)
.onErrorResume(withMDC(e -> {
log.error("error on transfer", e);
return Mono.error(e);
}));
}
:
suspend fun transfer(transactionKey: String, fromAccountId: Long,
toAccountId: Long, amount: BigDecimal) {
try {
try {
retry(limitAttempts(3) + filter { it is OptimisticLockException }) {
val foundTransaction = transactionRepository
.findByUniqueKey(transactionKey)
if (foundTransaction != null) {
logger.warn("retry of transaction $transactionKey")
return@retry
}
val fromAccount = accountRepository.findById(fromAccountId)
?: throw AccountNotFound()
val toAccount = accountRepository.findById(toAccountId)
?: throw AccountNotFound()
if (fromAccount.amount - amount < BigDecimal.ZERO) {
throw NotEnoghtMoney()
}
val transactionToInsert = Transaction(
amount = amount,
fromAccountId = fromAccountId,
toAccountId = toAccountId,
uniqueKey = transactionKey
)
transactionalOperator.executeAndAwait {
try {
transactionRepository.save(transactionToInsert)
} catch (e: DataIntegrityViolationException) {
if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {
throw e;
}
}
accountRepository.transferAmount(
fromAccount.id!!, fromAccount.version, amount.negate()
)
accountRepository.transferAmount(
toAccount.id!!, toAccount.version, amount
)
}
}
} catch (e: OptimisticLockException) {
throw ResponseStatusException(
BANDWIDTH_LIMIT_EXCEEDED,
"limit of OptimisticLockException exceeded", e
)
}
} catch (e: Exception) {
logger.error(e) { "error on transfer" }
throw e;
}
}
Stacktraces
, .
:
o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockException
at c.g.c.v.r.services.Ledger.lambda$transfer$5(Ledger.java:75)
...
Caused by: c.g.c.v.r.OptimisticLockException: null
at c.g.c.v.r.repos.AccountRepositoryImpl.lambda$transferAmount$0(AccountRepositoryImpl.java:27)
at r.c.p.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
...
:
error on transfer o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockException
at c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:70)
at c.g.c.v.r.services.Ledger$transfer$1.invokeSuspend(Ledger.kt)
...
Caused by: c.g.c.v.r.OptimisticLockException: null
at c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)
...
at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)
at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)
at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)
(Coroutine boundary)
at o.s.t.r.TransactionalOperatorExtensionsKt.executeAndAwait(TransactionalOperatorExtensions.kt:31)
at c.g.c.v.r.services.Ledger$transfer$3.invokeSuspend(Ledger.kt:56)
at com.github.michaelbull.retry.RetryKt$retry$3.invokeSuspend(Retry.kt:38)
at c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:35)
at c.g.c.v.r.controllers.LedgerController$transfer$2$1.invokeSuspend(LedgerController.kt:20)
at c.g.c.v.r.controllers.LedgerController$transfer$2.invokeSuspend(LedgerController.kt:19)
at kotlin.reflect.full.KCallables.callSuspend(KCallables.kt:55)
at o.s.c.CoroutinesUtils$invokeSuspendingFunction$mono$1.invokeSuspend(CoroutinesUtils.kt:64)
(Coroutine creation stacktrace)
at k.c.i.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:122)
at k.c.i.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
...
Caused by: c.g.c.v.r.OptimisticLockException: null
at c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)
...
at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)
at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)
at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)
...
, ( , ).
Java . , . . . Kotlin .
, - . ? . , - traceId (thread name ) .
Kotlin , , . ( : ).
flatMap. - try catch, .
:
return accountRepository.findById(fromAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound()))
.flatMap(fromAccount -> accountRepository.findById(toAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound()))
.flatMap(toAccount -> {
...
})
:
val fromAccount = accountRepository.findById(fromAccountId)
?: throw AccountNotFound()
val toAccount = accountRepository.findById(toAccountId)
?: throw AccountNotFound()
...
try catch, .
:
return transactionRepository.findByUniqueKey(transactionKey)
...
.onErrorMap(
OptimisticLockException.class,
e -> new ResponseStatusException(
BANDWIDTH_LIMIT_EXCEEDED,
"limit of OptimisticLockException exceeded", e
)
)
:
try {
val foundTransaction = transactionRepository
.findByUniqueKey(transactionKey)
...
} catch (e: OptimisticLockException) {
throw ResponseStatusException(
BANDWIDTH_LIMIT_EXCEEDED,
"limit of OptimisticLockException exceeded", e
)
}
throw, . Reactor :
.flatMap(foo -> {
if (foo.isEmpty()) {
return Mono.error(new IllegalStateException());
} else {
return Mono.just(foo);
}
})
, , . - .
Mono.empty()
. null . Β¨C5C.
Ide , mono . . , - .
Kotlin not null , , . nullable - .
:
:
return transactionRepository.findByUniqueKey(transactionKey)
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(foundTransaction -> {
if (foundTransaction.isPresent()) {
log.warn("retry of transaction " + transactionKey);
return Mono.empty();
}
...
:
val foundTransaction = transactionRepository
.findByUniqueKey(transactionKey)
if (foundTransaction != null) {
logger.warn("retry of transaction $transactionKey")
return@retry
}
...
, - Reactor, .
, traceId . ThreadLocal , MDC ( ). ?
. Reactor Coroutines immutable, MDC ( ).
Java , traceId :
@Component
public class TraceIdFilter implements WebFilter {
@Override
public Mono<Void> filter(
ServerWebExchange exchange, WebFilterChain chain
) {
var traceId = Optional.ofNullable(
exchange.getRequest().getHeaders().get("X-B3-TRACEID")
)
.orElse(Collections.emptyList())
.stream().findAny().orElse(UUID.randomUUID().toString());
return chain.filter(exchange)
.contextWrite(context ->
LoggerHelper.addEntryToMDCContext(context, "traceId", traceId)
);
}
}
, - , traceId MDC:
public static <T, R> Function<T, Mono<R>> withMDC(
Function<T, Mono<R>> block
) {
return value -> Mono.deferContextual(context -> {
Optional<Map<String, String>> mdcContext = context
.getOrEmpty(MDC_ID_KEY);
if (mdcContext.isPresent()) {
try {
MDC.setContextMap(mdcContext.get());
return block.apply(value);
} finally {
MDC.clear();
}
} else {
return block.apply(value);
}
});
}
, Mono. .. , Mono. :
.onErrorResume(withMDC(e -> {
log.error("error on transfer", e);
return Mono.error(e);
}))
Kotlin . , traceId MDC:
@Component
class TraceIdFilter : WebFilter {
override fun filter(
exchange: ServerWebExchange, chain: WebFilterChain
): Mono<Void> {
val traceId = exchange.request.headers["X-B3-TRACEID"]?.first()
MDC.put("traceId", traceId ?: UUID.randomUUID().toString())
return chain.filter(exchange)
}
}
withContext(MDCContext()) { β¦ }
, MDC traceId. .
Java Reactor , : , , breakpoints ...
: stepOver, , ( ).
, suspend . issue. , , Java Reactor evaluate , .
, , .
:
return Mono.zip(
transactionRepository.findByUniqueKey(transactionKey)
.map(Optional::of)
.defaultIfEmpty(Optional.empty()),
accountRepository.findById(fromAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound())),
accountRepository.findById(toAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound())),
).flatMap(withMDC(fetched -> {
var foundTransaction = fetched.getT1();
var fromAccount = fetched.getT2();
var toAccount = fetched.getT3();
if (foundTransaction.isPresent()) {
log.warn("retry of transaction " + transactionKey);
return Mono.empty();
}
...
}
:
coroutineScope {
val foundTransactionAsync = async {
logger.info("async fetch of transaction $transactionKey")
transactionRepository.findByUniqueKey(transactionKey)
}
val fromAccountAsync = async {
accountRepository.findById(fromAccountId)
}
val toAccountAsync = async {
accountRepository.findById(toAccountId)
}
if (foundTransactionAsync.await() != null) {
logger.warn("retry of transaction $transactionKey")
return@retry
}
val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()
val toAccount = toAccountAsync.await() ?: throw AccountNotFound()
...
}
Kotlin β β, β β Reactor.
, -. Reactor , . - foundTransactionAsync.await(). , transactionRepository.findByUniqueKey() , , accountRepository.findById() ( ).
. , Reactor :
coroutineScope {
val foundTransactionAsync = async {
logger.info("async fetch of transaction $transactionKey")
transactionRepository.findByUniqueKey(transactionKey)
}
val fromAccountAsync = async {
accountRepository.findById(fromAccountId)
}
val toAccountAsync = async {
accountRepository.findById(toAccountId)
}
if (foundTransactionAsync.await() != null) {
logger.warn("retry of transaction $transactionKey")
return@retry
}
val transactionToInsert = Transaction(
amount = amount,
fromAccountId = fromAccountId,
toAccountId = toAccountId,
uniqueKey = transactionKey
)
transactionalOperator.executeAndAwait {
try {
transactionRepository.save(transactionToInsert)
} catch (e: DataIntegrityViolationException) {
if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {
throw e;
}
}
val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()
val toAccount = toAccountAsync.await() ?: throw AccountNotFound()
if (fromAccount.amount - amount < BigDecimal.ZERO) {
throw NotEnoghtMoney()
}
accountRepository.transferAmount(
fromAccount.id!!, fromAccount.version, amount.negate()
)
accountRepository.transferAmount(
toAccount.id!!, toAccount.version, amount
)
}
}
. .. , . , , ( ).
, , .
context scope
, :
scope. , , .
context. .
Spring , :
@PutMapping("/transfer")
suspend fun transfer(@RequestBody request: TransferRequest) {
coroutineScope {
withContext(MDCContext()) {
ledger.transfer(request.transactionKey, request.fromAccountId,
request.toAccountId, request.amount)
}
}
}
, regexp , . - .
AOP suspend
, , . aspect suspend .
. , .
, ( ).
. , .
, .
, JetBrains . , - , .
Yang terpenting, dengan coroutine, Anda tidak perlu mengingat semua fitur Reaktor dan API-nya yang hebat. Anda tinggal menulis kodenya.