stream是一个抽象接口,用于在Node.js中处理流数据.Node:Stream模块提供了实现流接口的API.

当我学习NodeJS streams时,我遇到了一些问题,我不确定流是否是消耗进程

例如,我有一个公共代码:

import { Readable } from 'stream'

class MyCustomReadableStream extends Readable {
  constructor() {
    super();
    this.data = ['data1', 'data2', 'data3'];
  }

  _read() {
    const chunk = this.data.shift();

    if (chunk) {
      this.push(chunk);
    } else {
      this.push(null);
    }
  }
}

不同代码处理stream有一些不同的情况


情景1:

import { createServer } from "http";

const customStream = new MyCustomReadableStream();

customStream.on('data', (chunk) => {
  console.log('Received chunk of data:', chunk.toString());
});

customStream.on('end', () => {
  console.log('Custom stream reading is complete.');
});


createServer().listen(0)

/*
// Output: 
Received chunk of data: data1
Received chunk of data: data2
Received chunk of data: data3
Custom stream reading is complete.
*/

情景2:

import { createServer } from "http";

const customStream = new MyCustomReadableStream();

setTimeout(() => {
    customStream.on('data', (chunk) => {
        console.log('1Received chunk of data:', chunk.toString());
    });
}, 1000)

customStream.on('end', () => {
  console.log('Custom stream reading is complete.');
});


createServer().listen(0)

/*
// output:
1Received chunk of data: data1
1Received chunk of data: data2
1Received chunk of data: data3
Custom stream reading is complete.
*/

情况3:

import { createServer } from "http";

const customStream = new MyCustomReadableStream();

customStream.on('data', (chunk) => {
  console.log('Received chunk of data:', chunk.toString());
});

setTimeout(() => {
    customStream.on('data', (chunk) => {
        console.log('1Received chunk of data:', chunk.toString());
    });
}, 1000)

customStream.on('end', () => {
  console.log('Custom stream reading is complete.');
});


createServer().listen(0)

/*
// output:
Received chunk of data: data1
Received chunk of data: data2
Received chunk of data: data3
Custom stream reading is complete.
*/

情景4:

import { createServer } from "http";

const customStream = new MyCustomReadableStream();

customStream.on('data', (chunk) => {
  console.log('Received chunk of data:', chunk.toString());
});

customStream.on('data', (chunk) => {
    console.log('1Received chunk of data:', chunk.toString());
});

customStream.on('end', () => {
  console.log('Custom stream reading is complete.');
});


createServer().listen(0)

/*
// output:
Received chunk of data: data1
1Received chunk of data: data1
Received chunk of data: data2
1Received chunk of data: data2
Received chunk of data: data3
1Received chunk of data: data3
Custom stream reading is complete.
Custom stream reading is complete.
*/

我不明白这四种形式的区别,流阅读是一个消费过程吗?

推荐答案

stream.Readable扩展了events.EventEmitter类.当为this.emit('data', chunk) is called时,事件emits 器将查找订阅的事件监听器(source code link).然后,它将使用给定的参数执行这些回调.查找回调和调用回调同步发生.

这意味着Readable不会等待其他data个活动订阅者.只要监听者订阅了data事件,它就会立即call its resume method(更多信息请参见:Readable streams -> Two reading modes).较晚的订阅不会从头开始触发读取;它们将收到较晚的数据块,但不会收到因订阅较晚而"错过"的数据块.

Javascript相关问答推荐

有没有方法在Angular中的bowser选项卡之间移动HTML?

设置默认值后动态更新japbox或调色板

如何使用Paged.js仅渲染特定页面

在具有焦点和上下文的d3多线图表中,如何将上下文的刷新限制在特定日期?

我的glb文件没有加载到我的three.js文件中

Redux工具包查询(RTKQ)端点无效并重新验证多次触发

过滤对象数组并动态将属性放入新数组

if/else JavaScript中的条件行为

如何在Angular中插入动态组件

Chrome是否忽略WebAuthentication userVerification设置?""

Mongoose post hook在使用await保存时不返回Postman响应

我怎么在JS里连续加2个骰子的和呢?

对网格项目进行垂直排序不起作用

面对代码中的错误作为前端与后端的集成

未捕获的运行时错误:调度程序为空

每次重新呈现时调用useState initialValue函数

使用RxJS from Event和@ViewChild vs KeyUp事件和RxJS主题更改输入字段值

P5.js中的分形树

如何更改Html元素S的 colored颜色 ,然后将其褪色为原始 colored颜色

如何通过Axios在GraphQL查询中发送数组