用这个扯掉我的头发...是否有人能够将 node 生成的Socket.IO个"工作"进程扩展到多个.js的cluster模块?

假设我有以下four个工作进程(伪):

// on the server
var express = require('express');
var server = express();
var socket = require('socket.io');
var io = socket.listen(server);

// socket.io
io.set('store', new socket.RedisStore);

// set-up connections...
io.sockets.on('connection', function(socket) {

  socket.on('join', function(rooms) {
    rooms.forEach(function(room) {
      socket.join(room);
    });
  });

  socket.on('leave', function(rooms) {
    rooms.forEach(function(room) {
      socket.leave(room);
    });
  });

});

// Emit a message every second
function send() {
  io.sockets.in('room').emit('data', 'howdy');
}

setInterval(send, 1000);

在浏览器上...

// on the client
socket = io.connect();
socket.emit('join', ['room']);

socket.on('data', function(data){
  console.log(data);
});

每秒The problem:条,我收到four条消息,因为有四个单独的工作进程发送消息.

如何确保邮件只发送一次?

推荐答案

Edit:英寸插座.IO 1.0+,而不是使用多个Redis客户端设置存储,现在可以使用更简单的Redis适配器模块.

var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));

下面显示的示例更像这样:

var cluster = require('cluster');
var os = require('os');

if (cluster.isMaster) {
  // we create a HTTP server, but we do not use listen
  // that way, we have a socket.io server that doesn't accept connections
  var server = require('http').createServer();
  var io = require('socket.io').listen(server);
  var redis = require('socket.io-redis');

  io.adapter(redis({ host: 'localhost', port: 6379 }));

  setInterval(function() {
    // all workers will receive this in Redis, and emit
    io.emit('data', 'payload');
  }, 1000);

  for (var i = 0; i < os.cpus().length; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  }); 
}

if (cluster.isWorker) {
  var express = require('express');
  var app = express();

  var http = require('http');
  var server = http.createServer(app);
  var io = require('socket.io').listen(server);
  var redis = require('socket.io-redis');

  io.adapter(redis({ host: 'localhost', port: 6379 }));
  io.on('connection', function(socket) {
    socket.emit('data', 'connected to worker: ' + cluster.worker.id);
  });

  app.listen(80);
}

如果主 node 需要发布到其他套接字.IO进程,但本身不接受套接字连接,使用socket.io-emitter而不是socket.io-redis.

如果在扩展方面遇到困难,请使用DEBUG=*运行 node 应用程序.插座IO现在实现了debug,它还将打印出Redis适配器调试消息.输出示例:

socket.io:server initializing namespace / +0ms
socket.io:server creating engine.io instance with opts {"path":"/socket.io"} +2ms
socket.io:server attaching client serving req handler +2ms
socket.io-parser encoding packet {"type":2,"data":["event","payload"],"nsp":"/"} +0ms
socket.io-parser encoded {"type":2,"data":["event","payload"],"nsp":"/"} as 2["event","payload"] +1ms
socket.io-redis ignore same uid +0ms

如果您的主进程和子进程都显示相同的解析器消息,那么您的应用程序正在正确地扩展.


如果你是从一个工人身上emits ,你的设置应该不会有问题.您所做的是从所有四个Worker发出消息,由于Redis publish/subscribe,消息不会重复,而是像您要求应用程序那样写入四次.以下是Redis的简单功能图:

Client  <--  Worker 1 emit -->  Redis
Client  <--  Worker 2  <----------|
Client  <--  Worker 3  <----------|
Client  <--  Worker 4  <----------|

正如您所看到的,当您从一个worker发出时,它会将emit发布到Redis,并从其他已订阅Redis数据库的worker镜像.这也意味着您可以使用连接到同一实例的多个套接字服务器,并且一个服务器上的emit将在所有连接的服务器上触发.

使用集群,当客户端连接时,它将连接到四个工作进程中的一个,而不是全部四个.这也意味着,您从该worker发出的任何内容只会向客户端显示一次.是的,应用程序正在扩展,但你这样做的方式是,你从所有四个工人身上发出,Redis数据库使它看起来就像你在一个工人身上四次调用它一样.如果一个客户端实际连接到您的所有四个套接字实例,那么它们每秒将接收十六条消息,而不是四条.

套接字处理的类型取决于将要使用的应用程序的类型.如果要单独处理客户机,那么应该没有问题,因 for each 客户机只会触发一个worker的连接事件.如果需要全局"心跳",那么可以在主进程中使用套接字处理程序.由于工作进程在主进程死亡时死亡,因此应该抵消主进程的连接负载,并让子进程处理连接.下面是一个例子:

var cluster = require('cluster');
var os = require('os');

if (cluster.isMaster) {
  // we create a HTTP server, but we do not use listen
  // that way, we have a socket.io server that doesn't accept connections
  var server = require('http').createServer();
  var io = require('socket.io').listen(server);

  var RedisStore = require('socket.io/lib/stores/redis');
  var redis = require('socket.io/node_modules/redis');

  io.set('store', new RedisStore({
    redisPub: redis.createClient(),
    redisSub: redis.createClient(),
    redisClient: redis.createClient()
  }));

  setInterval(function() {
    // all workers will receive this in Redis, and emit
    io.sockets.emit('data', 'payload');
  }, 1000);

  for (var i = 0; i < os.cpus().length; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  }); 
}

if (cluster.isWorker) {
  var express = require('express');
  var app = express();

  var http = require('http');
  var server = http.createServer(app);
  var io = require('socket.io').listen(server);

  var RedisStore = require('socket.io/lib/stores/redis');
  var redis = require('socket.io/node_modules/redis');

  io.set('store', new RedisStore({
    redisPub: redis.createClient(),
    redisSub: redis.createClient(),
    redisClient: redis.createClient()
  }));

  io.sockets.on('connection', function(socket) {
    socket.emit('data', 'connected to worker: ' + cluster.worker.id);
  });

  app.listen(80);
}

在这个例子中,有五个插座.IO实例,一个是主,四个是子.主服务器从不调用listen(),因此该进程没有连接开销.但是,如果在主进程上调用emit,它将被发布到Redis,四个工作进程将在其客户机上执行emit.这会将连接负载补偿给工作程序,如果工作程序死亡,主应用程序中的主应用程序逻辑将不会被触动.

请注意,使用Redis时,所有emits ,即使是在命名空间或房间中,也将由其他工作进程处理,就像您从该进程触发emits 一样.换句话说,如果你有两个插座.具有一个Redis实例的IO实例,在第一个worker中的套接字上调用emit()将向其客户端发送数据,而worker 2将执行与从该worker调用emit相同的操作.

Node.js相关问答推荐

try 使用Puppeteer抓取Twitter时数组空

使用prisma迁移到SQL失败

如何在node.js中以随机顺序调用函数并按顺序操作

NX无法使用缓存运行根级脚本

如何将信号从终端窗口发送到运行在Raspberry Pi上的Puppeteer/Node.js上的webscraper

MongoDB上的updateOne和findOneAndUpdate在Node.js中无法正常工作

条件内的表达式

在对象数组中的数组中嵌套 $lookup - Mongodb

只要我在后端正确验证所有内容,就可以将这些数据存储在本地存储中吗?

firebase/messaging 不提供名为 getToken 的导出

如何从哪个应用程序用户中找到在 Firebase 身份验证中进行身份验证的用户

如何在 NestJS 中使用外部生成的 swagger.json?

nuxt:在 docker 镜像中找不到

将已保存的卡片从条带显示到前端

Ansible 将以什么用户身份运行我的命令?

如何从 Redis 保存和检索会话

如何在 Joi 字符串验证中使用枚举值

为什么 Node 控制台不显示功能代码?

遍历 NodeJS 中的一系列日期

Socket.IO 连接用户数