我正在使用libhttps://github.com/confluentinc/confluent-kafka-go的v2从多个分区消费我的主题.

当我启动我的消费者时,看起来很好,但出于业务原因,我需要在特定时间段内停止消费者一段时间,但我的暂停方法似乎不起作用.

我怀疑这是因为分区,但不确定.

这是我的适配器:

const (
    saslMechanismSha512 = "SCRAM-SHA-512"

    sessionTimeoutInMs = 180000 // => 3 minutes
)

type KafkaAdapter struct {
    Consumer *kafka.Consumer
}

func NewKafkaAdapter(ctx context.Context, consumer *kafka.Consumer, topic string) (*KafkaAdapter, error) {
    err := consumer.Subscribe(topic, nil)
    if err != nil {
        return nil, fmt.Errorf("error subscribing to topic %s: %v", topic, err)
    }

    return &KafkaAdapter{
        Consumer: consumer,
    }, nil
}

func (k *KafkaAdapter) Consume(ctx context.Context) (*port.Message, error) {
    select {
    case <-ctx.Done():
        return nil, context.Canceled
    default:
        message, err := k.Consumer.ReadMessage(-1) // -1 keeps undefined timeout while seeking for new messages
        if err != nil {
            return nil, err
        }

        headers := getMessageHeaders(message.Headers)

        streamName := getStreamName(headers)

        return &port.Message{
            Value:     message.Value,
            Key:       message.Key,
            Headers:   headers,
            Stream:    streamName,
            Timestamp: message.Timestamp,
            Offset:    int64(message.TopicPartition.Offset),
        }, nil
    }
}

func (k *KafkaAdapter) CommitMessage(ctx context.Context) error {
    _, err := k.Consumer.Commit()

    return err
}

func (k *KafkaAdapter) Unsubscribe(ctx context.Context) {
    k.Consumer.Unsubscribe()
}

func SetupKafkaConsumer(ctx context.Context, topic item.Topic) (*kafka.Consumer, error) {
    consumerConfig := &kafka.ConfigMap{
        "bootstrap.servers":  strings.Join(topic.Endpoints, ","),
        "group.id":           topic.Name,
        "session.timeout.ms": sessionTimeoutInMs,
        "enable.auto.commit": false,
    }

    if topic.User != "" && topic.Password != "" {
        consumerConfig.SetKey("sasl.username", topic.User)
        consumerConfig.SetKey("sasl.password", topic.Password)
        consumerConfig.SetKey("security.protocol", "SASL_SSL")
        consumerConfig.SetKey("sasl.mechanism", saslMechanismSha512)
        consumerConfig.SetKey("sasl.mechanisms", saslMechanismSha512)
    }

    consumer, err := kafka.NewConsumer(consumerConfig)
    if err != nil {
        log.Fatalf("error creating Kafka consumer: %v", err)
        return nil, err
    }

    return consumer, nil
}

func getMessageHeaders(messageHeaders []kafka.Header) []port.MessageHeader {
    var headers []port.MessageHeader
    for _, kafkaHeader := range messageHeaders {
        header := port.MessageHeader{
            Key:   string(kafkaHeader.Key),
            Value: kafkaHeader.Value,
        }
        headers = append(headers, header)
    }

    return headers
}

func getStreamName(headers []port.MessageHeader) string {
    var streamName string
    for _, header := range headers {
        if header.Key == "sn" {
            streamName = string(header.Value)
            break
        }
    }

    return streamName
}

这是我的Main.Go文件:


const (
    saslMechanismSha512 = "SCRAM-SHA-512"
)

var (
    topicExample = "topic-example"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    appConfig, err := mdmcore.GetConfigurations("./config/config.toml")
    if err != nil {
        log.Fatalf("Error getting configurations: %v", err)
    }

    topicConfiguration := item.ReadTopic(appConfig, topicExample, false) // false = will get reader configuration

    consumer, err := adapter.SetupKafkaConsumer(ctx, topicConfiguration)
    if err != nil {
        log.Fatalf("error creating Kafka consumer: %v", err)
    }
    defer consumer.Close()

    topicPartition := kafka.TopicPartition{
        Topic:     &topicExample,
        Offset:    kafka.OffsetStored,
        Partition: kafka.PartitionAny,
    }

    err = consumer.Assign([]kafka.TopicPartition{topicPartition})
    if err != nil {
        panic(fmt.Sprintf("error assigning topic/partitions: %v", err))
    }

    kafkaReader, err := adapter.NewKafkaAdapter(ctx, consumer, topicExample)
    if err != nil {
        log.Fatalf("error creating Kafka adapter: %v", err)
    }

    repo, err := bootstrap.NewRepositories(appConfig)
    if err != nil {
        log.Fatalf("error creating a repository: %v", err)


    }

    dataManager := models.NewGormDataManager(repo.Db)

    messageService := service.NewMessageService(kafkaReader, dataManager)

    signalChannel := make(chan os.Signal, 1)
    signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)

    go func() {
        <-signalChannel
        cancel()
    }()

    c := cron.New()
    c.AddFunc("22 18 * * *", func() {
        // I need to pause during a specific time
        consumer.Pause([]kafka.TopicPartition{topicPartition})
    })

    c.AddFunc("28 18 * * *", func() {
        // And then, resume it when needed
        consumer.Resume([]kafka.TopicPartition{topicPartition})
    })
    c.Start()

    messageService.StartConsuming(ctx, topicExample)

    <-ctx.Done()
}

我用了consumer.Pause([]kafka.TopicPartition{topicPartition}),但没有效果.

而且我也不确定我的客户是连接到所有分区还是只连接到一个分区.

推荐答案

我想出了一个办法让它发挥作用.

我把Assign方法改成了Subscribe方法,这样我就不用担心Kafka分区均衡了.

而且,我没有使用方法.Pause.Resume,而是使用了SubscribeUnsubscribe方法.

大概是这样的:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    appConfig, err := mdmcore.GetConfigurations("./config/config.toml")
    if err != nil {
        log.Fatalf("Error getting configurations: %v", err)
    }

    topicConfiguration := item.ReadTopic(appConfig, someTopic, false) // false = will get reader configuration

    consumer, err := adapter.SetupKafkaConsumer(ctx, topicConfiguration)
    if err != nil {
        log.Fatalf("error creating Kafka consumer: %v", err)
    }
    defer consumer.Close()

    kafkaReader, err := adapter.NewKafkaAdapter(ctx, consumer, someTopic)
    if err != nil {
        log.Fatalf("error creating Kafka adapter: %v", err)
    }

    repo, err := bootstrap.NewRepositories(appConfig)
    if err != nil {
        log.Fatalf("error creating a repository for my-topic flow: %v", err)
    }

    dataManager := models.NewGormDataManager(repo.Db)

    messageService := service.NewMessageService(kafkaReader, dataManager)

    signalChannel := make(chan os.Signal, 1)
    signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)

    go func() {
        <-signalChannel
        cancel()
    }()

    c := cron.New()
    c.AddFunc("59 23 * * *", func() {
        messageService.Subscribe(ctx, someTopic)
    })

    c.AddFunc("30 2 * * *", func() {
        messageService.Unsubscribe(ctx)
    })
    c.Start()

    messageService.StartConsuming(ctx, someTopic)

    <-ctx.Done()
}

Go相关问答推荐

golang 的条件储存库

GetSecretValue,get identity:get credentials:无法刷新缓存的凭据

有没有办法通过Go cmdline或IDE(IntelliJ)找出我的 struct 实现了什么接口?

创建使用逗号而不是加号分隔OU的CSR

CURL和Postman HTTP POST工作,但Golang请求失败,状态为400

不接受来自 stdin 的重复输入

如何将Golang测试用例的测试覆盖率值与特定阈值进行比较

Secrets Manager Update Secret - Secret String 额外的 JSON 编码

Go Programming Language书上的例子server2错了吗?

加载 docker 镜像失败

缺少签名帮助文档

如何从 Asterisk Manager Interface Event 获取活动呼叫数

如何使用 Docker 引擎 SDK 和 Golang 运行 docker 挂载卷

如何将 npm 安装进度条通过管道传输到终端?

在 Go 中发送 ack 和 term 后消息仍在 nats 限制队列中

如何在没有内存分配的情况下压缩和发布文件

如何获取多个 url 参数值

Golang SSH客户端错误无法验证,try 的方法[无公钥],没有支持的方法

如何使用golang操作很长的字符串以避免内存不足

如何在 Windows 中使用 github.com/AllenDang/giu 和 github.com/gordonklaus/portaudio 构建 GO 程序