Menggunakan Spring Cloud Stream Binding dengan Kafka Message Broker

Halo semuanya! Nama saya Vitaly, saya seorang pengembang di Web3Tech. Dalam posting ini, saya akan memperkenalkan konsep dan konstruksi dasar dari framework Spring Cloud Stream untuk mendukung dan bekerja dengan makelar pesan Kafka, dengan putaran penuh pengujian unit kontekstual mereka. Kami menggunakan skema seperti itu dalam proyek pemungutan suara elektronik semua-Rusia kami di platform blockchain Waves Enterprise .





Sebagai bagian dari tim proyek Spring Cloud, Spring Cloud Stream didasarkan pada Spring Boot dan menggunakan Integrasi Spring untuk menyediakan komunikasi dengan pialang pesan. Namun, ini mudah diintegrasikan dengan berbagai broker pesan dan membutuhkan konfigurasi minimal untuk membuat layanan mikro berbasis peristiwa atau pesan.





Konfigurasi dan ketergantungan

Pertama, kita perlu menambahkan dependensi spring-cloud-starter-stream-kafka ke build.gradle :





dependencies {
   implementation(kotlin("stdlib"))
   implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
   implementation("com.fasterxml.jackson.module:jackson-module-kotlin")

   implementation("org.springframework.boot:spring-boot-starter-web")
   implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")

   testImplementation("org.springframework.boot:spring-boot-starter-test")
   testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")
   testImplementation("org.springframework.kafka:spring-kafka-test:springKafkaTestVersion")
}
      
      



Dalam konfigurasi project Spring Cloud Stream, Anda perlu menyertakan URL broker Kafka, nama antrean (topik), dan parameter binding lainnya. Berikut adalah contoh konfigurasi YAML untuk layanan application.yaml :





spring:
 application:
   name: cloud-stream-binding-kafka-app
 cloud:
   stream:
     kafka:
       binder:
         brokers: 0.0.0.0:8080
         configuration:
           auto-offset-reset: latest
     bindings:
       customChannel:                   #Channel name
         destination: 0.0.0.0:8080      #Destination to which the message is sent (topic)
         group: input-group-N
         contentType: application/json
         consumer:
           max-attempts: 1
           autoCommitOffset: true
           autoCommitOnError: false
      
      



Konsep dan kelas

, , Spring Cloud Stream, , (SpringCloudStreamBindingKafkaApp.kt):





@EnableBinding(ProducerBinding::class)

@SpringBootApplication
 
 class SpringCloudStreamBindingKafkaApp

 fun main(args: Array<String>) {

    SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args)

 }
      
      



@EnableBinding , .





.





Binding β€” , .

Binder β€” middleware .

Channel β€” middleware .

StreamListeners β€” (beans), , MessageConverter middleware β€œDTO”.

Message Schema β€” , . .





send/receive, producer consumer. , Spring Cloud Stream.





Producer Kafka, (ProducerBinding.kt):





interface ProducerBinding {

   @Output(BINDING_TARGET_NAME)
   fun messageChannel(): MessageChannel
}
      
      



onsumer Kafka .





ConsumerBinding.kt:





interface ConsumerBinding {

   companion object {
       const val BINDING_TARGET_NAME = "customChannel"
   }

   @Input(BINDING_TARGET_NAME)
   fun messageChannel(): MessageChannel
}
      
      



Consumer.kt:





@EnableBinding(ConsumerBinding::class)
class Consumer(val messageService: MessageService) {

   @StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME)
   fun process(
       @Payload message: Map<String, Any?>,
       @Header(value = KafkaHeaders.OFFSET, required = false) offset: Int?
   ) {
       messageService.consume(message)
   }
}
      
      



Kafka . Kafka, spring-kafka-test.





MessageCollector

, . ProducerBinding payload ProducerTest.kt:





@SpringBootTest
class ProducerTest {

   @Autowired
   lateinit var producerBinding: ProducerBinding

   @Autowired
   lateinit var messageCollector: MessageCollector

   @Test
   fun `should produce somePayload to channel`() {
       // ARRANGE
       val request = mapOf(1 to "foo", 2 to "bar", "three" to 10101)

       // ACT
producerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())
       val payload = messageCollector.forChannel(producerBinding.messageChannel())
           .poll()
           .payload

       // ASSERT
       val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)
       assertTrue(request.entries.stream().allMatch { re ->
           re.value == payloadAsMap[re.key.toString()]
       })

       messageCollector.forChannel(producerBinding.messageChannel()).clear()
   }
}
      
      



Embedded Kafka

@ClassRule . Kafka Zookeeper , . Kafka Zookeper (ConsumerTest.kt):





@SpringBootTest
@ActiveProfiles("test")
@EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class])
@EnableBinding(ProducerBinding::class)
class ConsumerTest {

   @Autowired
   lateinit var producerBinding: ProducerBinding

   @Autowired
   lateinit var objectMapper: ObjectMapper

   @MockBean
   lateinit var messageService: MessageService

   companion object {
       @ClassRule @JvmField
       var embeddedKafka = EmbeddedKafkaRule(1, true, "any-name-of-topic")
   }

   @Test
   fun `should consume via txConsumer process`() {
       // ACT
       val request = mapOf(1 to "foo", 2 to "bar")
       producerBinding.messageChannel().send(MessageBuilder.withPayload(request)
           .setHeader("someHeaderName", "someHeaderValue")
           .build())

       // ASSERT
       val requestAsMap = objectMapper.readValue<Map<String, Any?>>(objectMapper.writeValueAsString(request))
       runBlocking {
           delay(20)
           verify(messageService).consume(requestAsMap)
       }
   }
}
      
      



Dalam posting ini, saya mendemonstrasikan kemampuan Spring Cloud Stream dan cara menggunakannya dengan Kafka. Spring Cloud Stream menawarkan antarmuka yang ramah pengguna dengan nuansa konfigurasi broker yang disederhanakan, diimplementasikan dengan cepat, bekerja dengan stabil, dan mendukung broker populer modern seperti Kafka. Hasilnya, saya memberikan sejumlah contoh dengan pengujian unit berdasarkan EmbeddedKafkaRule menggunakan MessageCollector.





Semua sumber dapat ditemukan di Github . Terima kasih sudah membaca!








All Articles