Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate-limit example #642

Merged
merged 4 commits into from
Oct 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ members = [
"middleware/middleware-ext-mut",
"middleware/middleware-http-to-https",
"middleware/middleware",
"middleware/middleware-rate-limit",
"protobuf",
"run-in-thread",
"server-sent-events",
Expand Down
13 changes: 13 additions & 0 deletions middleware/middleware-rate-limit/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "middleware-rate-limit"
version = "1.0.0"
edition = "2021"

[dependencies]
actix-web.workspace = true
log.workspace = true
env_logger.workspace = true
futures-util.workspace = true
chrono.workspace = true

actix-governor = "0.4"
Empty file.
45 changes: 45 additions & 0 deletions middleware/middleware-rate-limit/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::io;

use actix_governor::{Governor, GovernorConfigBuilder};
use actix_web::{
middleware,
web::{self},
App, HttpResponse, HttpServer,
};

mod rate_limit;

async fn index() -> HttpResponse {
HttpResponse::Ok().body("succeed")
}

#[actix_web::main]
async fn main() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));

let limit_cfg = GovernorConfigBuilder::default()
.per_second(10)
.burst_size(2)
.finish()
.unwrap();

log::info!("starting HTTP server at http://localhost:8080");

HttpServer::new(move || {
App::new()
.wrap(middleware::Logger::default())
.service(
web::resource("/test/governor")
.wrap(Governor::new(&limit_cfg))
.route(web::get().to(index)),
)
.service(
web::resource("/test/simple")
.wrap(rate_limit::RateLimit::new(2))
.route(web::get().to(index)),
)
})
.bind(("127.0.0.1", 8080))?
.run()
.await
}
121 changes: 121 additions & 0 deletions middleware/middleware-rate-limit/src/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use std::cell::RefCell;
use std::cmp::min;
use std::future::{ready, Ready};

use actix_web::body::EitherBody;
use actix_web::{
dev,
dev::{Service, ServiceRequest, ServiceResponse, Transform},
Error, HttpResponse,
};
use chrono::{Local, NaiveDateTime};
use futures_util::future::LocalBoxFuture;

#[doc(hidden)]
pub struct RateLimitService<S> {
service: S,
token_bucket: RefCell<TokenBucket>,
}

impl<S, B> Service<ServiceRequest> for RateLimitService<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<EitherBody<B>>;
type Error = Error;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

dev::forward_ready!(service);

fn call(&self, req: ServiceRequest) -> Self::Future {
log::info!("request is passing through the AddMsg middleware");

req.uri().path();
// if be limited
if !self.token_bucket.borrow_mut().allow_query() {
return Box::pin(async {
Ok(req.into_response(
HttpResponse::TooManyRequests()
.body("")
.map_into_right_body(),
))
});
}

let fut = self.service.call(req);
Box::pin(async move { fut.await.map(ServiceResponse::map_into_left_body) })
}
}

#[derive(Clone, Debug)]
pub struct RateLimit {
// limit in 10s
limit: u64,
}

impl RateLimit {
pub fn new(limit: u64) -> Self {
Self { limit }
}
}

impl<S, B> Transform<S, ServiceRequest> for RateLimit
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<EitherBody<B>>;
type Error = Error;
type Transform = RateLimitService<S>;
type InitError = ();
type Future = Ready<Result<Self::Transform, Self::InitError>>;

fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(RateLimitService {
service,
token_bucket: RefCell::new(TokenBucket::new(self.limit)),
}))
}
}

struct TokenBucket {
// limit in ten sec
limit: u64,
last_query_time: NaiveDateTime,
// max query number in ten sec,in this case equal limit
capacity: u64,
// numbers of token,default equal capacity
tokens: u64,
}

impl TokenBucket {
fn new(limit: u64) -> Self {
TokenBucket {
limit,
last_query_time: Default::default(),
capacity: limit,
tokens: 0,
}
}

fn allow_query(&mut self) -> bool {
let current_time = Local::now().naive_local();

let time_elapsed = (current_time.timestamp() - self.last_query_time.timestamp()) as u64;

let tokens_to_add = time_elapsed * self.limit / 10;

self.tokens = min(self.tokens + tokens_to_add, self.capacity);

if self.tokens > 0 {
self.last_query_time = current_time;
self.tokens -= 1;
true
} else {
false
}
}
}