将 Observables 与反馈合并,将 Observable 与其自身合并

2024-01-07

我需要创建 Observable,它将从不同的来源(其他 Observables)收集信息,每个来源都会对事件值产生影响,但该值仍然是基于先前的值(一种状态机)构建的。

我们有带有 int 值和操作代码的消息:

class Message{
    Integer value;
    String operation;

    public Message(Integer value, String operation) {
        this.value = value;
        this.operation = operation;
    }
}

以及此类值的一些来源,具有初始值:

Observable<Message> dynamicMessage = Observable.just(new Message(1, "+"));

现在有事件源了。这些源将发出值,新的dynamicMessage 值将基于该值以及dynamicMessage 的先前值而出现。实际上它的事件类型是:

class Changer1 {
    String operation;

    public Changer1(String operation) {
        this.operation = operation;
    }
}


class Changer2 {
    Integer value;

    public Changer2(Integer value) {
        this.value = value;
    }
}

变更者1负责变更操作。变更者对变更的价值负责。

以及这些值的来源:

static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
        .delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
                .delay(2, TimeUnit.SECONDS));


static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
        .delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
                .delay(2, TimeUnit.SECONDS));

Now I need change dynamicMessage Observable and take in respect messages from changers. here is a diagram: state machine

我还尝试编写一个程序,我如何看到解决方案,但它因 OutOfMemoryException 挂起,就在第一个值之后:
消息{值=1,操作='+'}
清单:

import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

public class RxDilemma {


static class Message{
    Integer value;
    String operation;

    public Message(Integer value, String operation) {
        this.value = value;
        this.operation = operation;
    }

    @Override
    public String toString() {
        return "Message{" +
                "value=" + value +
                ", operation='" + operation + '\'' +
                '}';
    }
}

static class Changer1 {
    String operation;

    public Changer1(String operation) {
        this.operation = operation;
    }

    @Override
    public String toString() {
        return "Changer1{" +
                "operation='" + operation + '\'' +
                '}';
    }
}


static class Changer2 {
    Integer value;

    public Changer2(Integer value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Changer2{" +
                "value=" + value +
                '}';
    }
}





static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
        .delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
                .delay(2, TimeUnit.SECONDS));


static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
        .delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
                .delay(2, TimeUnit.SECONDS));


static Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")).mergeWith(changers1.flatMap(new Func1<Changer1, Observable<Message>>() {
    @Override
    public Observable<Message> call(final Changer1 changer) {
        return dynamicMessage.last().map(new Func1<Message, Message>() {
            @Override
            public Message call(Message message) {
                message.operation = changer.operation;
                return message;
            }
        });
    }
})).mergeWith(changers2.flatMap(new Func1<Changer2, Observable<Message>>() {
    @Override
    public Observable<Message> call(final Changer2 changer2) {
        return dynamicMessage.last().map(new Func1<Message, Message>() {
            @Override
            public Message call(Message message) {
                if("+".equalsIgnoreCase(message.operation)){
                    message.value = message.value+changer2.value;
                } else if("-".equalsIgnoreCase(message.operation)){
                    message.value = message.value-changer2.value;
                }
                return message;
            }
        });
    }
}));

public static void main(String[] args) throws InterruptedException {

    dynamicMessage.subscribe(new Action1<Message>() {
        @Override
        public void call(Message message) {
            System.out.println(message);
        }
    });


    Thread.sleep(5000000);
}
}

所以我的预期输出是:

消息{值=1,操作='+'}
消息{值=1,操作='-'}
消息{值=-1,操作='-'}
消息{值=1,操作='+'}
消息{值=2,操作='+'}

我还考虑过用CombineLatest合并所有内容,但后来我发现CombineLatest没有提供更改了哪一个元素,如果没有这些信息,我将不知道需要对消息进行哪些更改。 有什么帮助吗?


In other words, you want to reduce scan your actions to reflect changes in a state. There is an operator, rightly named reduce scan, that does exactly what you are asking for. It takes every event that comes and lets you transform it with the addition of previous result of that transformation, which will be your state.

interface Changer {
    State apply(State state);
}

class ValueChanger implements Changer {
    ...
    State apply(State state){
        // implement your logic of changing state by this action and return a new one
    }
    ...
}


class OperationChanger implements Changer {
    ...
    State apply(State state){
        // implement your logic of changing state by this action and return a new one
    }
    ...
}

Observable<ValueChanger> valueChangers
Observable<OperationChanger> operationChangers

Observable.merge(valueChangers, operationChangers)
    .scan(initialState, (state, changer) -> changer.apply(state))
    .subscribe(state -> {
        // called every time a change happens
    });

请注意,我稍微更改了您的命名以使其更有意义。另外,我在这里使用 Java8 lambda 表达式。如果您无法在项目中使用它们,只需将它们替换为匿名类即可。

编辑:使用scan运算符而不是reduce因为它会传递整个过程中的每个结果,而不是仅发出最终结果

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将 Observables 与反馈合并,将 Observable 与其自身合并 的相关文章

  • RxJs 将流拆分为多个流

    如何根据分组方法将永无止境的流拆分为多个结束的流 a a a a a b b b b c c c c d d d e gt 到这些可观察到的 a a a a a b b b b c c c c d d d e gt 如您所见 a是在开始的时
  • 反应式编程和函数式反应式编程之间的区别

    我最近一直在对反应式编程进行一些研究 我发现很难找到反应式编程和反应式编程之间差异的良好描述 功能性 反应性的 反应式编程是否只是使用函数式方法 范式而不是使用声明式或面向对象范式来实现 函数式响应式编程 FRP 是一种具有特定语义的特定编
  • 我的“zipLatest”运算符是否已经存在?

    关于我自己写的一个运算符的快速问题 请原谅我可怜的大理石图表 zip aa bb cc dd ee ff gg 11 22 33 44 55 a1 b2 c3 d4 e5 combineLatest aa bb cc dd ee ff gg
  • 可观察,出错时重试并仅在完成时缓存

    我们可以使用cache 运算符来避免多次执行长任务 http请求 并重用其结果 Observable apiCall createApiCallObservable cache notice the cache the first time
  • RX 自动完成框

    我正在尝试使用 RX 和 WPF 构建过滤器控件 所以我有一个文本框和一个列表框 启动时 列表框有 100 个联系人姓名 用户可以输入姓名来过滤列表 问题是我如何构建文本流 关键输入 然后发布 这应该是时间敏感的 所以我想只有在 750 毫
  • 创建一个包含比原始元素更多元素的 ReactiveUI 派生集合

    是否可以创建一个 ReactiveUI 派生集合 其中包含比原始集合更多的元素 我已经看到有一种方法可以过滤集合并选择单个属性 但我正在寻找相当于可枚举的 SelectMany 操作 为了说明这一点 想象一下尝试获取代表每个陷入交通拥堵的乘
  • ReactiveMongo 是如何实现的,使其被认为是非阻塞的?

    阅读有关 Play Framework 和 ReactiveMongo 的文档让我相信 ReactiveMongo 的工作方式是使用很少的线程并且从不阻塞 然而 从 Play 应用程序到 Mongo 服务器的通信似乎必须发生在某处的一些线程
  • 如何在wpf中延迟调用文本框的textchanged事件

    我有从 Textbox 继承的自定义控件 我想延迟调用 textchanged 事件 Observable FromEventPattern
  • MeteorJS - 监视服务器变量更改并更新模板值

    我有个疑问 不确定是否可能 也没有找到明确的答案 是否可以向服务器变量添加 观察者 以便当值发生变化时 我可以更新视图 客户端 假设我有一个var counter 0超时函数每分钟更新一次计数器变量 我想更新一个 span counter
  • 如何在不引入竞争条件的情况下等待 RX 主体的响应?

    我有一项服务允许调用者异步发送命令和接收响应 在真实的应用程序中 这些操作是相当断开的 某些操作将发送命令 并且响应将被独立处理 但是 在我的测试中 我需要能够发送命令 然后等待 第一个 响应 然后再继续测试 响应是使用 RX 发布的 我对
  • Angular 模板中可观察对象上的 ObjectUnsubscribedErrorImpl

    我正在使用 Angular 11 并且正在使用以下命令访问组件模板中的可观察对象async pipe 路线的第一次加载 一切都工作得很好 没有错误 当我离开该页面并返回时 出现以下错误 组件模板 成分 import Component On
  • Spring 5 的反应式 WebSockets - 如何获取初始消息

    我遵循了该教程 特别是浏览器 WebSocket 客户端的部分 http www baeldung com spring 5 reactive websockets http www baeldung com spring 5 reacti
  • 为方法创建 IObservable 的好方法是什么?

    比方说 我们有一堂课 public class Foo public string Do int param 我想创建一个可观察的值 这些值是由Do方法 一种方法是创建一个正在调用的事件Do并使用Observable FromEvent创建
  • rxjs:定期执行一些操作,中间有特定的延迟

    客户端应用程序向服务器发送请求 这可能需要很长时间才能完成 一旦请求完成或失败 客户端应该等待一段时间 即10秒 然后再次发送请求 目前的工作解决方案是这样的 appRequest new Subject ngOnInit void thi
  • Rxjava 中“背压”一词是什么意思?

    我是 RxJava 的初学者 我很好奇 背压 这是否意味着生产者在背后给消费者施压 或者这是否意味着消费者正在向生产者施加压力 反方向施压 RxJava 背压 当你有一个 observable 发射物品的速度太快 以至于消费者无法跟上流量
  • 如何创建无限间隔的 Observable 并在每个时间间隔发出新的对象?

    我正在尝试创建一个每秒都会发出新对象的 Observable 所以现在我只是压缩一个 Observable 它以 Observable 间隔从列表中发出有限数量的对象 val list1 mutableListOf
  • 如何在 RxJS 中通过 ID 去抖

    我的问题是下一个 我想取消我的应用程序的点赞功能 我使用操作在我的应用程序中进行更改 例如 dispatch likePost 1 gt dispatch type LIKE POST id 1 给出下一个例子 我在时间 0 发送一个操作
  • 为什么 Task.WhenAny 没有抛出预期的 TimeoutException?

    请注意以下简单代码 class Program static void Main var sw new Stopwatch sw Start try Task WhenAny RunAsync GetAwaiter GetResult ca
  • 如何使用 RxJava 处理分页?

    我正在考虑将我的 Android 应用程序转换为使用 Rxjava 进行网络请求 我目前访问的网络服务类似于 getUsersByKeyword String query int limit int offset 据我了解 Observab
  • Rx 中的热连接

    Observable Concat是一个连接可观察量的实现 但第二个IObservable

随机推荐