如何在 Rust 中彻底打破 tokio-core 事件循环和 futures::Stream

2024-02-24

我正在涉足 tokio-core,并且可以弄清楚如何生成事件循环。然而,有两件事我不确定 - 如何优雅地退出事件循环以及如何退出事件循环内运行的流。例如,考虑这段简单的代码,它在事件循环中生成两个侦听器,并等待另一个线程指示退出条件:

extern crate tokio_core;
extern crate futures;

use tokio_core::reactor::Core;
use futures::sync::mpsc::unbounded;
use tokio_core::net::TcpListener;
use std::net::SocketAddr;
use std::str::FromStr;
use futures::{Stream, Future};
use std::thread;
use std::time::Duration;
use std::sync::mpsc::channel;

fn main() {
    let (get_tx, get_rx) = channel();

    let j = thread::spawn(move || {
        let mut core = Core::new().unwrap();
        let (tx, rx) = unbounded();
        get_tx.send(tx).unwrap(); // <<<<<<<<<<<<<<< (1)

        // Listener-0
        {
            let l = TcpListener::bind(&SocketAddr::from_str("127.0.0.1:44444").unwrap(),
                                      &core.handle())
                .unwrap();

            let fe = l.incoming()
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        // Listener1
        {
            let l = TcpListener::bind(&SocketAddr::from_str("127.0.0.1:55555").unwrap(),
                                      &core.handle())
                .unwrap();

            let fe = l.incoming()
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        let work = rx.for_each(|v| {
            if v {
                // (3) I want to shut down listener-0 above the release the resources
                Ok(())
            } else {
                Err(()) // <<<<<<<<<<<<<<< (2)

            }
        });

        let _ = core.run(work);
        println!("Exiting event loop thread");
    });

    let tx = get_rx.recv().unwrap();

    thread::sleep(Duration::from_secs(2));
    println!("Want to terminate listener-0"); // <<<<<< (3)
    tx.send(true).unwrap();

    thread::sleep(Duration::from_secs(2));
    println!("Want to exit event loop");
    tx.send(false).unwrap();

    j.join().unwrap();
}

所以说在主线程睡眠之后我想要事件循环线程的干净退出。目前,我向事件循环发送一些内容以使其退出,从而释放线程。

然而两者,(1) and (2)感觉很hacky - 我强制一个错误作为退出条件。我的问题是:

1)我做得对吗?如果不是,那么优雅退出事件循环线程的正确方法是什么。

2)我什至不知道该怎么做(3)- 即在外部指示关闭侦听器 0 并释放其所有资源的条件。我该如何实现这一目标?


事件循环(core)不再被转动(例如通过run())或被遗忘(drop()ed)。没有同步退出。core.run()返回并停止转动循环时Future传递给它就完成了。

A Stream通过屈服完成None(下面代码中用(3)标记)。 当例如TCP 连接被关闭Stream代表它完成,反之亦然。

extern crate tokio_core;
extern crate futures;

use tokio_core::reactor::Core;
use futures::sync::mpsc::unbounded;
use tokio_core::net::TcpListener;
use std::net::SocketAddr;
use std::str::FromStr;
use futures::{Async, Stream, Future, Poll};
use std::thread;
use std::time::Duration;

struct CompletionPact<S, C>
    where S: Stream,
          C: Stream, 
{
    stream: S,
    completer: C,
}

fn stream_completion_pact<S, C>(s: S, c: C) -> CompletionPact<S, C>
    where S: Stream,
          C: Stream,
{
    CompletionPact {
        stream: s,
        completer: c,
    }
}

impl<S, C> Stream for CompletionPact<S, C>
    where S: Stream,
          C: Stream,
{
    type Item = S::Item;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
        match self.completer.poll() {
            Ok(Async::Ready(None)) |
            Err(_) |
            Ok(Async::Ready(Some(_))) => {
                // We are done, forget us
                Ok(Async::Ready(None)) // <<<<<< (3)
            },
            Ok(Async::NotReady) => {
                self.stream.poll()
            },
        }
    }
}

fn main() {
    // unbounded() is the equivalent of a Stream made from a channel()
    // directly create it in this thread instead of receiving a Sender
    let (tx, rx) = unbounded::<()>();
    // A second one to cause forgetting the listener
    let (l0tx, l0rx) = unbounded::<()>();

    let j = thread::spawn(move || {
        let mut core = Core::new().unwrap();

        // Listener-0
        {
            let l = TcpListener::bind(
                    &SocketAddr::from_str("127.0.0.1:44444").unwrap(),
                    &core.handle())
                .unwrap();

            // wrap the Stream of incoming connections (which usually doesn't
            // complete) into a Stream that completes when the
            // other side is drop()ed or sent on
            let fe = stream_completion_pact(l.incoming(), l0rx)
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        // Listener1
        {
            let l = TcpListener::bind(
                    &SocketAddr::from_str("127.0.0.1:55555").unwrap(),
                    &core.handle())
                .unwrap();

            let fe = l.incoming()
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        let _ = core.run(rx.into_future());
        println!("Exiting event loop thread");
    });

    thread::sleep(Duration::from_secs(2));
    println!("Want to terminate listener-0");
    // A drop() will result in the rx side Stream being completed,
    // which is indicated by Ok(Async::Ready(None)).
    // Our wrapper behaves the same when something is received.
    // When the event loop encounters a
    // Stream that is complete it forgets about it. Which propagates to a
    // drop() that close()es the file descriptor, which closes the port if
    // nothing else uses it.
    l0tx.send(()).unwrap(); // alternatively: drop(l0tx);
    // Note that this is async and is only the signal
    // that starts the forgetting.

    thread::sleep(Duration::from_secs(2));
    println!("Want to exit event loop");
    // Same concept. The reception or drop() will cause Stream completion.
    // A completed Future will cause run() to return.
    tx.send(()).unwrap();

    j.join().unwrap();
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Rust 中彻底打破 tokio-core 事件循环和 futures::Stream 的相关文章

随机推荐