Pada artikel ini, saya ingin menjelaskan sedikit lebih detail bagaimana mekanisme komit otomatis bekerja untuk pendengar di pustaka kafka-klien (pertimbangkan versi 2.6.0)
Dalam dokumentasi, kita dapat menemukan formulasi berikut yang menjelaskan cara kerja komit otomatis:
Komit otomatis pada dasarnya berfungsi sebagai cron dengan periode yang ditetapkan melalui properti konfigurasi auto.commit.interval.ms. Jika konsumen mengalami crash, maka setelah restart atau rebalance, posisi semua partisi yang dimiliki oleh konsumen yang mengalami crash akan diatur ulang ke offset terakhir yang dilakukan.
Dokumen java untuk KafkaConsumer, pada gilirannya, berisi deskripsi berikut:
Konsumen dapat secara otomatis melakukan offset secara berkala; atau dapat memilih untuk mengontrol posisi yang dikomit ini secara manual dengan memanggil salah satu API komit (mis. commitSync dan commitAsync).
Dari formulasi ini, kesalahpahaman mungkin muncul bahwa komit offset otomatis non-pemblokiran terjadi di latar belakang dan tidak sepenuhnya jelas bagaimana kaitannya dengan proses penerimaan pesan oleh konsumen tertentu dan, yang terpenting, jaminan pengiriman apa yang kami miliki ?
Mari kita lihat lebih dekat mekanisme untuk menerima pesan oleh pendengar dengan pengaturan enable.auto.commit = true menggunakan contoh implementasi kelas KafkaConsumer dari pustaka org.apache.kafka: kafka-clients: 2.6.0
Untuk melakukan ini, pertimbangkan contoh yang diberikan di dokumen java KafkaConsumer :
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Bagaimana komit otomatis terjadi dalam kasus ini? Jawabannya harus ditemukan dalam metode itu sendiri untuk menerima pesan baru.
consumer.poll(Duration.ofMillis(100));
. KafkaConsumer auto-commit enable.auto.commit auto.commit.interval.ms ConsumerCoordinator , auto-commit.
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
nextAutoCommitTimer.update(now);
if (nextAutoCommitTimer.isExpired()) {
nextAutoCommitTimer.reset(autoCommitIntervalMs);
doAutoCommitOffsetsAsync();
}
}
}
enable.auto.commit = true auto.commit.interval.ms , , ( doAutoCommitOffsetsAsync)
private void doAutoCommitOffsetsAsync() {
Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);
commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
if (exception != null) {
if (exception instanceof RetriableCommitFailedException) {
log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
exception);
nextAutoCommitTimer.updateAndReset(rebalanceConfig.retryBackoffMs);
} else {
log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
}
} else {
log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
}
});
}
poll KafkaConsumer. updateAssignmentMetadataIfNeeded, poll ConsumerCoordinator, , maybeAutoCommitOffsetsAsync
poll KafkaConsumer:
offset
.
KafkaConsumer , .
.1 enable.auto.commit = true auto.commit.interval.ms. .. poll() 3 , auto.commit.interval.ms=6000, .
? โat least once deliveryโ, .