我无法理解如何编写封装在一个结构中的并发异步代码。
我不确定如何准确解释这个问题,所以我会尝试用一个例子来解释。
假设我有一个UdpServer
结构。该结构有多个与其行为相关的方法(例如,handle_datagram
, deserialize_datagram
, etc)
如果我想让代码并发,我将生成 tokio 任务,这需要提供给它的闭包是静态的,这意味着我无法调用&self
从这个任务中只要&self
不是静态的,这意味着我无法调用self.serialize_datagram()
.
我理解这个问题(不能保证结构会比线程寿命更长),但看不到解决它的正确方法。我知道可以将函数移出 impl,但这对我来说看起来不是一个好的解决方案。
另外,即使我们暂时假设我could take &self
作为静态的,由于某种原因,这段代码在我看来仍然不正确(我猜还不够生锈)。
另一个“解决方案”是采取self: Arc<Self>
代替&self
,但这感觉更糟。
所以我假设有一些我不知道的模式。
有人可以向我解释一下我应该如何重构整个事情吗?
示例代码:
struct UdpServer {}
impl UdpServer {
pub async fn run(&self) {
let socket = UdpSocket::bind(self.addr).await.unwrap();
loop {
let mut buf: &mut [u8] = &mut [];
let (_, _) = socket.recv_from(&mut buf).await.unwrap();
// I spawn tokio task to enable concurrency
tokio::spawn(async move {
// But i can't use &self in here because it's not static.
let datagram = self.deserialize_datagram(buf).await;
self.handle_datagram(()).await;
});
}
}
pub async fn deserialize_datagram(&self, buf: &mut [u8]) -> Datagram {
unimplemented!()
}
pub async fn handle_datagram(&self, datagram: Datagram) {
unimplemented!()
}
}
目前唯一的方法是使self
通过使用持续任意长的时间Arc
. Since run()
是一个方法UdpServer
,它需要更改为Arc<Self>
,你考虑过但拒绝了,因为感觉更糟。尽管如此,这就是做到这一点的方法:
pub async fn run(self: Arc<Self>) {
let socket = UdpSocket::bind(&self.addr).await.unwrap();
loop {
let mut buf: &mut [u8] = &mut [];
let (_, _) = socket.recv_from(&mut buf).await.unwrap();
tokio::spawn({
let me = Arc::clone(&self);
async move {
let datagram = me.deserialize_datagram(buf).await;
me.handle_datagram(datagram).await;
}
});
}
}
操场
有趣的是,smol 异步运行时实际上可能会提供您正在寻找的东西,因为它的执行者具有一生。该生命周期与调用者环境中的值相关联,并且执行器上生成的 future 可能会引用它。例如,这样编译:
use futures_lite::future;
use smol::{Executor, net::UdpSocket};
struct Datagram;
struct UdpServer {
addr: String,
}
impl UdpServer {
pub async fn run<'a>(&'a self, ex: &Executor<'a>) {
let socket = UdpSocket::bind(&self.addr).await.unwrap();
loop {
let mut buf: &mut [u8] = &mut [];
let (_, _) = socket.recv_from(&mut buf).await.unwrap();
ex.spawn({
async move {
let datagram = self.deserialize_datagram(buf).await;
self.handle_datagram(datagram).await;
}
}).detach();
}
}
pub async fn deserialize_datagram(&self, _buf: &mut [u8]) -> Datagram {
unimplemented!()
}
pub async fn handle_datagram(&self, _datagram: Datagram) {
unimplemented!()
}
}
fn main() {
let server = UdpServer { addr: "127.0.0.1:8080".to_string() };
let ex = Executor::new();
future::block_on(server.run(&ex));
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)