Skip to content

Commit

Permalink
chore: removing actix web server in favour of a plain loop (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Sep 17, 2024
1 parent f81c245 commit 99ea9f6
Show file tree
Hide file tree
Showing 22 changed files with 441 additions and 1,794 deletions.
845 changes: 181 additions & 664 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 3 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,16 @@ edition = "2021"
lto = "thin"

[dependencies]
actix-cors = "0.7.0"
actix-governor = "0.5.0"
actix-web = "4.5.1"
actix-web-lab = "0.20.2"
anyhow = "1.0.79"
async-trait = "0.1.80"
chrono = { version = "0.4.33", features = ["serde"] }
dotenvy = "0.15.7"
envconfig = "0.10.0"
futures = "0.3.30"
handlebars = "5.1.1"
integrationos-domain = { version = "4.1.6", features = [
"dummy",
"actix-error",
] }
jsonwebtoken = "9.2.0"
moka = { version = "0.12.5", features = ["future"] }
integrationos-domain = { version = "4.1.6", features = ["dummy"] }
metrics = "0.21.1"
metrics-exporter-prometheus = "0.12.1"
mongodb = "2.8.0"
reqwest = { version = "0.12.3", features = [
"json",
Expand All @@ -33,7 +26,6 @@ serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113"
tokio = { version = "1.35.1", features = ["macros", "rt-multi-thread"] }
tracing = { version = "0.1.40", features = ["log"] }
tracing-actix-web = "0.7.9"
tracing-bunyan-formatter = "0.3.9"
tracing-log = "0.2.0"
tracing-subscriber = { version = "0.3.18", features = [
Expand Down
55 changes: 55 additions & 0 deletions src/algebra/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use metrics_exporter_prometheus::PrometheusBuilder;

pub const SUCCESSFULLY_REFRESHED_GAUGE: &str = "successfully_refreshed";
pub const FAILED_TO_REFRESH_GAUGE: &str = "failed_to_refresh";
pub const REFRESH_TOTAL: &str = "refresh_total";

#[derive(Clone, Debug)]
pub struct Metrics {
is_installed: bool,
}

impl Metrics {
pub fn new() -> anyhow::Result<Self> {
let metric = PrometheusBuilder::new()
.install()
.map_err(|e| {
tracing::error!("Failed to install prometheus exporter: {}", e);
})
.ok();

if metric.is_some() {
metrics::describe_gauge!(
SUCCESSFULLY_REFRESHED_GAUGE,
"The number of successfully refreshed connections"
);

metrics::describe_gauge!(
FAILED_TO_REFRESH_GAUGE,
"The number of failed to refresh connections"
);

metrics::describe_gauge!(REFRESH_TOTAL, "The total number of refreshes");

Ok(Self { is_installed: true })
} else {
Ok(Self {
is_installed: false,
})
}
}

pub fn add_refreshed(&self, value: u64) {
if self.is_installed {
metrics::increment_gauge!(SUCCESSFULLY_REFRESHED_GAUGE, value as f64);
metrics::increment_gauge!(REFRESH_TOTAL, value as f64);
}
}

pub fn add_failed_to_refresh(&self, value: u64) {
if self.is_installed {
metrics::increment_gauge!(FAILED_TO_REFRESH_GAUGE, value as f64);
metrics::increment_gauge!(REFRESH_TOTAL, value as f64);
}
}
}
4 changes: 2 additions & 2 deletions src/algebra/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
mod metrics;
mod parameter;
mod refresh;
mod storage;
mod token;

pub use metrics::*;
pub use parameter::*;
pub use refresh::*;
pub use storage::*;
pub use token::*;
32 changes: 21 additions & 11 deletions src/algebra/refresh.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
algebra::StorageExt,
domain::{Refresh, Trigger, Unit},
ParameterExt, Refreshed,
Metrics, ParameterExt, Refreshed,
};
use chrono::{Duration, Utc};
use integrationos_domain::{
Expand Down Expand Up @@ -39,6 +39,7 @@ pub async fn refresh(
secrets: Arc<SecretsClient>,
oauths: Arc<MongoStore<ConnectionOAuthDefinition>>,
client: Client,
metrics: Arc<Metrics>,
) -> Result<Unit, Error> {
let refresh_before = Utc::now();
let refresh_after = refresh_before + Duration::minutes(msg.refresh_before_in_minutes());
Expand Down Expand Up @@ -73,18 +74,27 @@ pub async fn refresh(
requests.push(result);
}

match futures::future::join_all(requests)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
{
Ok(vec) => {
tracing::info!("Refreshed {} connections: {:?}", vec.len(), vec);
let results = futures::future::join_all(requests).await;

Ok(())
}
Err(err) => Err(InternalError::io_err(err.to_string().as_str(), None)),
let (successes, failures): (Vec<_>, Vec<_>) =
results.into_iter().partition(|result| result.is_ok());

if !successes.is_empty() {
tracing::info!("Refreshed {} connections: {:?}", successes.len(), successes);
}

if !failures.is_empty() {
tracing::info!(
"Failed to refresh {} connections: {:?}",
failures.len(),
failures
);
}

metrics.add_refreshed(successes.len() as u64);
metrics.add_failed_to_refresh(failures.len() as u64);

Ok(())
}

pub async fn trigger(
Expand Down
45 changes: 0 additions & 45 deletions src/algebra/token.rs

This file was deleted.

127 changes: 0 additions & 127 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,130 +5,3 @@ mod service;
pub use algebra::*;
pub use domain::*;
pub use service::*;

use actix_cors::Cors;
use actix_governor::{Governor, GovernorConfigBuilder};
use actix_web::{
dev::Server,
web::{scope, Data},
App, HttpServer,
};
use actix_web_lab::middleware::from_fn;
use anyhow::Context;
use std::{net::TcpListener, time::Duration};

pub const PREFIX: &str = "/v1";
pub const INTEGRATION_PREFIX: &str = "/integration";

pub struct Application {
port: u16,
server: Server,
}

impl Application {
pub async fn start(configuration: &Configuration) -> Result<Self, anyhow::Error> {
tracing::info!(
"Starting application with configuration: {}{:#?}{}",
"\n",
&configuration,
"\n"
);
let address = format!(
"{}:{}",
configuration.server().host(),
configuration.server().port()
);
let listener = TcpListener::bind(&address)?;
let port = listener.local_addr()?.port();
let state = AppState::try_from(configuration.clone()).await?;

let sleep_timer = Duration::from_secs(configuration.oauth().sleep_timer());
let refresh_before = configuration.oauth().refresh_before();

let connections = state.connections().clone();
let oauths = state.oauths().clone();
let secrets = state.secrets().clone();
let client = state.client().clone();

tokio::spawn(async move {
loop {
let res = refresh(
Refresh::new(refresh_before),
connections.clone(),
secrets.clone(),
oauths.clone(),
client.clone(),
)
.await;
if let Err(e) = res {
tracing::warn!("Failed to send refresh message: {:?}", e);
}

tracing::info!("Sleeping for {} seconds", sleep_timer.as_secs());
tokio::time::sleep(sleep_timer).await;
}
});

let server = run(listener, configuration.clone(), state).await?;

Ok(Self { port, server })
}

pub fn port(&self) -> u16 {
self.port
}

pub fn handler(self) -> Server {
self.server
}

pub async fn spawn(self) -> Result<(), anyhow::Error> {
let server = self.handler();
let http = tokio::spawn(server);

tokio::select! {
res = http => {
res.context("Failed to spawn http application.")?.context("Failed to spawn http application.")
},
}
}
}

async fn run(
listener: TcpListener,
configuration: Configuration,
state: AppState,
) -> Result<Server, anyhow::Error> {
let governor = GovernorConfigBuilder::default()
.per_second(configuration.server().burst_rate_limit())
.permissive(configuration.server().is_development())
.burst_size(configuration.server().burst_size_limit())
.finish()
.context("Failed to create governor.")?;

let server = HttpServer::new(move || {
let trace: Tracer = Tracer::default();
App::new()
.wrap(trace.tracer())
.wrap(
Cors::default()
.allowed_methods(vec!["GET", "POST"])
.allow_any_origin()
.allow_any_header()
.supports_credentials()
.max_age(3600),
)
.wrap(Governor::new(&governor))
.service(
scope(&(PREFIX.to_owned() + INTEGRATION_PREFIX)) // /v1/integration
.wrap(from_fn(auth_middleware))
.service(trigger_refresh),
)
.service(scope(PREFIX).service(health_check)) // /v1
.app_data(Data::new(state.clone()))
})
.listen(listener)?
.run();

Ok(server)
}
38 changes: 30 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,44 @@
use dotenvy::dotenv;
use envconfig::Envconfig;
use integrationos_domain::telemetry::{get_subscriber, init_subscriber};
use oauth_api::{Application, Configuration};
use oauth_api::{refresh, AppState, Refresh, RefreshConfig};
use std::time::Duration;

#[actix_web::main]
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv().ok();

let suscriber = get_subscriber("oauth-api".into(), "info".into(), std::io::stdout);
init_subscriber(suscriber);

let configuration = Configuration::init_from_env()?;
let configuration = RefreshConfig::init_from_env()?;

let address = configuration.server().app_url().to_string();
let application = Application::start(&configuration).await?;
tracing::info!(
"Starting application with configuration: {}{:#?}{}",
"\n",
&configuration,
"\n"
);
let state = AppState::try_from(configuration.clone()).await?;

tracing::info!("Starting server at {}", &address);
application.spawn().await?;
let sleep_timer = Duration::from_secs(configuration.sleep_timer());
let refresh_before = configuration.refresh_before();

Ok(())
loop {
let res = refresh(
Refresh::new(refresh_before),
state.connections().clone(),
state.secrets().clone(),
state.oauths().clone(),
state.client().clone(),
state.metrics().clone(),
)
.await;
if let Err(e) = res {
tracing::warn!("Failed to send refresh message: {:?}", e);
}

tracing::info!("Sleeping for {} seconds", sleep_timer.as_secs());
tokio::time::sleep(sleep_timer).await;
}
}
Loading

0 comments on commit 99ea9f6

Please sign in to comment.