以下代码按预期工作(请参见输出):

import { Observable, Subject } from "rxjs";
import { scan } from "rxjs/operators";
export type ColumnFilter = { field: string, value: string };

let sub$: Subject<ColumnFilter> = new Subject<ColumnFilter>();
let obs$: Observable<ColumnFilter[]> = new Observable<ColumnFilter[]>();

obs$ = sub$.pipe(
    scan((acc: ColumnFilter[], curr) => {
        const index = acc.findIndex(f => f.field === curr.field);
        if (index === -1) {
            acc.push({ field: curr.field, value: curr.value });
            return acc;
        }

        curr.value === '' ?
            acc.splice(index, 1) :
            acc[index] = { field: curr.field, value: curr.value };
        return acc;
    }, [])
);


obs$.subscribe(
    value => console.log(value)
);

sub$.next({ field: 'size', value: 'xl' })
sub$.next({ field: 'color', value: 'black' })
sub$.next({ field: 'color', value: '' })


// output:
//    [ { field: 'size', value: 'xl' } ]
//    [ { field: 'size', value: 'xl' }, { field: 'color', value: 'black' } ]
//    [ { field: 'size', value: 'xl' } ]

现在,如果我向obs$个可观察对象添加第二个订阅,输出将更改:(判断流的最后一个值.它现在包含{ field: 'color', value: '' }个)

import { Observable, Subject } from "rxjs";
import { scan } from "rxjs/operators";
export type ColumnFilter = { field: string, value: string };


let sub$: Subject<ColumnFilter> = new Subject<ColumnFilter>();
let obs$: Observable<ColumnFilter[]> = new Observable<ColumnFilter[]>();

obs$ = sub$.pipe(
    scan((acc: ColumnFilter[], curr) => {
        const index = acc.findIndex(f => f.field === curr.field);
        if (index === -1) {
            acc.push({ field: curr.field, value: curr.value });
            return acc;
        }

        curr.value === '' ?
            acc.splice(index, 1) :
            acc[index] = { field: curr.field, value: curr.value };
        return acc;
    }, [])
);

// vvv this was added
obs$.subscribe()
// ^^^ this was added

obs$.subscribe(
    value => console.log(value)
);

sub$.next({ field: 'size', value: 'xl' })
sub$.next({ field: 'color', value: 'black' })
sub$.next({ field: 'color', value: '' })

// output:
//    [ { field: 'size', value: 'xl' } ]
//    [ { field: 'size', value: 'xl' }, { field: 'color', value: 'black' } ]
//    [ { field: 'size', value: 'xl' }, { field: 'color', value: '' } ]

我不明白为什么第二次订阅会改变流的结果.我只能猜测这与变异累加器有关?因为如果我使用acc.filter而不是acc.splice,它就会像预期的那样工作.

Edit

增加了一个堆叠闪电战:https://stackblitz.com/edit/rxjs-6-opeartors-uzojgw?file=index.ts

推荐答案

下面是您的第二个示例代码中发生的事情:当一个事件由sub$.next()触发时,管道 for each 订阅的可观察对象执行一次.由于scan运算符的acc: ColumnFilter[]是在这些执行之间共享的,因此会出现上述副作用.

我的建议是使用shareshareReplay操作符,这样管道在每个事件中只执行一次,并为订阅者多播管道发出的值(=所有订阅者都获得相同的对象引用).

obs$ = sub$.pipe(
  scan((acc: ColumnFilter[], curr) => {
    const index = acc.findIndex((f) => f.field === curr.field);
    if (index === -1) {
      acc.push({ field: curr.field, value: curr.value });
      return acc;
    }

    curr.value === ''
      ? acc.splice(index, 1)
      : (acc[index] = { field: curr.field, value: curr.value });
    return acc;
  }, []),
  share() // you might use shareReplay(1) instead, e.g. if you have late subscribers
);

Typescript相关问答推荐

TypeScript Page Routing在单击时不呈现子页面(React Router v6)

如何将泛型对象作为返回类型添加到类型脚本中

从子类构造函数推断父类泛型类型

ANGLE独立组件布线错误:没有ActivatedRouting提供程序

如何将定义模型(在Vue 3.4中)用于输入以外的其他用途

垫表页脚角v15

如何在Vue中使用Enum作为传递属性的键类型?

TypeScrip:强制使用动态密钥

转换器不需要的类型交集

我可以使用typeof来强制执行某些字符串吗

是否可以通过映射类型将函数参数约束为预定义类型?

如何使用TypeScrip在EXPO路由链路组件中使用动态路由?

任何导航器都未处理有效负载为";params";:{";roomId";:";...";}}的导航操作

如何从抽象类的静态方法创建子类的实例?

我可以使用JsDocs来澄清Typescript实用程序类型吗?

为什么我会得到一个;并不是所有的代码路径都返回一个值0;switch 中的所有情况何时返回或抛出?(来自vsCode/TypeScript)

传入类型参数<;T>;变容函数

断言同级为true或抛出错误的Typescript函数

如何确定剧作家中给定定位器的值?

从平面配置推断嵌套递归类型