我有一个PCollection,我需要从其中 Select n个最大的行.我正在try 创建一个数据流管道使用GO和卡住这个.

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

type User struct {
    Name string
    Age  int
}

func printRow(ctx context.Context, list User) {
    fmt.Println(list)
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var userList = []User{
        {"Bob", 5},
        {"Adam", 8},
        {"John", 3},
        {"Ben", 1},
        {"Jose", 1},
        {"Bryan", 1},
        {"Kim", 1},
        {"Tim", 1},
    }
    initial := beam.CreateList(s, userList)

    pc2 := beam.ParDo(s, func(row User, emit func(User)) {
        emit(row)
    }, initial)

    beam.ParDo0(s, printRow, pc2)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}

从上面的代码中,我需要根据User.Age Select 前5行 我发现有一个函数的链接top package做了同样的事情,但它说它返回单个元素PCollection.这有什么不同?

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.RegisterFunction(less)
}

type User struct {
    Name string
    Age  int
}

func printRow(ctx context.Context, list User) {
    fmt.Println(list)
}

func less(a, b User) bool {
    return a.Age < b.Age
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var userList = []User{
        {"Bob", 5},
        {"Adam", 8},
        {"John", 3},
        {"Ben", 1},
        {"Jose", 1},
        {"Bryan", 1},
        {"Kim", 1},
        {"Tim", 1},
    }
    initial := beam.CreateList(s, userList)

    best := top.Largest(s, initial, 5, less)

    pc2 := beam.ParDo(s, func(row User, emit func(User)) {
        emit(row)
    }, best)

    beam.ParDo0(s, printRow, pc2)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}

我像上面一样添加了 Select 前5行的函数,但得到错误[]main.User is not assignable to main.User

我需要与以前相同格式的PCollection,因为我有进一步的处理要做.我怀疑这是因为top.Large函数返回的是一个单元素PCollection.对如何转换格式有什么建议吗?

推荐答案

最佳PCollection是[]用户

所以试着...

pc2 := beam.ParDo(s, func(rows []User, emit func(User)) {
    for _, row := range rows {
        emit(row)
    }
}, best)

Go相关问答推荐

Golang应用程序:所请求的资源上不存在HTTP-Control-Allow-Origin标头

如何在gofr发起的服务间调用请求中添加Authorization Header?

如何解析Go-Gin多部分请求中的 struct 切片

go aws-lambda 与 terraform 中的 exec 格式错误

如果第一次匹配条件,如何跳过切片中的值

如何从 nil 指针创建值

nixOS 上的 Nginx 反向代理在try 加载 css/js 时返回 404

用于提取 <*n 的正则表达式(其中 n 是一个数字)

当填充通道的函数调用未嵌入 goroutine 时,为什么我会遇到死锁?

在本地 go 应用程序上获取秘密的正确策略

MQTT 客户端没有收到另一个客户端发送的消息

Gremlin-Go:树步骤不可序列化

将值发送到 Channel 并在就绪时读取输出

函数实现接口时的模式名称是什么?

emersion/go-imap - imap.FetchRFC822:无效内存地址或零指针取消引用

致命错误:找不到由 zergon321/reisen 引起的libavcodec/avcodec.h文件

将 []float64 像素切片转换为图像

出于某种原因,Golang (Go) AES CBC 密文被填充了 16 个 0x00 字节

实现接口的指针的泛型类型是什么?

如何断言类型是指向golang中接口的指针