我有一个项目,有一个 Sql-Server 数据库后端和 Dapper 作为 ORM。我正在尝试使用 Dapper 的QueryAsync()
方法来获取一些数据。不仅如此,对我的存储库的调用来自几个使用 a 调用的任务内部Task.WhenAll
(也就是说,每个任务都涉及从该存储库获取数据,因此每个任务都等待我的存储库的方法来包装QueryAsync()
称呼)。
问题是我的 SqlConnections 永远不会关闭,即使我使用的是using
堵塞。结果,我的数据库有 100 多个打开的连接,并最终开始出现“达到最大池大小”异常。问题是,当我切换到Query()
代替QueryAsync()
,它工作正常,但我希望能够异步执行此操作。
这是一个代码示例。我试图尽可能地模仿实际应用程序的结构,这就是为什么它看起来比实际应用程序更复杂的原因。
界面:
public interface IFooRepository<T> where T: FooBase
{
Task<IEnumerable<T>> Select(string account, DateTime? effectiveDate = null);
}
执行:
public class FooRepository : RepositoryBase, IFooRepository<SpecialFoo>
{
private readonly IWebApiClientRepository _accountRepository;
public FooRepository(IWebApiClientRepository repo)
{
_accountRepository = repo;
}
public async Task<IEnumerable<FuturePosition>> Select(string code, DateTime? effectiveDate = null)
{
effectiveDate = effectiveDate ?? DateTime.Today.Date;
var referenceData = await _accountRepository.GetCrossRefferenceData(code, effectiveDate.Value);
using (var connection = new SqlConnection("iamaconnectionstring")
{
connection.Open();
try
{
var res = await connection.QueryAsync<FuturePosition>(SqlQueryVariable + "AND t.code = @code;",
new
{
effectiveDate = effectiveDate.Value,
code = referenceData.Code
});
foreach (var item in res)
{
item.PropFromReference = referenceData.PropFromReference;
}
return res;
}
catch (Exception e)
{
//log
throw;
}
finally
{
connection.Close();
}
}
}
}
所以现在调用代码有两层。我将从外部开始。我想这就是问题所在。下面有评论。
人口:
public class Populator : PopulatorBase
{
private IAccountRepository _acctRepository;
public override async Task<IEnumerable<PopulationResult>> ProcessAccount(DateTime? popDate = null)
{
//My attempt at throttling the async calls
//I was hoping this would force a max of 10 simultaneous connections.
//It did not work.
SemaphoreSlim ss = new SemaphoreSlim(10,10);
var accountsToProcess = _acctRepository.GetAllAccountsToProcess();
var accountNumbers = accountsToProcess.SelectMany(a => a.accountNumbers).ToList();
List<Task<ProcessResult>> trackedTasks = new List<Task<ProcessResult>>();
foreach (var item in accountNumbers)
{
await ss.WaitAsync();
trackedTasks.Add(ProcessAccount(item.AccountCode, popDate ?? DateTime.Today));
ss.Release();
}
//my gut tells me the issue is because of these tasks
var results = await Task.WhenAll(trackedTasks);
return results;
}
private async Task<ProcessResult>ProcessAccount(string accountCode, DateTime? popDate)
{
var createdItems = await _itemCreator.MakeExceptions(popDate, accountCode);
return Populate(accountCode, createdItems);
}
}
物品创建者:
public class ItemCreator : ItemCreatorBase
{
private readonly IFooRepository<FuturePosition> _fooRepository;
private readonly IBarRepository<FuturePosition> _barRepository;
public RussellGlobeOpFutureExceptionCreator() )
{
//standard constructor stuff
}
public async Task<ItemCreationResult> MakeItems(DateTime? effectiveDate, string account)
{
DateTime reconDate = effectiveDate ?? DateTime.Today.Date;
//this uses the repository I outlined above
var foos = await _fooRepository.Select(account, effectiveDate);
//this repository uses a rest client, I doubt it's the problem
var bars = await _barRepository.Select(account, effectiveDate);
//just trying to make this example less lengthy
var foobars = MakeFoobars(foos, bars);
var result = new ItemCreationResult { EffectiveDate = effectiveDate, Items = foobars };
return result;
}
}
据我尝试过:
- 使用 SemaphoreSlim 进行节流
- 无节流
- Using
connection.OpenAnync()
在回购协议中
- 包括/排除finally块(应该与
using
)
值得知道的是foreach
填充器中的循环运行大约 500 次。本质上,有一个包含 500 个帐户的列表。对于每一个,都需要进行长时间的运行populate
任务涉及从我的 Foo 存储库中提取数据。
老实说我不知道。我认为这可能与等待填充器中任务列表中每个任务的异步数据库调用有关。对这个问题的任何见解都会非常有帮助。