我正在构建一个Go应用程序,它从Kafka主题读取消息,并将它们写入PostgreSQL数据库.
我已经建立了一个循环,使用kafka.Reader从Kafka读取消息,并使用sql.DB将它们插入数据库.如果读取消息或将其插入数据库时出错,我会记录错误并继续下一条消息.
然而,我不确定如何处理在将数据插入到PostgreSQL数据库后提交Kafka消息时发生的错误.具体地说,如果手动提交导致错误,我应该怎么办?我是否应该重试提交操作?我应该记录错误并继续下一条消息吗?处理这些类型的错误的最佳实践是什么?
for {
kafkaMessage, err := kafkaReader.ReadMessage(context.Background())
if err != nil {
fmt.Printf("Failed to read message from Kafka: %s\n", err)
continue
}
_, err = db.Exec("INSERT INTO mytable (payload) VALUES ($1)", kafkaMessage.Value)
if err != nil {
fmt.Printf("Failed to insert payload into database: %s\n", err)
continue
}
// What should I do if the commit operation fails?
err = kafkaReader.CommitMessages(context.Background(), kafkaMessage)
if err != nil {
// What's the best practice for handling this error?
}
}