最终,我想出了四种不同的方法来解决这个问题。我生成了 500 个随机值插入 MyTable,并对四种方法中的每一种进行了计时(包括启动和回滚运行该事务的事务)。在我的测试中,数据库位于本地主机上。然而,具有最佳性能的解决方案也只需要与数据库服务器进行一次往返,因此当部署到与数据库不同的服务器时,我发现的最佳解决方案仍然应该击败替代方案。
请注意变量connection
and transaction
在以下代码中使用,并假定为有效的 Npgsql 数据对象。另请注意,符号Nx 较慢表示操作花费的时间等于最优解乘以N.
方法 #1(1,494ms = 慢 18.7 倍):将数组展开为单独的参数
public List<MyTable> InsertEntries(double[] entries)
{
// Create a variable used to dynamically build the query
var query = new StringBuilder(
"INSERT INTO \"MyTable\" (\"Value\") VALUES ");
// Create the dictionary used to store the query parameters
var queryParams = new DynamicParameters();
// Get the result set without auto-assigned ids
var result = entries.Select(e => new MyTable { Value = e }).ToList();
// Add a unique parameter for each id
var paramIdx = 0;
foreach (var entry in result)
{
var paramName = string.Format("value{1:D6}", paramIdx);
if (0 < paramIdx++) query.Append(',');
query.AppendFormat("(:{0})", paramName);
queryParams.Add(paramName, entry.Value);
}
query.Append(" RETURNING \"ID\"");
// Execute the query, and store the ids
var ids = connection.Query<int>(query, queryParams, transaction);
ids.ForEach((id, i) => result[i].ID = id);
// Return the result
return result;
}
我真的不确定为什么这会是最慢的,因为它只需要到数据库的一次往返,但事实确实如此。
方法 #2(267ms = 慢 3.3 倍):标准循环迭代
public List<MyTable> InsertEntries(double[] entries)
{
const string query =
"INSERT INTO \"MyTable\" (\"Value\") VALUES (:val) RETURNING \"ID\"";
// Get the result set without auto-assigned ids
var result = entries.Select(e => new MyTable { Value = e }).ToList();
// Add each entry to the database
foreach (var entry in result)
{
var queryParams = new DynamicParameters();
queryParams.Add("val", entry.Value);
entry.ID = connection.Query<int>(
query, queryParams, transaction);
}
// Return the result
return result;
}
令我震惊的是,这仅比最佳解决方案慢 3.3 倍,但我预计在实际环境中情况会变得更糟,因为该解决方案需要串行发送 500 条消息到服务器。然而,这也是最简单的解决方案。
方法 #3(223ms = 慢 2.8 倍):异步循环迭代
public List<MyTable> InsertEntries(double[] entries)
{
const string query =
"INSERT INTO \"MyTable\" (\"Value\") VALUES (:val) RETURNING \"ID\"";
// Get the result set without auto-assigned ids
var result = entries.Select(e => new MyTable { Value = e }).ToList();
// Add each entry to the database asynchronously
var taskList = new List<Task<IEnumerable<int>>>();
foreach (var entry in result)
{
var queryParams = new DynamicParameters();
queryParams.Add("val", entry.Value);
taskList.Add(connection.QueryAsync<int>(
query, queryParams, transaction));
}
// Now that all queries have been sent, start reading the results
for (var i = 0; i < result.Count; ++i)
{
result[i].ID = taskList[i].Result.First();
}
// Return the result
return result;
}
这正在变得更好,但仍然不是最佳的,因为我们只能将与线程池中可用线程一样多的插入排队。然而,这几乎与非线程方法一样简单,因此它是速度和可读性之间的良好折衷。
方法 #4(134ms = 慢 1.7 倍):批量插入
这种方法需要在运行下面的代码段之前定义以下 Postgres SQL:
CREATE TYPE "MyTableType" AS (
"Value" DOUBLE PRECISION
);
CREATE FUNCTION "InsertIntoMyTable"(entries "MyTableType"[])
RETURNS SETOF INT AS $$
DECLARE
insertCmd TEXT := 'INSERT INTO "MyTable" ("Value") '
'VALUES ($1) RETURNING "ID"';
entry "MyTableType";
BEGIN
FOREACH entry IN ARRAY entries LOOP
RETURN QUERY EXECUTE insertCmd USING entry."Value";
END LOOP;
END;
$$ LANGUAGE PLPGSQL;
以及相关代码:
public List<MyTable> InsertEntries(double[] entries)
{
const string query =
"SELECT * FROM \"InsertIntoMyTable\"(:entries::\"MyTableType\")";
// Get the result set without auto-assigned ids
var result = entries.Select(e => new MyTable { Value = e }).ToList();
// Convert each entry into a Postgres string
var entryStrings = result.Select(
e => string.Format("({0:E16})", e.Value).ToArray();
// Create a parameter for the array of MyTable entries
var queryParam = new {entries = entryStrings};
// Perform the insert
var ids = connection.Query<int>(query, queryParam, transaction);
// Assign each id to the result
ids.ForEach((id, i) => result[i].ID = id);
// Return the result
return result;
}
我对这种方法有两个问题。首先,我必须对 MyTableType 成员的排序进行硬编码。如果该顺序发生变化,我必须修改此代码以匹配。第二个是我必须在将所有输入值发送到 postgres 之前将它们转换为字符串(在实际代码中,我有多个列,所以我不能只更改数据库函数的签名以采用双精度) precision[],除非我传入 N 个数组,其中 N 是 MyTableType 上的字段数)。
尽管存在这些缺陷,但这已经越来越接近理想状态,并且只需要与数据库进行一次往返。
-- 开始编辑 --
自最初的帖子以来,我提出了四种其他方法,它们都比上面列出的方法更快。我已经修改了Nx 较慢下面的数字反映了新的最快方法。
方法 #5(105ms = 慢 1.3 倍):与#4相同,没有动态查询
这种方法与方法#4对“InsertIntoMyTable”函数进行以下更改:
CREATE FUNCTION "InsertIntoMyTable"(entries "MyTableType"[])
RETURNS SETOF INT AS $$
DECLARE
entry "MyTableType";
BEGIN
FOREACH entry IN ARRAY entries LOOP
RETURN QUERY INSERT INTO "MyTable" ("Value")
VALUES (entry."Value") RETURNING "ID";
END LOOP;
END;
$$ LANGUAGE PLPGSQL;
除了以下问题之外方法#4,这样做的缺点是,在生产环境中,“MyTable”是分区的。使用这种方法,我需要每个目标分区一种方法。
方法 #6(89ms = 慢 1.1 倍):带数组参数的插入语句
public List<MyTable> InsertEntries(double[] entries)
{
const string query =
"INSERT INTO \"MyTable\" (\"Value\") SELECT a.* FROM " +
"UNNEST(:entries::\"MyTableType\") a RETURNING \"ID\"";
// Get the result set without auto-assigned ids
var result = entries.Select(e => new MyTable { Value = e }).ToList();
// Convert each entry into a Postgres string
var entryStrings = result.Select(
e => string.Format("({0:E16})", e.Value).ToArray();
// Create a parameter for the array of MyTable entries
var queryParam = new {entries = entryStrings};
// Perform the insert
var ids = connection.Query<int>(query, queryParam, transaction);
// Assign each id to the result
ids.ForEach((id, i) => result[i].ID = id);
// Return the result
return result;
}
唯一的缺点与第一个问题相同方法#4。也就是说,它将实现与排序结合起来"MyTableType"
。尽管如此,我发现这是我第二喜欢的方法,因为它非常快,并且不需要任何数据库函数即可正常工作。
方法#7(80ms = 非常慢):与 #1 相同,但不带参数
public List<MyTable> InsertEntries(double[] entries)
{
// Create a variable used to dynamically build the query
var query = new StringBuilder(
"INSERT INTO \"MyTable\" (\"Value\") VALUES");
// Get the result set without auto-assigned ids
var result = entries.Select(e => new MyTable { Value = e }).ToList();
// Add each row directly into the insert statement
for (var i = 0; i < result.Count; ++i)
{
entry = result[i];
query.Append(i == 0 ? ' ' : ',');
query.AppendFormat("({0:E16})", entry.Value);
}
query.Append(" RETURNING \"ID\"");
// Execute the query, and store the ids
var ids = connection.Query<int>(query, null, transaction);
ids.ForEach((id, i) => result[i].ID = id);
// Return the result
return result;
}
这是我最喜欢的方法。它仅比最快的慢一点(即使有 4000 条记录,它的运行时间仍然低于 1 秒),但不需要特殊的数据库函数或类型。我唯一不喜欢的是,我必须对双精度值进行字符串化,然后才能由 Postgres 再次解析。最好以二进制形式发送这些值,这样它们会占用 8 个字节,而不是我为它们分配的 20 个左右字节。
方法 #8(80 毫秒):与 #5 相同,但采用纯 sql
这种方法与方法#5对“InsertIntoMyTable”函数进行以下更改:
CREATE FUNCTION "InsertIntoMyTable"(
entries "MyTableType"[]) RETURNS SETOF INT AS $$
INSERT INTO "MyTable" ("Value")
SELECT a.* FROM UNNEST(entries) a RETURNING "ID";
$$ LANGUAGE SQL;
这种方法与#5 一样,需要每个函数一个函数“我的桌子”分割。这是最快的,因为可以为每个函数生成一次查询计划,然后重复使用。在其他方法中,必须解析查询,然后计划,然后执行。尽管这是最快的,但由于数据库方面的额外要求,我没有选择它方法#7,速度优势非常小。