Kafka Consumer Loop 异常处理
25 Apr 2021不论用什么语言的 Kafka 客户端,consumer worker 的逻辑通常是一个 fetch loop,每次获取一条或者一批消息,然后在循环里处理信息,成功以后自动或者手动 commit offset。
比如使用 segmentio/kafka-go1 Go 客户端的 consumer worker 逻辑如下:
func (s *Service) Worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
s.reader.Close()
return
default:
m, err := s.reader.FetchMessage(ctx)
if err != nil {
fmt.Printf("fetch messages error: %v\n", err)
// 消息 fetch 失败,重新 fetch
break
}
// 消息处理逻辑
// 消息反序列化并构建实体 o
...
if err := s.Save(o) {
// 服务异常处理,重新 fetch 消息
// 注意这里 break 的是 select 结构,不是外层 for 循环
break
}
s.reader.CommitMessages(ctx, m)
}
}
}
即使在设置了 offset 手动 commit 的前提下,上面的逻辑仍有一个问题,在服务异常的时候会丢失信息。当在 consumer loop 的逻辑里调用服务(s.Save(o)
)出现异常的时候(比如网络抖动或者数据库超时),下一次 fetch 消息不会重新获取到上一次处理失败的消息,不论这个消息有没有 committed。因此假如下一个 fetch 的新消息处理成功并 committed,那么之前处理失败的消息就相当于丢失2了。
当然,如果把异常处理的 break
换成 return
,出现异常时这个 consumer worker 就退出了,那么重启 consumer worker 以后 fetch 到的消息还是未经 commit 的失败消息,这种情况下不会丢失消息,但每次出现异常都需要重启进程显然也不可取。
if err := s.Save(o) {
// 服务异常处理,退出 consumer loop
// 这样不会丢失消息,但需要重启或重新创建 consumer worker
return
}
为了修复这个这个问题,我们可以加上重试逻辑。在某个消息处理出现异常的时候,要想既不阻塞后续消息处理,又不丢失这条消息,可以将处理异常的消息发送到一个 retry topic,在 retry topic 里重新消费这个消息。比如 Uber Engineering 的实践3里采用了多级 retry topic 的方式,每级 retry 消息的消费有不同的延时以避免消息风暴。这种方式稍显复杂。
通常,每个消息的处理逻辑都应该保证是幂等的,因此可以多次消费同一条消息,这也是实现 Kafka “at least once” guarantee 的要求。在这种保证的前提下,重复消费一条消息不应产生异常,如果有异常,应该是服务或者网络故障,可能会自动恢复或者需要人工干预修复,这时即使消费下一条消息也同样会出错。因此我们可以一直重试消费失败的消息,直到故障得到修复消息处理成功,然后自动消费下一个消息。
下面的重试逻辑借助一个 buffered channel 循环消费失败消息直到成功:
func (s *Service) Worker(ctx context.Context) {
// 重试 channel
ch := make(chan kafka.Message, 1)
// 消息处理函数
do := func(m kafka.Message) error {
// 由消息反序列化得到实体
err, o := unmarshal(m)
if err != nil {
// 如果出现反序列化错误,不重试该消息
// 可以在这里添加日志记录逻辑或持久化到存储
return nil
}
err = s.Save(o)
return err
}
for {
select {
case <-ctx.Done():
s.reader.Close()
return
case m := <-ch:
// retry back-off
time.Sleep(3 * time.Second)
fmt.Printf("*** RETRY message at topic/partition/offset %v/%v/%v\n", m.Topic, m.Partition, m.Offset)
err := do(m)
if err != nil {
ch <- m
break
}
s.reader.CommitMessages(ctx, m)
default:
m, err := s.reader.FetchMessage(ctx)
if err != nil {
fmt.Printf("fetch messages error: %v\n", err)
break
}
err = do(m)
if err != nil {
ch <- m
break
}
s.reader.CommitMessages(ctx, m)
}
}
}
在上面的逻辑中,当一个消息处理出现异常,会向 buffered channel(len=1) 发送处理失败的消息,并 break select 结构。因为 buffered channel 此时有了数据,因此下一次 select 将执行重试逻辑。如果重试失败,那么继续向 buffered channel 写入该消息并开始下一次重试。如果重试成功,那么 buffered channel 就是空的(已被 select/case 读取),下一次 select 将执行 default:
case 里的代码,也就是正常消费逻辑。
可以在重试逻辑里加上合适的 back-off 策略,比如例子中是 sleep 3 秒。在消息处理函数 do()
中,可以控制哪些异常需要重试,哪些可以忽略,比如数据库连接错误肯定需要重试,而消息反序列化错误就应该忽略该消息,因为下一次重试该消息仍然不会成功。通过 do()
函数的返回值 error 是否为 nil 来控制是否重试消息。
更进一步,如果有多个类似消息消费逻辑,可以将循环逻辑抽出来成为一个通用 helper function,接收消息处理函数和对应的 kafka reader 作为参数:
const RETRY_BACKOFF = 3 * time.Second
// 通用 helper loop function
func WithWorker(ctx context.Context, reader *kafka.Reader, do func(context.Context, kafka.Message) error) {
// 重试 channel
ch := make(chan kafka.Message, 1)
for {
select {
case <-ctx.Done():
reader.Close()
return
case m := <-ch:
// retry back-off
time.Sleep(RETRY_BACKOFF)
fmt.Printf("*** RETRY message at topic/partition/offset %v/%v/%v\n", m.Topic, m.Partition, m.Offset)
err := do(m)
if err != nil {
ch <- m
break
}
reader.CommitMessages(ctx, m)
default:
m, err := reader.FetchMessage(ctx)
if err != nil {
fmt.Printf("fetch messages error: %v\n", err)
break
}
err = do(m)
if err != nil {
ch <- m
break
}
reader.CommitMessages(ctx, m)
}
}
}
// 消息处理函数
func (s *Service) worker(ctx context.Context, m kafka.Message) error {
// 由消息反序列化得到实体
err, o := unmarshal(m)
if err != nil {
// 如果出现反序列化错误,不重试该消息
// 可以在这里添加日志记录逻辑或持久化到存储
return nil
}
err = s.Save(ctx, o)
return err
}
然后这样调用:
func (s *Service) Run(ctx context.Context) {
go utils.WithWorker(ctx, s.cacheReader, s.cacheWorker)
go utils.WithWorker(ctx, s.indexReader, s.indexWorker)
}