有没有使用Go执行2个PCollection的左联接的简单方法? 我看到SQL联接只在Java中可用.

package main

import (
    "context"
    "flag"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

type customer struct {
    CustID int
    FName  string
}

type order struct {
    OrderID int
    Amount  int
    Cust_ID int
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var custList = []customer{
        {1, "Bob"},
        {2, "Adam"},
        {3, "John"},
        {4, "Ben"},
        {5, "Jose"},
        {6, "Bryan"},
        {7, "Kim"},
        {8, "Tim"},
    }

    var orderList = []order{
        {123, 100, 1},
        {125, 30, 3},
        {128, 50, 7},
    }

    custPCol := beam.CreateList(s, custList)

    orderPCol := beam.CreateList(s, orderList)

    // Left Join custPcol with orderPCol
    // Expected Result
    // CustID | FName   |OrderID| Amount
    //     1  | Bob     |   123 | 100
    //     2  | Adam    |       |
    //     3  | John    |   125 | 100
    //     4  | Ben     |       |
    //     5  | Jose    |       |
    //     6  | Bryan   |       |
    //     7  | Kim     |   125 | 100
    //     8  | Tim     |       |

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}

我想加入这2个PCollect并执行进一步的操作.我看到了关于CoGroupByKey的文档,但无法将其转换为普通SQL连接所能做的格式.

对此有何建议?

推荐答案

试着这样做

type resultType struct {
    CustID  int
    FName   string
    OrderID int
    Amount  int
}

result := beam.ParDo(s, func(c customer, iterOrder func(*order) bool) resultType {
    var o order

    for iterOrder(&o) {
        if c.CustID == o.Cust_ID {
            return resultType{
                CustID:  c.CustID,
                FName:   c.FName,
                OrderID: o.OrderID,
                Amount:  o.Amount,
            }
        }
    }

    return resultType{
        CustID: c.CustID,
        FName:  c.FName,
    }
}, custPCol, beam.SideInput{Input: orderPCol})

或者如果您想使用CoGroupByKey...

custWithKeyPCol := beam.ParDo(s, func(c customer) (int, customer) {
    return c.CustID, c
}, custPCol)

orderWithKeyPCol := beam.ParDo(s, func(o order) (int, order) {
    return o.Cust_ID, o
}, orderPCol)

resultPCol := beam.CoGroupByKey(s, custWithKeyPCol, orderWithKeyPCol)

beam.ParDo0(s, func(CustID int, custIter func(*customer) bool, orderIter func(*order) bool) {
    c, o := customer{}, order{}
    for custIter(&c) {
        if ok := orderIter(&o); ok {
            fmt.Println(CustID, c.FName, o.OrderID, o.Amount)
        }
        fmt.Println(CustID, c.FName)
    }
}, resultPCol)

Go相关问答推荐

使用ciph.AEAD.Seal()查看内存使用情况

如何将文件从AWS S3存储桶复制到Azure BLOB存储

无法在32位计算机上运行Golang应用程序

JWT 如何解析声明有效性和错误?

Redis:尽管数据存在,但 rdb.Pipelined 中出现redis:nil错误

有没有办法让sqlc生成可以使用pgxpool的代码

优化方式中所有可能组合的字符串相似度

生成一个 CSV/Excel,在 Golang 中该列的下拉选项中指定值

Go test "-run -" 标志执行测试更快

将 firestoreinteger_value转换为整数

Golang 创建一个带有处理程序的模拟数据库并使用接口调用数据库

errors.Wrap 和 errors.WithMessage 有什么区别

我在 go 中制作的递归函数有什么问题?

具有两个或多个模型的 GORM 查询

Dockerfile 问题 - 为什么找不到二进制 dlv - 没有这样的文件或目录

Golang grpc go.mod 问题

函数调用中的类型参数panic

如何从 Go 1.18 中的单个方法返回两种不同的具体类型?

实现接口的指针的泛型类型是什么?

Golang 泛型