便宜extremely.基本上,到NATS的连接是一个多路复用器:
- 它连接到nats服务器
- 它通过一个或多个订阅来表达其对主题的兴趣.
- 然后,该连接接收新消息,并将它们发送给您设置的适当使用者.
我创建了一个测试设置,其中我能够在一台机器(我的笔记本电脑)上拥有1500个消费者,包括一个NATS集群和一个生产者,整个设置仍然能够发送和接收数百万个"Hello, World!"
.实际上,我需要大幅扩大生产商的规模,甚至设法获得任何有意义的数据.
环境
文档文件
我们从下面的二进制文件中创建一个Docker镜像.
这是相当标准的.
FROM golang:1.22-alpine3.19 as builder
ARG BINARY_NAME="producer"
WORKDIR /tmp/cmds
COPY . .
RUN go build -o ${BINARY_NAME} ./cmds/${BINARY_NAME}
FROM alpine:3.19
ARG BINARY_NAME="producer"
ENV BINARY_NAME=${BINARY_NAME}
COPY --from=builder /tmp/cmds/${BINARY_NAME} /usr/local/bin/${BINARY_NAME}
Docker-compose.yaml
version: "3.8"
services:
# A nats cluster with 3 nodes
nats:
image: nats:2.1.9
command:
- "--debug"
- "--cluster"
- "nats://0.0.0.0:6222"
- "--routes"
# Note that this needs to be prefixed with the
# name of the directory that the docker-compose file is in.
# In my case it's "nats-consumers-78214263" (a mnemonic and question ID)
- "nats://nats-consumers-78214263-nats-1:6222"
hostname: nats
deploy:
replicas: 3
# The producer
# You can scale this up via `docker compose scale producer=n`
# to see how the consumers handle the load
producer:
deploy:
replicas: 1
build:
context: .
args:
- BINARY_NAME=producer
command: ["/usr/local/bin/producer"]
environment:
- PRODUCER_NATS_URL=nats://nats:4222
- PRODUCER_PRODUCERS=1
# The consumer
# You can scale this up via `docker compose scale consumer=n`
# to see how the consumers handle the load
consumer:
deploy:
replicas: 1
build:
context: .
args:
- BINARY_NAME=consumer
command: ["/usr/local/bin/consumer"]
environment:
- CONSUMER_NATS_URL=nats://nats:4222
- CONSUMER_TOPIC=test.>
- CONSUMER_CONSUMERS=15000
的服务
制片人
package main
import (
"context"
"fmt"
"net/url"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/alecthomas/kong"
"github.com/nats-io/nats.go"
)
var producercfg struct {
NatsURL url.URL `kong:"name='nats-url',help='NATS server URL',default='nats://nats:4222'"`
制片人s int `kong:"name='producers',help='Number of producers to start',default='1'"`
}
func main() {
ctx := kong.Parse(&producercfg, kong.DefaultEnvars("PRODUCER"))
// Run the configured number of producers in goroutines
// Note that all producers share the same NATS connection
// Each producer sends a messsage every 100ms
nc, err := nats.Connect(producercfg.NatsURL.String())
ctx.FatalIfErrorf(err, "Could not connect to NATS server: %s", producercfg.NatsURL.String())
defer nc.Close()
// Handle SIGINT and SIGTERM to shut down gracefully
// We use a context here because that makes it easy for us to shut down
// all goroutines in one fell swoop, but gracefully so.
sigs, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
var wg sync.WaitGroup
var sent atomic.Int64
for i := 0; i < producercfg.制片人s; i++ {
wg.Add(1)
go func(producerContext context.Context, conn *nats.Conn, id int) {
ctx.Printf("Starting publisher to %s", fmt.Sprintf("test.%d", id))
defer wg.Done()
for {
// We have...
select {
// either received a signal to shut down...
case <-producerContext.Done():
ctx.Printf("制片人 %d shutting down", id)
// ... so we return from the goroutine.
return
default:
// or we send a message.
sent.Add(1)
err := conn.Publish(fmt.Sprintf("test.%d", id), []byte("Hello, World!"))
ctx.FatalIfErrorf(err, "Could not publish message: %s", err)
}
}
}(sigs, nc, i)
}
tick := time.NewTicker(time.Second)
evt:
for {
// Either we receive a signal to shut down...
select {
case <-sigs.Done():
cancel()
break evt
// ... or we print out the number of messages sent so far.
case <-tick.C:
ctx.Printf("Sent %d messages", sent.Load())
}
}
ctx.Printf("Received signal, shutting down producers...")
wg.Wait()
ctx.Printf("All producers shut down. Exiting.")
}
请注意,您可能需要扩展生产者以获得有意义的数据.
消费者
而且,相当标准:
package main
import (
"context"
"net/url"
"os/signal"
"sync/atomic"
"syscall"
"time"
"github.com/alecthomas/kong"
"github.com/nats-io/nats.go"
)
var consumercfg struct {
NatsURL url.URL `kong:"name='nats-url',help='NATS server URL',default='nats://nats:4222'"`
// Note that with this topic, ALL consumers we create here
// will receive ALL messages by ALL producers.
Topic string `kong:"name='topic',help='NATS topic to subscribe to',default='test.>'"`
消费者s int `kong:"name='consumers',help='Number of consumers to start',default='1'"`
}
func main() {
ctx := kong.Parse(&consumercfg, kong.DefaultEnvars("CONSUMER"))
ctx.Printf("Starting consumer on %s, subscribing to %s", consumercfg.NatsURL.String(), consumercfg.Topic)
nc, err := nats.Connect(consumercfg.NatsURL.String())
ctx.FatalIfErrorf(err, "Could not connect to NATS server: %s", consumercfg.NatsURL.String())
// Run the configured number of consumers in goroutines
// Note that all consumers share the same NATS connection
// Each consumer subscribes to the configured topic
// and counts the number of messages received, printing them out every second.
// The consumers will stop when SIGINT or SIGTERM are received.
sigs, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
for i := 0; i < consumercfg.消费者s; i++ {
go func(sigs context.Context, conn *nats.Conn, topic string, id int) {
count := atomic.Int64{}
// We use the same connection!
sub, err := conn.Subscribe(topic, func(msg *nats.Msg) {
// Callback for processing a new message.
count.Add(1)
})
ctx.FatalIfErrorf(err, "Could not subscribe to topic %s: %s", topic, err)
defer sub.Unsubscribe()
tick := time.NewTicker(time.Second)
for {
select {
case <-sigs.Done():
ctx.Printf("Received shutdown signal.")
ctx.Printf("Final result: received %d messages", count.Load())
return
case <-tick.C:
ctx.Printf("%6d Received %d messages", id, count.Load())
}
}
}(sigs, nc, consumercfg.Topic, i)
}
<-sigs.Done()
}
如果感兴趣,我可以在GitHub上发布代码和内容.