Note:请参阅本文末尾的编辑,它修复了我原始答案中的错误。无论如何,我都会留下原来的答案,因为它对于很多情况仍然有用,而且我认为它有助于解决OP的问题,至少有一些限制。
你的方法与Iterator
朝着正确的方向前进。该解决方案可能起草如下:将流转换为迭代器,像您已经完成的那样包装迭代器,然后从包装迭代器创建一个流,但您应该使用Spliterator反而。这是代码:
private static <T> Stream<T> asNonThrowingStream(
Stream<T> stream,
Supplier<? extends T> valueOnException) {
// Get spliterator from original stream
Spliterator<T> spliterator = stream.spliterator();
// Return new stream from wrapper spliterator
return StreamSupport.stream(
// Extending AbstractSpliterator is enough for our purpose
new Spliterators.AbstractSpliterator<T>(
spliterator.estimateSize(),
spliterator.characteristics()) {
// We only need to implement tryAdvance
@Override
public boolean tryAdvance(Consumer<? super T> action) {
try {
return spliterator.tryAdvance(action);
} catch (RuntimeException e) {
action.accept(valueOnException.get());
return true;
}
}
}, stream.isParallel());
}
我们正在延长AbstractSpliterator包装原始流返回的 spliterator。我们只需要实现tryAdvance方法,该方法要么委托给原始 spliterator 的tryAdvance
方法或捕获RuntimeException
并使用提供的调用该操作valueOnException
value.
Spliterator
的合约指定了返回值tryAdvance
必须是true
如果该动作被消耗,那么如果RuntimeException
被捕获,这意味着原始分裂者已将其从自己的内部抛出tryAdvance
方法。因此,我们返回true
在这种情况下,意味着该元素无论如何都会被消耗。
通过将这些值作为参数传递给构造函数来保留原始 spliterator 的估计大小和特征AbstractSpliterator
.
最后,我们通过新的 spliterator 创建一个新流StreamSupport.stream
方法。如果原始流也是并行的,则新流也是并行的。
下面介绍一下上述方法的使用方法:
public Stream<String> convertToString(Stream<Integer> input) {
return asNonThrowingStream(input.map(String::valueOf), () -> "NaN");
}
Edit
As per 霍尔格的评论 below, 用户 holi-java善意地提供了一个解决方案,避免了 Holger 指出的陷阱。
这是代码:
<T> Stream<T> exceptionally(Stream<T> source, BiConsumer<Exception, Consumer<? super T>> handler) {
class ExceptionallySpliterator extends AbstractSpliterator<T>
implements Consumer<T> {
private Spliterator<T> source;
private T value;
private long fence;
ExceptionallySpliterator(Spliterator<T> source) {
super(source.estimateSize(), source.characteristics());
this.fence = source.getExactSizeIfKnown();
this.source = source;
}
@Override
public Spliterator<T> trySplit() {
Spliterator<T> it = source.trySplit();
return it == null ? null : new ExceptionallySpliterator(it);
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
return fence != 0 && consuming(action);
}
private boolean consuming(Consumer<? super T> action) {
Boolean state = tryConsuming(action);
if (state == null) {
return true;
}
if (state) {
action.accept(value);
value = null;
return true;
}
return false;
}
private Boolean tryConsuming(Consumer<? super T> action) {
fence--;
try {
return source.tryAdvance(this);
} catch (Exception ex) {
handler.accept(ex, action);
return null;
}
}
@Override
public void accept(T value) {
this.value = value;
}
}
return stream(new ExceptionallySpliterator(source.spliterator()), source.isParallel()).onClose(source::close);
}
请参考进行测试如果您想进一步了解此解决方案。