我还是个初学者,正在构建我第一个用来查询Azure DevOps API以获取测试结果的Angular 仪表板应用程序.

  1. 查询接口,得到版本定义的过滤列表(大约100个)
  2. 对于每个版本定义,获取最新版本
  3. 对于每个版本,获取来自该版本的所有测试运行的集合.
  4. 收到第一组测试结果后,立即显示所有结果的表(每个发布定义都是一行,测试结果为可展开表).

我设法在可观察对象的嵌套订阅中实现了这一点(见下文),但我知道应该避免这种情况,最好是使用类似mergeMap/switchMap和/或forkJoin这样的内容.几天来一直在和它作斗争,但还没有运气.

然后是挑战#2:第二个数据流应该被添加到其中:管道.遵循相同的方法: for each 最新的管道运行获取一个管道列表,为所有测试运行的每个管道获取一个管道列表.这两个数据流可以/应该分别和异步地获取,一旦其中一个获取了第一组测试运行,它就可以显示在仪表板上.

如何做到这一点??

我的仅使用嵌套订阅的发布定义的有效解决方案:

ngOnInit(): void {   
    this.router.paramMap.pipe(takeUntil(this.ngUnsubscribe))
    .subscribe(params => {
      this.teamToFilterOn = params.get('team');
      this.apiService.getReleaseDefinitions(this.teamToFilterOn as string)
      .pipe(takeUntil(this.ngUnsubscribe))
      .subscribe((releaseDefinitions: any)=> {
        if (releaseDefinitions.length === 0) {
          this.isLoading = false
        }
        releaseDefinitions.forEach((releaseDefinition: any) => {
          if (releaseDefinition.lastRelease) {
            this.apiService.getRelease(releaseDefinition.lastRelease.id)
            .pipe(takeUntil(this.ngUnsubscribe))
            .subscribe((info: PipelineOrReleaseInfo) => {
              if (info) {
                this.apiService.getTestRunsByRelease(info.releaseId)
                .pipe(takeUntil(this.ngUnsubscribe))
                .subscribe((testruns: any) => {    
                  this.isLoading = false;              
                  this.results = [...this.results, { info: info, testruns: testruns, totals: this.calculateEnvironmentTotals(testruns.testRunResults)}];             
                  this.dataSource.data = this.results;
                });
              }              
            });
          }
        });            
      });
    });
  }   

首先try 使用forkJoin,但不知道如何继续.我也不确定这是否正确,因为forkJoin似乎要等到两个可观测对象都完成了,但只要其中一个有结果,它就应该继续循环结果并进行剩余的调用.

ngOnInit(): void {   
    this.router.paramMap.pipe(takeUntil(this.ngUnsubscribe))
    .subscribe(params => {
      this.teamToFilterOn = params.get('team');      
      
      let releaseDefQuery =  this.apiService.getReleaseDefinitions(this.teamToFilterOn as string)
      let pipelineDefQuery = this.apiService.getPipelineDefinitions(this.teamToFilterOn as string)

      forkJoin([releaseDefQuery, pipelineDefQuery]).subscribe(definitions => {
        let releaseDefinitions = definitions[0];
        let pipelineDefinitions = definitions[1];

        releaseDefinitions.forEach((releaseDefinition: any) => {
          if (releaseDefinition.lastRelease) {
            this.apiService.getRelease(releaseDefinition.lastRelease.id)
            .pipe(takeUntil(this.ngUnsubscribe))
            .subscribe((info: PipelineOrReleaseInfo) => {
...

EDIT:为清楚起见,还添加了流水线,产生了相同类型PipelineOrReleaseInfotestruns的对象info.一旦其中一个流(发布或管道)完成了这两个对象,就可以显示它了.那么,这两个流可以/应该在某个时候合并吗?

pipelineDefinitions.forEach((pipelineDefinition: any) => {
    this.apiService.getLatestPipelineRun(pipelineDefinition.id)
    .pipe(takeUntil(this.ngUnsubscribe))
    .subscribe((info: PipelineOrReleaseInfo) => {
      if (info) {
        this.apiService.getTestRunsByPipeline(info.pipelineRunId)
        .pipe(takeUntil(this.ngUnsubscribe))
        .subscribe((testruns: any) => {    
          this.isLoading = false;              
          this.results = [...this.results, { info: info, testruns: testruns, totals: this.calculateEnvironmentTotals(testruns.testRunResults)}];             
          this.dataSource.data = this.results;
        });
      }              
    });
}

EDIT:完全正常工作的代码来自接受的答案:

ngOnInit(): void {
    this.teamToFilterOn = this.router.snapshot.paramMap.get('team');    

    const releaseResults$: Observable<any> = this.apiService.getReleaseDefinitions(this.teamToFilterOn as string).pipe(
      mergeMap(releaseDefs => releaseDefs), // Turns Observable<[]> into Observable<>
      filter((releaseDef: any) => releaseDef.lastRelease), // Take only releaseDefs with a lastRelease
      mergeMap((releaseDef: any) => this.apiService.getRelease(releaseDef.lastRelease.id)),
      filter(releaseInfo => !!releaseInfo), // Continue only when release info is returned
      mergeMap((releaseInfo: PipelineOrReleaseInfo) => this.apiService.getTestRunsByRelease(releaseInfo.releaseId)
      .pipe(map(testruns => ({ testruns, info: releaseInfo }))))
    );

    const pipelineResults$: Observable<any> = this.apiService.getPipelineDefinitions(this.teamToFilterOn as string).pipe(
      mergeMap(pipelineDefs => pipelineDefs), // Turns Observable<[]> into Observable<def>
      mergeMap((pipelineDef: any) => this.apiService.getLastPipelineRun(pipelineDef.id)),
      filter(pipelineInfo => !!pipelineInfo), // Continue only when pipeline info is returned
      mergeMap((pipelineInfo: PipelineOrReleaseInfo) => this.apiService.getTestRunsByPipeline(pipelineInfo.pipelineRunId)
      .pipe(map(testruns => ({ testruns, info: pipelineInfo }))))
    );
   
    merge(releaseResults$, pipelineResults$)
    .pipe(takeUntil(this.ngUnsubscribe))
    .subscribe(({ testruns, info }) => {
      this.isLoading = false;              
      this.results = [...this.results, { info: info, testruns: testruns, totals: this.calculateEnvironmentTotals(testruns.testRunResults)}];             
      this.dataSource.data = this.results;
    });
  }

推荐答案

像任何类型的Stream(例如Promise)一样,当你看到Observable的嵌套时,你可能想后退一步,看看它是否真的有根据.

让我们一点一点地研究一下您的解决方案.

我们的出发点是:

this.router.paramMap.pipe(takeUntil(this.ngUnsubscribe))

然后订阅,但在订阅中,您对给定数据执行可观察的操作,这强烈建议您应该改为pipe操作,然后订阅最终结果.

在这种情况下,您想要map您的params到大约Observable.你还可以从switchMap提供的"提前中断"行为中受益.否则,如果您不想"提前中断",也可以 Select mergeMap(以前更恰当的名称是flatMap).

我们将添加filtermap,以确保我们有team参数,并将其拔出(因为我们不需要其余的参数).

this.router.paramMap.pipe(
    takeUntil(this.ngUnsubscribe),
    filter(params => params.has("team"))
    map(params => params.get("team"))
    switchMap(team => {
      this.teamToFilterOn = team as string;
      // We'll dissect the rest
    })
) // [...]

然后就是你想要和那支球队做什么的部分.

你有多个"任务"依赖于相同的输入,而且你想同时完成这两个任务,所以达到forkJoin是个不错的 Select .但也有combineLatest个人做了类似的事情,但将结果"一步一步"地结合起来.

你用"最新"这个词来形容你的两项任务,所以我们确实会用combineLatest来代替:

const releaseDef$ = // [...]
const pipelineDef$ = // [...]
return combineLatest([releaseDef$, pipelineDef$]);

现在让我们来剖析这两个操作.

据我所知,你只对lastRelease分的版本感兴趣.你也不想在新加入的时候"切换",你想要所有的,让我们来编码一下:

const releaseDef$ = this.apiService.getReleaseDefinitions(this.teamToFilterOn as string).pipe(
    mergeMap(releaseDefs => releaseDefs), // Turns Observable<def[]> into Observable<def>
    filter(releaseDef => releaseDef.lastRelease),
    mergeMap(lastReleaseDef => this.apiService.getRelease(releaseDefinition.lastRelease.id)),
    filter(info => !!info)
    mergeMap(info => this.apiService.getTestRunsByRelease(info.releaseId).pipe(map(testruns => ({ testruns, info }))),
)

你会注意到我也输入了getTestRunsByRelease的结果.这是因为与Promise不同,我们没有像async/await这样的替代语法来帮助以一种简单的方式保持以前的状态.取而代之的是,我们必须依赖来self 们的单子操作flatMap内的单子操作map,并且拖拽先前的结果.对于Promise,mapflatMap都是.then.而Observable则分别为mapmergeMap.

我们对您的管道应用了非常类似的转换:

const pipelineDef$ = this.apiService.getPipelineDefinitions(this.teamToFilterOn as string)
  .pipe(
    mergeMap(pipelineDefs => pipelineDefs), // Turns Observable<def[]> into Observable<def>
    mergeMap(pipelineDef => this.apiService.getLatestPipelineRun(pipelineDef.id)),
    filter(info => !!info),
    mergeMap(info => this.apiService.getTestRunsByPipeline(info.pipelineRunId).pipe(map(testruns => ({ testruns, info }))),
  );

在这里,如果您需要对releaseDef$pipelineDef$的结果进行独立运算,则可以使用tap.

请注意,可以很容易地将它们提取到两种方法中.

由于两者的结束操作相同,并且它们的结果具有相同的形状,因此可以使用merge而不是combineLatest将两个可观测对象合并为一个,以便在传入时发出两者的所有值(而不是组合和发出数组中每个的最新值):

return merge(releaseDef$, pipelineDef$);

为了总结这一点,让我们把它们放在一起:

ngOnInit(): void {
  this.router.paramMap.pipe(
    takeUntil(this.ngUnsubscribe),
    filter(params => params.has("team"))
    map(params => params.get("team"))
    switchMap(team => {
      this.teamToFilterOn = team as string;
      
      const releaseDef$ = this.apiService.getReleaseDefinitions(this.teamToFilterOn as string).pipe(
        mergeMap(releaseDefs => releaseDefs), // Turns Observable<def[]> into Observable<def>
        filter(releaseDef => releaseDef.lastRelease),
        mergeMap(lastReleaseDef => this.apiService.getRelease(releaseDefinition.lastRelease.id)),
        filter(info => !!info)
        mergeMap(info => this.apiService.getTestRunsByRelease(info.releaseId).pipe(map(testruns => ({ testruns, infos })))),
      );

      const pipelineDef$ = this.apiService.getPipelineDefinitions(this.teamToFilterOn as string)
        .pipe(
          mergeMap(pipelineDefs => pipelineDefs), // Turns Observable<def[]> into Observable<def>
          mergeMap(pipelineDef => this.apiService.getLatestPipelineRun(pipelineDef.id)),
          filter(info => !!info),
          mergeMap(info => this.apiService.getTestRunsByPipeline(info.pipelineRunId).pipe(map(testruns => ({ testruns, info }))),
        )

      return merge(releaseDef$, pipelineDef$);
    })
  ).subscribe(({ testruns, infos }) => {
    this.isLoading = false;              
    this.results = [...this.results, { info: info, testruns: testruns, totals: this.calculateEnvironmentTotals(testruns.testRunResults)}];             
    this.dataSource.data = this.results;
  });
}

您会注意到,我只在takeUntil(this.ngUnsubscribe)上使用,因为"主"可观测链将随之停止,这意味着操作也将停止.

如果您不确定或遇到问题,您仍然可以将它们作为每.pipe个参数的第一个参数.

Javascript相关问答推荐

禁用从vue.js 2中的循环创建的表的最后td的按钮

是否有方法在OpenWeatherMap API中获取过go 的降水数据?

Vue:ref不会创建react 性属性

如何解决chrome—extension代码中的错误,它会实时覆盖google—meet的面部图像?'

判断表格单元格中是否存在文本框

在网页上添加谷歌亵渎词

为什么当我解析一个promise时,输出处于挂起状态?

Google maps API通过API返回ZERO_RESULTS,以获得方向请求,但适用于Google maps

数字时钟在JavaScript中不动态更新

为什么我的导航条打开状态没有在窗口addeventlistener(Reaction Js)中更新?

将Node.js包发布到GitHub包-错误ENEEDAUTH

让chart.js饼图中的一个切片变厚?

如何找到带有特定文本和测试ID的div?

输入的值的类型脚本array.排序()

Socket.IO在刷新页面时执行函数两次

不协调嵌入图片

REACT-本机错误:错误类型错误:无法读取未定义的容器的属性

使用python,我如何判断一个html复选框是否被隐藏,以及它是否被S选中?

如何从Reaction-Redux中来自API调用的数据中筛选值

在ReactJS上挂载组件时获得多个身份验证请求