今天讲一下我理解的 actix-web
的项目结构。
项目分为几部分:
- db
连接数据库 - app data
主要用来针对 Application 的全局变量,在 handler 中可以使用,如 db connection pool - model
Rust 的 struct,对 database table 的映射 - http handler
http 请求处理函数 - actix config
配置路由规则 - log
配置日志 - sentry(optional)
- prometheus(optional)
先看一下结构。
$ tree src
src
├── dao
│ ├── mod.rs
│ └── user.rs
├── db.rs
├── main.rs
├── models
│ ├── mod.rs
│ └── user.rs
├── schema.rs
└── services
├── config.rs
├── handler
│ ├── mod.rs
│ └── user.rs
└── mod.rs
1. db connection pool
db.rs
这个文件用来利用 r2d2
来初始化 db 连接池,并定义了连接池类型 type Pool = r2d2::Pool<ConnectionManager<PgConnection>>
。
use diesel::prelude::*;
use diesel::r2d2::{self, ConnectionManager};
pub type Pool = r2d2::Pool<ConnectionManager<PgConnection>>;
pub fn create_db_pool(db_url: &str) -> Pool {
// set up database connection pool
let manager = ConnectionManager::<PgConnection>::new(db_url);
let pool = r2d2::Pool::builder()
.build(manager)
.expect("Failed to create pool.");
pool
}
2. model
这个定义了 Rust struct,这里只定义了 User 这个结构。
其中 User
在查询时使用(Queryable
),NewUser
在写入时使用(Insertable
)。
use chrono;
use serde_derive::{Deserialize, Serialize};
use crate::schema::users;
#[derive(Queryable, PartialEq, Serialize, Deserialize, Debug)]
pub struct User {
pub id: i64,
pub first_name: String,
pub last_name: String,
pub gender: String,
pub birthday: Option<chrono::NaiveDate>,
}
#[derive(Clone, Insertable, Serialize, Deserialize, Debug)]
#[table_name = "users"]
pub struct NewUser {
pub first_name: String,
pub last_name: String,
pub gender: String,
pub birthday: Option<chrono::NaiveDate>,
}
其他业务类型可以在 models
目录添加,如可以为订单添加 order.rs
。
3. services handler
内部的 handler mod 用于存放 http 请求的 handler。actix
采用单线程模型,而由于 diesel
不支持异步操作,为了不阻塞 actix
的线程,使用 web::block
让对数据库操作跑在独立线程,而这个独立的线程运行着 synchronous actor,操作完毕会通过 Future 的 poll 拿到 db 操作的结果。
如 get_user
:
use actix_web::{web, HttpResponse, Error};
use crate::db::Pool;
use crate::models::user::NewUser;
use crate::dao::user as dao;
/// Finds user by id.
pub async fn get_user(
pool: web::Data<Pool>,
user_id: web::Path<i64>,
) -> Result<HttpResponse, Error> {
let conn = pool.get().expect("couldn't get db connection from pool");
let user_id = user_id.into_inner();
// use web::block to offload blocking Diesel code without blocking server thread
let user = web::block(move || dao::find_user_by_user_id(user_id, &conn))
.await
.map_err(|e| {
eprintln!("{}", e);
HttpResponse::InternalServerError().finish()
})?;
if let Some(user) = user {
Ok(HttpResponse::Ok().json(user))
} else {
let res = HttpResponse::NotFound()
.body(format!("No user found with id: {}", user_id));
Ok(res)
}
}
web::block
将同步操作放在线程池中运行:actix_threadpool::run(f)
。
/// Execute blocking function on a thread pool, returns future that resolves
/// to result of the function execution.
pub async fn block<F, I, E>(f: F) -> Result<I, BlockingError<E>>
where
F: FnOnce() -> Result<I, E> + Send + 'static,
I: Send + 'static,
E: Send + std::fmt::Debug + 'static,
{
actix_threadpool::run(f).await
}
run
函数是一个 generic function,参数类型为 F: FnOnce() -> Result<I, E> + Send + 'static
,参数函数运行结束后,通过 onechot::channel
返回的 sender 发送运行结果。
/// Execute blocking function on a thread pool, returns future that resolves
/// to result of the function execution.
pub fn run<F, I, E>(f: F) -> CpuFuture<I, E>
where
F: FnOnce() -> Result<I, E> + Send + 'static,
I: Send + 'static,
E: Send + fmt::Debug + 'static,
{
let (tx, rx) = oneshot::channel();
POOL.with(|pool| {
pool.execute(move || {
if !tx.is_canceled() {
let _ = tx.send(f());
}
})
});
CpuFuture { rx }
}
与此同时,run
的返回值是一个 CpuFuture
。由于 CpuFuture
实现了 Future trait
(impl<I, E: fmt::Debug> Future for CpuFuture<I, E>
),poll
方法将 channel
中的数据返回。
/// Blocking operation completion future. It resolves with results
/// of blocking function execution.
pub struct CpuFuture<I, E> {
rx: oneshot::Receiver<Result<I, E>>,
}
impl<I, E: fmt::Debug> Future for CpuFuture<I, E> {
type Output = Result<I, BlockingError<E>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let rx = Pin::new(&mut self.rx);
let res = match rx.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(res) => res
.map_err(|_| BlockingError::Canceled)
.and_then(|res| res.map_err(BlockingError::Error)),
};
Poll::Ready(res)
}
}
而最终的数据是通过 Receiver
来获取的。由于 Receiver
也实现了 Future trait
(impl<T> Future for Receiver<T>
),其 poll
会查看 channel
中是否有数据,这部分交给 Receiver
内部的 Inner
来处理。
impl<T> Future for Receiver<T> {
type Output = Result<T, Canceled>;
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<T, Canceled>> {
self.inner.recv(cx)
}
}
Receiver
/// A future for a value that will be provided by another asynchronous task.
///
/// This is created by the [`channel`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct Receiver<T> {
inner: Arc<Inner<T>>,
}
而最后 Inner::recv
做了真正的工作。
fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
// Check to see if some data has arrived. If it hasn't then we need to
// block our task.
//
// Note that the acquisition of the `rx_task` lock might fail below, but
// the only situation where this can happen is during `Sender::drop`
// when we are indeed completed already. If that's happening then we
// know we're completed so keep going.
let done = if self.complete.load(SeqCst) {
true
} else {
let task = cx.waker().clone();
match self.rx_task.try_lock() {
Some(mut slot) => { *slot = Some(task); false },
None => true,
}
};
// If we're `done` via one of the paths above, then look at the data and
// figure out what the answer is. If, however, we stored `rx_task`
// successfully above we need to check again if we're completed in case
// a message was sent while `rx_task` was locked and couldn't notify us
// otherwise.
//
// If we're not done, and we're not complete, though, then we've
// successfully blocked our task and we return `Pending`.
if done || self.complete.load(SeqCst) {
// If taking the lock fails, the sender will realise that the we're
// `done` when it checks the `complete` flag on the way out, and
// will treat the send as a failure.
if let Some(mut slot) = self.data.try_lock() {
if let Some(data) = slot.take() {
return Poll::Ready(Ok(data));
}
}
Poll::Ready(Err(Canceled))
} else {
Poll::Pending
}
}
4. service config
config 用于路由规则的配置。
我们可以定义以下路由规则:
GET /api/v1/user/{user_id}
交给 get_user
处理GET /api/v1/users
交给 get_users
处理POST /api/v1/user
交给 add_user
处理
config.rs
use actix_web::web::{ServiceConfig, resource as r, get, scope, post};
use crate::services::handler::user::{add_user, get_user, get_users};
pub fn config(cfg: &mut ServiceConfig) {
cfg
.service(
scope("/api/v1")
.service(r("/user/{user_id}").route(get().to(get_user)))
.service(r("/user").route(post().to(add_user)))
.service(r("/users").route(get().to(get_users)))
)
.service(scope("/api/v2")
.service(r("/user/{user_id}").route(get().to(get_user)))
);
}
5. app data
对于数据库连接这样的全局资源,可以通过 data
方法传递下去,让 http handler 可以使用。
let pool = db::create_db_pool(&db_url);
let bind = "127.0.0.1:8080";
println!("Starting server at: {}", &bind);
// Start HTTP server
HttpServer::new(move || {
App::new()
// set up DB pool to be used with web::Data<Pool> extractor
.data(pool.clone())
.wrap(middleware::Logger::default())
.configure(config)
})
.bind(&bind)?
.run()
.await
在使用上,以数据库连接池 Pool
为例,在 handler 参数中加入 pool: web::Data<Pool>
即可。
/// Finds user by id.
pub async fn get_user(
pool: web::Data<Pool>,
user_id: web::Path<i64>,
) -> Result<HttpResponse, Error> {
let conn = pool.get().expect("couldn't get db connection from pool");
// use conn
}
5. log
actix 提供了日志中间件,通过配置:
std::env::set_var("RUST_LOG", "actix_web=info,diesel=debug,actix=info");
env_logger::init();
并在 App 中添加 middleware 即可使用:middleware::Logger::default()
// Start HTTP server
HttpServer::new(move || {
App::new()
// set up DB pool to be used with web::Data<Pool> extractor
.data(pool.clone())
.wrap(middleware::Logger::default())
.configure(config)
})
.bind(&bind)?
.run()
.await