Kafka 问题实录 + 解决方案

Kafka 问题实录 + 解决方案(附详细代码)


一:消息消费失败但 offset 被提交,导致消息丢失

问题描述

默认 @KafkaListener 自动提交 offset,如果业务代码抛异常后 offset 被提交,Kafka 会认为此条消息已成功消费,不再重试,导致数据丢失。

解决方案

设置为“手动提交 offset + 异常不提交”

详细代码

@KafkaListener(topics = "article-topic", containerFactory = "kafkaListenerContainerFactory")
public void listen(List<String> messages, Acknowledgment ack) {
    try {
        for (String msg : messages) {
            // 假设这里可能抛出异常
            process(msg);
        }
        ack.acknowledge(); // 成功后手动提交 offset
    } catch (Exception e) {
        log.error("处理失败,不提交 offset,Kafka 将重试: {}", e.getMessage());
    }
}

public void process(String msg) {
    if (msg.contains("error")) {
        throw new RuntimeException("模拟业务异常");
    }
    // 正常业务逻辑
    log.info("成功处理: {}", msg);
}

二:消息重复处理,导致数据重复写入数据库

问题描述

Kafka 保证“至少一次”语义,消费失败重试可能导致相同消息被多次处理。

解决方案

使用 Redis 实现幂等性判断(setIfAbsent)

详细代码

@Autowired
private StringRedisTemplate redisTemplate;

public void process(String json) {
    Article article = JSON.parseObject(json, Article.class);
    String redisKey = "dedup:article:" + article.getId();

    // 如果该 key 不存在,则设置并处理
    Boolean isNew = redisTemplate.opsForValue()
            .setIfAbsent(redisKey, "1", Duration.ofHours(24));

    if (Boolean.FALSE.equals(isNew)) {
        log.warn("重复消息,跳过: {}", article.getId());
        return;
    }

    // 后续处理逻辑
    saveToDB(article);
}

三:Kafka backlog 积压,导致消费延迟

问题描述

爬虫发送数据频率高,消费端处理慢,Kafka 积压严重,消费延迟。

解决方案

  • 批量消费 + 多线程并行处理
  • 设置 concurrency 提升并发数

详细代码

@Configuration
public class KafkaConfig {
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> factory) {
        ConcurrentKafkaListenerContainerFactory<String, String> container = 
            new ConcurrentKafkaListenerContainerFactory<>();
        container.setConsumerFactory(factory);
        container.setConcurrency(5); // 开启多线程消费
        container.setBatchListener(true); // 批量拉取
        container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return container;
    }
}

@KafkaListener(topics = "article-topic", containerFactory = "kafkaListenerContainerFactory")
public void listen(List<String> messages, Acknowledgment ack) {
    messages.parallelStream().forEach(this::process);
    ack.acknowledge();
}

四:消息过大导致发送失败(RecordTooLargeException)

问题描述

Kafka 默认最大消息大小为 1MB,超过会报错。

解决方案

  • Kafka 配置参数调高(如 10MB)
    # server.properties
    message.max.bytes=10485760
    # producer config
    max.request.size=10485760
  • 大文件走 OSS、MinIO 等存储,Kafka 仅传 URL

Kafka 发送 URL 示例代码

String url = ossService.upload(article.getContent());
kafkaTemplate.send("article-topic", article.getId(), url);

五:服务重启未提交 offset 导致重复消费

问题描述

服务宕机或重启前未提交 offset,Kafka 重启后重新投递旧消息,导致重复处理。

解决方案

  • 使用 Redis 实现幂等控制(见上)
  • 设置 AckModeMANUAL_IMMEDIATE 保证业务完成后再提交 offset


总结

Kafka 强大但有坑,关键问题集中在:

  • Offset 管理
  • 幂等性
  • 性能与延迟
  • 消息大小与分区