How mergeMap
works when concurrent
is not specified
一百零二
当mergeMap
接收到新值时,它将基于project function(即,提供给mergeMap
的功能)创建新的可观测值(也称为inner可观测值),并且它将订阅它.
由于concurrent
选项的值为Infinity
,因此可以有尽可能多的传入值,mergeMap
将 for each 值创建一个inner可观察值,而不会出现任何问题.
How mergeMap
works when concurrent
is specified
当指定此选项时,mergeMap
将使用buffer
来保存超过的值:
const outerNext = (value: T) => (active < concurrent ? doInnerSub(value) : buffer.push(value));
在code above中,我们看到每当number of active个内部可观测值exceeds是concurrent allowed的数目时,缓冲器就被填充新值.当mergeMap
从Array(1000)
接收到值(如[0,999])时,调用outerNext
函数.
为什么观察者没有完成
此代码
import { from, timer, mergeMap, EMPTY } from "rxjs";
from(Array(1000).keys()).pipe(
mergeMap(n => n < 100 ? timer(10) : EMPTY, 10)
).subscribe({ complete: () => console.log('Complete') });
将不会完成,因为在doInnerSub()
函数(从outerNext()
调用的函数)中使用了implementation detail.
大意是,它是一架recursive function.这是其功能的伪代码实现(您可以找到真正的实现here):
const doInnerSub = value => {
// Increase number of active connections.
active++;
// Creating from the projection function provided to `mergeMap`.
createInnerObservable({
// During the teardown phase(also when `finalize` is called).
afterComplete: () => {
if (hasInnerObservableCompleted) {
// Making space for a new connection.
active--;
while (buffer.length && active < concurrent) {
bufferedValue = buffer.shift()!;
doInnerSub(bufferedValue);
}
}
}
});
}
在创建了timer(10)
个完整的观测值之后,将只创建EMPTY
个观测值.这样的观测值completed immediately as they're subscribed.换句话说,它们是同步完成的.这样做的结果是,在第90个可观察到的第timer(10)
条(90=timer(10)
-10)之后,这些线:
while (buffer.length && active < concurrent) {
bufferedValue = buffer.shift()!;
doInnerSub(bufferedValue);
}
将导致引发此错误:
这里还有一个带有调用堆栈的屏幕截图,它似乎证明同步(和递归)调用doInnerSub()
的次数太多了.
Why the Observable completes when concurrent
is omitted
答案是因为doInnerSub()
不再需要调用自己(因为缓冲区不再被使用).基本上,将会有doInnerSub()
0个内在可见被创造出来.
为什么在不使用条件的情况下,可观察对象完成
在这种情况下:
mergeMap(n => timer(10), 10)
调用堆栈不会继续增加,因为timer(10)
涉及asynchronous action(即setInterval()
).
在这种情况下:
mergeMap(n => EMPTY, 10)
由于EMPTY
在订阅时立即完成,因此buffer won't be populated at all.因此,callstack exceeded错误不会发生.
在上图中,该断点根本不会被击中.
为什么这些值停留在90
至于为什么它停止为90,原因是concurrent = 10
,所以在{concurrent = 10
(From MergeMap Condition)-concurentValue(10)}timer(10)
可观察到之后,调用堆栈将继续增加,因为第一个EMPTY
可观察到的是从缓冲区中取出的,然后从那里开始无限递归.
例如,如果使用11,则打印的最后一个值将是89:
mergeMap((n) => (n < 100 ? timer(10) : EMPTY), 11),