我试图了解 Projectreactor 为应用程序代码提供的数据可见性方面的保证。例如我预计下面的代码会失败,但经过一百万次迭代后它不会失败。我正在更改线程 A 上典型 POJO 的状态,并从线程 B 读回它。Reactor 是否保证 POJO 更改在线程中可见?
public class Main {
public static void main(String[] args) {
Integer result = Flux.range(1, 1_000_000)
.map(i -> {
Data data = new Data();
data.setValue(i);
data.setValueThreeTimes(i);
data.setValueObj(i + i);
return data;
})
.parallel(250)
.runOn(Schedulers.newParallel("par", 500))
.map(d -> {
d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
return d;
})
.sequential()
.parallel(250)
.runOn(Schedulers.newParallel("par", 500))
.map(d -> {
d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
return d;
})
// .sequential()
.map(d -> {
if (d.getValue() * 3 != d.getValueThreeTimes()) throw new RuntimeException("data corrupt error");
return d;
})
.reduce(() -> 0, (Integer sum, Data d) -> sum + d.getValueObj() + d.getValue())
.sequential()
.blockLast();
}
static class Data {
private int value;
private int valueThreeTimes;
private Integer valueObj;
public int getValueThreeTimes() {
return valueThreeTimes;
}
public void setValueThreeTimes(int valueThreeTimes) {
this.valueThreeTimes = valueThreeTimes;
}
public int getValue() {
return value;
}
@Override
public String toString() {
return "Data{" +
"value=" + value +
", valueObj=" + valueObj +
'}';
}
public void setValue(int value) {
this.value = value;
}
public Integer getValueObj() {
return valueObj;
}
public void setValueObj(Integer valueObj) {
this.valueObj = valueObj;
}
}
private static <T> T identityWithThreadLogging(T el, String operation) {
System.out.println(operation + " -- " + el + " -- " +
Thread.currentThread().getName());
return el;
}
}
反应式流规范强制要求Flux
or Mono
(a Publisher
), onNext
事件必须是连续的。
parallel()
is a ParallelFlux
,它通过分治来放松一点:你会得到多个“轨道”,每个轨道都单独遵守规范,但总体而言不遵守规范(轨道之间的并行化)。
反过来,sequential()
返回到Flux
world 并引入内存屏障来保证结果序列符合 RS 规范。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)