web项目怎么样改变目录结构_actix-web 的项目结构

2023-05-16

今天讲一下我理解的 actix-web 的项目结构。

项目分为几部分:

  • db
    连接数据库
  • app data
    主要用来针对 Application 的全局变量,在 handler 中可以使用,如 db connection pool
  • model
    Rust 的 struct,对 database table 的映射
  • http handler
    http 请求处理函数
  • actix config
    配置路由规则
  • log
    配置日志
  • sentry(optional)
  • prometheus(optional)

先看一下结构。

$ tree src
src
├── dao
│   ├── mod.rs
│   └── user.rs
├── db.rs
├── main.rs
├── models
│   ├── mod.rs
│   └── user.rs
├── schema.rs
└── services
    ├── config.rs
    ├── handler
    │   ├── mod.rs
    │   └── user.rs
    └── mod.rs

1. db connection pool

db.rs 这个文件用来利用 r2d2 来初始化 db 连接池,并定义了连接池类型 type Pool = r2d2::Pool<ConnectionManager<PgConnection>>

use diesel::prelude::*;
use diesel::r2d2::{self, ConnectionManager};

pub type Pool = r2d2::Pool<ConnectionManager<PgConnection>>;

pub fn create_db_pool(db_url: &str) -> Pool {
    // set up database connection pool
    let manager = ConnectionManager::<PgConnection>::new(db_url);
    let pool = r2d2::Pool::builder()
        .build(manager)
        .expect("Failed to create pool.");
    pool
}

2. model

这个定义了 Rust struct,这里只定义了 User 这个结构。

其中 User 在查询时使用(Queryable),NewUser 在写入时使用(Insertable)。

use chrono;
use serde_derive::{Deserialize, Serialize};

use crate::schema::users;

#[derive(Queryable, PartialEq, Serialize, Deserialize, Debug)]
pub struct User {
    pub id: i64,
    pub first_name: String,
    pub last_name: String,
    pub gender: String,
    pub birthday: Option<chrono::NaiveDate>,
}

#[derive(Clone, Insertable, Serialize, Deserialize, Debug)]
#[table_name = "users"]
pub struct NewUser {
    pub first_name: String,
    pub last_name: String,
    pub gender: String,
    pub birthday: Option<chrono::NaiveDate>,
}

其他业务类型可以在 models 目录添加,如可以为订单添加 order.rs

3. services handler

内部的 handler mod 用于存放 http 请求的 handler。actix 采用单线程模型,而由于 diesel 不支持异步操作,为了不阻塞 actix 的线程,使用 web::block 让对数据库操作跑在独立线程,而这个独立的线程运行着 synchronous actor,操作完毕会通过 Future 的 poll 拿到 db 操作的结果。

get_user

use actix_web::{web, HttpResponse, Error};
use crate::db::Pool;
use crate::models::user::NewUser;
use crate::dao::user as dao;

/// Finds user by id.
pub async fn get_user(
    pool: web::Data<Pool>,
    user_id: web::Path<i64>,
) -> Result<HttpResponse, Error> {
    let conn = pool.get().expect("couldn't get db connection from pool");
    let user_id = user_id.into_inner();
    // use web::block to offload blocking Diesel code without blocking server thread
    let user = web::block(move || dao::find_user_by_user_id(user_id, &conn))
        .await
        .map_err(|e| {
            eprintln!("{}", e);
            HttpResponse::InternalServerError().finish()
        })?;

    if let Some(user) = user {
        Ok(HttpResponse::Ok().json(user))
    } else {
        let res = HttpResponse::NotFound()
            .body(format!("No user found with id: {}", user_id));
        Ok(res)
    }
}

web::block 将同步操作放在线程池中运行:actix_threadpool::run(f)

/// Execute blocking function on a thread pool, returns future that resolves
/// to result of the function execution.
pub async fn block<F, I, E>(f: F) -> Result<I, BlockingError<E>>
where
    F: FnOnce() -> Result<I, E> + Send + 'static,
    I: Send + 'static,
    E: Send + std::fmt::Debug + 'static,
{
    actix_threadpool::run(f).await
}

run 函数是一个 generic function,参数类型为 F: FnOnce() -> Result<I, E> + Send + 'static,参数函数运行结束后,通过 onechot::channel 返回的 sender 发送运行结果。

/// Execute blocking function on a thread pool, returns future that resolves
/// to result of the function execution.
pub fn run<F, I, E>(f: F) -> CpuFuture<I, E>
where
    F: FnOnce() -> Result<I, E> + Send + 'static,
    I: Send + 'static,
    E: Send + fmt::Debug + 'static,
{
    let (tx, rx) = oneshot::channel();
    POOL.with(|pool| {
        pool.execute(move || {
            if !tx.is_canceled() {
                let _ = tx.send(f());
            }
        })
    });

    CpuFuture { rx }
}

与此同时,run 的返回值是一个 CpuFuture。由于 CpuFuture 实现了 Future traitimpl<I, E: fmt::Debug> Future for CpuFuture<I, E>),poll 方法将 channel 中的数据返回。

/// Blocking operation completion future. It resolves with results
/// of blocking function execution.
pub struct CpuFuture<I, E> {
    rx: oneshot::Receiver<Result<I, E>>,
}

impl<I, E: fmt::Debug> Future for CpuFuture<I, E> {
    type Output = Result<I, BlockingError<E>>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let rx = Pin::new(&mut self.rx);
        let res = match rx.poll(cx) {
            Poll::Pending => return Poll::Pending,
            Poll::Ready(res) => res
                .map_err(|_| BlockingError::Canceled)
                .and_then(|res| res.map_err(BlockingError::Error)),
        };
        Poll::Ready(res)
    }
}

而最终的数据是通过 Receiver 来获取的。由于 Receiver 也实现了 Future traitimpl<T> Future for Receiver<T>),其 poll 会查看 channel 中是否有数据,这部分交给 Receiver 内部的 Inner 来处理。

impl<T> Future for Receiver<T> {
    type Output = Result<T, Canceled>;

    fn poll(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Result<T, Canceled>> {
        self.inner.recv(cx)
    }
}

Receiver

/// A future for a value that will be provided by another asynchronous task.
///
/// This is created by the [`channel`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct Receiver<T> {
    inner: Arc<Inner<T>>,
}

而最后 Inner::recv 做了真正的工作。

fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
    // Check to see if some data has arrived. If it hasn't then we need to
    // block our task.
    //
    // Note that the acquisition of the `rx_task` lock might fail below, but
    // the only situation where this can happen is during `Sender::drop`
    // when we are indeed completed already. If that's happening then we
    // know we're completed so keep going.
    let done = if self.complete.load(SeqCst) {
        true
    } else {
        let task = cx.waker().clone();
        match self.rx_task.try_lock() {
            Some(mut slot) => { *slot = Some(task); false },
            None => true,
        }
    };

    // If we're `done` via one of the paths above, then look at the data and
    // figure out what the answer is. If, however, we stored `rx_task`
    // successfully above we need to check again if we're completed in case
    // a message was sent while `rx_task` was locked and couldn't notify us
    // otherwise.
    //
    // If we're not done, and we're not complete, though, then we've
    // successfully blocked our task and we return `Pending`.
    if done || self.complete.load(SeqCst) {
        // If taking the lock fails, the sender will realise that the we're
        // `done` when it checks the `complete` flag on the way out, and
        // will treat the send as a failure.
        if let Some(mut slot) = self.data.try_lock() {
            if let Some(data) = slot.take() {
                return Poll::Ready(Ok(data));
            }
        }
        Poll::Ready(Err(Canceled))
    } else {
        Poll::Pending
    }
}

4. service config

config 用于路由规则的配置。

我们可以定义以下路由规则:

  • GET /api/v1/user/{user_id} 交给 get_user 处理
  • GET /api/v1/users 交给 get_users 处理
  • POST /api/v1/user 交给 add_user 处理

config.rs

use actix_web::web::{ServiceConfig, resource as r, get, scope, post};
use crate::services::handler::user::{add_user, get_user, get_users};

pub fn config(cfg: &mut ServiceConfig) {
    cfg
        .service(
            scope("/api/v1")
                .service(r("/user/{user_id}").route(get().to(get_user)))
                .service(r("/user").route(post().to(add_user)))
                .service(r("/users").route(get().to(get_users)))
        )
        .service(scope("/api/v2")
            .service(r("/user/{user_id}").route(get().to(get_user)))
        );
}

5. app data

对于数据库连接这样的全局资源,可以通过 data 方法传递下去,让 http handler 可以使用。

let pool = db::create_db_pool(&db_url);

let bind = "127.0.0.1:8080";

println!("Starting server at: {}", &bind);

// Start HTTP server
HttpServer::new(move || {
    App::new()
        // set up DB pool to be used with web::Data<Pool> extractor
        .data(pool.clone())
        .wrap(middleware::Logger::default())
        .configure(config)
})
    .bind(&bind)?
    .run()
    .await

在使用上,以数据库连接池 Pool 为例,在 handler 参数中加入 pool: web::Data<Pool> 即可。

/// Finds user by id.
pub async fn get_user(
    pool: web::Data<Pool>,
    user_id: web::Path<i64>,
) -> Result<HttpResponse, Error> {
    let conn = pool.get().expect("couldn't get db connection from pool");
    // use conn
}

5. log

actix 提供了日志中间件,通过配置:

std::env::set_var("RUST_LOG", "actix_web=info,diesel=debug,actix=info");
env_logger::init();

并在 App 中添加 middleware 即可使用:middleware::Logger::default()

// Start HTTP server
HttpServer::new(move || {
    App::new()
        // set up DB pool to be used with web::Data<Pool> extractor
        .data(pool.clone())
        .wrap(middleware::Logger::default())
        .configure(config)
})
    .bind(&bind)?
    .run()
    .await

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

web项目怎么样改变目录结构_actix-web 的项目结构 的相关文章

随机推荐