Antrian DelayedQueue

Beberapa tahun yang lalu, di salah satu proyek, kami dihadapkan pada kebutuhan untuk menunda pelaksanaan suatu tindakan untuk jangka waktu tertentu. Misalnya, cari tahu status pembayaran dalam tiga jam atau kirim ulang pemberitahuan setelah 45 menit Namun, pada saat itu kami tidak menemukan pustaka yang sesuai yang dapat "ditunda" dan tidak memerlukan waktu tambahan untuk konfigurasi dan pengoperasian. Kami menganalisis opsi yang memungkinkan dan menulis pustaka antrian tertunda kecil kami sendiri di Java menggunakan Redis sebagai repositori. Pada artikel ini saya akan berbicara tentang kapabilitas perpustakaan, alternatifnya dan "garu" yang kami temui dalam prosesnya.



Kegunaan



Jadi apa fungsi antrian tertunda? Acara yang ditambahkan ke antrian tertunda dikirim ke penangan setelah jangka waktu yang ditentukan. Jika pemrosesan gagal, acara akan dikirimkan lagi nanti. Selain itu, jumlah upaya maksimum dibatasi. Redis tidak menjamin keamanan, dan Anda harus bersiap untuk kehilangan acara . Namun, dalam versi cluster, Redis menunjukkan keandalan yang cukup tinggi, dan kami tidak pernah menemui hal ini selama satu setengah tahun pengoperasian.



API



Tambahkan acara ke antrian



eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1)).subscribe();


Perhatikan bahwa metode ini kembali Mono, jadi untuk menjalankan Anda perlu melakukan salah satu hal berikut:



  • subscribe(...)
  • block()


Penjelasan lebih rinci disediakan dalam dokumentasi untuk Project Reactor. Konteksnya ditambahkan ke acara seperti ini:



eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1), Map.of("key", "value")).subscribe();


Daftarkan penangan acara



eventService.addHandler(DummyEvent.class, e -> Mono.just(true), 1);


, :



eventService.addHandler(
        DummyEvent.class,
        e -> Mono
            .subscriberContext()
            .doOnNext(ctx -> {
                Map<String, String> eventContext = ctx.get("eventContext");
                log.info("context key {}", eventContext.get("key"));
            })
            .thenReturn(true),
        1
);




eventService.removeHandler(DummyEvent.class);




"-":



import static com.github.fred84.queue.DelayedEventService.delayedEventService;

var eventService = delayedEventService().client(redisClient).build();


:



import static com.github.fred84.queue.DelayedEventService.delayedEventService;

var eventService = delayedEventService()
        .client(redisClient)
        .mapper(objectMapper)
        .handlerScheduler(Schedulers.fromExecutorService(executor))
        .schedulingInterval(Duration.ofSeconds(1))
        .schedulingBatchSize(SCHEDULING_BATCH_SIZE)
        .enableScheduling(false)
        .pollingTimeout(POLLING_TIMEOUT)
        .eventContextHandler(new DefaultEventContextHandler())
        .dataSetPrefix("")
        .retryAttempts(10)
        .metrics(new NoopMetrics())
        .refreshSubscriptionsInterval(Duration.ofMinutes(5))
        .build();


( Redis) eventService.close() , @javax.annotation.PreDestroy.





- , . :



  • , Redis;
  • , ( "delayed.queue.ready.for.handling.count" )




, delayed queue. 2018

Amazon Web Services.

, . : " , Amazon-, ".





:





- , JMS . SQS , 15 .





" " . , Redis :





, Netflix dyno-queues

. , , .



, " " sorted set list, ( ):



var events = redis.zrangebyscore("delayed_events", Range.create(-1, System.currentTimeMillis()), 100);
events.forEach(key -> {
  var payload = extractPayload(key);
  var listName = extractType(key);
  redis.lpush(listName, payload);
  redis.zrem("delayed_events", key);
});


Spring Integration, :



redis.brpop(listName)


.





"list" (, ), list . Redis , 2 .



events.forEach(key -> {
  ...
  redis.multi();
  redis.zrem("delayed_events", key);
  redis.lpush(listName, payload);
  redis.exec();
});




list-a . , . "sorted_set" .



events.forEach(key -> {
  ...
  redis.multi();
  redis.zadd("delayed_events", nextAttempt(key))
  redis.zrem("delayed_events", key);
  redis.lpush(listName, payload);
  redis.exec();
});




, , " " "delayed queue" . "sorted set"

metadata;payload, payload , metadata - . . , metadata payload Redis hset "sorted set" .



var envelope = metadata + SEPARATOR + payload;
redis.zadd(envelope, scheduledAt);




var envelope = metadata + SEPARATOR + payload;
var key = eventType + SEPARATOR + eventId;

redis.multi();
redis.zadd(key, scheduledAt);
redis.hset("metadata", key, envelope)
redis.exec();




, . , list . TTL :



redis.set(lockKey, "value", ex(lockTimeout.toMillis() * 1000).nx());




Spring, . " " :





Lettuce , . Project Reactor , " ".

, Subscriber



redis
  .reactive()
  .brpop(timeout, queue)
  .map(e -> deserialize(e))
  .subscribe(new InnerSubscriber<>(handler, ... params ..))




class InnerSubscriber<T extends Event> extends BaseSubscriber<EventEnvelope<T>> {

    @Override
    protected void hookOnNext(@NotNull EventEnvelope<T> envelope) {
        Mono<Boolean> promise = handler.apply(envelope.getPayload());
        promise.subscribe(r -> request(1));
    }
}


, ( Netflix dyno queue, poll- ).



?



  • Kotlin DSL. Kotlin suspend fun API Project Reactor


Tautan






All Articles