我需要创建 Observable,它将从不同的来源(其他 Observables)收集信息,每个来源都会对事件值产生影响,但该值仍然是基于先前的值(一种状态机)构建的。
我们有带有 int 值和操作代码的消息:
class Message{
Integer value;
String operation;
public Message(Integer value, String operation) {
this.value = value;
this.operation = operation;
}
}
以及此类值的一些来源,具有初始值:
Observable<Message> dynamicMessage = Observable.just(new Message(1, "+"));
现在有事件源了。这些源将发出值,新的dynamicMessage 值将基于该值以及dynamicMessage 的先前值而出现。实际上它的事件类型是:
class Changer1 {
String operation;
public Changer1(String operation) {
this.operation = operation;
}
}
class Changer2 {
Integer value;
public Changer2(Integer value) {
this.value = value;
}
}
变更者1负责变更操作。变更者对变更的价值负责。
以及这些值的来源:
static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
.delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
.delay(2, TimeUnit.SECONDS));
static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS));
Now I need change dynamicMessage Observable and take in respect messages from changers. here is a diagram:
我还尝试编写一个程序,我如何看到解决方案,但它因 OutOfMemoryException 挂起,就在第一个值之后:
消息{值=1,操作='+'}
清单:
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
public class RxDilemma {
static class Message{
Integer value;
String operation;
public Message(Integer value, String operation) {
this.value = value;
this.operation = operation;
}
@Override
public String toString() {
return "Message{" +
"value=" + value +
", operation='" + operation + '\'' +
'}';
}
}
static class Changer1 {
String operation;
public Changer1(String operation) {
this.operation = operation;
}
@Override
public String toString() {
return "Changer1{" +
"operation='" + operation + '\'' +
'}';
}
}
static class Changer2 {
Integer value;
public Changer2(Integer value) {
this.value = value;
}
@Override
public String toString() {
return "Changer2{" +
"value=" + value +
'}';
}
}
static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
.delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
.delay(2, TimeUnit.SECONDS));
static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS));
static Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")).mergeWith(changers1.flatMap(new Func1<Changer1, Observable<Message>>() {
@Override
public Observable<Message> call(final Changer1 changer) {
return dynamicMessage.last().map(new Func1<Message, Message>() {
@Override
public Message call(Message message) {
message.operation = changer.operation;
return message;
}
});
}
})).mergeWith(changers2.flatMap(new Func1<Changer2, Observable<Message>>() {
@Override
public Observable<Message> call(final Changer2 changer2) {
return dynamicMessage.last().map(new Func1<Message, Message>() {
@Override
public Message call(Message message) {
if("+".equalsIgnoreCase(message.operation)){
message.value = message.value+changer2.value;
} else if("-".equalsIgnoreCase(message.operation)){
message.value = message.value-changer2.value;
}
return message;
}
});
}
}));
public static void main(String[] args) throws InterruptedException {
dynamicMessage.subscribe(new Action1<Message>() {
@Override
public void call(Message message) {
System.out.println(message);
}
});
Thread.sleep(5000000);
}
}
所以我的预期输出是:
消息{值=1,操作='+'}
消息{值=1,操作='-'}
消息{值=-1,操作='-'}
消息{值=1,操作='+'}
消息{值=2,操作='+'}
我还考虑过用CombineLatest合并所有内容,但后来我发现CombineLatest没有提供更改了哪一个元素,如果没有这些信息,我将不知道需要对消息进行哪些更改。
有什么帮助吗?