线上出现了kafka消息堆积和重平衡

 @KafkaListener(topics = KafkaConstant.SUMMARY_TOPIC, containerFactory = "batchKafkaListenerContainerFactory")
    public void process(List<ConsumerRecord<String, Record>> consumerRecordList, Acknowledgment acknowledgment) {
        log.info("summary consuming message, size:{}", consumerRecordList.size());
        try {
           
           //do something

        } catch (Exception e) {
            log.error("error consuming message", e);
        } finally {
            acknowledgment.acknowledge();
        }
    }

发现的问题

1. 缺乏错误重试机制

  • 大部分 listener 在 catch 块中只记录日志,没有重试逻辑
  • KafkaServiceImpl.executeTask() 中,如果单个消息处理失败,只记录错误,但整个批次仍会被 ack,导致消息丢失
  • 没有配置 Kafka 的重试机制(如 @Retryable 或 Dead Letter Topic)

2. 批处理中的消息丢失风险

  • 对于高/中优先级的批处理 listener(Email, SMS, Push),使用异步 executor 处理
  • 如果某个消息处理失败,其他消息仍继续处理,但失败消息不会被重试
  • executeTask 方法在所有异步任务提交后立即 ack,导致处理失败的消息无法恢复

3. 超时配置问题

  • WebhookListener 有 180 秒超时,但其他 listener 没有类似的超时保护
  • 批处理可能因个别慢消息阻塞整个批次

4. 缺乏死信队列 (DLQ)

  • 没有配置 Dead Letter Topic 来处理无法处理的消息
  • 失败消息直接丢失,没有机会进行后续处理或分析

5. 异常处理不一致

  • 低优先级 listener 使用 try-catch-finally 确保 ack
  • 高/中优先级使用 KafkaService 异步处理,错误处理较弱

6. 资源管理

  • 异步处理没有限制并发数量,可能导致资源耗尽
  • CompletableFuture 没有显式的超时或取消机制(除了 Webhook)

建议改进

  1. 添加重试机制:使用 Spring Retry 或 Kafka 重试配置
  2. 实现死信队列:配置 DLQ topic 处理失败消息
  3. 改进批处理逻辑:在 executeTask 中收集失败消息,ack 前检查处理结果
  4. 添加监控:记录消息处理成功/失败统计
  5. 统一错误处理:建立标准化的错误处理策略
  6. 添加熔断器:防止下游服务故障影响消费

Back to the top!