在了解了大约Observables之后,我发现它们与Node.js streams非常相似.两者都有一种机制,在新数据到达、发生错误或没有更多数据(EOF)时通知消费者.
我很想了解两者在概念/功能上的差异.谢谢
在了解了大约Observables之后,我发现它们与Node.js streams非常相似.两者都有一种机制,在新数据到达、发生错误或没有更多数据(EOF)时通知消费者.
我很想了解两者在概念/功能上的差异.谢谢
Observables和 node .js的Streams允许您解决相同的基本问题:异步处理一系列值.我认为,两者之间的主要区别与促使其出现的背景有关.这种情况反映在术语和API中.
在Observables端,您有一个EcmaScript的扩展,它引入了react 式编程模型.它试图用Observer
和Observable
这两个极简主义和可组合的概念来填补价值生成和异步性之间的鸿沟.
在 node 上.js和Streams端您想要创建一个接口,用于对网络流和本地文件进行异步和性能处理.术语来源于最初的上下文,你会得到pipe
、chunk
、encoding
、flush
、Duplex
、Buffer
等等.通过使用一种实用的方法,为特定用例提供明确的支持,你会失go 一些编写东西的能力,因为它没有那么统一.例如,您在Readable
流上使用push
,在Writable
流上使用write
,尽管从概念上讲,您正在做相同的事情:发布一个值.
因此,在实践中,如果您查看这些概念,并且使用选项{ objectMode: true }
,您可以将Observable
与Readable
流匹配,将Observer
与Writable
流匹配.您甚至可以在这两个模型之间创建一些简单的适配器.
var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');
var Observable = function(subscriber) {
this.subscribe = subscriber;
}
var Subscription = function(unsubscribe) {
this.unsubscribe = unsubscribe;
}
Observable.fromReadable = function(readable) {
return new Observable(function(observer) {
function nop() {};
var nextFn = observer.next ? observer.next.bind(observer) : nop;
var returnFn = observer.return ? observer.return.bind(observer) : nop;
var throwFn = observer.throw ? observer.throw.bind(observer) : nop;
readable.on('data', nextFn);
readable.on('end', returnFn);
readable.on('error', throwFn);
return new Subscription(function() {
readable.removeListener('data', nextFn);
readable.removeListener('end', returnFn);
readable.removeListener('error', throwFn);
});
});
}
var Observer = function(handlers) {
function nop() {};
this.next = handlers.next || nop;
this.return = handlers.return || nop;
this.throw = handlers.throw || nop;
}
Observer.fromWritable = function(writable, shouldEnd, throwFn) {
return new Observer({
next: writable.write.bind(writable),
return: shouldEnd ? writable.end.bind(writable) : function() {},
throw: throwFn
});
}
您可能已经注意到,我更改了一些名称,并使用了这里介绍的更简单的Observer
和Subscription
概念,以避免Observables在Generator
中所做的过载.基本上,Subscription
可以让你从Observable
退订.总之,有了上面的代码,你就可以有一个pipe
.
Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));
与process.stdin.pipe(process.stdout)
相比,您拥有的是一种组合、过滤和转换流的方法,这种方法也适用于任何其他数据序列.您可以使用Readable
、Transform
和Writable
流实现它,但API支持子类化,而不是链接Readable
和应用函数.例如,在Observable
模型上,转换值对应于对流应用transformer函数.它不需要新的Transform
亚型.
Observable.just = function(/*... arguments*/) {
var values = arguments;
return new Observable(function(observer) {
[].forEach.call(values, function(value) {
observer.next(value);
});
observer.return();
return new Subscription(function() {});
});
};
Observable.prototype.transform = function(transformer) {
var source = this;
return new Observable(function(observer) {
return source.subscribe({
next: function(v) {
observer.next(transformer(v));
},
return: observer.return.bind(observer),
throw: observer.throw.bind(observer)
});
});
};
Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
.subscribe(Observer.fromWritable(process.stdout))
结论是什么?在任何地方都很容易引入react 模型和Observable
概念.围绕这个概念实现一个完整的库是比较困难的.所有这些小功能都需要始终如一地协同工作.毕竟,ReactiveX项目仍在进行中.但是,如果你真的需要将文件内容发送到客户端,处理编码,然后压缩它,那么它的支持就在那里,在NodeJS中,它工作得非常好.