Angular NgRx - 继续轮询仅第一次调用的服务的效果

2024-02-27

我有一个应用程序,我刚刚添加了 NgRX,我希望使用效果来打开和关闭轮询。

示例大纲

我跟着这个帖子 https://bbonczek.github.io/jekyll/update/2018/03/01/polling-with-ngrx.html这似乎是一个好方法。我有一个简化的例子here https://stackblitz.com/edit/angular-ngrx-polling2,大部分代码位于app.effects.ts.

和例子类似,我的效果是这样的startPolling$, stopPolling$ and continuePolling$,除非我使用的是较新的createEffect工厂方法。

另外,我已经移动了delay(2000)上面的takeWhile(),正如我发现的,如果服务调用抛出错误,catchError(err => of(appActions.getDataFail(err)))会导致效果进入连续的非常快的循环而没有延迟。

开始和停止按钮调度轮询开始和停止...

public start() {
    console.log('dispatching start');
    this.store.dispatch(appActions.startPolling());
  }

  public stop() {
    console.log('dispatching stop');
    this.store.dispatch(appActions.stopPolling());
  }

我的问题

我有一些控制台日志,这样我们就可以看到发生了什么。

当我们点击开始按钮(只是first时间),我可以看到轮询开始,并按预期继续。例如,我可以一遍又一遍地看到以下内容......

dispatching start
app effect started polling
app.service.getData
app effect continue polling
app.service.getData
app effect continue polling
app.service.getData
app effect continue polling

Perfect.

当我停下来时我看到

dispatching stop
app effect stop polling

也正确。

现在,问题来了,当我尝试重新启动时。如果我现在再次单击开始按钮,我看到的只是最初的开始轮询效果......

dispatching start
app effect started polling
app.service.getData

和代码continuePolling$不再被称为,所以我没有民意调查。

为什么这个效果第二次没有触发?我就是不明白这是为什么。

Update 1

我想也许我的问题是一旦isPollingActive设置为 false,并且takeWhile(() => this.isPollingActive),“停止”,可观察的不再活跃,即continuePolling$完成,所以永远不会重新启动?

假设这一点,我尝试了以下方法,其中有两个不同的变量,一个用于“暂停”轮询(例如,如果我在离线模式下检测到应用程序),另一个用于取消(即当用户导航出组件时) 。

所以,我的整体效果现在变成了......

    @Injectable()
    export class AppEffects {
      private isPollingCancelled: boolean;
      private isPollingPaused: boolean;

      constructor(
        private actions$: Actions,
        private store: Store<AppState>,
        private appDataService: AppDataService
      ) { }

      public startPolling$ = createEffect(() => this.actions$.pipe(
        ofType(appActions.startPolling),
        tap(_ => console.log('app effect started polling')),
        tap(() => {
          this.isPollingCancelled = false;
          this.isPollingPaused = false;
        }),        
          mergeMap(() =>
            this.appDataService.getData()
              .pipe(                        
                switchMap(data => {              
                  return [appActions.getDataSuccess(data)
                  ];
                  }),
                catchError(err => of(appActions.getDataFail(err)))
              ))
        ));

         public pausePolling$ = createEffect(() => this.actions$.pipe(
            ofType(appActions.pausePolling),
            tap(_ => this.isPollingPaused = true),
            tap(_ => console.log('app effect pause polling')),       
         ));
      
      public cancelPolling$ = createEffect(() => this.actions$.pipe(
        ofType(appActions.cancelPolling),
        tap(_ => this.isPollingCancelled = true),
        tap(_ => console.log('app effect cancel polling')),
      ));

        public continuePolling$ = createEffect(() => this.actions$.pipe(
          ofType(appActions.getDataSuccess, appActions.getDataFail),    
          tap(data => console.log('app effect continue polling')),  
          takeWhile(() => !this.isPollingCancelled),    
          delay(3000),  
     
          mergeMap(() =>
            this.appDataService.getData()
              .pipe(   
                delay(3000),  
                tap(data => console.log('app effect continue polling - inner loop')),  
                takeWhile(() => !this.isPollingPaused), // check again incase this has been unset since delay 
                switchMap(data => {              
                  return [appActions.getDataSuccess(data)
                  ];
                  }),
                catchError(err => of(appActions.getDataFail(err)))
              ))
        ));    
    } 

我不建议运行上面的命令,因为当我发送一个pause polling action,效果似乎进入了无限循环,我必须通过任务管理器杀死浏览器。

我不知道为什么会发生这种情况,但我似乎比以前更远离解决方案。

Update 2

我注意到我没有从暂停和取消效果中返回任何操作。

所以我已经更新了它们,我们遵循......

 public pausePolling$ = createEffect(() => this.actions$.pipe(
    ofType(appActions.pausePolling),
    tap(_ => this.isPollingPaused = true),
    tap(_ => console.log('app effect pause polling')),
    map(_ => appActions.pausePollingSuccess())
  ));
  
  public cancelPolling$ = createEffect(() => this.actions$.pipe(
    ofType(appActions.cancelPolling),
    tap(_ => {
      this.isPollingCancelled = true;
      this.isPollingPaused = true;
    }),
    tap(_ => console.log('app effect cancel polling')),
    map(_ => appActions.cancelPollingSuccess())
  ));

现在暂停似乎工作正常,但是当我派遣appActions.cancelPolling,我再次看到像一个无限循环app effect cancel polling正在记录到控制台。

Update 3

我已经找到了为什么会出现无限循环以及如何停止它。根据 docohere https://ngrx.io/guide/effects/lifecycle#non-dispatching-effects,我可以添加dispatch:false...

    public cancelPolling$ = createEffect(() => this.actions$.pipe(
        ofType(appActions.cancelPolling),
        tap(_ => {
          this.isPollingCancelled = true;
          this.isPollingPaused = true;
        }),
        tap(_ => console.log('app effect cancel polling')),
      ), { dispatch: false }); // <------ add this

这似乎解决了我的无限循环。

我现在唯一的任务是能够弄清楚如何启动、停止和重新启动轮询,处理两个成功调用appDataService.getData()以及例外情况。

我可以让它为其中之一工作(取决于我将延迟和需要时间放在哪里),但不能同时为两者工作

Update 4

我有最新的代码here https://stackblitz.com/edit/angular-ngrx-polling3.

按原样运行它,我的 getData 成功了,令人惊讶的是,暂停或停止操作都会停止它并允许它重新启动。我很惊讶stop操作允许它重新启动,因为我假设takeWhile(() => !this.isPollingCancelled),会取消效果。

另外,如果true被传递给getData这将导致它出现可观察到的错误。轮询继续(如所希望的,即即使出现错误仍然重试),但是一旦我们现在分派暂停操作,它不会停止轮询,并且如果我们分派停止,它确实会停止,但随后它不会重新启动。我赢不了。

Update 5

我想也许由于连续轮询效果被取消,我可以每次重新创建它,如下所示。

    import { Injectable, OnInit, OnDestroy } from '@angular/core';
    import { createEffect, Actions, ofType } from '@ngrx/effects';
    import { select, Store } from '@ngrx/store';
    import { mergeMap, map, catchError, takeWhile, delay, tap, switchMap } from 'rxjs/operators';
    import { AppState } from './app.state';
    import { Observable, of } from 'rxjs';
    import { AppDataService } from '../app-data.service';
    import * as appActions from './app.actions';

    @Injectable()
    export class AppEffects {
      private isPollingCancelled: boolean;
      private isPollingPaused: boolean;

      constructor(
        private actions$: Actions,
        private store: Store<AppState>,
        private appDataService: AppDataService
      ) { }

      public startPolling$ = createEffect(() => this.actions$.pipe(
        ofType(appActions.startPolling),
        tap(_ => console.log('app effect started polling')),
        tap(() => {
          this.isPollingCancelled = false;
          this.isPollingPaused = false;
          this.createPollingEffect(); // <--- recreate the effect every time
        }),        
          mergeMap(() =>
            this.appDataService.getData()
              .pipe(                        
                switchMap(data => {              
                  return [appActions.getDataSuccess(data)
                  ];
                  }),
                catchError(err => of(appActions.getDataFail(err)))
              ))
        ));

      public pausePolling$ = createEffect(() => this.actions$.pipe(
        ofType(appActions.pausePolling),
        tap(_ => this.isPollingPaused = true),
        tap(_ => console.log('app effect pause polling')),
      ), { dispatch: false });
      
      public cancelPolling$ = createEffect(() => this.actions$.pipe(
        ofType(appActions.cancelPolling),
        tap(_ => {
          this.isPollingCancelled = true;
          this.isPollingPaused = true;
        }),
        tap(_ => console.log('app effect cancel polling')),
      ), { dispatch: false });

      public continuePolling$: any;

      private createPollingEffect(): void {
        console.log('creating continuePolling$');
        this.continuePolling$ = createEffect(() => this.actions$.pipe(
          ofType(appActions.getDataSuccess, appActions.getDataFail),
          tap(data => console.log('app effect continue polling')),
          delay(3000),
          takeWhile(() => !this.isPollingCancelled),
          mergeMap(() =>
            this.appDataService.getData(false)
              .pipe(
                tap(data => console.log('app effect continue polling - inner loop')),

                switchMap(data => {
                  return [appActions.getDataSuccess(data)
                  ];
                }),
                catchError(err => of(appActions.getDataFail(err)))
              ))
        ), { resubscribeOnError: true });
      } 
    }

所以,在startPolling I call this.createPollingEffect()创建持续轮询效果。

但是,当我尝试这样做时,轮询从未开始。

Update 6

我想出了一个似乎对我有用的解决方案。

我有以下内容

public startPolling$ = createEffect(() => this.actions$.pipe(
        ofType(dataActions.startPollingGetData),
        tap(_ => this.logger.info('effect start polling')),
        tap(() => this.isPollingActive = true),
        switchMap(_ => this.syncData())
      ), { dispatch: false });
      
    public continuePolling$ = createEffect(() => this.actions$.pipe(
        ofType(dataPlannerActions.DataSuccess,
          dataActions.DataFail),
        tap(_ => this.logger.debug('data effect continue polling')),
        tap(_ => this.isInDelay = true),
        delay(8000),
        tap(_ => this.isInDelay = false),
        switchMap(_ => this.syncData())
      ), { dispatch: false });


    public stopPolling$ = createEffect(() => this.actions$.pipe(
        ofType(dataActions.stopPollingData),
        tap(_ => this.isPollingActive = false),
        tap(_ => this.logger.info('data effect stop polling')),
        map(_ => dataActions.stopPollingDataSuccess())
      ), { dispatch: false });


    private syncData(): Observable<Action> {
        const result$: Observable<Action> = Observable.create(async subscriber => {
          try {
            // If polling "switched off", we just need to return anything (not actually used)
            // Id isInDelay, we may be restating while we still have a pending delay.
            // In this case we will exit, and just wait for the delay to restart
            // (otherwise we can end up with more than one call to this)
            if (this.isInDelay || !this.isPollingActive) {
              subscriber.next("");
              return;
            }

我在这里使用了几个“标志”,我相信这将是一种更“rxy”的方式。

实际上,看到这个帖子 https://stackoverflow.com/questions/60220897/rxjs-ngrx-is-there-a-way-to-cancel-a-stream-after-the-delay-operator关于如何可能摆脱isInDelay(我只需要设法将其放入上面的生产代码中)。


使用它来代替:

public startPolling$ = createEffect(() => this.actions$.pipe(
  ofType(appActions.startPolling),    
  tap(_ => console.log('app effect started polling')),  
  tap(() => this.isPollingActive = true),        
  switchMap(() =>
    this.appDataSurvice.getData()
      .pipe(                        
        exhaustMap(data => {              
          return [appActions.getDataSuccess(data)];
        }),
        catchError(err => of(appActions.getDataFail(err)))
      ))
));
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Angular NgRx - 继续轮询仅第一次调用的服务的效果 的相关文章

随机推荐