Neste artigo, quero explicar com um pouco mais de detalhes como o mecanismo de auto-commit funciona para ouvintes na biblioteca kafka-clients (considere a versão 2.6.0)
Na documentação, podemos encontrar a seguinte formulação que descreve como funciona a confirmação automática:
A confirmação automática funciona basicamente como um cron com um período definido por meio da propriedade de configuração auto.commit.interval.ms. Se o consumidor travar, após uma reinicialização ou rebalanceamento, a posição de todas as partições pertencentes ao consumidor travado será redefinida para o último deslocamento confirmado.
Os documentos java para KafkaConsumer, por sua vez, contêm a seguinte descrição:
O consumidor pode comprometer compensações automaticamente e periodicamente; ou pode escolher controlar esta posição confirmada manualmente chamando uma das APIs de confirmação (por exemplo, commitSync e commitAsync).
A partir dessas formulações, pode surgir um equívoco de que uma confirmação de compensação automática sem bloqueio ocorre em segundo plano e não está totalmente claro como ela se relaciona com o processo de recebimento de mensagens por um consumidor específico e, o mais importante, quais garantias de entrega temos ?
Vamos dar uma olhada no mecanismo de recebimento de mensagens pelo ouvinte com a configuração enable.auto.commit = true usando o exemplo da implementação da classe KafkaConsumer da biblioteca org.apache.kafka: kafka-clients: 2.6.0
Para fazer isso, considere o exemplo fornecido no documento java KafkaConsumer :
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Como a confirmação automática acontece neste caso? A resposta deve ser encontrada no próprio método de recebimento de novas mensagens.
consumer.poll(Duration.ofMillis(100));
. KafkaConsumer auto-commit enable.auto.commit auto.commit.interval.ms ConsumerCoordinator , auto-commit.
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
nextAutoCommitTimer.update(now);
if (nextAutoCommitTimer.isExpired()) {
nextAutoCommitTimer.reset(autoCommitIntervalMs);
doAutoCommitOffsetsAsync();
}
}
}
enable.auto.commit = true auto.commit.interval.ms , , ( doAutoCommitOffsetsAsync)
private void doAutoCommitOffsetsAsync() {
Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);
commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
if (exception != null) {
if (exception instanceof RetriableCommitFailedException) {
log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
exception);
nextAutoCommitTimer.updateAndReset(rebalanceConfig.retryBackoffMs);
} else {
log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
}
} else {
log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
}
});
}
poll KafkaConsumer. updateAssignmentMetadataIfNeeded, poll ConsumerCoordinator, , maybeAutoCommitOffsetsAsync
poll KafkaConsumer:
offset
.
KafkaConsumer , .
.1 enable.auto.commit = true auto.commit.interval.ms. .. poll() 3 , auto.commit.interval.ms=6000, .
? “at least once delivery”, .