RxJava。顺序执行

2024-01-03

在我的 Android 应用程序中,我有一个演示者,它处理用户交互,包含某种请求管理器,如果需要,可以通过请求管理器将用户输入发送到请求管理器。

请求管理器本身包含服务器 API 并使用此 RxJava 处理服务器请求。

我有一个代码,每次用户输入消息时都会向服务器发送请求并显示服务器的响应:

private Observable<List<Answer>> sendRequest(String request) {
    MyRequest request = new MyRequest();
    request.setInput(request);
    return Observable.fromCallable(() -> serverApi.process(request))
            .doOnNext(myResponse -> {
                // store some data
            })
            .map(MyResponse::getAnswers)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread());
}

但现在我需要排队。用户可以在服务器响应之前发送新消息。队列中的每条消息都应按顺序处理。 IE。第二条消息将在我们收到第一条消息的回复后发送,依此类推。

如果发生错误,则不应处理进一步的请求。

我还需要在 RecyclerView 中显示答案。

我不知道如何更改上面的代码来实现上述处理

我看到了一些问题。一方面,该队列可以由用户随时更新,另一方面,只要服务器发送响应,消息就应该从队列中删除。

也许有一个 rxjava 运算符或我刚刚错过的特殊方法。

我在这里看到了类似的答案,但是,那里的“队列”是恒定的。使用 RxJava 和 Retrofit 进行 N 个顺序 api 调用 https://stackoverflow.com/questions/32317052/making-n-sequential-api-calls-using-rxjava-and-retrofit

我将非常感谢任何解决方案或链接


我没有找到任何优雅的本机 RxJava 解决方案。所以我会定制一个Subscriber做你的工作。

对于你的3点:

  1. 对于顺序执行,我们创建一个单线程调度程序

    Scheduler sequential = Schedulers.from(Executors.newFixedThreadPool(1));

  2. 为了在发生错误时停止所有请求,我们应该一起订阅所有请求,而不是创建一个Flowable每次。所以我们定义以下函数(这里我要求的是Integer和回应String):

    void sendRequest(Integer request)

    Flowable<String> reciveResponse()

    并定义一个字段来关联请求和响应流:

    FlowableProcessor<Integer> requestQueue = UnicastProcessor.create();

  3. 为了重新运行未发送的请求,我们定义了重新运行函数:

    void rerun()

然后我们就可以使用它:

reciveResponse().subscribe(/**your subscriber**/)

现在让我们来实现它们。

当发送请求时,我们只需将其推送到requestQueue

public void sendRequest(Integer request) {
  requestQueue.onNext(request);
}

首先,要按顺序执行请求,我们应该安排工作sequential:

requestQueue
  .observeOn(sequential)
  .map(i -> mockLongTimeRequest(i)) // mock for your serverApi.process
  .observeOn(AndroidSchedulers.mainThread());

其次,发生错误时停止请求。这是默认行为。如果我们什么都不做,错误就会破坏订阅,并且不会发出任何其他项目。

第三,重新运行未发送的请求。首先是因为本机运算符将取消流,例如MapSubscriber做(RxJava-2.1.0-FlowableMap#63):

try {
    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
    fail(ex);// fail will call cancel
    return;
}

我们应该包装错误。这里我用我的Try https://github.com/XDean/Java-EX/blob/master/src/main/java/xdean/jex/util/task/tryto/Try.java类来包装可能的异常,您可以使用任何其他可以包装异常而不是抛出异常的实现:

    .map(i -> Try.to(() -> mockLongTimeRequest(i)))

然后就是习俗OnErrorStopSubscriber implements Subscriber<Try<T>>, Subscription.

它正常请求和发出项目。当错误发生时(实际上是失败的Try发出)它停在那里,不会请求或发出甚至下游请求它。通话后rerun方法后,会回到运行状态并正常发射。课程大约有80行。你可以看到代码我的github https://github.com/XDean/StackOverflow/blob/master/src/main/java/xdean/stackoverflow/rx/Q47264933.java.

现在我们可以测试我们的代码:

public static void main(String[] args) throws InterruptedException {
  Q47264933 q = new Q47264933();
  IntStream.range(1, 10).forEach(i -> q.sendRequest(i));// emit 1 to 10
  q.reciveResponse().subscribe(e -> System.out.println("\tdo for: " + e));
  Thread.sleep(10000);
  q.rerun(); // re-run after 10s
  Thread.sleep(10000);// wait for it complete because the worker thread is deamon
}

private String mockLongTimeRequest(int i) {
  Thread.sleep((long) (1000 * Math.random()));
  if (i == 5) {
    throw new RuntimeException(); // error occur when request 5
  }
  return Integer.toString(i);
}

和输出:

1 start at:129
1 done  at:948
2 start at:950
    do for: 1
2 done  at:1383
3 start at:1383
    do for: 2
3 done  at:1778
4 start at:1778
    do for: 3
4 done  at:2397
5 start at:2397
    do for: 4
error happen: java.lang.RuntimeException
6 start at:10129
6 done  at:10253
7 start at:10253
    do for: 6
7 done  at:10415
8 start at:10415
    do for: 7
8 done  at:10874
9 start at:10874
    do for: 8
9 done  at:11544
    do for: 9

可以看到它是按顺序运行的。并在发生错误时停止。通话后rerun方法,它继续处理剩下的未发送的请求。

完整代码请参见我的github。 https://github.com/XDean/StackOverflow/blob/master/src/main/java/xdean/stackoverflow/rx/Q47264933.java

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RxJava。顺序执行 的相关文章

随机推荐