我正在构建一个Go应用程序,它从Kafka主题读取消息,并将它们写入PostgreSQL数据库.

我已经建立了一个循环,使用kafka.Reader从Kafka读取消息,并使用sql.DB将它们插入数据库.如果读取消息或将其插入数据库时出错,我会记录错误并继续下一条消息.

然而,我不确定如何处理在将数据插入到PostgreSQL数据库后提交Kafka消息时发生的错误.具体地说,如果手动提交导致错误,我应该怎么办?我是否应该重试提交操作?我应该记录错误并继续下一条消息吗?处理这些类型的错误的最佳实践是什么?

for {
        kafkaMessage, err := kafkaReader.ReadMessage(context.Background())
        if err != nil {
            fmt.Printf("Failed to read message from Kafka: %s\n", err)
            continue
        }

        _, err = db.Exec("INSERT INTO mytable (payload) VALUES ($1)", kafkaMessage.Value)
        if err != nil {
            fmt.Printf("Failed to insert payload into database: %s\n", err)
            continue
        }

        // What should I do if the commit operation fails?
        err = kafkaReader.CommitMessages(context.Background(), kafkaMessage)
        if err != nil {
            // What's the best practice for handling this error?
        }
    }

推荐答案

当面临错误时,只需continuefor循环的下一次迭代.

如果由于任何原因未能提交Kafka消息,Kafka将在接下来的reader.ReadMessage(ctx)个月内再次返回相同的消息.

但是,为了确保您的代码does not继续多次重复相同的失败工作,耗尽您的资源,用相同的错误消息淹没日志(log)等,请在每次错误后使用简单的sleep,或者如果确实需要的话,可以使用断路器逻辑来实现您的功能.

if err != nil {
   log.Errorf("...", ...)
   time.Sleep(5 * time.Second)
   continue
}

Postgresql相关问答推荐

Org.postgresql.util.PSQLException:错误:函数LOWER(BYTEA)不存在

多克.波斯格雷斯.PgAdmin4

如何在postgres中测试锁定

错误:用户需要系统密码:postgres

EF Core和npgsql连接池已被耗尽

至少 pgRouting 的某些部分可以与并行查询并行运行吗?

PostgreSql maintenance_work_mem 在索引创建期间增加

PL/pgSQL 中 PL/SQL %ISOPEN 的类似功能是什么?

我应该 Select 哪种数据类型?

将一列与另一列的每个元素进行比较,该列是一个数组

在 jOOQ 中 Select 相同的命名列

无法从在 wsl 2 上运行的服务之一连接到在 wsl 2 上的容器中运行的 postgres 数据库

Postgres 根据自己的估计 Select 一个更费时的查询计划

PostgreSQL unnest 与空数组

Django 1.8 数组字段追加和扩展

在执行 postgresql 函数时提交事务

Postgres 中的相似函数与 pg_trgm

错误:CASE types character varying and numeric cannot be matched

在同一台机器上创建多个 Postgres 实例

PostgreSQL 9.1:如何连接数组中的行而不重复,加入另一个表