我使用RxJS为数组的每个元素调用一个异步函数. 我只想在元素满足特定条件时调用异步函数,并且我还想限制并发执行的数量.

所以我使用了带有concurrent参数的mergeMap运算符,但我遇到了奇怪的行为.

可重现的最小示例为:

import { from, timer, mergeMap, EMPTY } from "rxjs";

from(Array(1000).keys()).pipe(
  mergeMap(n => n < 100 ? timer(10) : EMPTY, 10)
).subscribe({ complete: () => console.log('Complete') });

假设Array(1000).keys()是数组,n < 100是条件,timer(10)是异步函数.

我希望观察者能够完成,但它没有完成.

奇怪的是,如果没有指定concurrent参数,或者在没有条件的情况下为所有元素调用了异步函数,则可观测对象就会完成.

// without concurrent parameter
mergeMap(n => n < 100 ? timer(10) : EMPTY)
// or without condition
mergeMap(n => timer(10), 10)
// or
mergeMap(n => EMPTY, 10)

我还通过下面的代码观察了这next个值.(请注意,我使用timer(10).pipe(map(() => n))来获取输入值)

import { from, timer, mergeMap, EMPTY, map } from "rxjs";

from(Array(1000).keys()).pipe(
  mergeMap(n => n < 100 ? timer(10).pipe(map(() => n)) : EMPTY, 10),
).subscribe({ next: console.log, complete: () => console.log('Complete') });

输出值在90停止.

...
86
87
88
89
90

我想知道为什么会发生这种行为.

推荐答案

How mergeMap works when concurrent is not specified

一百零二

mergeMap接收到新值时,它将基于project function(即,提供给mergeMap的功能)创建新的可观测值(也称为inner可观测值),并且它将订阅它.

由于concurrent选项的值为Infinity,因此可以有尽可能多的传入值,mergeMap将 for each 值创建一个inner可观察值,而不会出现任何问题.

How mergeMap works when concurrent is specified

当指定此选项时,mergeMap将使用buffer来保存超过的值:

const outerNext = (value: T) => (active < concurrent ? doInnerSub(value) : buffer.push(value));

code above中,我们看到每当number of active个内部可观测值exceedsconcurrent allowed的数目时,缓冲器就被填充新值.当mergeMapArray(1000)接收到值(如[0,999])时,调用outerNext函数.

为什么观察者没有完成

此代码

import { from, timer, mergeMap, EMPTY } from "rxjs";

from(Array(1000).keys()).pipe(
  mergeMap(n => n < 100 ? timer(10) : EMPTY, 10)
).subscribe({ complete: () => console.log('Complete') });

将不会完成,因为在doInnerSub()函数(从outerNext()调用的函数)中使用了implementation detail.

大意是,它是一架recursive function.这是其功能的伪代码实现(您可以找到真正的实现here):

const doInnerSub = value => {
  // Increase number of active connections.
  active++;

  // Creating from the projection function provided to `mergeMap`.
  createInnerObservable({
    // During the teardown phase(also when `finalize` is called).
    afterComplete: () => {
      if (hasInnerObservableCompleted) {
        // Making space for a new connection.
        active--;

        while (buffer.length && active < concurrent) {
          bufferedValue = buffer.shift()!;
          doInnerSub(bufferedValue);
        }
      }
    }
  });
}

在创建了timer(10)个完整的观测值之后,将只创建EMPTY个观测值.这样的观测值completed immediately as they're subscribed.换句话说,它们是同步完成的.这样做的结果是,在第90个可观察到的第timer(10)条(90=timer(10)-10)之后,这些线:

while (buffer.length && active < concurrent) {
  bufferedValue = buffer.shift()!;
  doInnerSub(bufferedValue);
}

将导致引发此错误:

enter image description here

这里还有一个带有调用堆栈的屏幕截图,它似乎证明同步(和递归)调用doInnerSub()的次数太多了.

enter image description here

Why the Observable completes when concurrent is omitted

答案是因为doInnerSub()不再需要调用自己(因为缓冲区不再被使用).基本上,将会有doInnerSub()0个内在可见被创造出来.

为什么在不使用条件的情况下,可观察对象完成

在这种情况下:

mergeMap(n => timer(10), 10)

调用堆栈不会继续增加,因为timer(10)涉及asynchronous action(即setInterval()).

在这种情况下:

mergeMap(n => EMPTY, 10)

由于EMPTY在订阅时立即完成,因此buffer won't be populated at all.因此,callstack exceeded错误不会发生.

enter image description here

在上图中,该断点根本不会被击中.

为什么这些值停留在90

至于为什么它停止为90,原因是concurrent = 10,所以在{concurrent = 10(From MergeMap Condition)-concurentValue(10)}timer(10)可观察到之后,调用堆栈将继续增加,因为第一个EMPTY可观察到的是从缓冲区中取出的,然后从那里开始无限递归.

例如,如果使用11,则打印的最后一个值将是89:

mergeMap((n) => (n < 100 ? timer(10) : EMPTY), 11),

enter image description here

Javascript相关问答推荐

如何在react + react路由域名中使用DeliverBrowserRouter?

添加/删除时React图像上传重新渲染上传的图像

如何使用侧边滚动按钮具体滚动每4个格?

zoom svg以适应圆

如何用拉威尔惯性Vue依赖下拉?

阿波罗返回的数据错误,但在网络判断器中是正确的

在286之后恢复轮询

如何在 cypress 中使用静态嵌套循环

如何在JavaScript文件中使用Json文件

在数组中查找重叠对象并仅返回那些重叠对象

创建以键值对为有效负载的Redux Reducer时,基于键的类型检测

Vaadin定制组件-保持对javascrip变量的访问

无法重定向到Next.js中的动态URL

为列表中的项目设置动画

P5play SecurityError:无法从';窗口';读取命名属性';Add';:阻止具有源的帧访问跨源帧

有没有办法在R中创建一张具有多个色标的热图?

如何将对象推送到firestore数组?

扩散运算符未按预期工作,引发语法错误

如何处理不带参数的redux thunk payloadCreator回调函数?

在点击链接后重定向至url之前暂停