diff --git a/src/lib.rs b/src/lib.rs index e3cb5cf..3284a4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,3 +9,135 @@ pub mod prelude { pub type Unit = (); } + +use crate::prelude::{AppState, Config, Refresh, Tracer, Unit}; +use actix_cors::Cors; +use actix_governor::{Governor, GovernorConfigBuilder}; +use actix_web::{ + dev::Server, + web::{scope, Data}, + App, HttpServer, +}; +use actix_web_lab::middleware::from_fn; +use anyhow::Context; +use futures::Future; +use prelude::{admin_middleware, get_state, health_check, sensitive_middleware, trigger_refresh}; +use std::{net::TcpListener, pin::Pin, time::Duration}; + +pub const PREFIX: &str = "/v1"; +pub const ADMIN_PREFIX: &str = "/admin"; +pub const INTEGRATION_PREFIX: &str = "/integration"; +type Task = Pin + Send + Sync>>; + +pub struct Application { + port: u16, + server: Server, + task: Task, +} + +impl Application { + pub async fn start(configuration: &Config) -> Result { + tracing::info!( + "Starting application with configuration: {}{:#?}{}", + "\n", + &configuration, + "\n" + ); + let address = format!( + "{}:{}", + configuration.server().host(), + configuration.server().port() + ); + let listener = TcpListener::bind(&address)?; + let port = listener.local_addr()?.port(); + let state = AppState::try_from(configuration.clone()).await?; + + let sleep_timer = Duration::from_secs(configuration.oauth().sleep_timer()); + let refresh_before = configuration.oauth().refresh_before(); + let refresh_actor = state.refresh_actor().clone(); + let task = Box::pin(async move { + loop { + let message = Refresh::new(refresh_before); + let res = refresh_actor.send(message).await; + + if let Err(e) = res { + tracing::warn!("Failed to send refresh message: {:?}", e); + } + + tracing::info!("Sleeping for {} seconds", sleep_timer.as_secs()); + tokio::time::sleep(sleep_timer).await; + } + }); + + let server = run(listener, configuration.clone(), state).await?; + + Ok(Self { port, server, task }) + } + + pub fn port(&self) -> u16 { + self.port + } + + pub fn handler(self) -> (Server, Task) { + (self.server, self.task) + } + + pub async fn spawn(self) -> Result<(), anyhow::Error> { + let (server, task) = self.handler(); + let task = tokio::spawn(task); + let http = tokio::spawn(server); + + tokio::select! { + res = http => { + res.context("Failed to spawn http application.")?.context("Failed to spawn http application.") + }, + res = task => { + res.context("Failed to spawn background task.") + } + } + } +} + +async fn run( + listener: TcpListener, + configuration: Config, + state: AppState, +) -> Result { + let governor = GovernorConfigBuilder::default() + .per_second(configuration.server().burst_rate_limit()) + .permissive(configuration.server().is_development()) + .burst_size(configuration.server().burst_size_limit()) + .finish() + .context("Failed to create governor.")?; + + let server = HttpServer::new(move || { + let trace: Tracer = Tracer::default(); + App::new() + .wrap(trace.tracer()) + .wrap( + Cors::default() + .allowed_methods(vec!["GET", "POST"]) + .allow_any_origin() + .allow_any_header() + .supports_credentials() + .max_age(3600), + ) + .wrap(Governor::new(&governor)) + .service( + scope(&(PREFIX.to_owned() + ADMIN_PREFIX)) // /v1/admin + .wrap(from_fn(sensitive_middleware)) + .service(get_state), + ) + .service( + scope(&(PREFIX.to_owned() + INTEGRATION_PREFIX)) // /v1/integration + .wrap(from_fn(admin_middleware)) + .service(trigger_refresh), + ) + .service(scope(PREFIX).service(health_check)) // /v1 + .app_data(Data::new(state.clone())) + }) + .listen(listener)? + .run(); + + Ok(server) +} diff --git a/src/service/configuration/mod.rs b/src/service/configuration/mod.rs index 9a777aa..e2b6a2e 100644 --- a/src/service/configuration/mod.rs +++ b/src/service/configuration/mod.rs @@ -1,7 +1,5 @@ mod telemetry; -use actix_governor::{KeyExtractor, PeerIpKeyExtractor, SimpleKeyExtractionError}; -use actix_web::dev::ServiceRequest; pub use telemetry::*; use envconfig::Envconfig; @@ -10,7 +8,6 @@ use integrationos_domain::{ }; use std::collections::HashMap; use std::fmt::Debug; -use std::net::IpAddr; #[derive(Clone, Envconfig)] pub struct OAuthConfig { @@ -306,25 +303,3 @@ impl From> for Config { Self { oauth, server } } } - -#[derive(Clone)] -pub struct WhiteListKeyExtractor; - -impl KeyExtractor for WhiteListKeyExtractor { - type Key = IpAddr; - type KeyExtractionError = SimpleKeyExtractionError<&'static str>; - - fn extract(&self, req: &ServiceRequest) -> Result { - PeerIpKeyExtractor.extract(req) - } - - fn whitelisted_keys(&self) -> Vec { - // In case we want to add more private networks remember that the CIDR notation for - // 172s is 172.16.0.0/12 and for 192s is 192.168.0.0/16 - - "10.0.0.0/8" - .parse() - .map(|ip| vec![ip]) - .unwrap_or_else(|_| vec![]) - } -} diff --git a/src/service/http/application.rs b/src/service/http/application.rs deleted file mode 100644 index e01e825..0000000 --- a/src/service/http/application.rs +++ /dev/null @@ -1,132 +0,0 @@ -use super::{admin_event_middleware, admin_middleware, get_state, health_check, trigger_refresh}; -use crate::prelude::{AppState, Config, Refresh, Tracer, Unit, WhiteListKeyExtractor}; -use actix_cors::Cors; -use actix_governor::{Governor, GovernorConfigBuilder}; -use actix_web::{ - dev::Server, - web::{scope, Data}, - App, HttpServer, -}; -use actix_web_lab::middleware::from_fn; -use anyhow::Context; -use futures::Future; -use std::{net::TcpListener, pin::Pin, time::Duration}; - -pub const PREFIX: &str = "/v1"; -pub const ADMIN_PREFIX: &str = "/admin"; -pub const INTEGRATION_PREFIX: &str = "/integration"; -type Task = Pin + Send + Sync>>; - -pub struct Application { - port: u16, - server: Server, - task: Task, -} - -impl Application { - pub async fn start(configuration: &Config) -> Result { - tracing::info!( - "Starting application with configuration: {}{:#?}{}", - "\n", - &configuration, - "\n" - ); - let address = format!( - "{}:{}", - configuration.server().host(), - configuration.server().port() - ); - let listener = TcpListener::bind(&address)?; - let port = listener.local_addr()?.port(); - let state = AppState::try_from(configuration.clone()).await?; - - let sleep_timer = Duration::from_secs(configuration.oauth().sleep_timer()); - let refresh_before = configuration.oauth().refresh_before(); - let refresh_actor = state.refresh_actor().clone(); - let task = Box::pin(async move { - loop { - let message = Refresh::new(refresh_before); - let res = refresh_actor.send(message).await; - - if let Err(e) = res { - tracing::warn!("Failed to send refresh message: {:?}", e); - } - - tracing::info!("Sleeping for {} seconds", sleep_timer.as_secs()); - tokio::time::sleep(sleep_timer).await; - } - }); - - let server = run(listener, configuration.clone(), state).await?; - - Ok(Self { port, server, task }) - } - - pub fn port(&self) -> u16 { - self.port - } - - pub fn handler(self) -> (Server, Task) { - (self.server, self.task) - } - - pub async fn spawn(self) -> Result<(), anyhow::Error> { - let (server, task) = self.handler(); - let task = tokio::spawn(task); - let http = tokio::spawn(server); - - tokio::select! { - res = http => { - res.context("Failed to spawn application.")?.context("Failed to spawn application.") - }, - res = task => { - res.context("Failed to spawn application.") - } - } - } -} - -async fn run( - listener: TcpListener, - configuration: Config, - state: AppState, -) -> Result { - let governor = GovernorConfigBuilder::default() - .key_extractor(WhiteListKeyExtractor) - .per_second(configuration.server().burst_rate_limit()) - .permissive(configuration.server().is_development()) - .burst_size(configuration.server().burst_size_limit()) - .finish() - .context("Failed to create governor.")?; - - let server = HttpServer::new(move || { - let trace: Tracer = Tracer::default(); - App::new() - .wrap(trace.tracer()) - .wrap( - Cors::default() - .allowed_methods(vec!["GET", "POST"]) - .allow_any_origin() - .allow_any_header() - .supports_credentials() - .max_age(3600), - ) - .wrap(Governor::new(&governor)) - .service( - scope(&(PREFIX.to_owned() + ADMIN_PREFIX)) // /v1/admin - .wrap(from_fn(admin_middleware)) - .service(get_state), - ) - .service( - scope(&(PREFIX.to_owned() + INTEGRATION_PREFIX)) // /v1/integration - .wrap(from_fn(admin_event_middleware)) - .service(trigger_refresh), - ) - .service(scope(PREFIX).service(health_check)) // /v1 - .app_data(Data::new(state.clone())) - }) - .listen(listener)? - .run(); - - Ok(server) -} diff --git a/src/service/http/middleware.rs b/src/service/http/middleware.rs deleted file mode 100644 index 3ba1465..0000000 --- a/src/service/http/middleware.rs +++ /dev/null @@ -1,130 +0,0 @@ -use crate::prelude::AppState; -use actix_web::{ - body::MessageBody, - dev::{ServiceRequest, ServiceResponse}, - error::ErrorUnauthorized, - web::Data, - Error as ActixWebError, HttpMessage, -}; -use actix_web_lab::middleware::Next; -use integrationos_domain::{algebra::StoreExt, event_access::EventAccess, Claims}; -use jsonwebtoken::{decode, DecodingKey, Validation}; -use mongodb::bson::doc; -use std::sync::Arc; - -pub async fn admin_middleware( - req: ServiceRequest, - next: Next, -) -> Result, ActixWebError> { - let state = req.app_data::>(); - let state = match state { - None => return Err(ErrorUnauthorized("No state found")), - Some(state) => state, - }; - - let extracted_info = extract_admin_info(&req, state); - - match extracted_info { - Ok(claims) => { - req.extensions_mut().insert(claims.to_owned()); - next.call(req).await - } - Err(err) => Err(ErrorUnauthorized(err)), - } -} - -pub async fn admin_event_middleware( - req: ServiceRequest, - next: Next, -) -> Result, ActixWebError> { - let state = req.app_data::>(); - let state = match state { - None => return Err(ErrorUnauthorized("No state found")), - Some(state) => state, - }; - - let event_access = extract_event_info(&req, state).await; - let claims = extract_admin_info(&req, state); - - match (event_access, claims) { - (Ok(event_access), Ok(claims)) => { - req.extensions_mut().insert(claims.to_owned()); - req.extensions_mut().insert(event_access.to_owned()); - next.call(req).await - } - (Err(err), _) | (_, Err(err)) => Err(ErrorUnauthorized(err)), - } -} - -fn extract_admin_info( - req: &ServiceRequest, - state: &Data, -) -> Result { - let token = req - .headers() - .get(state.configuration().server().admin_header()) - .and_then(|header| header.to_str().ok()) - .map(|h| h.to_string().split_at(7).1.to_string()); - - let token = match token { - Some(token) => token, - None => return Err(ErrorUnauthorized("No token found")), - }; - - let mut validator = Validation::default(); - validator.set_audience(&["integration-team", "oauth-integrationos"]); - - let claims = decode::( - &token, - &DecodingKey::from_secret(state.configuration().server().admin_secret().as_ref()), - &validator, - ) - .map_err(|_| ErrorUnauthorized("Invalid token"))?; - - Ok(claims.claims) -} - -async fn extract_event_info( - req: &ServiceRequest, - state: &Data, -) -> Result, ActixWebError> { - let Some(auth_header) = req - .headers() - .get(state.configuration().server().auth_header()) - else { - Err(ErrorUnauthorized("No auth header found"))? - }; - - let event_access = state - .cache() - .try_get_with_by_ref(auth_header, async { - let key = auth_header - .to_str() - .map_err(|e| format!("Invalid auth header: {}", e))?; - - if let Some(event_access) = state - .event_access() - .get_one(doc! { - "accessKey": key, - "deleted": false - }) - .await - .map_err(|e| { - tracing::warn!("{}", e); - format!("{}", e) - })? - { - Ok(Arc::new(event_access)) - } else { - Err(format!("No event access found for key: {}", key)) - } - }) - .await; - - let event_access: Arc = match event_access { - Ok(event_access) => event_access, - Err(err) => Err(ErrorUnauthorized(err))?, - }; - - Ok(event_access) -} diff --git a/src/service/http/mod.rs b/src/service/http/mod.rs index b691fed..891ba61 100644 --- a/src/service/http/mod.rs +++ b/src/service/http/mod.rs @@ -1,14 +1,24 @@ mod admin; -mod application; -mod middleware; mod public; -use actix_web::{HttpResponse, HttpResponseBuilder}; pub use admin::*; -pub use application::*; -pub use middleware::*; pub use public::*; + +use crate::prelude::AppState; +use actix_web::{ + body::MessageBody, + dev::{ServiceRequest, ServiceResponse}, + error::ErrorUnauthorized, + web::Data, + Error as ActixWebError, HttpMessage, +}; +use actix_web::{HttpResponse, HttpResponseBuilder}; +use actix_web_lab::middleware::Next; +use integrationos_domain::{algebra::StoreExt, event_access::EventAccess, Claims}; +use jsonwebtoken::{decode, DecodingKey, Validation}; +use mongodb::bson::doc; use reqwest::StatusCode; +use std::sync::Arc; #[derive(serde::Serialize, serde::Deserialize, Debug, Eq, PartialEq)] #[serde(rename_all = "camelCase")] @@ -40,3 +50,120 @@ where .json(ServerResponse { r#type, args, code }) } } + +pub async fn sensitive_middleware( + req: ServiceRequest, + next: Next, +) -> Result, ActixWebError> { + let state = req.app_data::>(); + let state = match state { + None => return Err(ErrorUnauthorized("No state found")), + Some(state) => state, + }; + + let extracted_info = extract_admin_info(&req, state); + + match extracted_info { + Ok(claims) => { + req.extensions_mut().insert(claims.to_owned()); + next.call(req).await + } + Err(err) => Err(ErrorUnauthorized(err)), + } +} + +pub async fn admin_middleware( + req: ServiceRequest, + next: Next, +) -> Result, ActixWebError> { + let state = req.app_data::>(); + let state = match state { + None => return Err(ErrorUnauthorized("No state found")), + Some(state) => state, + }; + + let event_access = extract_event_info(&req, state).await; + let claims = extract_admin_info(&req, state); + + match (event_access, claims) { + (Ok(event_access), Ok(claims)) => { + req.extensions_mut().insert(claims.to_owned()); + req.extensions_mut().insert(event_access.to_owned()); + next.call(req).await + } + (Err(err), _) | (_, Err(err)) => Err(ErrorUnauthorized(err)), + } +} + +fn extract_admin_info( + req: &ServiceRequest, + state: &Data, +) -> Result { + let token = req + .headers() + .get(state.configuration().server().admin_header()) + .and_then(|header| header.to_str().ok()) + .map(|h| h.to_string().split_at(7).1.to_string()); + + let token = match token { + Some(token) => token, + None => return Err(ErrorUnauthorized("No token found")), + }; + + let mut validator = Validation::default(); + validator.set_audience(&["integration-team", "oauth-integrationos"]); + + let claims = decode::( + &token, + &DecodingKey::from_secret(state.configuration().server().admin_secret().as_ref()), + &validator, + ) + .map_err(|_| ErrorUnauthorized("Invalid token"))?; + + Ok(claims.claims) +} + +async fn extract_event_info( + req: &ServiceRequest, + state: &Data, +) -> Result, ActixWebError> { + let Some(auth_header) = req + .headers() + .get(state.configuration().server().auth_header()) + else { + Err(ErrorUnauthorized("No auth header found"))? + }; + + let event_access = state + .cache() + .try_get_with_by_ref(auth_header, async { + let key = auth_header + .to_str() + .map_err(|e| format!("Invalid auth header: {}", e))?; + + if let Some(event_access) = state + .event_access() + .get_one(doc! { + "accessKey": key, + "deleted": false + }) + .await + .map_err(|e| { + tracing::warn!("{}", e); + format!("{}", e) + })? + { + Ok(Arc::new(event_access)) + } else { + Err(format!("No event access found for key: {}", key)) + } + }) + .await; + + let event_access: Arc = match event_access { + Ok(event_access) => event_access, + Err(err) => Err(ErrorUnauthorized(err))?, + }; + + Ok(event_access) +} diff --git a/src/service/http/public/trigger.rs b/src/service/http/public/trigger.rs index 732b734..cfd551d 100644 --- a/src/service/http/public/trigger.rs +++ b/src/service/http/public/trigger.rs @@ -37,6 +37,7 @@ pub async fn trigger_refresh( Some(request_id), ) .start(); + let id = connection.id; let trigger = Trigger::new(connection); @@ -50,6 +51,5 @@ pub async fn trigger_refresh( "outcome": outcome, }); - // Ok(HttpResponse::Ok().json(ServerResponse::new(ResponseType::Trigger, json))) Ok(ServerResponse::from(ResponseType::Trigger, json, 200)) }