我的代码处理与远程主机的 TCP 连接,ConcurrentQueue
存储传出消息。它旨在在单线程中运行。连接的生命周期包含在RunAsync
而一个单独的对象包含连接的“公共状态”:
class PublicState
{
internal readonly ConcurrentQueue<Message> OutgoingMessageQueue = new ConcurrentQueue<Message>();
internal TaskCompletionSource<Object> OutgoingMessageTcs = null;
internal readonly TaskCompletionSource<Object> ConnectedTcs = new TaskCompletionSource<Object>();
public void EnqueueMessages(IEnumerable<Message> messages)
{
foreach( Message m in messages ) this.OutgoingMessageQueue.Enqueue( m);
if( this.OutgoingMessageTcs == null ) this.OutgoingMessageTcs = new TaskCompletionSource<Object>();
this.OutgoingMessageTcs.SetResult( null );
}
}
static async Task RunAsync(IPEndPoint endPoint, PublicState state)
{
using( TcpClient tcp = new TcpClient() )
{
await tcp.ConnectAsync( endPoint.Address, endPoint.Port ).ConfigureAwait(false);
Byte[] reusableBuffer = new Byte[ 4096 ];
using( NetworkStream ns = tcp.GetStream() )
{
state.ConnectedTcs.SetResult( null );
Task<Int32> nsReadTask = null;
while( tcp.Connected )
{
if( !state.writeQueue.IsEmpty )
{
await WriteMessagesAsync( ... ).ConfigureAwait( false );
}
if( ns.DataAvailable )
{
await ReadMessagesAsync( ... ).ConfigureAwait( false );
}
// Wait for new data to arrive from remote host or for new messages to send:
if( state.OutgoingMessageTcs == null ) state.OutgoingMessageTcs = new TaskCompletionSource<Object>();
if( nsReadTask == null ) nsReadTask = ns.ReadAsync( reusableBuffer, 0, 0 ).ConfigureAwait( false );
Task c = await Task.WhenAny( state.OutgoingMessageTcs, nsReadTask ).ConfigureAwait( false );
if( c == state.OutgoingMessageTcs.Task ) state.OutgoingMessageTcs = null;
else if( c == nsReadTask ) nsReadTask = null;
}
}
}
}
像这样使用:
public async Task Main(String[] args)
{
PublicState state = new PublicState();
Task clientTask = Client.RunAsync( new IPEndPoint(args[0]), state );
await state.ConnectedTcs.Task; // awaits until TCP connection is established
state.EnqueueMessage( new Message("foo") );
state.EnqueueMessage( new Message("bar") );
state.EnqueueMessage( new Message("baz") );
await clientTask; // awaits until the TCP connection is closed
}
这段代码可以工作,但我不喜欢它:感觉就像我正在使用TaskCompletionSource
这意味着代表一个实际的任务或某种后台操作,而我实际上正在使用TaskCompletionSource
作为一种廉价的EventWaitHandle
。我没有使用EventWaitHandle
因为它是IDisposable
(我不想冒泄漏本地资源的风险)并且它缺乏WaitAsync
or WaitOneAsync
方法。我可以用SemaphoreSlim
(这是可等待的,但包含一个EventWaitHandle
)但我的代码并不能真正代表信号量的良好使用。
我使用的是TaskCompletionSource<T>
可以接受,或者是否有更好的方法来“取消等待”执行RunAsync
当一个项目被添加到OutgoingMessageQueue
?
我觉得这是“错误”的另一个原因是TaskCompletionSource<T>
只能使用一次,之后需要更换。我热衷于避免无关的分配。