Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first
其中有一句话说明: Note that this retry is no different than if the client resent the record upon receiving the error, 这句话含义为自动重试和手动重试没什么区别。
2021-11-09 23:22:45.470 ERROR 790 --- [d | producer-34] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='e6eedca3-1a49-406b-9090-09781ba08402' and payload='{"app_name":"inke","service_name":"gtd_chl_data","topic":"cap_click_attributes_info","time":"2021-11...' to topic app_inke_binlog:
org.apache.kafka.common.errors.TimeoutException: Expiring 11 record(s) for app_inke_binlog-5: 30013 ms has passed since batch creation plus linger time
是有的,其实这次数据丢失问题,根本原因并不是消息发送失败,而是消息消费和使用并没有匹配起来。目前的过程是:kafka consumer消费消息–>kafka producer 发送消息(async)–>kafka consumer确认偏移量。问题点就出在了kafka生产消息是异步的(ps:同步发送的性能非常差,是不可能采用的),这就导致了确认偏移量这个操作失去了我们想要的效果了。
那么,能不能异步确认偏移量呢?让kafka producer成功发送消息以后再执行偏移量确认操作?答案是可行的,这也是本文介绍的方案:
private void send(KafkaTemplate<Bytes, Bytes> kafkaTemplate, String topic, Bytes key, Bytes value, int count, Map<Bytes, Bytes> dataMap, Acknowledgment ack) {
ListenableFuture<SendResult<Bytes, Bytes>> future = kafkaTemplate.send(topic, null, value);
future.addCallback(new ListenableFutureCallback<SendResult<Bytes, Bytes>>() {
public void onFailure(Throwable e) {
if (count > 500) {
alertSupporter.sendMsg("重试用尽,topic:{}发送消息失败,key:{},value:{}", topic, key, value);
if (isRunning) {
executorService.submit(() -> send(kafkaTemplate, topic, null, value, count + 1, dataMap, ack));
} else {
alertSupporter.sendMsg("服务停止,topic:{}发送消息失败,key:{},value:{}", topic, key, value);
public void onSuccess(SendResult<Bytes, Bytes> result) {
if (dataMap.isEmpty()) {
try {
} catch (Exception e) {
alertSupporter.sendMsg("kafka确认偏移量失败!", e);