使用 RxJS 进行批处理?

2023-12-23

我猜想这应该很容易实现,但我在解决它时遇到了困难(我猜是概念上)。

我拥有的是一个返回 JSON 对象数组的 API。我需要单步执行这些对象,并对每个对象进行另一个 AJAX 调用。问题是处理每个 AJAX 调用的系统一次只能处理两个活动调用(因为这是一个挂接到桌面应用程序中的 CPU 密集型任务)。

我想知道如何使用 RxJS(使用版本 5 或版本 4)来实现这一点?

编辑:此外,是否可以同时运行一系列步骤。 IE。

Downloading File: 1 Processing File: 1 Converting File: 1 Uploading File: 1 Downloading File: 2 Processing File: 2 Converting File: 2 Uploading File: 2 Downloading File: 3 Processing File: 3 Converting File: 3 Uploading File: 3

我尝试过做类似的事情:

Rx.Observable.fromPromise(start())
    .concatMap(arr => Rx.Observable.from(arr))
    .concatMap(x => downloadFile(x))
    .concatMap((entry) => processFile(entry))
    .concatMap((entry) => convertFile(entry))
    .concatMap((entry) => UploadFile(entry))
    .subscribe(
        data => console.log('data', new Date().getTime(), data),
        error => logger.warn('err', error),
        complete => logger.info('complete')
    );

但这似乎不起作用。例如,downloadFile 不会等待 processFile、convertFile 和 uploadFile 全部完成,而是在前一个完成后立即再次运行下一个。


如果您希望请求的顺序完全像这样,这里有两种方法

Downloading File: 1
Processing File: 1
Converting File: 1
Uploading File: 1
Downloading File: 2
Processing File: 2
...

您需要解决单个 concatMap 方法中的所有承诺,如下所示

Rx.Observable.fromPromise(getJSONOfAjaxRequests())
  .flatMap(function(x) { return x;})
  .concatMap(function(item) {
    return downloadFile(item)
      .then(processFile)
      .then(convertFile);
  })
  .subscribe(function(data) {
    console.log(data);
  });

请参阅此处的工作 plunkr:https://plnkr.co/edit/iugdlC2PpW3NeNF2yLzS?p=preview https://plnkr.co/edit/iugdlC2PpW3NeNF2yLzS?p=preview这样,只有前一个 ajax 调用完成后,才会发送新的 ajax 调用。

另一种方法是允许文件并行发送请求,但“下载、处理、转换、上传”操作将按顺序进行。为此,你可以通过以下方式让它工作

Rx.Observable.fromPromise(getJSONOfAjaxRequests())
  .flatMap(function(x) { return x;})
  .merge(2)  // in case maximum concurrency required is 2
  .concatMap(function(item) {
    return downloadFile(item);
  })
  .concatMap(function(item) {
    return processFile(item);
  })
  .concatMap(function(item) {
    return convertFile(item)
  })
  .subscribe(function(data) {
    //console.log(data);
  });

请参阅此处的 plunkr:https://plnkr.co/edit/mkDj6Q7lt72jZKQk8r0p?p=preview https://plnkr.co/edit/mkDj6Q7lt72jZKQk8r0p?p=preview

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 RxJS 进行批处理? 的相关文章

随机推荐