我在.NET core 3.1下创建了GRPC服务主机(使用Grpc.AspNetCore v2.30)https://github.com/grpc/grpc-dotnet https://github.com/grpc/grpc-dotnet)和 .NET Framework 4.6.2 下的 GRPC 客户端(使用来自https://github.com/grpc/grpc https://github.com/grpc/grpc)。这些框架对于主机来说是一个限制。
我正在从一个客户端运行多次调用来对服务进行压力测试 - 一次调用 Update,另一次调用 UpdateStream。对于两者,我在客户端都面临着一个奇怪的问题。它有时会生成一个错误 - 当我开始执行时或在执行过程中可能会立即发生错误,并且它永远不会恢复 - 我必须停止客户端主机并再次重新启动它才能使其工作。仅当使用不同的机器时才会发生这种情况 - 在本地主机调用上没有问题。
关于这个问题有什么想法/想法吗?
这是我在 Update/UpdateStream 客户端调用中遇到的异常:
Status(StatusCode="未知",Detail="处理程序引发异常。", DebugException="Grpc.Core.Internal.CoreErrorDetailException: {"created":"@1595
930477.263000000","描述":"从对等方 ipv4:[ip]:23456 收到错误","文件":"T:\src\github\grpc\workspace_csharp_ext_windows_x86\src\core
\lib\surface\call.cc","file_line":1055,"grpc_message":"处理程序抛出异常。","grpc_status":2}")
这是客户端/服务器代码:
Server:
class Program
{
const int _port = 23456;
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
Console.WriteLine("started - press any key to quit...");
Console.ReadKey();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.ConfigureKestrel(options =>
{
options.ConfigureEndpointDefaults(o =>
{
o.Protocols = HttpProtocols.Http2;
});
options.ListenAnyIP(_port);
});
webBuilder.UseStartup<Startup>();
});
}
public class ProxyService : StreamingApi.Protos.StreamingApi.StreamingApiBase
{
private long _handledRequests = 0;
private long _timeDiff = 0;
public override Task<UpdateResponse> Update(UpdateRequest request, ServerCallContext context)
{
Interlocked.Add(ref _timeDiff, (DateTime.Now - TimeSpan.FromTicks(Convert.ToInt64(request.Items["time"]))).Millisecond);
Interlocked.Increment(ref _handledRequests);
return Task.FromResult(new UpdateResponse());
}
public override async Task<UpdateResponse> UpdateStream(IAsyncStreamReader<UpdateRequest> requestStream, ServerCallContext serverCallContext)
{
try
{
while (await requestStream.MoveNext(serverCallContext.CancellationToken))
{
var updateReq = requestStream.Current;
Interlocked.Add(ref _timeDiff, (DateTime.Now - TimeSpan.FromTicks(Convert.ToInt64(updateReq.Items["time"]))).Millisecond);
Interlocked.Increment(ref _handledRequests);
}
}
catch(OperationCanceledException ex)
{
// log
}
return new UpdateResponse();
}
}
class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddGrpc();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapGrpcService<ProxyService>();
});
}
}
Client:
partial class Program
{
static void Main(string[] args)
{
Console.WriteLine();
Arguments arguments = new Arguments(args);
if (arguments.Initialized)
{
IProxyClient grpcClient = GetGrpcClient(arguments.Host, arguments.Port, ChannelCredentials.Insecure);
string limitationMsg = arguments.UseLimiter ?
$"limitation of max {arguments.MaxRequestsPerTimeUnit} requests per {arguments.TimeUnitSecs} seconds":
"no limitation";
Console.WriteLine($"\nExecuting {arguments.TotalRequests} requests with StreamMode={arguments.IsStreamMode} using {arguments.Threads} threads with {limitationMsg} ...");
var result = Run(grpcClient, arguments.Threads, arguments.TotalRequests, arguments.MaxRequestsPerTimeUnit, TimeSpan.FromSeconds(arguments.TimeUnitSecs), arguments.UseLimiter, arguments.IsStreamMode).Result;
Console.WriteLine($"Time Taken = {result.Item1}, Total Request Calls = {result.Item2}, Total Errors: {result.Item3}\n");
grpcClient.Disconnect().Wait();
Thread.Sleep(1000);
}
}
private static IProxyClient GetGrpcClient(string host, int port, ChannelCredentials channelCredentials)
{
var channel = new Channel(host, port, channelCredentials);
StreamingApi.Protos.StreamingApi.StreamingApiClient channelClient = new StreamingApi.Protos.StreamingApi.StreamingApiClient(channel);
return new ProxyClient(channel, channelClient);
}
private static async Task<(TimeSpan, int, int)> Run(IProxyClient grpcClient,
int threads,
int requests,
int maxRequestsPerTimeUnit,
TimeSpan timeUnit,
bool useLimiter,
bool isStreamMode)
{
int totalRequestCalls = 0;
int totalErrors = 0;
List<Task> tasks = new List<Task>();
int requestsPerThread = requests / threads;
TimeLimiter timeLimiter = useLimiter ? TimeLimiter.GetFromMaxCountByInterval(maxRequestsPerTimeUnit, timeUnit) : null;
UpdateRequest request = GetMeasuredRequest();
Stopwatch sw = new Stopwatch();
sw.Start();
for (int i = 0; i < threads; i++)
{
tasks.Add(Task.Run(async () =>
{
for (int requestIndex = 0; requestIndex < requestsPerThread; requestIndex++)
{
request.Items["time"] = DateTime.Now.Ticks.ToString();
if (useLimiter)
{
await timeLimiter;
}
try
{
if (isStreamMode)
{
await grpcClient.SendUpdateStream(request);
}
else
{
_ = await grpcClient.SendUpdate(request);
}
Interlocked.Increment(ref totalRequestCalls);
}
catch (Exception ex)
{
Interlocked.Increment(ref totalErrors);
Console.WriteLine(ex.Message);
Thread.Sleep(500);
}
}
}));
}
await Task.WhenAll(tasks);
sw.Stop();
return (sw.Elapsed, totalRequestCalls, totalErrors);
}
private static UpdateRequest GetMeasuredRequest()
{
UpdateRequest request = new UpdateRequest { ItemName = "pattern", SubcriptionId = "subscriptionId", IsSnapshot = false};
request.Items["key1"] = "value1";
request.Items["key2"] = "value2";
request.Items["key3"] = "value3";
request.Items["key4"] = "value4";
request.Items["key5"] = "value5";
return request;
}
}
public class ProxyClient : IProxyClient
{
private Channel _channel;
private StreamingApi.Protos.StreamingApi.StreamingApiClient _client;
private AsyncClientStreamingCall<UpdateRequest, UpdateResponse> _updateRequestStreamWriter;
public ProxyClient(Channel channel, StreamingApi.Protos.StreamingApi.StreamingApiClient client)
{
_client = client;
_updateRequestStreamWriter = client.UpdateStream();
}
public async Task Disconnect()
{
await _channel.ShutdownAsync();
}
public async Task<UpdateResponse> SendUpdate(UpdateRequest request)
{
return await _client.UpdateAsync(request);
}
public async Task SendUpdateStream(UpdateRequest request)
{
await _updateRequestStreamWriter.RequestStream.WriteAsync(request);
}
}