事件循环(core
)不再被转动(例如通过run()
)或被遗忘(drop()ed)。没有同步退出。core.run()
返回并停止转动循环时Future
传递给它就完成了。
A Stream
通过屈服完成None
(下面代码中用(3)标记)。
当例如TCP 连接被关闭Stream
代表它完成,反之亦然。
extern crate tokio_core;
extern crate futures;
use tokio_core::reactor::Core;
use futures::sync::mpsc::unbounded;
use tokio_core::net::TcpListener;
use std::net::SocketAddr;
use std::str::FromStr;
use futures::{Async, Stream, Future, Poll};
use std::thread;
use std::time::Duration;
struct CompletionPact<S, C>
where S: Stream,
C: Stream,
{
stream: S,
completer: C,
}
fn stream_completion_pact<S, C>(s: S, c: C) -> CompletionPact<S, C>
where S: Stream,
C: Stream,
{
CompletionPact {
stream: s,
completer: c,
}
}
impl<S, C> Stream for CompletionPact<S, C>
where S: Stream,
C: Stream,
{
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
match self.completer.poll() {
Ok(Async::Ready(None)) |
Err(_) |
Ok(Async::Ready(Some(_))) => {
// We are done, forget us
Ok(Async::Ready(None)) // <<<<<< (3)
},
Ok(Async::NotReady) => {
self.stream.poll()
},
}
}
}
fn main() {
// unbounded() is the equivalent of a Stream made from a channel()
// directly create it in this thread instead of receiving a Sender
let (tx, rx) = unbounded::<()>();
// A second one to cause forgetting the listener
let (l0tx, l0rx) = unbounded::<()>();
let j = thread::spawn(move || {
let mut core = Core::new().unwrap();
// Listener-0
{
let l = TcpListener::bind(
&SocketAddr::from_str("127.0.0.1:44444").unwrap(),
&core.handle())
.unwrap();
// wrap the Stream of incoming connections (which usually doesn't
// complete) into a Stream that completes when the
// other side is drop()ed or sent on
let fe = stream_completion_pact(l.incoming(), l0rx)
.for_each(|(_sock, peer)| {
println!("Accepted from {}", peer);
Ok(())
})
.map_err(|e| println!("----- {:?}", e));
core.handle().spawn(fe);
}
// Listener1
{
let l = TcpListener::bind(
&SocketAddr::from_str("127.0.0.1:55555").unwrap(),
&core.handle())
.unwrap();
let fe = l.incoming()
.for_each(|(_sock, peer)| {
println!("Accepted from {}", peer);
Ok(())
})
.map_err(|e| println!("----- {:?}", e));
core.handle().spawn(fe);
}
let _ = core.run(rx.into_future());
println!("Exiting event loop thread");
});
thread::sleep(Duration::from_secs(2));
println!("Want to terminate listener-0");
// A drop() will result in the rx side Stream being completed,
// which is indicated by Ok(Async::Ready(None)).
// Our wrapper behaves the same when something is received.
// When the event loop encounters a
// Stream that is complete it forgets about it. Which propagates to a
// drop() that close()es the file descriptor, which closes the port if
// nothing else uses it.
l0tx.send(()).unwrap(); // alternatively: drop(l0tx);
// Note that this is async and is only the signal
// that starts the forgetting.
thread::sleep(Duration::from_secs(2));
println!("Want to exit event loop");
// Same concept. The reception or drop() will cause Stream completion.
// A completed Future will cause run() to return.
tx.send(()).unwrap();
j.join().unwrap();
}