Kafka 问题实录 + 解决方案(附详细代码)
一:消息消费失败但 offset 被提交,导致消息丢失
问题描述
默认 @KafkaListener 自动提交 offset,如果业务代码抛异常后 offset 被提交,Kafka 会认为此条消息已成功消费,不再重试,导致数据丢失。
解决方案
设置为“手动提交 offset + 异常不提交”
详细代码
@KafkaListener(topics = "article-topic", containerFactory = "kafkaListenerContainerFactory")
public void listen(List<String> messages, Acknowledgment ack) {
try {
for (String msg : messages) {
// 假设这里可能抛出异常
process(msg);
}
ack.acknowledge(); // 成功后手动提交 offset
} catch (Exception e) {
log.error("处理失败,不提交 offset,Kafka 将重试: {}", e.getMessage());
}
}
public void process(String msg) {
if (msg.contains("error")) {
throw new RuntimeException("模拟业务异常");
}
// 正常业务逻辑
log.info("成功处理: {}", msg);
}
二:消息重复处理,导致数据重复写入数据库
问题描述
Kafka 保证“至少一次”语义,消费失败重试可能导致相同消息被多次处理。
解决方案
使用 Redis 实现幂等性判断(setIfAbsent)
详细代码
@Autowired
private StringRedisTemplate redisTemplate;
public void process(String json) {
Article article = JSON.parseObject(json, Article.class);
String redisKey = "dedup:article:" + article.getId();
// 如果该 key 不存在,则设置并处理
Boolean isNew = redisTemplate.opsForValue()
.setIfAbsent(redisKey, "1", Duration.ofHours(24));
if (Boolean.FALSE.equals(isNew)) {
log.warn("重复消息,跳过: {}", article.getId());
return;
}
// 后续处理逻辑
saveToDB(article);
}
三:Kafka backlog 积压,导致消费延迟
问题描述
爬虫发送数据频率高,消费端处理慢,Kafka 积压严重,消费延迟。
解决方案
- 批量消费 + 多线程并行处理
- 设置
concurrency
提升并发数
详细代码
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> factory) {
ConcurrentKafkaListenerContainerFactory<String, String> container =
new ConcurrentKafkaListenerContainerFactory<>();
container.setConsumerFactory(factory);
container.setConcurrency(5); // 开启多线程消费
container.setBatchListener(true); // 批量拉取
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return container;
}
}
@KafkaListener(topics = "article-topic", containerFactory = "kafkaListenerContainerFactory")
public void listen(List<String> messages, Acknowledgment ack) {
messages.parallelStream().forEach(this::process);
ack.acknowledge();
}
四:消息过大导致发送失败(RecordTooLargeException)
问题描述
Kafka 默认最大消息大小为 1MB,超过会报错。
解决方案
- Kafka 配置参数调高(如 10MB)
# server.properties message.max.bytes=10485760 # producer config max.request.size=10485760
- 大文件走 OSS、MinIO 等存储,Kafka 仅传 URL
Kafka 发送 URL 示例代码
String url = ossService.upload(article.getContent());
kafkaTemplate.send("article-topic", article.getId(), url);
五:服务重启未提交 offset 导致重复消费
问题描述
服务宕机或重启前未提交 offset,Kafka 重启后重新投递旧消息,导致重复处理。
解决方案
- 使用 Redis 实现幂等控制(见上)
- 设置
AckMode
为MANUAL_IMMEDIATE
保证业务完成后再提交 offset
总结
Kafka 强大但有坑,关键问题集中在:
- Offset 管理
- 幂等性
- 性能与延迟
- 消息大小与分区