如果与 async/await 一起使用(使用 Dapper 从 SQL Server 流式传输数据),返回 IEnumerable 会发生什么情况?

2024-01-12

我正在使用 Dapper 从 SQL Server 中的一个非常大的集合中传输数据。返回效果很好IEnumerable并打电话Query(),但是当我切换到QueryAsync(),该程序似乎尝试从 SQL Server 读取所有数据而不是流式传输。

根据这个question https://stackoverflow.com/questions/13026558/explanation-of-dapper-buffer-cache,它应该可以很好地工作buffered: false,我正在做,但这个问题没有提到async/await.

现在根据这个question https://stackoverflow.com/questions/24966019/async-with-huge-data-streams,做我想做的事并不简单QueryAsync().

我是否正确理解当上下文切换时可枚举数会被迭代async/await?

另一个问题是,当新的 C#8 异步流可用时,这是否可以做到?


2020 年 3 月更新

.NET Core 3.0(和3.1)现已发布,完全支持异步流。这Microsoft.Bcl.AsyncInterfaces https://www.nuget.org/packages/Microsoft.Bcl.AsyncInterfaces/向 .NET Standard 2.0 和 .NET Framework 4.6.1+ 添加了对它们的支持,尽管出于理智原因应使用 4.7.2。正如文档上.NET 标准实现支持说明 https://learn.microsoft.com/en-us/dotnet/standard/net-standard#net-implementation-support

虽然 NuGet 认为 .NET Framework 4.6.1 支持 .NET Standard 1.5 到 2.0,但使用为 .NET Framework 4.6.1 项目中的这些版本构建的 .NET Standard 库存在一些问题。

对于需要使用此类库的 .NET Framework 项目,我们建议您将项目升级到目标 .NET Framework 4.7.2 或更高版本。

原答案

If you 检查源代码 https://github.com/StackExchange/Dapper/blob/master/Dapper/SqlMapper.Async.cs#L433,你会发现你的怀疑几乎是正确的。什么时候buffered是假的,QueryAsync将流式传输同步地.

if (command.Buffered)
{
    var buffer = new List<T>();
    var convertToType = Nullable.GetUnderlyingType(effectiveType) ?? effectiveType;
    while (await reader.ReadAsync(cancel).ConfigureAwait(false))
    {
        object val = func(reader);
        if (val == null || val is T)
        {
            buffer.Add((T)val);
        }
        else
        {
            buffer.Add((T)Convert.ChangeType(val, convertToType, CultureInfo.InvariantCulture));
        }
    }
    while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
    command.OnCompleted();
    return buffer;
}
else
{
    // can't use ReadAsync / cancellation; but this will have to do
    wasClosed = false; // don't close if handing back an open reader; rely on the command-behavior
    var deferred = ExecuteReaderSync<T>(reader, func, command.Parameters);
    reader = null; // to prevent it being disposed before the caller gets to see it
    return deferred;
}

正如评论所解释的,不可能使用ReadAsync当返回类型预计为 IEnumerable 时。这就是为什么必须引入 C# 8 的异步枚举的原因。

ExecuteReaderSync 的代码是:

private static IEnumerable<T> ExecuteReaderSync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
    using (reader)
    {
        while (reader.Read())
        {
            yield return (T)func(reader);
        }
        while (reader.NextResult()) { /* ignore subsequent result sets */ }
        (parameters as IParameterCallbacks)?.OnCompleted();
    }
}

It uses Read代替ReadAsync.

C#8 异步流将允许重写它以返回IAsyncEnumerable。仅仅更改语言版本并不能解决问题。

鉴于异步流的当前文档,这可能如下所示:

private static async IAsyncEnumerable<T> ExecuteReaderASync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
    using (reader)
    {
        while (await reader.ReadAsync())
        {
            yield return (T)func(reader);
        }

        while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
         command.OnCompleted();
        (parameters as IParameterCallbacks)?.OnCompleted();
    }
}

Buuuuuut异步流是只能在 .NET Core 上运行的功能之一,并且可能尚未实现。当我尝试在 Sharplab.io 上写一个时,Kaboom。[connection lost, reconnecting…]

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如果与 async/await 一起使用(使用 Dapper 从 SQL Server 流式传输数据),返回 IEnumerable 会发生什么情况? 的相关文章

随机推荐