Bagaimana cara kami menggunakan Kafka Streams di tim gudang data Vivid Money?

Hei! Nama saya Andrey Serebryansky, saya seorang insinyur data di tim Operasi Data. Tim kami bertanggung jawab untuk mengisi repositori Snowflake kami, serta memastikan bahwa tim lainnya memiliki data waktu nyata. Misalnya feed transaksi (ini adalah pembelian pelanggan, transfer mereka, cashback yang diterima oleh mereka) diisi berdasarkan data kami.





Untuk semua tugas ini, kami menggunakan Kafka, dan yang paling penting Kafka Streams. Hari ini saya akan berbicara tentang tugas apa Kafka Streams dapat digunakan dan menunjukkan kode untuk contoh sederhana kami. Ini akan berguna bagi mereka yang menggunakan Kafka tetapi belum mencoba Kafka Streams. Jika Anda ingin mempertahankan status saat memproses topik Kafka, atau sedang mencari sintaks sederhana untuk memperkaya beberapa topik dengan informasi dari orang lain, hari ini saya akan menunjukkan bagaimana Anda dapat melakukannya dengan mudah dan praktis di luar kebiasaan.





Garis besar artikel

  1. Sedikit tentang Kafka Streams





  2. Mengapa kita membutuhkan Kafka Streams sama sekali





  3. Kasus No. 1. Memperkaya pembelian pelanggan kami dengan informasi merek





  4. Kasus nomor 2. Kami mengambil data pelanggan dari tim Origination ke penyimpanan kami





  5. Bagaimana memulai semua ini?





  6. Sedikit tentang skalabilitas Kafka Streams





  7. kesimpulan





Sedikit tentang Kafka Streams

Kafka Streams - Java. Kafka Java/Scala.





exactly once processing kafka transactions.





Kafka Streams , stateful (, ).





Kafka Streams?

: , - , , , , .





, - . , , . , , , , , .





: , . , , , .





Kami mengambil data secara berurutan dari sumber yang berbeda, menunggu jika terjadi kesalahan pada sumbernya.
, , -

, .





Terlalu banyak teman

, , , . . Kafka Streams. , ,





Kafka Streams menarik data yang diperlukan terlebih dahulu
Kafka Streams

, .





β„–1.

, . (brand_id) ( ).





Merek teratas

.





Topik otorisasi

.









builder.streams("authorization-events")
    .join(
        builder.globalTable("brands"), 
        auth -> auth.get("brand_id"), // ,      
        (brand, auth) -> auth.set("brandName", brand.get("name")) //  
    );

      
      



builder? . :





import org.apache.kafka.streams.StreamsBuilder;
...

StreamsBuilder builder = new StreamsBuilder();

      
      



, Kafka Streams id ( , ).





id ?

Kafka Streams , , - . builder.globalTable(topicName)



.





. , , . , . , , .





https://kafka.Apache.org/0110/documentation/streams/developer-guide#streams_duality
https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_duality

, Kafka Streams .





β„–2. Origination

Vivid Money, , . Origination - Vivid.





Informasi tentang nama depan dan belakang masuk ke database tim Origination
Origination

Kafka Connect open-source dynamodb JSON.





Kami mengambil data dari dynamodb ke kafka kami
dynamodb

, . , , . Apache AVRO. .





Avro
{
  "type": "record",
  "name": "OriginationClient",
  "namespace": "datahub",
  "fields": [
    {
      "name": "firstName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "lastName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    ...
  ]
}

      
      



, :





Schema schema = new Schema.Parser().parse(new File("path/to/schema.avsc"));
AvroConverter avroConverter = new AvroConverter(schema);

builder.stream("origination-json-topic")
    .mapValues(val -> avroConverter.convert(val))
    .to("origination-avro-topic");

      
      



AvroConverter - , . open source https://github.com/allegro/json-avro-converter . .





, . , , , . (diff) . , .





, . . . , Kafka Streams . , , .





:





import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
...

var changes = builder.stream(sourceTopic);
var stateStoreSupplier = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore**("state-store"**), //                
  Serdes.Bytes(), //       
    new GenericAvroSerde() //       
);
builder.addStateStore(stateStoreSupplier);
changes.transform(() -> new ChangeTransformer(), "state-store") //  ,  
    .to(outputTopic);

      
      



ChangeTransformer :





public class ChangeTransformer implements Transformer {
  private KeyValueStore<Bytes, GenericRecord> stateStore;

  @Override
  public void init(ProcessorContext processorContext) {
     this.stateStore = processorContext.getStateStore("state-store");
  }
  @Override
  public KeyValue<String, GenericRecord> transform(String recordKey, GenericRecord record) {
    GenericRecord prevState = stateStore.get(recordKey);
    return extractDiff(prevState, record);
  }
  ...
}

      
      



?

StreamsBuilder builder = new StreamsBuilder();builder.stream("my-input-topic")
        .filter(...)
        .map(...)
        .to("my-output-topic");
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start(); // 
...
kafkaStreams.stop();

      
      



Kafka Streams

Kafka Streams . . 16 , 16 . , .





, state-store ( ChangeTransformer-), , ! .





: https://docs.confluent.io/platform/current/streams/architecture.html#parallelism-model





Kafka Streams :





  • stateful (join, get previous state). , .





  • . map, filter, join DSL. , transform()



    . ChangeTransformer-, .





  • . . .





P.S. ) , !








All Articles