NATS中消费者的定义是

消费者是流的有状态视图.它作为接口, 客户端使用存储在流中的消息子集, 跟踪哪些邮件已发送和确认, 客户.

I'm more interested in understanding what exactly is a consumer under the hood ? Is it a process, a thread, some kind of socket i.e. a network entity etc ?

我正在设计一个使用NATS2.9的系统,所以一个消费者不能有多个主题过滤器.我想 for each 主题过滤器创建一个消费者,我担心在最坏的情况下创建大约10,000个消费者对性能资源的影响.&

Is a consumer an expensive resource to create in NATS ?

创建一个具有非常通用的主题过滤器的单一消费者是我正在考虑的一个替代方案,但它并不适合我的设计,因为几个go routine 将共享这一个全局消费者,而不是创建一个消费者每个go routine ,每个go routine 将消费和处理自己的主题过滤器的消息,从而确保了一种逻辑隔离,就我的设计而言.

推荐答案

便宜extremely.基本上,到NATS的连接是一个多路复用器:

  • 它连接到nats服务器
  • 它通过一个或多个订阅来表达其对主题的兴趣.
  • 然后,该连接接收新消息,并将它们发送给您设置的适当使用者.

我创建了一个测试设置,其中我能够在一台机器(我的笔记本电脑)上拥有1500个消费者,包括一个NATS集群和一个生产者,整个设置仍然能够发送和接收数百万个"Hello, World!".实际上,我需要大幅扩大生产商的规模,甚至设法获得任何有意义的数据.

环境

文档文件

我们从下面的二进制文件中创建一个Docker镜像. 这是相当标准的.

FROM golang:1.22-alpine3.19 as builder
ARG BINARY_NAME="producer"
WORKDIR /tmp/cmds
COPY . .
RUN go build -o ${BINARY_NAME} ./cmds/${BINARY_NAME}

FROM alpine:3.19
ARG BINARY_NAME="producer"
ENV BINARY_NAME=${BINARY_NAME}
COPY --from=builder /tmp/cmds/${BINARY_NAME} /usr/local/bin/${BINARY_NAME}

Docker-compose.yaml

version: "3.8"

services:
  # A nats cluster with 3 nodes
  nats:
    image: nats:2.1.9
    command:
      - "--debug"
      - "--cluster"
      - "nats://0.0.0.0:6222"
      - "--routes"
      # Note that this needs to be prefixed with the
      # name of the directory that the docker-compose file is in.
      # In my case it's "nats-consumers-78214263" (a mnemonic and question ID)
      - "nats://nats-consumers-78214263-nats-1:6222"
    hostname: nats
    deploy:
      replicas: 3

  # The producer
  # You can scale this up via `docker compose scale producer=n`
  # to see how the consumers handle the load
  producer:
    deploy:
      replicas: 1
    build:
      context: .
      args:
        - BINARY_NAME=producer
    command: ["/usr/local/bin/producer"]
    environment:
      - PRODUCER_NATS_URL=nats://nats:4222
      - PRODUCER_PRODUCERS=1
  
  # The consumer
  # You can scale this up via `docker compose scale consumer=n`
  # to see how the consumers handle the load
  consumer:
    deploy:
      replicas: 1
    build:
      context: .
      args:
        - BINARY_NAME=consumer
    command: ["/usr/local/bin/consumer"]
    environment:
      - CONSUMER_NATS_URL=nats://nats:4222
      - CONSUMER_TOPIC=test.>
      - CONSUMER_CONSUMERS=15000

的服务

制片人

package main

import (
    "context"
    "fmt"
    "net/url"
    "os/signal"
    "sync"
    "sync/atomic"
    "syscall"
    "time"

    "github.com/alecthomas/kong"
    "github.com/nats-io/nats.go"
)

var producercfg struct {
    NatsURL   url.URL `kong:"name='nats-url',help='NATS server URL',default='nats://nats:4222'"`
    制片人s int     `kong:"name='producers',help='Number of producers to start',default='1'"`
}

func main() {
    ctx := kong.Parse(&producercfg, kong.DefaultEnvars("PRODUCER"))

    // Run the configured number of producers in goroutines
    // Note that all producers share the same NATS connection
    // Each producer sends a messsage every 100ms

    nc, err := nats.Connect(producercfg.NatsURL.String())
    ctx.FatalIfErrorf(err, "Could not connect to NATS server: %s", producercfg.NatsURL.String())
    defer nc.Close()

    // Handle SIGINT and SIGTERM to shut down gracefully
    // We use a context here because that makes it easy for us to shut down
    // all goroutines in one fell swoop, but gracefully so.
    sigs, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer cancel()

    var wg sync.WaitGroup
    var sent atomic.Int64

    for i := 0; i < producercfg.制片人s; i++ {
        wg.Add(1)
        go func(producerContext context.Context, conn *nats.Conn, id int) {
            ctx.Printf("Starting publisher to %s", fmt.Sprintf("test.%d", id))
            defer wg.Done()

            for {
                // We have...
                select {
                // either received a signal to shut down...
                case <-producerContext.Done():
                    ctx.Printf("制片人 %d shutting down", id)
                    // ... so we return from the goroutine.
                    return
                default:
                    // or we send a message.
                    sent.Add(1)
                    err := conn.Publish(fmt.Sprintf("test.%d", id), []byte("Hello, World!"))
                    ctx.FatalIfErrorf(err, "Could not publish message: %s", err)
                }
            }
        }(sigs, nc, i)
    }

    tick := time.NewTicker(time.Second)

evt:
    for {
        // Either we receive a signal to shut down...
        select {
        case <-sigs.Done():
            cancel()
            break evt
        // ... or we print out the number of messages sent so far.
        case <-tick.C:
            ctx.Printf("Sent %d messages", sent.Load())
        }
    }
    ctx.Printf("Received signal, shutting down producers...")
    wg.Wait()
    ctx.Printf("All producers shut down. Exiting.")
}

请注意,您可能需要扩展生产者以获得有意义的数据.

消费者

而且,相当标准:

package main

import (
    "context"
    "net/url"
    "os/signal"
    "sync/atomic"
    "syscall"
    "time"

    "github.com/alecthomas/kong"
    "github.com/nats-io/nats.go"
)

var consumercfg struct {
    NatsURL   url.URL `kong:"name='nats-url',help='NATS server URL',default='nats://nats:4222'"`

    // Note that with this topic, ALL consumers we create here
    // will receive ALL messages by ALL producers. 
    Topic     string  `kong:"name='topic',help='NATS topic to subscribe to',default='test.>'"`
    消费者s int     `kong:"name='consumers',help='Number of consumers to start',default='1'"`
}

func main() {
    ctx := kong.Parse(&consumercfg, kong.DefaultEnvars("CONSUMER"))
    ctx.Printf("Starting consumer on %s, subscribing to %s", consumercfg.NatsURL.String(), consumercfg.Topic)

    nc, err := nats.Connect(consumercfg.NatsURL.String())
    ctx.FatalIfErrorf(err, "Could not connect to NATS server: %s", consumercfg.NatsURL.String())
    // Run the configured number of consumers in goroutines
    // Note that all consumers share the same NATS connection
    // Each consumer subscribes to the configured topic
    // and counts the number of messages received, printing them out every second.
    // The consumers will stop when SIGINT or SIGTERM are received.

    sigs, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer cancel()

    for i := 0; i < consumercfg.消费者s; i++ {
        go func(sigs context.Context, conn *nats.Conn, topic string, id int) {

            count := atomic.Int64{}

            // We use the same connection!
            sub, err := conn.Subscribe(topic, func(msg *nats.Msg) {
                // Callback for processing a new message.
                count.Add(1)
            })
            ctx.FatalIfErrorf(err, "Could not subscribe to topic %s: %s", topic, err)
            defer sub.Unsubscribe()

            tick := time.NewTicker(time.Second)
            for {
                select {
                case <-sigs.Done():
                    ctx.Printf("Received shutdown signal.")
                    ctx.Printf("Final result: received %d messages", count.Load())
                    return
                case <-tick.C:
                    ctx.Printf("%6d Received %d messages", id, count.Load())
                }
            }
        }(sigs, nc, consumercfg.Topic, i)
    }

    <-sigs.Done()
}

如果感兴趣,我可以在GitHub上发布代码和内容.

Go相关问答推荐

golang父进程的副本无法进行https/tls调用并获得tls:未能验证证书""

CGO Linux到Windows交叉编译中的未知类型名称

由于索引器压缩比限制,扫描包含golang包的图像时,伪影XRAY失败

ChromeDriver不存在(高朗selenium)

为什么Slices包中的函数定义Slice参数的类型参数?

在 go 中,将接收器 struct 从值更改为指针是否向后兼容?

Github Actions Go lambda 项目不同的 sha256sums

在 Go sync.Map 中,为什么这部分实现不一致或者我误解了什么?

在两个单独的速率受限端点之间同步请求

MQTT 客户端没有收到另一个客户端发送的消息

当函数返回一个函数时,为什么 Go 泛型会失败?

在 Golang 模板中计算时间/持续时间

go:embed 文件扩展名模式

Go 泛型:自引用接口约束

无法建立连接,因为目标机器主动拒绝它 Golang

Golang ACMEv2 HTTP-01 挑战不挑战服务器

如何在golang中使用ozzo验证进行时间最大验证

从 Go struct 中提取标签作为 reflect.Value

手动下载并放置一个 golang mod 文件

Go 泛型是否与 LINQ to Objects 等效?