线上出现了kafka消息堆积和重平衡
@KafkaListener(topics = KafkaConstant.SUMMARY_TOPIC, containerFactory = "batchKafkaListenerContainerFactory")
public void process(List<ConsumerRecord<String, Record>> consumerRecordList, Acknowledgment acknowledgment) {
log.info("summary consuming message, size:{}", consumerRecordList.size());
try {
//do something
} catch (Exception e) {
log.error("error consuming message", e);
} finally {
acknowledgment.acknowledge();
}
}
发现的问题
1. 缺乏错误重试机制
- 大部分 listener 在 catch 块中只记录日志,没有重试逻辑
KafkaServiceImpl.executeTask()中,如果单个消息处理失败,只记录错误,但整个批次仍会被 ack,导致消息丢失- 没有配置 Kafka 的重试机制(如
@Retryable或 Dead Letter Topic)
2. 批处理中的消息丢失风险
- 对于高/中优先级的批处理 listener(Email, SMS, Push),使用异步 executor 处理
- 如果某个消息处理失败,其他消息仍继续处理,但失败消息不会被重试
executeTask方法在所有异步任务提交后立即 ack,导致处理失败的消息无法恢复
3. 超时配置问题
WebhookListener有 180 秒超时,但其他 listener 没有类似的超时保护- 批处理可能因个别慢消息阻塞整个批次
4. 缺乏死信队列 (DLQ)
- 没有配置 Dead Letter Topic 来处理无法处理的消息
- 失败消息直接丢失,没有机会进行后续处理或分析
5. 异常处理不一致
- 低优先级 listener 使用 try-catch-finally 确保 ack
- 高/中优先级使用
KafkaService异步处理,错误处理较弱
6. 资源管理
- 异步处理没有限制并发数量,可能导致资源耗尽
CompletableFuture没有显式的超时或取消机制(除了 Webhook)
建议改进
- 添加重试机制:使用 Spring Retry 或 Kafka 重试配置
- 实现死信队列:配置 DLQ topic 处理失败消息
- 改进批处理逻辑:在
executeTask中收集失败消息,ack 前检查处理结果 - 添加监控:记录消息处理成功/失败统计
- 统一错误处理:建立标准化的错误处理策略
- 添加熔断器:防止下游服务故障影响消费
Back to the top!