我正在尝试提出一个 Rx Builder,以在 F# 计算表达式语法中使用反应式扩展。我该如何修复它,以免堆栈崩溃?就像下面的 Seq 例子一样。
是否有计划提供 RxBuilder 的实现作为响应式扩展的一部分或作为 .NET Framework 未来版本的一部分?
open System
open System.Linq
open System.Reactive.Linq
type rxBuilder() =
member this.Delay f = Observable.Defer f
member this.Combine (xs: IObservable<_>, ys : IObservable<_>) =
Observable.merge xs ys
member this.Yield x = Observable.Return x
member this.YieldFrom (xs:IObservable<_>) = xs
let rx = rxBuilder()
let rec f x = seq { yield x
yield! f (x + 1) }
let rec g x = rx { yield x
yield! g (x + 1) }
//do f 5 |> Seq.iter (printfn "%A")
do g 5 |> Observable.subscribe (printfn "%A") |> ignore
do System.Console.ReadLine() |> ignore
一个简短的答案是,Rx Framework 不支持使用这样的递归模式生成可观察量,因此它不容易完成。这Combine
用于 F# 序列的操作需要一些可观察量不提供的特殊处理。 Rx 框架可能期望您使用以下方式生成可观察值Observable.Generate
然后使用 LINQ 查询/F# 计算生成器来处理它们。
不管怎样,这里有一些想法——
首先,你需要更换Observable.merge
with Observable.Concat
。第一个并行运行两个可观察值,而第二个首先从第一个可观察值生成所有值,然后从第二个可观察值生成值。进行此更改后,该代码片段在堆栈溢出之前至少会打印约 800 个数字。
堆栈溢出的原因是Concat
创建一个可观察的对象,调用Concat
创建另一个调用的可观察对象Concat
解决这个问题的一种方法是添加一些同步。如果您使用的是 Windows 窗体,那么您可以修改Delay
这样它就可以在 GUI 线程上调度可观察对象(这会丢弃当前堆栈)。这是一个草图:
type RxBuilder() =
member this.Delay f =
let sync = System.Threading.SynchronizationContext.Current
let res = Observable.Defer f
{ new IObservable<_> with
member x.Subscribe(a) =
sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
// Note: This is wrong, but we cannot easily get the IDisposable here
null }
member this.Combine (xs, ys) = Observable.Concat(xs, ys)
member this.Yield x = Observable.Return x
member this.YieldFrom (xs:IObservable<_>) = xs
要正确实现这一点,您必须编写自己的Concat
方法,相当复杂。这个想法是:
- Concat 返回一些特殊类型,例如
IConcatenatedObservable
- 当递归调用该方法时,您将创建一个链
IConcatenatedObservable
互相引用
- The
Concat
方法将寻找这个链,当有例如三个对象,它会丢弃中间的一个(始终保持链的长度最多为 2)。
对于 StackOverflow 的答案来说,这有点太复杂了,但对于 Rx 团队来说,这可能是一个有用的反馈。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)