我有一个关于RxJS的问题. 我正在创建一个Web应用程序来管理协会的成员. 我想创建一个按钮来"重置"一个网站的数据库. 具体步骤如下:

  • 向所有会员发送邮箱以重新注册
  • 删除日期
  • 刷新页面

以下是我编写的代码,它可以工作,但我知道有一些地方是错误的.我是RxJS的新手,所以我不太了解所有的原则……

newYear(){
    this.amicalisteService.getAmicalistesValides("").subscribe({
      // Envoyer un mail à l'ensemble des amicalistes valides
      next: amicalistes => {
        for(const amicaliste of amicalistes) {
          const to = amicaliste.email;
          const cc = null;
          const subject = "Adhère à l'AEIR";
          const body = ""
          this.amicalisteService.sendMail(null, to, cc, subject, body).subscribe({
            next: response => {
              console.log('E-mail envoyé avec succès !', response);
            },
            error: error => {
              console.error('Erreur lors de l\'envoi de l\'e-mail :', error);
            }
          });
        }
      },
      complete: () => {
        // Supprimer amicalistes, photos et cartes amicalistes
        this.amicalisteService.getAmicalistes().subscribe(amicalistes  => {
          for(const amicaliste of amicalistes){
            this.amicalisteService.deleteAmicalisteById(amicaliste.id).subscribe();
          }
        });
        this.imageService.getImages("amicaliste").subscribe(images => {
          for(const image of images){
            this.imageService.deleteImageByName("amicaliste", this.imageService.getImageName(image.toString())).subscribe();
          }
        });
        this.imageService.getImages("pdf").subscribe(pdfs => {
          for(const pdf of pdfs){
            this.imageService.deleteImageByName("pdf", this.imageService.getImageName(pdf.toString())).subscribe();
          }
        })
      }
      //Refresh...
    })
  }

我听说在SUBSCRIBE()中使用SUBSCRIBE()不是一种好的做法,但我想不出如何以不同的方式实现它. 不过,我希望在此代码中保留几项内容.如果我没有记错的话,实际上,3个Subscribe()是并行运行的.我想要留着它. 否则,我明白使用SwitchMap可以帮助我,但我似乎无法实现它.有人能给我一些建议吗?

非常感谢你!

推荐答案

可观测者是事件流.

远程调用(例如,调用服务器以发送邮件或调用数据库以清理某些数据)被实现为事件流,该事件流仅通知一个事件(即,远程调用的响应),然后通知complete或仅通知错误错误(○)S.

使用RxJs操作符,您可以组合这样的流.例如,您可以执行以下操作:

  • 您从一个流Stream_A开始,该流发出event_A,然后结束
  • 您可以有第二个流Stream_B,它发出event_B,然后完成
  • 然后组合Stream_AStream_B以创建第三个流Stream_A_B,第一个流triggers表示Stream_A的执行并发出event_A,一旦event_A被通知Stream_B的执行并发出Stream_B通知的所有事件,则Stream_B发出所有事件,在本例中为event_B
  • 为了在RxJ中创建这个组合流,我们使用运算器concatMap(注意:人们通常使用switchMap来串联流--通常结果是相同的,但含义和潜在行为略有不同--调用远程服务的序列必须一个接一个地发生,concatMap通常是首选方法)

合并更多流以获得新流的另一个示例如下:

  • 有两个流,Stream_1Stream_2Stream_3.这些流中的每个流发出一个值,然后完成.
  • 我们可以将等待所有3个流发出和完成的这3个流组合在一起,然后只发出一个值,即流发出的所有值的数组,然后完成.
  • 对于RxJ,利用函数forkJoin获得这种新的组合流

哈文说,希望对RxJ和可观测到的情况有所澄清,以下是我在您的情况下会做的事情

newYear(){
    // assume getAmicalistesValides returns an Observable that emits the result
    // of a remote call
    this.amicalisteService.getAmicalistesValides("")
    // to combine Observables we need to "pipe" operators, i.e. to execute
    // operators one after the other
    .pipe(
      // first thing to do here seems to send an email for each amicaliste
      // assuming we want to send the emails all in parallel, we can first
      // create one Observable for each mail to be sent and then use forkJoin
      // to execute them all in parallel
      // But all this has to happen after the Observable returned by getAmicalistesValides
      // has emitted its value, hence we use concatMap
      concatMap(amicalistes => {
        for(const amicaliste of amicalistes) {
          const to = amicaliste.email;
          const cc = null;
          const subject = "Adhère à l'AEIR";
          const body = ""
          // here we create the array of Observables
          const sendMailObs = this.amicalisteService.sendMail(null, to, cc, subject, body)
          // each of these Observables can print something or react to errors
          .pipe(tap({
            next: response => {
              console.log('E-mail envoyé avec succès !', response);
            },
            error: error => {
              console.error('Erreur lors de l\'envoi de l\'e-mail :', error);
            }))
          });
          // now we trigger the concurrent execution of all sendMail observables
          return forkJoin(sendMailObs)
      }),
      // after having sent the mails you want to do more stuff: delete data, images
      // and so on - assume each of these operations is an Observable
      // you will have to use concatMap and within it create the new Observables
      // and trigger them in parallel using forkJoin, as above
      concatMap(mailSentResults => {
         const deleteDataObs = ....
         const deleteImagesObs = ...
         ...
         return forkJoin([deleteDataObs, deleteImagesObs, // maybe other Obsevables])
      })
    )
    // up to here you have created a new stream, composing various other streams
    // and now is the time to subscribe to this new stream, which is the only stream 
    // you want to explicitely subscribe
    .subscribe({
      next: res => ... // manage the value notified by upstream
      error: err => ... // manage error
      complete: () => ... // do something when all is completed, if required
    })
  }

我希望我已经理解了你的情况,这一切都有一定的道理

Angular相关问答推荐

如何在Angular应用程序部署中安全地管理环境变量

HTTP Get请求未发送到提供的URL

Angular 信号不更新afterNextender()

try 测试具有Angular material 下拉列表的组件时,在Angular 17中发现&q;错误的意外合成属性@TransitionMessages

我们应该在Angular 从14升级到15的过程中也升级茉莉花和业力相关的依赖吗?

如何使用ANGLING HTTP.POST并将类型更改为键入的回复?

独立组件;依赖项注入

Angular 17:延迟加载带参数的独立组件?

ANGLE v16独立组件不能与拦截器一起工作?

如何以Angular 解析表单控件名称的无值访问器?

一次重定向后,所有子路由都处于活动状态

合并Cypress和Karma的代码覆盖率报告JSON文件不会产生正确的结果

在Angular中扩展多个类

Cypress 12.8.1 无法使用条纹元素 iframe

Angular 2 二传手与 ngOnChanges

Angular2 ngFor跳过第一个索引

Angular 2 - 事件列表

如何处理解析器中的错误

没有 ChildrenOutletContexts 的提供者 (injectionError)

如何升级 Angular CLI 项目?