前言

之前我这需要做两个kafka之间同步数据,之前用的kafka自带的kafka-mirror-maker,但是这个工具不方便维护,因此自己根据spring-kafka写了个同步工具。

问题

但是最近发现数据同步过程中有数据丢失的情况,按道理讲这种情况不应该发生,因为我发送kafka消息时,重试次数设置的是retries=Integer.MAX_VALUE,也就是说会一直重试才对;根据kafka官方提供的关于retries的描述:

 Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first  

其中有一句话说明: Note that this retry is no different than if the client resent the record upon receiving the error, 这句话含义为自动重试和手动重试没什么区别。

那就很奇怪了,按道理讲不应该出现发送失败才对,于是开始排查日志,发现如下:

 2021-11-09 23:22:45.470 ERROR 790 --- [d | producer-34] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='e6eedca3-1a49-406b-9090-09781ba08402' and payload='{"app_name":"inke","service_name":"gtd_chl_data","topic":"cap_click_attributes_info","time":"2021-11...' to topic app_inke_binlog:
org.apache.kafka.common.errors.TimeoutException: Expiring 11 record(s) for app_inke_binlog-5: 30013 ms has passed since batch creation plus linger time  

于是开始翻看代码相应位置:

代码中会进行是否是首次发送的判断,如果是首次发送并且超时,那么会直接返回错误,所以官方说明是不可靠的;

解决方案

既然让它自动重试不靠谱,那么咱们就只能手动重试了。那直接设置一个无限重试,是不是就OK了呢?这样做肯定可以达到和之前想要的效果,那就是出现失败就一直重试,反正正常情况下肯定能成功;但是这样做有没有问题呢?

是有的,其实这次数据丢失问题,根本原因并不是消息发送失败,而是消息消费和使用并没有匹配起来。目前的过程是:kafka consumer消费消息–>kafka producer 发送消息(async)–>kafka consumer确认偏移量。问题点就出在了kafka生产消息是异步的(ps:同步发送的性能非常差,是不可能采用的),这就导致了确认偏移量这个操作失去了我们想要的效果了。

那么,能不能异步确认偏移量呢?让kafka producer成功发送消息以后再执行偏移量确认操作?答案是可行的,这也是本文介绍的方案:

 private void send(KafkaTemplate<Bytes, Bytes> kafkaTemplate, String topic, Bytes key, Bytes value, int count, Map<Bytes, Bytes> dataMap, Acknowledgment ack) {
        ListenableFuture<SendResult<Bytes, Bytes>> future = kafkaTemplate.send(topic, null, value);
        future.addCallback(new ListenableFutureCallback<SendResult<Bytes, Bytes>>() {
            @Override
            public void onFailure(Throwable e) {
                if (count > 500) {
                    alertSupporter.sendMsg("重试用尽,topic:{}发送消息失败,key:{},value:{}", topic, key, value);
                    return;
                }
                if (isRunning) {
                    executorService.submit(() -> send(kafkaTemplate, topic, null, value, count + 1, dataMap, ack));
                } else {
                    alertSupporter.sendMsg("服务停止,topic:{}发送消息失败,key:{},value:{}", topic, key, value);
                }

            }

            @Override
            public void onSuccess(SendResult<Bytes, Bytes> result) {
                dataMap.remove(result.getProducerRecord().value());
                if (dataMap.isEmpty()) {
                    try {
                        ack.acknowledge();
                    } catch (Exception e) {
                        alertSupporter.sendMsg("kafka确认偏移量失败!", e);
                    }
                }
            }
        });
    }  

思路是这样:kafka消费到消息后,将所有的消息先存放到到一个Map中,每次成功发送就将这个Map中的数据remove掉,直到Map为空,那么发送就完成了,直接确认偏移量即可;如果发送失败,那么就走手动重试策略即可。目前这套方案已经发到了线上,目前性能稳定,数据也不再出现丢失情况了,问题解决;

总结

其实不单单是发送数据到kafka,下游接收端也可能是hbase/es等数据源,这些数据源写入数据时,都是批量写入性能最优,因为写入这些数据源时,也可以根据异步确认的思路进行处理,保证数据不丢失的情况下,数据处理效率还能保持,这篇问题就这些了,再见。