我正在try 读取来自PubSub的消息,然后写入数据流中的BigQuery表.然而,通过使用直接流道,我遇到了"无根单位"的错误.

以下是我的代码;

package main

import (
    "context"
    "encoding/json"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)


type DummyBody struct {
        TaskId string `json:"id" bigquery:"id"`
    }


func buildPipeline(s beam.Scope) {
    rawDummyBodies := pubsubio.Read(s, "project", "topic", &pubsubio.ReadOptions{Subscription: "sub.ID"})

    dummyBodies := beam.ParDo(s, func(ctx context.Context, data []byte) (DummyBody, error) {
        var body DummyBody
        if err := json.Unmarshal(data, &body); err != nil {
            log.Error(ctx, err)
            fmt.Println("Error")
            return body, err
        }
        fmt.Println("No Error")
        return body, nil
    }, rawDummyBodies)

    debug.Printf(s, "Task : %#v", dummyBodies)

    bigqueryio.Write(s, "project", "table", dummyBodies)
}

func main() {
    flag.Parse()
    beam.Init()

    p, s := beam.NewPipelineWithRoot()
    buildPipeline(s)

    ctx := context.Background()
    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute pipeline: %v", err)
    }
}

管道开始使用直接运行器执行,但由于没有根单元而失败.

2022/11/01 14:29:55无法执行管道:转换失败 由以下原因引起: 无根单位 退出状态%1

推荐答案

目前实施的pubsubio only works on Dataflow Runner.

Go相关问答推荐

调用API时使用nginx作为反向代理时从nginx获取502坏网关

如何在AWS SDK Go v2 STS上正确使用重试

如何预编译Golang标准库?

Docker Compose Health Check未退出,错误为无法启动

如何使用中间件更改http请求的响应代码?

如何在v2 Go SDK中使用KeyConditionExpression查询AWS DynamoDb?

JetBrains Goland,禁用突出显示测试文件

如何配置vscode以在Go中显示不必要的(过度指定的)泛型?

该文件位于模块.内,该模块不包含在您的工作区中

如何使用泛型将接口转换为指定类型

GRPC 元数据未在 Go 中更新

同时调用应该只获取一次数据

Golang prometheus 显示自定义指标

将接口方法的参数限制为几个允许的 struct ?

函数调用中的类型参数panic

如何使用golang操作很长的字符串以避免内存不足

在 Golang 中获取谷歌云服务帐户的访问令牌?

为什么 go.mod 中的所有依赖都是间接的?

如何获得Ent中数字列的总和

函数参数的判断顺序是什么?