Menggunakan Azure Service Bus dari Java

Halo kolega! Kebetulan aplikasi kami ditulis dalam java stack, tetapi dihosting di Azure. Dan kami mencoba memanfaatkan layanan manajemen penyedia cloud.



Salah satunya adalah Azure Service Bus, dan hari ini saya ingin berbicara tentang fitur penggunaannya dalam aplikasi Spring Boot biasa.



Jika Anda ingin membaca tentang fitur rake - gulir ke bagian akhir artikel



Apa itu Bus Layanan Azure



Beberapa kata tentang Azure Service Bus adalah broker pesan cloud (pengganti cloud untuk RabbitMQ, ActiveMQ). Mendukung antrian (pesan dikirim ke satu penerima) dan topik (mekanisme terbitkan / berlangganan) - secara lebih rinci di sini



Dukungan dinyatakan:



  1. Pesan yang dipesan - dokumentasinya mengatakan bahwa ini adalah FIFO, TETAPI diimplementasikan menggunakan konsep sesi pesan - sekelompok pesan, bukan seluruh antrian. Jika Anda perlu menjamin urutan pesan, maka Anda menggabungkan pesan ke dalam grup, dan sekarang pesan dalam grup akan dikirim sebagai FIFO. Jadi, Antrean Bus Layanan Azure bukanlah FIFO - ini mengirimkan pesan Anda secara acak sesuai keinginan
  2. Antrian dead-letter - semuanya sederhana di sini, mereka tidak berhasil mengirimkan pesan setelah N percobaan atau periode waktu - dipindahkan ke DLQ
  3. Pengiriman terjadwal - Anda dapat mengatur penundaan sebelum pengiriman
  4. Penangguhan pesan - menyembunyikan pesan dalam antrian, pesan tidak akan dikirim secara otomatis, tetapi dapat diambil dengan ID. Kita perlu menyimpan ID ini di suatu tempat


Cara mengintegrasikan dengan Azure Service Bus



Azure Service Bus mendukung AMQP 1.0, yang berarti tidak kompatibel dengan klien RabbitMQ. bunny menggunakan AMQP 0.9.1



Satu-satunya klien "standar" yang dapat bekerja dengan Bus Layanan adalah Apache Qpid .



Ada 3 cara untuk memasangkan aplikasi Spring Boot Anda dengan Service Bus:



  1. JMS + QPID — , — QPID — .

    timeout producer — — factory.setCacheProducers(false);
  2. Spring Cloud — Azure Service Bus — , . Service Bus

    ( 1.2.6) — , azure service bus java sdk.



    Spring Integration — , «Scheduled delivery» «Message deferral» .



    sdk, MessageAndSessionPump

  3. azure service bus java sdk — ,


Spring Cloud — Azure Service Bus



Saya akan membahas metode ini secara lebih rinci dan memberi tahu Anda tentang fitur-fitur penggunaan

aplikasi contoh di repositori resmi, jadi tidak ada gunanya menduplikasi kode - repositori dengan contoh ada di sini .



Karena Ini adalah Spring Integration Messaging, semuanya bermuara pada Channel, MessageHandler, MessagingGateway, ServiceActivator.



Dan kemudian ada ServiceBusQueueTemplate .



Mengirim pesan



Kita harus punya Channel di mana kita menulis pesan yang ingin kita kirim, di sisi lain ada MessageHandler yang mengirimkannya ke Service Bus.



The MessagHandler adalah com.microsoft.azure.spring.integration.core.DefaultMessageHandler - ini adalah konektor ke layanan eksternal.



Bagaimana cara mengikatnya ke saluran? - tambahkan anotasi - @ServiceActivator (inputChannel = OUTPUT_CHANNEL) dan sekarang MessagHandler kami mendengarkan saluran OUTPUT_CHANNEL .



Selanjutnya, entah bagaimana kami perlu menulis pesan kami ke saluran - di sini lagi keajaiban musim semi - kami mengumumkan MessagingGateway dan mengikatnya ke saluran berdasarkan namanya.



Cuplikan dari contoh :



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(String text);
}


Itu saja: Gateway -> Channel -> MessagHandler -> ServiceBusQueueTemplate -> ServiceBusMessageConverter .



Dalam kode, tetap menginjeksi gateway kami dan memanggil metode kirim .



Saya menyebutkan ServiceBusMessageConverter di rantai panggilan karena suatu alasan - jika Anda ingin menambahkan header kustom (misalnya CORRELATION_ID) ke pesan, ini adalah tempat di mana mereka perlu dipindahkan dari org.springframework.messaging.MessageHeaders ke pesan biru.

Metode khusus setCustomHeaders .



Dalam kasus ini, gateway Anda akan terlihat seperti ini:



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(@Payload String text, @Header("CORRELATION_ID") String correlationId);
}


Menerima pesan



Oke, kami tahu cara mengirim pesan, bagaimana cara mendapatkannya sekarang?



Berikut semuanya adalah sama - MessageProducer -> Saluran -> Handler



The MessageProducer adalah com.microsoft.azure.spring.integration.servicebus.inbound.ServiceBusQueueInboundChannelAdapter - ini adalah konektor untuk layanan eksternal. Di dalam, ServiceBusQueueTemplate yang sama dengan ServiceBusMessageConverter tempat Anda dapat membaca header kustom dan meletakkannya di pesan integrasi pegas.



Saluran sudah dipasang di dalamnya dengan tangan:



@Bean
public ServiceBusQueueInboundChannelAdapter queueMessageChannelAdapter(
        @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, 
        ServiceBusQueueOperation queueOperation) {
    queueOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
    ServiceBusQueueInboundChannelAdapter adapter = new ServiceBusQueueInboundChannelAdapter(QUEUE_NAME,
            queueOperation);
    adapter.setOutputChannel(inputChannel);
    return adapter;
}


Tetapi Handler itu sendiri terhubung ke saluran melalui @ServiceActivator .



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
    String message = new String(payload);
.......


Anda bisa langsung mendapatkan baris:



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(String payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
.......


Anda mungkin telah memperhatikan parameter checkpointer checkpointer yang aneh, yang digunakan untuk secara manual mengakui pemrosesan pesan

Jika Anda menyetel CheckpointMode.MANUAL saat membuat ServiceBusQueueInboundChannelAdapter , Anda harus mengirim sendiri konfirmasi untuk pesan tersebut. Jika Anda menggunakan CheckpointMode.RECORD maka konfirmasi akan dikirim secara otomatis - detail dalam kode ServiceBusQueueTemplate .







Fitur penggunaan



Jadi, daftar "garu" dan "keripik" yang telah kami tuju.



ReceiveMode.PEEKLOCK



Azure Service Bus mendukung mode PEEKLOCK - konsumen mengambil pesan, terkunci ke dalam bus layanan, tidak dapat diakses oleh siapa pun selama waktu tertentu (durasi kunci), tetapi tidak dihapus darinya. Jika dalam waktu yang ditentukan konsumen belum mengirim konfirmasi pemrosesan - berhasil / ditinggalkan atau belum memperpanjang kunci - pesan dianggap tersedia lagi dan upaya pengiriman baru akan dilakukan.



Menariknya, abaikan hanya menyetel ulang kunci dan pesan menjadi tersedia secara instan untuk pengiriman ulang.



ServiceBusQueueTemplate default menciptakan QueueClient modus ReceiveMode.PEEKLOCK .



Jika pengecualian yang tidak tertangani terbang di handler kami- tidak ada konfirmasi yang akan dikirim ke server dan pesan akan tetap terkunci dan akan dikirim ulang saat batas waktu.

Dalam hal ini, penghitung pengiriman akan meningkat, yang logis.



Saya tidak tahu apakah ini bug atau fitur - tetapi akan sangat memudahkan untuk membuat jeda antara percobaan ulang untuk situasi saat diperlukan.



Jika pesan tidak dapat diproses bahkan dengan percobaan ulang, perlu untuk menangkap pengecualian dan menandai pesan sebagai diproses dan menambahkan logika tambahan ke aplikasi, jika tidak maka akan dikirim berulang kali hingga mencapai batas nomor pengiriman ulang (dikonfigurasi saat membuat antrian di bus layanan )



Jumlah pesan serentak dan prefetch



Seperti yang Anda duga, setelan konkurensi bertanggung jawab atas jumlah penangan pesan paralel, dan jumlah pesan prefetch adalah berapa banyak pesan yang akan kita masuk ke buffer dari server.



Secara default, ServiceBusQueueTemplate dikonfigurasi secara otomatis (AzureServiceBusQueueAutoConfiguration) dengan nilai 1 untuk kedua parameter, yaitu. secara default setiap antrian akan memiliki satu thread pemrosesan, meskipun konsep bus layanan dengan pengakuan untuk setiap pesan individu menyiratkan banyak penangan bersamaan. Ini semua lebih penting jika Anda memiliki pemrosesan permintaan yang lama.



Sayangnya, pengaturan ini tidak dapat diatur melalui konfigurasi aplikasi (application.yml / application.properties) dan hanya dapat diatur dalam kode. Tetapi bahkan melalui kode, Anda tidak akan dapat mengatur pengaturan yang berbeda untuk antrian yang berbeda.



Oleh karena itu, jika Anda perlu membuat pengaturan yang berbeda, Anda harus membuat beberapa kacang ServiceBusQueueTemplate untuk setiap ServiceBusQueueInboundChannelAdapter



CompletableFuture di dalam azure service bus java sdk



Sdk java bus layanan azure sendiri diimplementasikan di sekitar pelaksana CompletableFuture dan CachedThreadPool - MessagingFactory.INTERNAL_THREAD_POOL jadi berhati-hatilah dengan semua jenis kacang lokal utas



Pesan yang dipesan



Kami menggunakan bus layanan sebagai antrian pekerjaan - beberapa pekerjaan bergantung satu sama lain dan oleh karena itu harus dijalankan sesuai urutan pembuatannya.



Seperti yang saya sebutkan di atas, T-shirt menggunakan konsep sesi pesan - ketika pesan dikelompokkan ke dalam sesi berdasarkan kunci (ditransmisikan di header), sesi tersebut ada selama setidaknya ada satu pesan dengan kunci sesi - secara rinci dalam dokumentasi.

Bus layanan menjamin pengiriman pesan dalam grup seperti itu dalam urutan penambahan ke server (yaitu dalam urutan di mana server bus layanan menulisnya ke repositori).



Perlu juga disebutkan jika Anda telah membuat antrian yang mengaktifkan sesi - ini berarti bahwa semua pesan harus memiliki header dengan kunci sesi.



Segera kami sangat senang dengan kemungkinan bus layanan untuk menyusun pesan dalam antrian FIFO - meskipun untuk sekelompok pesan.



Tetapi setelah beberapa saat, kami mulai memperhatikan masalah:



  • beberapa pesan mulai berdatangan berkali-kali
  • pemrosesan antrian melambat
  • dalam statistik bus layanan, setengah dari permintaan ditandai sebagai gagal, dan permintaan yang gagal muncul bahkan pada antrian kosong saat idle


Melihat kode sdk, kami menemukan kekhasan bekerja dengan sesi:



  1. konsumen menangkap sesi tersebut dan mulai membaca semua pesan yang tersedia di dalamnya
  2. secara bersamaan, jumlah sesi diproses sama dengan parameter konkurensi
  3. unhandled exception — 1 ( ) — re-delivery ? 0 exception — ttl .
  4. — success abandon. — delay re-delivery

    .. abandon — , delivery counter .

    delivery count


Akibatnya, mereka meninggalkan fitur bus layanan ini dan menulis sepeda, dan bus layanan bertindak sebagai pemicu.



Segera setelah antrian yang diaktifkan sesi dibatalkan, kesalahan dalam statistik menghilang; permintaan ke bus layanan.



Dalam bundel JMS + Qpid - fungsi ini tidak tersedia.



Potensi masalah dengan ukuran antrian lebih dari 1G



Belum bertemu, tetapi mendengar bahwa itu mulai berfungsi tidak stabil jika ukuran antrian lebih dari 1G.



Jika Anda menemukan ini atau sebaliknya semuanya berfungsi - tulis di komentar.



Masalah dengan permintaan penelusuran



Agen wawasan aplikasi biru standar tidak dapat melacak pengiriman pesan sebagai ketergantungan dan pesan masuk sebagai permintaan.



Saya harus menambahkan beberapa kode.



Hasil



Jika Anda membutuhkan antrian pekerjaan dengan pemrosesan pesan yang panjang dan Anda tidak membutuhkan antrian, Anda dapat menggunakan.



Jika pemrosesan pesan cepat - gunakan Azure Event Hub - Kafka biasa, klien standar berfungsi dengan baik.



All Articles