之前我这需要做两个kafka之间同步数据,之前用的kafka自带的kafka-mirror-maker,但是这个工具不方便维护,因此自己根据spring-kafka写了个同步工具。
问题但是最近发现数据同步过程中有数据丢失的情况,按道理讲这种情况不应该发生,因为我发送kafka消息时,重试次数设置的是retries=Integer.MAX_VALUE,也就是说会一直重试才对;根据kafka官方提供的关于retries的描述:
Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first
其中有一句话说明: Note that this retry is no different than if the client resent the record upon receiving the error, 这句话含义为自动重试和手动重试没什么区别。
那就很奇怪了,按道理讲不应该出现发送失败才对,于是开始排查日志,发现如下:
2021-11-09 23:22:45.470 ERROR 790 --- [d | producer-34] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='e6eedca3-1a49-406b-9090-09781ba08402' and payload='{"app_name":"inke","service_name":"gtd_chl_data","topic":"cap_click_attributes_info","time":"2021-11...' to topic app_inke_binlog:
org.apache.kafka.common.errors.TimeoutException: Expiring 11 record(s) for app_inke_binlog-5: 30013 ms has passed since batch creation plus linger time
于是开始翻看代码相应位置:
代码中会进行是否是首次发送的判断,如果是首次发送并且超时,那么会直接返回错误,所以官方说明是不可靠的;
解决方案既然让它自动重试不靠谱,那么咱们就只能手动重试了。那直接设置一个无限重试,是不是就OK了呢?这样做肯定可以达到和之前想要的效果,那就是出现失败就一直重试,反正正常情况下肯定能成功;但是这样做有没有问题呢?
是有的,其实这次数据丢失问题,根本原因并不是消息发送失败,而是消息消费和使用并没有匹配起来。目前的过程是:kafka consumer消费消息–>kafka producer 发送消息(async)–>kafka consumer确认偏移量。问题点就出在了kafka生产消息是异步的(ps:同步发送的性能非常差,是不可能采用的),这就导致了确认偏移量这个操作失去了我们想要的效果了。
那么,能不能异步确认偏移量呢?让kafka producer成功发送消息以后再执行偏移量确认操作?答案是可行的,这也是本文介绍的方案:
private void send(KafkaTemplate<Bytes, Bytes> kafkaTemplate, String topic, Bytes key, Bytes value, int count, Map<Bytes, Bytes> dataMap, Acknowledgment ack) {
ListenableFuture<SendResult<Bytes, Bytes>> future = kafkaTemplate.send(topic, null, value);
future.addCallback(new ListenableFutureCallback<SendResult<Bytes, Bytes>>() {
@Override
public void onFailure(Throwable e) {
if (count > 500) {
alertSupporter.sendMsg("重试用尽,topic:{}发送消息失败,key:{},value:{}", topic, key, value);
return;
}
if (isRunning) {
executorService.submit(() -> send(kafkaTemplate, topic, null, value, count + 1, dataMap, ack));
} else {
alertSupporter.sendMsg("服务停止,topic:{}发送消息失败,key:{},value:{}", topic, key, value);
}
}
@Override
public void onSuccess(SendResult<Bytes, Bytes> result) {
dataMap.remove(result.getProducerRecord().value());
if (dataMap.isEmpty()) {
try {
ack.acknowledge();
} catch (Exception e) {
alertSupporter.sendMsg("kafka确认偏移量失败!", e);
}
}
}
});
}
思路是这样:kafka消费到消息后,将所有的消息先存放到到一个Map中,每次成功发送就将这个Map中的数据remove掉,直到Map为空,那么发送就完成了,直接确认偏移量即可;如果发送失败,那么就走手动重试策略即可。目前这套方案已经发到了线上,目前性能稳定,数据也不再出现丢失情况了,问题解决;
总结其实不单单是发送数据到kafka,下游接收端也可能是hbase/es等数据源,这些数据源写入数据时,都是批量写入性能最优,因为写入这些数据源时,也可以根据异步确认的思路进行处理,保证数据不丢失的情况下,数据处理效率还能保持,这篇问题就这些了,再见。