在了解了大约Observables之后,我发现它们与Node.js streams非常相似.两者都有一种机制,在新数据到达、发生错误或没有更多数据(EOF)时通知消费者.

我很想了解两者在概念/功能上的差异.谢谢

推荐答案

Observables和 node .js的Streams允许您解决相同的基本问题:异步处理一系列值.我认为,两者之间的主要区别与促使其出现的背景有关.这种情况反映在术语和API中.

Observables端,您有一个EcmaScript的扩展,它引入了react 式编程模型.它试图用ObserverObservable这两个极简主义和可组合的概念来填补价值生成和异步性之间的鸿沟.

在 node 上.js和Streams端您想要创建一个接口,用于对网络流和本地文件进行异步和性能处理.术语来源于最初的上下文,你会得到pipechunkencodingflushDuplexBuffer等等.通过使用一种实用的方法,为特定用例提供明确的支持,你会失go 一些编写东西的能力,因为它没有那么统一.例如,您在Readable流上使用push,在Writable流上使用write,尽管从概念上讲,您正在做相同的事情:发布一个值.

因此,在实践中,如果您查看这些概念,并且使用选项{ objectMode: true },您可以将ObservableReadable流匹配,将ObserverWritable流匹配.您甚至可以在这两个模型之间创建一些简单的适配器.

var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');

var Observable = function(subscriber) {
    this.subscribe = subscriber;
}

var Subscription = function(unsubscribe) {
    this.unsubscribe = unsubscribe;
}

Observable.fromReadable = function(readable) {
    return new Observable(function(observer) {
        function nop() {};

        var nextFn = observer.next ? observer.next.bind(observer) : nop;
        var returnFn = observer.return ? observer.return.bind(observer) : nop;
        var throwFn = observer.throw ? observer.throw.bind(observer) : nop;

        readable.on('data', nextFn);
        readable.on('end', returnFn);
        readable.on('error', throwFn);

        return new Subscription(function() {
            readable.removeListener('data', nextFn);
            readable.removeListener('end', returnFn);
            readable.removeListener('error', throwFn);
        });
    });
}

var Observer = function(handlers) {
    function nop() {};

    this.next = handlers.next || nop;
    this.return = handlers.return || nop;
    this.throw = handlers.throw || nop;
}

Observer.fromWritable = function(writable, shouldEnd, throwFn) {
    return new Observer({
        next: writable.write.bind(writable), 
        return: shouldEnd ? writable.end.bind(writable) : function() {}, 
        throw: throwFn
    });
}

您可能已经注意到,我更改了一些名称,并使用了这里介绍的更简单的ObserverSubscription概念,以避免ObservablesGenerator中所做的过载.基本上,Subscription可以让你从Observable退订.总之,有了上面的代码,你就可以有一个pipe.

Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));

process.stdin.pipe(process.stdout)相比,您拥有的是一种组合、过滤和转换流的方法,这种方法也适用于任何其他数据序列.您可以使用ReadableTransformWritable流实现它,但API支持子类化,而不是链接Readable和应用函数.例如,在Observable模型上,转换值对应于对流应用transformer函数.它不需要新的Transform亚型.

Observable.just = function(/*... arguments*/) {
    var values = arguments;
    return new Observable(function(observer) {
        [].forEach.call(values, function(value) {
            observer.next(value);
        });
        observer.return();
        return new Subscription(function() {});
    });
};

Observable.prototype.transform = function(transformer) {
    var source = this;
    return new Observable(function(observer) {
        return source.subscribe({
            next: function(v) {
                observer.next(transformer(v));
            },
            return: observer.return.bind(observer),
            throw: observer.throw.bind(observer)
        });
    });
};

Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
  .subscribe(Observer.fromWritable(process.stdout))

结论是什么?在任何地方都很容易引入react 模型和Observable概念.围绕这个概念实现一个完整的库是比较困难的.所有这些小功能都需要始终如一地协同工作.毕竟,ReactiveX项目仍在进行中.但是,如果你真的需要将文件内容发送到客户端,处理编码,然后压缩它,那么它的支持就在那里,在NodeJS中,它工作得非常好.

Node.js相关问答推荐

如何使用jq将依赖项添加到package.json中

NodeJS缓冲区大小逻辑:为什么默认是8KB,而不仅仅是数据大小?

在Android Studio中react 本机构建失败:未正确检测到Node.js版本

Mongoose-如何从父文档填充到子文档

如何在Sequelize with Postgres中将m:n关联表ID从整数迁移到UUID?

Next.js 路由不起作用 - 页面未正确加载

在 Docker 容器内创建一个 cron 作业(job)来执行 run.js 文件中的函数

$not 的聚合版本是什么?

在 getServerSideProps 中使用 EmailProvider 获取 NextAuth 会话会抛出 fs找不到模块

我应该转译我的 TypeScript 应用程序吗?

多字段传递获取查询失败

Express.js cookie setter 的 Domain 属性:如何与 *多个 * 域共享 cookie?

Web3.js 脚本在监听 CreatedPairs 时退出

Ansible 将以什么用户身份运行我的命令?

如果我使用像 express 这样的 node 服务器,是否需要 webpack-dev-server

从 React(同构应用程序)进行 API 调用时出现Access-Control-Allow-Origin问题

Express.js中的bodyParser.urlencoded({extended: true }))和bodyParser.json()是什么意思?

要求('babel/register')不起作用

Express js 阻止 GET /favicon.ico

大型项目的 NodeJS vs Play 框架