Skip to content

Commit

Permalink
ENG-3200: Centralize OAuth logic to a single repository (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Apr 26, 2024
1 parent 2c492d2 commit 6e9dab2
Show file tree
Hide file tree
Showing 24 changed files with 163 additions and 167 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ actix-governor = "0.5.0"
actix-web = "4.5.1"
actix-web-lab = "0.20.2"
anyhow = "1.0.79"
async-trait = "0.1.80"
chrono = { version = "0.4.33", features = ["serde"] }
dotenvy = "0.15.7"
envconfig = "0.10.0"
Expand Down
41 changes: 2 additions & 39 deletions src/algebra/mod.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,11 @@
mod parameter;
mod refresh;
mod storage;
mod token;
mod trigger;

pub use parameter::*;
pub use refresh::*;
pub use storage::*;
pub use token::*;
pub use trigger::*;

use chrono::{DateTime, Utc};
use integrationos_domain::{
algebra::{MongoStore, StoreExt},
Connection, Id, IntegrationOSError,
};
use mongodb::bson::doc;

pub async fn get_connections_to_refresh(
collection: &MongoStore<Connection>,
refresh_before: &DateTime<Utc>,
refresh_after: &DateTime<Utc>,
) -> Result<Vec<Connection>, IntegrationOSError> {
collection
.get_many(
Some(doc! {
"oauth.enabled.expires_at": doc! {
"$gt": refresh_before.timestamp(),
"$lte": refresh_after.timestamp(),
},
}),
None,
None,
None,
None,
)
.await
}

pub async fn get_connection_to_trigger(
collection: &MongoStore<Connection>,
id: Id,
) -> Result<Option<Connection>, IntegrationOSError> {
collection
.get_one(doc! {
"_id": id.to_string(),
})
.await
}
4 changes: 2 additions & 2 deletions src/algebra/parameter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use std::{
};
use tracing::warn;

pub trait Parameter {
pub trait ParameterExt {
fn headers(&self, computation: Option<&Computation>) -> Result<Option<HeaderMap>, Error>;
fn body(&self, secret: &OAuthSecret) -> Result<Option<Value>, Error>;
fn query(&self, computation: Option<&Computation>) -> Result<Option<Value>, Error>;
}

impl Parameter for ConnectionOAuthDefinition {
impl ParameterExt for ConnectionOAuthDefinition {
fn headers(&self, computation: Option<&Computation>) -> Result<Option<HeaderMap>, Error> {
headers(self, computation)
}
Expand Down
11 changes: 6 additions & 5 deletions src/algebra/refresh.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::prelude::{
get_connections_to_refresh, Query, Refresh, StatefulActor, Trigger, TriggerActor, Unit,
use crate::{
algebra::{StorageExt, TriggerActor},
domain::{Query, Refresh, StatefulActor, Trigger, Unit},
};
use actix::prelude::*;
use chrono::{Duration, Utc};
Expand Down Expand Up @@ -66,9 +67,9 @@ impl Handler<Refresh> for RefreshActor {
let state = self.state.clone();

Box::pin(async move {
let connections =
get_connections_to_refresh(&connections_store, &refresh_before, &refresh_after)
.await?;
let connections = connections_store
.get_by(&refresh_before, &refresh_after)
.await?;

tracing::info!("Found {} connections to refresh", connections.len());

Expand Down
45 changes: 45 additions & 0 deletions src/algebra/storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use integrationos_domain::{Connection, Id, IntegrationOSError, MongoStore, StoreExt};
use mongodb::bson::doc;

#[async_trait]
pub trait StorageExt {
async fn get_by(
&self,
refresh_before: &DateTime<Utc>,
refresh_after: &DateTime<Utc>,
) -> Result<Vec<Connection>, IntegrationOSError>;

async fn get(&self, id: Id) -> Result<Option<Connection>, IntegrationOSError>;
}

#[async_trait]
impl StorageExt for MongoStore<Connection> {
async fn get_by(
&self,
refresh_before: &DateTime<Utc>,
refresh_after: &DateTime<Utc>,
) -> Result<Vec<Connection>, IntegrationOSError> {
self.get_many(
Some(doc! {
"oauth.enabled.expires_at": doc! {
"$gt": refresh_before.timestamp(),
"$lte": refresh_after.timestamp(),
},
}),
None,
None,
None,
None,
)
.await
}

async fn get(&self, id: Id) -> Result<Option<Connection>, IntegrationOSError> {
self.get_one(doc! {
"_id": id.to_string(),
})
.await
}
}
12 changes: 6 additions & 6 deletions src/algebra/token.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::prelude::Config;
use crate::service::Configuration;
use chrono::{Duration, Utc};
use integrationos_domain::{Claims, IntegrationOSError as Error, InternalError};
use jsonwebtoken::{encode, EncodingKey, Header};

pub trait TokenGenerator {
fn generate(&self, configuration: Config, expiration: i64) -> Result<String, Error>;
pub trait TokenExt {
fn generate(&self, configuration: Configuration, expiration: i64) -> Result<String, Error>;
}

#[derive(Debug, Default)]
pub struct JwtTokenGenerator;
pub struct Token;

impl TokenGenerator for JwtTokenGenerator {
fn generate(&self, configuration: Config, expiration: i64) -> Result<String, Error> {
impl TokenExt for Token {
fn generate(&self, configuration: Configuration, expiration: i64) -> Result<String, Error> {
let key = configuration.server().admin_secret();
let key = key.as_bytes();
let key = EncodingKey::from_secret(key);
Expand Down
9 changes: 5 additions & 4 deletions src/algebra/trigger.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::Parameter;
use crate::prelude::{Outcome, Trigger};
use crate::domain::{Outcome, Trigger};

use super::ParameterExt;
use actix::prelude::*;
use chrono::Duration;
use integrationos_domain::{
Expand Down Expand Up @@ -256,8 +257,8 @@ impl Handler<Trigger> for TriggerActor {
Outcome::success(id.to_string().as_str(), json!({ "id": id.to_string() }))
}
Err(e) => Outcome::failure(
msg.connection().id.to_string().as_str(),
json!({ "error": e.to_string() }),
e,
json!({ "connectionId": msg.connection().id.to_string() }),
),
}
};
Expand Down
6 changes: 6 additions & 0 deletions src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ pub use query::*;
pub use refresh::*;
pub use state::*;
pub use trigger::*;

use futures::Future;
use std::pin::Pin;

pub type Unit = ();
pub type Task = Pin<Box<dyn Future<Output = Unit> + Send + Sync>>;
23 changes: 14 additions & 9 deletions src/domain/outcome.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
use serde::{Deserialize, Serialize};
use integrationos_domain::IntegrationOSError;
use serde::Serialize;
use serde_json::Value;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "type")]
pub enum Outcome {
Success { message: String, metadata: Value },
Failure { message: String, metadata: Value },
Success {
message: String,
metadata: Value,
},
Failure {
error: IntegrationOSError,
metadata: Value,
},
}

impl Outcome {
Expand All @@ -16,10 +24,7 @@ impl Outcome {
}
}

pub fn failure(message: &str, metadata: Value) -> Self {
Self::Failure {
message: message.to_string(),
metadata,
}
pub fn failure(error: IntegrationOSError, metadata: Value) -> Self {
Self::Failure { error, metadata }
}
}
2 changes: 1 addition & 1 deletion src/domain/refresh.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::prelude::Unit;
use crate::domain::Unit;
use actix::prelude::*;
use integrationos_domain::error::IntegrationOSError as Error;

Expand Down
2 changes: 1 addition & 1 deletion src/domain/trigger.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::prelude::Outcome;
use crate::domain::Outcome;
use actix::prelude::*;
use integrationos_domain::Connection;

Expand Down
31 changes: 9 additions & 22 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@ mod algebra;
mod domain;
mod service;

pub mod prelude {
pub use super::algebra::*;
pub use super::domain::*;
pub use super::service::*;
pub use algebra::*;
pub use domain::*;
pub use service::*;

pub type Unit = ();
}

use crate::prelude::{AppState, Config, Refresh, Tracer, Unit};
use actix_cors::Cors;
use actix_governor::{Governor, GovernorConfigBuilder};
use actix_web::{
Expand All @@ -20,14 +15,10 @@ use actix_web::{
};
use actix_web_lab::middleware::from_fn;
use anyhow::Context;
use futures::Future;
use prelude::{admin_middleware, get_state, health_check, sensitive_middleware, trigger_refresh};
use std::{net::TcpListener, pin::Pin, time::Duration};
use std::{net::TcpListener, time::Duration};

pub const PREFIX: &str = "/v1";
pub const ADMIN_PREFIX: &str = "/admin";
pub const INTEGRATION_PREFIX: &str = "/integration";
type Task = Pin<Box<dyn Future<Output = Unit> + Send + Sync>>;

pub struct Application {
port: u16,
Expand All @@ -36,7 +27,7 @@ pub struct Application {
}

impl Application {
pub async fn start(configuration: &Config) -> Result<Self, anyhow::Error> {
pub async fn start(configuration: &Configuration) -> Result<Self, anyhow::Error> {
tracing::info!(
"Starting application with configuration: {}{:#?}{}",
"\n",
Expand Down Expand Up @@ -100,7 +91,7 @@ impl Application {

async fn run(
listener: TcpListener,
configuration: Config,
configuration: Configuration,
state: AppState,
) -> Result<Server, anyhow::Error> {
let governor = GovernorConfigBuilder::default()
Expand All @@ -123,15 +114,11 @@ async fn run(
.max_age(3600),
)
.wrap(Governor::new(&governor))
.service(
scope(&(PREFIX.to_owned() + ADMIN_PREFIX)) // /v1/admin
.wrap(from_fn(sensitive_middleware))
.service(get_state),
)
.service(
scope(&(PREFIX.to_owned() + INTEGRATION_PREFIX)) // /v1/integration
.wrap(from_fn(admin_middleware))
.service(trigger_refresh),
.wrap(from_fn(auth_middleware))
.service(trigger_refresh)
.service(get_state),
)
.service(scope(PREFIX).service(health_check)) // /v1
.app_data(Data::new(state.clone()))
Expand Down
8 changes: 3 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use dotenvy::dotenv;
use oauth_api::{
prelude::{get_subscriber, init_subscriber, Config, OAuthConfig, ServerConfig},
Application,
};
use integrationos_domain::telemetry::{get_subscriber, init_subscriber};
use oauth_api::{Application, Configuration, OAuthConfig, ServerConfig};

#[actix_web::main]
async fn main() -> anyhow::Result<()> {
Expand All @@ -13,7 +11,7 @@ async fn main() -> anyhow::Result<()> {

let oauth = OAuthConfig::load().expect("Failed to read configuration.");
let server = ServerConfig::load().expect("Failed to read configuration.");
let configuration = Config::new(oauth, server);
let configuration = Configuration::new(oauth, server);

let address = configuration.server().app_url().to_string();
let application = Application::start(&configuration).await?;
Expand Down
8 changes: 4 additions & 4 deletions src/service/configuration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,12 @@ impl ServerConfig {
}

#[derive(Clone)]
pub struct Config {
pub struct Configuration {
oauth: OAuthConfig,
server: ServerConfig,
}

impl Debug for Config {
impl Debug for Configuration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let _ = f
.debug_struct("OAuthConfig")
Expand All @@ -185,7 +185,7 @@ impl Debug for Config {
}
}

impl Config {
impl Configuration {
pub fn new(oauth: OAuthConfig, server: ServerConfig) -> Self {
Self { oauth, server }
}
Expand Down Expand Up @@ -296,7 +296,7 @@ impl From<HashMap<&str, &str>> for ServerConfig {
}
}

impl From<HashMap<&str, &str>> for Config {
impl From<HashMap<&str, &str>> for Configuration {
fn from(value: HashMap<&str, &str>) -> Self {
let oauth = OAuthConfig::from(value.clone());
let server = ServerConfig::from(value);
Expand Down
Loading

0 comments on commit 6e9dab2

Please sign in to comment.