Produsen / Konsumen di Kafka dan Kotlin

Terjemahan artikel disiapkan pada malam awal kursus "Pengembangan backend di Kotlin"








Pada artikel ini, kita akan membahas tentang cara membuat aplikasi Spring Boot sederhana dengan Kafka dan Kotlin.



pengantar



Mulailah dengan mengunjungi https://start.spring.io dan tambahkan dependensi berikut:



Groovy



implementation("org.springframework.boot:spring-boot-starter-data-rest")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.apache.kafka:kafka-streams")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.kafka:spring-kafka")


Dalam contoh kami, kami akan menggunakan Gradle untuk membangun. Anda mungkin memilih Maven.



. IntelliJ IDEA.



Apache Kafka



Apache Kafka . Windows 10. Kafka «too many lines encountered». Kafka . , - Power Shell.



Kafka, :



Shell



.\zookeeper-server-start.bat ..\..\config\zookeeper.properties
.\kafka-server-start.bat ..\..\config\server.properties


/bin/windows.



Kafka, Zookeeper. Zookeeper – Apache, .



Spring Boot



IDE , KafkaDemoApplication.kt. Spring, .



:



Kotlin



import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class KafkaDemoApplication 

fun main(args: Array<String>) {
   runApplication<KafkaDemoApplication>(*args)
}




. .



-, . KafkaController.kt. :



Kotlin



var kafkaTemplate:KafkaTemplate<String, String>? = null;
val topic:String = "test_topic"

@GetMapping("/send")
fun sendMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
    var lf : ListenableFuture<SendResult<String, String>> = kafkaTemplate?.send(topic, message)!!
    var sendResult: SendResult<String, String> = lf.get()
    return ResponseEntity.ok(sendResult.producerRecord.value() + " sent to topic")
}


, test_topic, KafkaTemplate. ListenableFuture, . , .





Kafka – KafkaProducer. :



Kotlin



@GetMapping("/produce")
fun produceMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
    var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)

    val map = mutableMapOf<String, String>()
    map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"
    map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    map["bootstrap.servers"] = "localhost:9092"

    var producer = KafkaProducer<String, String>(map as Map<String, Any>?)
    var future:Future<RecordMetadata> = producer?.send(producerRecord)!!
    return ResponseEntity.ok(" message sent to " + future.get().topic());
}


.



KafkaProduce Map, . , StringSerializer.



, Serializer – Kafka, . Apache Kafka , ByteArraySerializer, ByteSerializer, FloatSerializer .



map StringSerializer.



Kotlin



map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"
map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"


– bootstrap-, Kafka.



Kotlin



map["bootstrap.servers"] = "localhost:9092"


, KafkaProducer.



ProducerRecord . :



Kotlin



var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)


:



Kotlin



var future:Future<RecordMetadata> = producer?.send(producerRecord)!!


future , .





, . . , , .

MessageConsumer.kt Service.



Kotlin



@KafkaListener(topics= ["test_topic"], groupId = "test_id")
fun consume(message:String) :Unit {
    println(" message received from topic : $message");
}


@KafkaListener , . , , .

GitHub.






«Backend- Kotlin»







All Articles