使用异步 Socket.BeginReceive 时如何检测超时?

2024-01-13

在 F# 中使用原始套接字编写异步 Ping,以使用尽可能少的线程启用并行请求。不使用“System.Net.NetworkInformation.Ping”,因为它似乎为每个请求分配一个线程。我也对使用 F# 异步工作流程感兴趣。

当目标主机不存在/响应时,下面的同步版本正确地超时,但异步版本挂起。当主机响应时,两者都会起作用。不确定这是 .NET 问题还是 F# 问题...

有任何想法吗?

(注意:该进程必须以管理员身份运行才能允许原始套接字访问)

这会引发超时:

let result = Ping.Ping ( IPAddress.Parse( "192.168.33.22" ), 1000 )

但是,这挂起:

let result = Ping.AsyncPing ( IPAddress.Parse( "192.168.33.22" ), 1000 )
             |> Async.RunSynchronously

这是代码...

module Ping

open System
open System.Net
open System.Net.Sockets
open System.Threading

//---- ICMP Packet Classes

type IcmpMessage (t : byte) =
    let mutable m_type = t
    let mutable m_code = 0uy
    let mutable m_checksum = 0us

    member this.Type
        with get() = m_type

    member this.Code
        with get() = m_code

    member this.Checksum = m_checksum

    abstract Bytes : byte array

    default this.Bytes
        with get() =
            [|
                m_type
                m_code
                byte(m_checksum)
                byte(m_checksum >>> 8)
            |]

    member this.GetChecksum() =
        let mutable sum = 0ul
        let bytes = this.Bytes
        let mutable i = 0

        // Sum up uint16s
        while i < bytes.Length - 1 do
            sum <- sum + uint32(BitConverter.ToUInt16( bytes, i ))
            i <- i + 2

        // Add in last byte, if an odd size buffer
        if i <> bytes.Length then
            sum <- sum + uint32(bytes.[i])

        // Shuffle the bits
        sum <- (sum >>> 16) + (sum &&& 0xFFFFul)
        sum <- sum + (sum >>> 16)
        sum <- ~~~sum
        uint16(sum)

    member this.UpdateChecksum() =
        m_checksum <- this.GetChecksum()


type InformationMessage (t : byte) =
    inherit IcmpMessage(t)

    let mutable m_identifier = 0us
    let mutable m_sequenceNumber = 0us

    member this.Identifier = m_identifier
    member this.SequenceNumber = m_sequenceNumber

    override this.Bytes
        with get() =
            Array.append (base.Bytes)
                         [|
                            byte(m_identifier)
                            byte(m_identifier >>> 8)
                            byte(m_sequenceNumber)
                            byte(m_sequenceNumber >>> 8)
                         |]

type EchoMessage() =
    inherit InformationMessage( 8uy )
    let mutable m_data = Array.create 32 32uy
    do base.UpdateChecksum()

    member this.Data
        with get()  = m_data
        and  set(d) = m_data <- d
                      this.UpdateChecksum()

    override this.Bytes
        with get() =
            Array.append (base.Bytes)
                         (this.Data)

//---- Synchronous Ping

let Ping (host : IPAddress, timeout : int ) =
    let mutable ep = new IPEndPoint( host, 0 )
    let socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
    socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout )
    socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout )
    let packet = EchoMessage()
    let mutable buffer = packet.Bytes

    try
        if socket.SendTo( buffer, ep ) <= 0 then
            raise (SocketException())
        buffer <- Array.create (buffer.Length + 20) 0uy

        let mutable epr = ep :> EndPoint
        if socket.ReceiveFrom( buffer, &epr ) <= 0 then
            raise (SocketException())
    finally
        socket.Close()

    buffer

//---- Entensions to the F# Async class to allow up to 5 paramters (not just 3)

type Async with
    static member FromBeginEnd(arg1,arg2,arg3,arg4,beginAction,endAction,?cancelAction): Async<'T> =
        Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,iar,state)), endAction, ?cancelAction=cancelAction)
    static member FromBeginEnd(arg1,arg2,arg3,arg4,arg5,beginAction,endAction,?cancelAction): Async<'T> =
        Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,arg5,iar,state)), endAction, ?cancelAction=cancelAction)

//---- Extensions to the Socket class to provide async SendTo and ReceiveFrom

type System.Net.Sockets.Socket with

    member this.AsyncSendTo( buffer, offset, size, socketFlags, remoteEP ) =
        Async.FromBeginEnd( buffer, offset, size, socketFlags, remoteEP,
                            this.BeginSendTo,
                            this.EndSendTo )
    member this.AsyncReceiveFrom( buffer, offset, size, socketFlags, remoteEP ) =
        Async.FromBeginEnd( buffer, offset, size, socketFlags, remoteEP,
                            this.BeginReceiveFrom,
                            (fun asyncResult -> this.EndReceiveFrom(asyncResult, remoteEP) ) )

//---- Asynchronous Ping

let AsyncPing (host : IPAddress, timeout : int ) =  
    async {
        let ep = IPEndPoint( host, 0 )
        use socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
        socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout )
        socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout )

        let packet = EchoMessage()
        let outbuffer = packet.Bytes

        try
            let! result = socket.AsyncSendTo( outbuffer, 0, outbuffer.Length, SocketFlags.None, ep )
            if result <= 0 then
                raise (SocketException())

            let epr = ref (ep :> EndPoint)
            let inbuffer = Array.create (outbuffer.Length + 256) 0uy 
            let! result = socket.AsyncReceiveFrom( inbuffer, 0, inbuffer.Length, SocketFlags.None, epr )
            if result <= 0 then
                raise (SocketException())
            return inbuffer
        finally
            socket.Close()
    }

詹姆斯,你自己接受的答案有一个我想指出的问题。您只分配一个计时器,这使得 AsyncReceiveEx 返回的异步对象成为有状态的一次性对象。这是我删减的一个类似的例子:

let b,e,c = Async.AsBeginEnd(Async.Sleep)

type Example() =
    member this.Close() = ()
    member this.AsyncReceiveEx( sleepTime, (timeoutMS:int) ) = 
        let timedOut = ref false 
        let completed = ref false 
        let timer = new System.Timers.Timer(double(timeoutMS), AutoReset=false)
        timer.Elapsed.Add( fun _ -> 
            lock timedOut (fun () -> 
                timedOut := true 
                if not !completed 
                then this.Close() 
                ) 
            ) 
        let complete() = 
            lock timedOut (fun () -> 
                timer.Stop() 
                timer.Dispose() 
                completed := true 
                ) 
        Async.FromBeginEnd( sleepTime,
                            (fun st -> 
                                let result = b(st)
                                timer.Start() 
                                result 
                            ), 
                            (fun result -> 
                                complete() 
                                if !timedOut 
                                then printfn "err"; () 
                                else e(result)
                            ), 
                            (fun () -> 
                                complete() 
                                this.Close() 
                                ) 
                            ) 

let ex = new Example()
let a = ex.AsyncReceiveEx(3000, 1000)
Async.RunSynchronously a
printfn "ok..."
// below throws ODE, because only allocated one Timer
Async.RunSynchronously a

理想情况下,您希望 AsyncReceiveEx 返回的异步的每次“运行”都表现相同,这意味着每次运行都需要自己的计时器和一组引用标志。这很容易解决,如下所示:

let b,e,c = Async.AsBeginEnd(Async.Sleep)

type Example() =
    member this.Close() = ()
    member this.AsyncReceiveEx( sleepTime, (timeoutMS:int) ) = 
        async {
        let timedOut = ref false 
        let completed = ref false 
        let timer = new System.Timers.Timer(double(timeoutMS), AutoReset=false)
        timer.Elapsed.Add( fun _ -> 
            lock timedOut (fun () -> 
                timedOut := true 
                if not !completed 
                then this.Close() 
                ) 
            ) 
        let complete() = 
            lock timedOut (fun () -> 
                timer.Stop() 
                timer.Dispose() 
                completed := true 
                ) 
        return! Async.FromBeginEnd( sleepTime,
                            (fun st -> 
                                let result = b(st)
                                timer.Start() 
                                result 
                            ), 
                            (fun result -> 
                                complete() 
                                if !timedOut 
                                then printfn "err"; () 
                                else e(result)
                            ), 
                            (fun () -> 
                                complete() 
                                this.Close() 
                                ) 
                            ) 
        }
let ex = new Example()
let a = ex.AsyncReceiveEx(3000, 1000)
Async.RunSynchronously a
printfn "ok..."
Async.RunSynchronously a

唯一的变化是将 AsyncReceiveEx 的主体放在里面async{...}并有最后一行return!.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用异步 Socket.BeginReceive 时如何检测超时? 的相关文章

随机推荐