对于问题的另一半有什么想法吗:在处理文件后将文件从源 blob 移动到存档 blob?
据我所知,没有任何内置任务可以帮助您实现此目的。根据我的测试,我认为您可以利用脚本任务 https://learn.microsoft.com/en-us/sql/integration-services/extending-packages-scripting/task/configuring-the-script-task-in-the-script-task-editor并编写代码(VB 或 C#)来直接处理 blob。下面是我的详细步骤,大家可以参考一下:
1) Use Azure Blob 源 https://learn.microsoft.com/en-us/sql/integration-services/data-flow/azure-blob-source and OLE DB 目标 https://learn.microsoft.com/en-us/sql/integration-services/lesson-1-7-adding-and-configuring-the-ole-db-destination under 数据流用于将 CSV 文件从 Azure Blob 加载到 Azure SQL 数据库中。
2) 成功将 CSV 数据加载到 SQL 表中后,使用脚本任务将源 blob 移动到存档 blob。
我将调用 Blob 服务 REST API复制斑点 https://learn.microsoft.com/en-us/rest/api/storageservices/fileservices/copy-blob and 删除斑点 https://learn.microsoft.com/en-us/rest/api/storageservices/fileservices/delete-blob with 容器 SAS 令牌 https://learn.microsoft.com/en-us/azure/storage/storage-dotnet-shared-access-signature-part-2,你可以利用微软Azure存储资源管理器 http://storageexplorer.com/并关注这位官方tutorial https://learn.microsoft.com/en-us/azure/vs-azure-tools-storage-explorer-blobs#get-the-sas-for-a-blob-container为 Blob 容器生成 SAS 令牌。
假设源 blob 和目标 blob 位于同一容器下,那么我添加三个变量(SourceBlobUrl
,ContainerSasToken
,ArchiveBlobUrl
) 如下并将它们添加为 ReadOnlyVariables脚本任务编辑器,你可以参考这个tutorial https://learn.microsoft.com/en-us/sql/integration-services/extending-packages-scripting/task/using-variables-in-the-script-task用于在脚本任务中使用变量。
Click 编辑脚本脚本任务编辑器下的按钮启动 VSTA 开发环境,您可以在其中编写自定义脚本。这是下面的Main方法ScriptMain.cs
如下:
public async void Main()
{
// TODO: Add your code here
string sasToken = Dts.Variables["ContainerSasToken"].Value.ToString();
string sourceBlobUrl = Dts.Variables["SourceBlobUrl"].Value.ToString();
string archiveBlobUrl = Dts.Variables["ArchiveBlobUrl"].Value.ToString();
try
{
HttpClient client = new HttpClient();
client.DefaultRequestHeaders.Add("x-ms-copy-source", sourceBlobUrl + sasToken);
//copy source blob to archive blob
Dts.Log($"start copying blob from [{sourceBlobUrl}] to [{archiveBlobUrl}]...", 0, new byte[0]);
HttpResponseMessage response = await client.PutAsync(archiveBlobUrl + sasToken, null);
if (response.StatusCode == HttpStatusCode.Accepted || response.StatusCode == HttpStatusCode.Created)
{
client.DefaultRequestHeaders.Clear();
Dts.Log($"start deleting blob [{sourceBlobUrl}]...", 0, new byte[0]);
//delete source blob
HttpResponseMessage result = await client.DeleteAsync(sourceBlobUrl + sasToken);
if (result.StatusCode == HttpStatusCode.Accepted || result.StatusCode == HttpStatusCode.Created)
{
Dts.TaskResult = (int)ScriptResults.Success;
return;
}
}
Dts.TaskResult = (int)ScriptResults.Failure;
}
catch (Exception ex)
{
Dts.Events.FireError(-1, "Script Task - Move source blob to an archive blob", ex.Message + "\r" + ex.StackTrace, String.Empty, 0);
Dts.TaskResult = (int)ScriptResults.Failure;
}
}
Result
此外,您还可以利用适用于 .NET 的 Microsoft Azure 存储客户端库 https://learn.microsoft.com/en-us/azure/storage/storage-dotnet-how-to-use-blobs要访问存储blob,此时需要在GAC之外的SSIS脚本任务中加载程序集,更多详细信息可以参考官方的这篇文章blog https://blogs.msdn.microsoft.com/dbrowne/2014/06/25/how-to-load-an-assembly-in-a-ssis-script-task-that-isnt-in-the-gac/.