我正在尝试使用以下方法将一些 TPL 异步集成到更大的 Rx 链中Observable.FromAsync
,就像这个小例子一样:
using System;
using System.Reactive.Linq;
using System.Threading.Tasks;
namespace rxtest
{
class Program
{
static void Main(string[] args)
{
MainAsync().Wait();
}
static async Task MainAsync()
{
await Observable.Generate(new Random(), x => true,
x => x, x => x.Next(250, 500))
.SelectMany((x, idx) => Observable.FromAsync(async ct =>
{
Console.WriteLine("start: " + idx.ToString());
await Task.Delay(x, ct);
Console.WriteLine("finish: " + idx.ToString());
return idx;
}))
.Take(10)
.LastOrDefaultAsync();
}
}
}
但是,我注意到这似乎会同时启动所有异步任务,而不是一次执行一个任务,这会导致应用程序的内存使用量激增。这SelectMany
似乎表现得与Merge
.
在这里,我看到这样的输出:
start: 0
start: 1
start: 2
...
我倒要看看:
start: 0
finish: 0
start: 1
finish: 1
start: 2
finish: 2
...
我怎样才能实现这个目标?
改变SelectMany
to a Select
with a Concat
:
static async Task MainAsync()
{
await Observable.Generate(new Random(), x => true,
x => x, x => x.Next(250, 500))
.Take(10)
.Select((x, idx) => Observable.FromAsync(async ct =>
{
Console.WriteLine("start: " + idx.ToString());
await Task.Delay(x, ct);
Console.WriteLine("finish: " + idx.ToString());
return idx;
}))
.Concat()
.LastOrDefaultAsync();
}
编辑-我将 Take(10) 移到了链上,因为生成不会阻塞-所以它阻止了这种逃跑。
The Select
将每个事件投影到表示将在订阅上启动的异步任务的流中。Concat
接受一个流流,并在前一个流完成时订阅每个连续的子流,将所有流连接成一个平面流。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)