我一直在try 在服务器流拦截器上设置元数据,以便它们可以被实际的RPC函数向下读取:

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    ss.SendHeader(metadata.New(map[string]string{"X-User-Id": "real_user_id"}))
    return handler(srv, ss)
}

func (server *Server) GetObjects(req *iam.GetObjectsRequest, client iam.Service_GetObjectsServer) error {
    ctx := client.Context()
    userID, ok := HeaderFromMetadata(ctx, "X-User-Id")

    log.Printf("User ID: %s, Ok: %t\n", userID, ok)
    return nil
}

func HeaderFromMetadata(ctx context.Context, headers ...string) (string, bool) {
    meta, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return "", false
    }

    for _, header := range headers {
        if value := meta.Get(header); len(value) > 0 {
            return value[0], true
        }
    }

    return "", false
}

我的服务器是这样注册的:

server := grpc.NewServer(
    grpc.StreamInterceptor(UserIDInterceptor))
RegisterIAMServer(server, NewServer())

我遇到的问题是找不到用户ID头.我可以看到,当客户端发送请求时,拦截器被调用,我可以看到元数据包含头,但实际的RPC似乎无法提取它.我到底做错了什么?

推荐答案

Update个个

更简单解决方案是只覆盖ServerStreamContext()方法

type serverStream struct {
    grpc.ServerStream
    ctx context.Context
}

func (s *serverStream) Context() context.Context {
    return s.ctx
}

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    md, ok := metadata.FromIncomingContext(ss.Context())
    if ok {
        md.Append("X-User-Id", "real_user_id")
    }
    newCtx := metadata.NewIncomingContext(ss.Context(), md)

    return handler(srv, &serverStream{ss, newCtx})
}

Update个个

另一个简单的解决方案是将一个包装器定义为grpc.ServerStream,如下所示

type serverStreamWrapper struct {
    ss  grpc.ServerStream
    ctx context.Context
}

func (w serverStreamWrapper) Context() context.Context        { return w.ctx }
func (w serverStreamWrapper) RecvMsg(msg interface{}) error   { return w.ss.RecvMsg(msg) }
func (w serverStreamWrapper) SendMsg(msg interface{}) error   { return w.ss.SendMsg(msg) }
func (w serverStreamWrapper) SendHeader(md metadata.MD) error { return w.ss.SendHeader(md) }
func (w serverStreamWrapper) SetHeader(md metadata.MD) error  { return w.ss.SetHeader(md) }
func (w serverStreamWrapper) SetTrailer(md metadata.MD)       { w.ss.SetTrailer(md) }

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    md, ok := metadata.FromIncomingContext(ss.Context())
    if ok {
        md.Append("X-User-Id", "real_user_id")
    }
    newCtx := metadata.NewIncomingContext(ss.Context(), md)

    return handler(srv, serverStreamWrapper{ss, newCtx})
}

您可以使用NewIncomingContext来创建流中当前上下文的副本.

由于没有方法来设置grpc.ServerStreamcontext,为了将上下文设置回ServerStream,wrappedStream被定义为context.Context,并且SetContext方法被设置为context.Context

type wrappedStream struct {
    grpc.ServerStream
    ctx context.Context
}

func (w *wrappedStream) SetContext(ctx context.Context) {
    w.ctx = ctx
}

完整的示例代码

type wrappedStream struct {
    grpc.ServerStream
    ctx context.Context
}

func (w *wrappedStream) Context() context.Context {
    return w.ctx
}

func (w *wrappedStream) SetContext(ctx context.Context) {
    w.ctx = ctx
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
    return w.ServerStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
    return w.ServerStream.SendMsg(m)
}

type StreamContextWrapper interface {
    grpc.ServerStream
    SetContext(context.Context)
}

func newStreamContextWrapper(ss grpc.ServerStream) StreamContextWrapper {
    ctx := ss.Context()
    return &wrappedStream{
        ss,
        ctx,
    }
}

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    md, ok := metadata.FromIncomingContext(ss.Context())
    if ok {
        md.Append("X-User-Id", "real_user_id")
    }
    newCtx := metadata.NewIncomingContext(ss.Context(), md)

    sw := newStreamContextWrapper(ss)
    sw.SetContext(newCtx)

    return handler(srv, sw)
}

Go相关问答推荐

Golang测试容器,无法使网络正常工作

使用Digitorus/pdfsign在GO(Golang)中签署pdf文件

Go Gin:验证 base64

正则表达式模式,确保至少一个字符与其他条件一起存在

Go 中的 protobuf FieldMask 解组

按位移计算结果中的差异

AWS Lambda 中的 Websocket URL 超时达到错误

如何以干净的方式在中间件中注入 repo 或服务?

缺少签名帮助文档

自定义 Fyne 自适应网格布局

Golang 中的泛型类型转换

为什么互斥量比 golang 中的通道慢?

SSH 代理,数据包长度错误

如何处理 Go 的 firebase admin sdk 错误?

在恒等函数中将类型 T 转换为类型 U

仅在工作日运行 cron

Go 泛型:自引用接口约束

如何从 Go 1.18 中的单个方法返回两种不同的具体类型?

为什么 go-cmp Equal() 说 struct 不是完全相等的,即使所有字段都非常相等?

在 Go 中,为什么 exec.Command() 失败但 os.StartProcess() 成功启动winget.exe?