NOTE
此功能的进一步更新将发布到GitHub 仓库 https://github.com/santisq/PSParallelPipeline以及PowerShell 画廊 https://www.powershellgallery.com/packages/PSParallelPipeline/。这个答案中的代码将不再维护.
可以找到该功能的文档以及使用示例here https://github.com/santisq/PSParallelPipeline/blob/main/docs/en-US/Invoke-Parallel.md。请注意,模块版本不再有-ThreadOptions
参数及实现-UseNewRunspace
and -TimeoutSeconds
参数,但其用法应该相同。
我们非常欢迎贡献,如果您想贡献,请分叉存储库并提交包含更改的拉取请求。
由于这是一个可能令人困惑并且经常给网站带来问题的主题,因此我决定创建这个函数来简化这项繁琐的任务并帮助那些陷入 Windows PowerShell 困境的人。目的是让它尽可能简单和友好,它也应该是一个可以复制粘贴到我们的程序中的功能$PROFILE https://learn.microsoft.com/en-us/powershell/module/microsoft.powershell.core/about/about_automatic_variables?view=powershell-7.2#profile可以在需要时重复使用,并且不需要安装模块(如问题中所述)。
这个功能受到了 RamblingCookieMonster 的很大启发Invoke-Parallel https://github.com/RamblingCookieMonster/Invoke-Parallel和 Boe Prox 的PoshRSJob https://github.com/proxb/PoshRSJob并且只是对那些进行了一些改进的简化版本。
定义
using namespace System.Collections
using namespace System.Collections.Generic
using namespace System.Management.Automation
using namespace System.Management.Automation.Language
using namespace System.Management.Automation.Runspaces
using namespace System.Threading
using namespace System.Text
# The function must run in the scope of a Module.
# `New-Module` must be used for portability. Otherwise store the
# function in a `.psm1` and import it via `Import-Module`.
New-Module PSParallelPipeline -ScriptBlock {
class CommandCompleter : IArgumentCompleter {
[IEnumerable[CompletionResult]] CompleteArgument(
[string] $commandName,
[string] $parameterName,
[string] $wordToComplete,
[CommandAst] $commandAst,
[IDictionary] $fakeBoundParameters) {
return [CompletionCompleters]::CompleteCommand(
$wordToComplete,
[NullString]::Value,
[CommandTypes]::Function)
}
}
function Invoke-Parallel {
[CmdletBinding(PositionalBinding = $false)]
[Alias('parallel')]
param(
[Parameter(Mandatory, ValueFromPipeline)]
[object] $InputObject,
[Parameter(Mandatory, Position = 0)]
[scriptblock] $ScriptBlock,
[Parameter()]
[ValidateRange(1, 63)]
[int] $ThrottleLimit = 5,
[Parameter()]
[hashtable] $Variables,
[Parameter()]
[ValidateNotNullOrEmpty()]
[ArgumentCompleter([CommandCompleter])]
[string[]] $Functions,
[Parameter()]
[ValidateSet('ReuseThread', 'UseNewThread')]
[PSThreadOptions] $ThreadOptions = [PSThreadOptions]::ReuseThread
)
begin {
try {
$iss = [initialsessionstate]::CreateDefault2()
foreach ($key in $Variables.PSBase.Keys) {
if ($Variables[$key] -is [scriptblock]) {
$PSCmdlet.ThrowTerminatingError([ErrorRecord]::new(
[PSArgumentException]::new('Passed-in script block variables are not supported.'),
'VariableCannotBeScriptBlock',
[ErrorCategory]::InvalidType,
$Variables[$key]))
}
$iss.Variables.Add([SessionStateVariableEntry]::new($key, $Variables[$key], ''))
}
foreach ($function in $Functions) {
$def = (Get-Command $function).Definition
$iss.Commands.Add([SessionStateFunctionEntry]::new($function, $def))
}
$usingParams = @{}
foreach ($usingstatement in $ScriptBlock.Ast.FindAll({ $args[0] -is [UsingExpressionAst] }, $true)) {
$varText = $usingstatement.Extent.Text
$varPath = $usingstatement.SubExpression.VariablePath.UserPath
# Thanks to mklement0 for catching up a bug here.
# https://github.com/mklement0
$key = [Convert]::ToBase64String([Encoding]::Unicode.GetBytes($varText.ToLowerInvariant()))
if (-not $usingParams.ContainsKey($key)) {
$value = $PSCmdlet.SessionState.PSVariable.GetValue($varPath)
if ($value -is [scriptblock]) {
$PSCmdlet.ThrowTerminatingError([ErrorRecord]::new(
[PSArgumentException]::new('Passed-in script block variables are not supported.'),
'VariableCannotBeScriptBlock',
[ErrorCategory]::InvalidType,
$value))
}
$usingParams.Add($key, $value)
}
}
$pool = [runspacefactory]::CreateRunspacePool(1, $ThrottleLimit, $iss, $Host)
$tasks = [List[hashtable]]::new()
$pool.ThreadOptions = $ThreadOptions
$pool.Open()
}
catch {
$PSCmdlet.ThrowTerminatingError($_)
}
}
process {
try {
# Thanks to Patrick Meinecke for his help here.
# https://github.com/SeeminglyScience/
$ps = [powershell]::Create().
AddScript({ $args[0].InvokeWithContext($null, [psvariable]::new('_', $args[1])) }).
AddArgument($ScriptBlock.Ast.GetScriptBlock()).
AddArgument($InputObject)
# This is how `Start-Job` does it's magic.
# Thanks to Jordan Borean for his help here.
# https://github.com/jborean93
if ($usingParams.Count) {
$null = $ps.AddParameters(@{ '--%' = $usingParams })
}
$ps.RunspacePool = $pool
$tasks.Add(@{
Instance = $ps
AsyncResult = $ps.BeginInvoke()
})
}
catch {
$PSCmdlet.WriteError($_)
}
}
end {
try {
while ($tasks.Count) {
$id = [WaitHandle]::WaitAny($tasks.AsyncResult.AsyncWaitHandle, 200)
if ($id -eq [WaitHandle]::WaitTimeout) {
continue
}
$task = $tasks[$id]
$task.Instance.EndInvoke($task.AsyncResult)
foreach ($err in $task.Instance.Streams.Error) {
$PSCmdlet.WriteError($err)
}
$tasks.RemoveAt($id)
}
}
catch {
$PSCmdlet.WriteError($_)
}
finally {
foreach ($task in $tasks.Instance) {
if ($task -is [IDisposable]) {
$task.Dispose()
}
}
if ($pool -is [IDisposable]) {
$pool.Dispose()
}
}
}
}
} -Function Invoke-Parallel | Import-Module -Force