Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENG-3200: Centralize OAuth logic to a single repository #7

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ actix-governor = "0.5.0"
actix-web = "4.5.1"
actix-web-lab = "0.20.2"
anyhow = "1.0.79"
async-trait = "0.1.80"
chrono = { version = "0.4.33", features = ["serde"] }
dotenvy = "0.15.7"
envconfig = "0.10.0"
Expand Down
41 changes: 2 additions & 39 deletions src/algebra/mod.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,11 @@
mod parameter;
mod refresh;
mod storage;
mod token;
mod trigger;

pub use parameter::*;
pub use refresh::*;
pub use storage::*;
pub use token::*;
pub use trigger::*;

use chrono::{DateTime, Utc};
use integrationos_domain::{
algebra::{MongoStore, StoreExt},
Connection, Id, IntegrationOSError,
};
use mongodb::bson::doc;

pub async fn get_connections_to_refresh(
collection: &MongoStore<Connection>,
refresh_before: &DateTime<Utc>,
refresh_after: &DateTime<Utc>,
) -> Result<Vec<Connection>, IntegrationOSError> {
collection
.get_many(
Some(doc! {
"oauth.enabled.expires_at": doc! {
"$gt": refresh_before.timestamp(),
"$lte": refresh_after.timestamp(),
},
}),
None,
None,
None,
None,
)
.await
}

pub async fn get_connection_to_trigger(
collection: &MongoStore<Connection>,
id: Id,
) -> Result<Option<Connection>, IntegrationOSError> {
collection
.get_one(doc! {
"_id": id.to_string(),
})
.await
}
4 changes: 2 additions & 2 deletions src/algebra/parameter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use std::{
};
use tracing::warn;

pub trait Parameter {
pub trait ParameterExt {
fn headers(&self, computation: Option<&Computation>) -> Result<Option<HeaderMap>, Error>;
fn body(&self, secret: &OAuthSecret) -> Result<Option<Value>, Error>;
fn query(&self, computation: Option<&Computation>) -> Result<Option<Value>, Error>;
}

impl Parameter for ConnectionOAuthDefinition {
impl ParameterExt for ConnectionOAuthDefinition {
fn headers(&self, computation: Option<&Computation>) -> Result<Option<HeaderMap>, Error> {
headers(self, computation)
}
Expand Down
11 changes: 6 additions & 5 deletions src/algebra/refresh.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::prelude::{
get_connections_to_refresh, Query, Refresh, StatefulActor, Trigger, TriggerActor, Unit,
use crate::{
algebra::{StorageExt, TriggerActor},
domain::{Query, Refresh, StatefulActor, Trigger, Unit},
};
use actix::prelude::*;
use chrono::{Duration, Utc};
Expand Down Expand Up @@ -66,9 +67,9 @@ impl Handler<Refresh> for RefreshActor {
let state = self.state.clone();

Box::pin(async move {
let connections =
get_connections_to_refresh(&connections_store, &refresh_before, &refresh_after)
.await?;
let connections = connections_store
.get_by(&refresh_before, &refresh_after)
.await?;

tracing::info!("Found {} connections to refresh", connections.len());

Expand Down
45 changes: 45 additions & 0 deletions src/algebra/storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use integrationos_domain::{Connection, Id, IntegrationOSError, MongoStore, StoreExt};
use mongodb::bson::doc;

#[async_trait]
pub trait StorageExt {
async fn get_by(
&self,
refresh_before: &DateTime<Utc>,
refresh_after: &DateTime<Utc>,
) -> Result<Vec<Connection>, IntegrationOSError>;

async fn get(&self, id: Id) -> Result<Option<Connection>, IntegrationOSError>;
}

#[async_trait]
impl StorageExt for MongoStore<Connection> {
async fn get_by(
&self,
refresh_before: &DateTime<Utc>,
refresh_after: &DateTime<Utc>,
) -> Result<Vec<Connection>, IntegrationOSError> {
self.get_many(
Some(doc! {
"oauth.enabled.expires_at": doc! {
"$gt": refresh_before.timestamp(),
"$lte": refresh_after.timestamp(),
},
}),
None,
None,
None,
None,
)
.await
}

async fn get(&self, id: Id) -> Result<Option<Connection>, IntegrationOSError> {
self.get_one(doc! {
"_id": id.to_string(),
})
.await
}
}
12 changes: 6 additions & 6 deletions src/algebra/token.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::prelude::Config;
use crate::service::Configuration;
use chrono::{Duration, Utc};
use integrationos_domain::{Claims, IntegrationOSError as Error, InternalError};
use jsonwebtoken::{encode, EncodingKey, Header};

pub trait TokenGenerator {
fn generate(&self, configuration: Config, expiration: i64) -> Result<String, Error>;
pub trait TokenExt {
fn generate(&self, configuration: Configuration, expiration: i64) -> Result<String, Error>;
}

#[derive(Debug, Default)]
pub struct JwtTokenGenerator;
pub struct Token;

impl TokenGenerator for JwtTokenGenerator {
fn generate(&self, configuration: Config, expiration: i64) -> Result<String, Error> {
impl TokenExt for Token {
fn generate(&self, configuration: Configuration, expiration: i64) -> Result<String, Error> {
let key = configuration.server().admin_secret();
let key = key.as_bytes();
let key = EncodingKey::from_secret(key);
Expand Down
9 changes: 5 additions & 4 deletions src/algebra/trigger.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::Parameter;
use crate::prelude::{Outcome, Trigger};
use crate::domain::{Outcome, Trigger};

use super::ParameterExt;
use actix::prelude::*;
use chrono::Duration;
use integrationos_domain::{
Expand Down Expand Up @@ -256,8 +257,8 @@ impl Handler<Trigger> for TriggerActor {
Outcome::success(id.to_string().as_str(), json!({ "id": id.to_string() }))
}
Err(e) => Outcome::failure(
msg.connection().id.to_string().as_str(),
json!({ "error": e.to_string() }),
e,
json!({ "connectionId": msg.connection().id.to_string() }),
),
}
};
Expand Down
6 changes: 6 additions & 0 deletions src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ pub use query::*;
pub use refresh::*;
pub use state::*;
pub use trigger::*;

use futures::Future;
use std::pin::Pin;

pub type Unit = ();
pub type Task = Pin<Box<dyn Future<Output = Unit> + Send + Sync>>;
23 changes: 14 additions & 9 deletions src/domain/outcome.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
use serde::{Deserialize, Serialize};
use integrationos_domain::IntegrationOSError;
use serde::Serialize;
use serde_json::Value;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "type")]
pub enum Outcome {
Success { message: String, metadata: Value },
Failure { message: String, metadata: Value },
Success {
message: String,
metadata: Value,
},
Failure {
error: IntegrationOSError,
metadata: Value,
},
}

impl Outcome {
Expand All @@ -16,10 +24,7 @@ impl Outcome {
}
}

pub fn failure(message: &str, metadata: Value) -> Self {
Self::Failure {
message: message.to_string(),
metadata,
}
pub fn failure(error: IntegrationOSError, metadata: Value) -> Self {
Self::Failure { error, metadata }
}
}
2 changes: 1 addition & 1 deletion src/domain/refresh.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::prelude::Unit;
use crate::domain::Unit;
use actix::prelude::*;
use integrationos_domain::error::IntegrationOSError as Error;

Expand Down
2 changes: 1 addition & 1 deletion src/domain/trigger.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::prelude::Outcome;
use crate::domain::Outcome;
use actix::prelude::*;
use integrationos_domain::Connection;

Expand Down
31 changes: 9 additions & 22 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@ mod algebra;
mod domain;
mod service;

pub mod prelude {
pub use super::algebra::*;
pub use super::domain::*;
pub use super::service::*;
pub use algebra::*;
pub use domain::*;
pub use service::*;

pub type Unit = ();
}

use crate::prelude::{AppState, Config, Refresh, Tracer, Unit};
use actix_cors::Cors;
use actix_governor::{Governor, GovernorConfigBuilder};
use actix_web::{
Expand All @@ -20,14 +15,10 @@ use actix_web::{
};
use actix_web_lab::middleware::from_fn;
use anyhow::Context;
use futures::Future;
use prelude::{admin_middleware, get_state, health_check, sensitive_middleware, trigger_refresh};
use std::{net::TcpListener, pin::Pin, time::Duration};
use std::{net::TcpListener, time::Duration};

pub const PREFIX: &str = "/v1";
pub const ADMIN_PREFIX: &str = "/admin";
pub const INTEGRATION_PREFIX: &str = "/integration";
type Task = Pin<Box<dyn Future<Output = Unit> + Send + Sync>>;

pub struct Application {
port: u16,
Expand All @@ -36,7 +27,7 @@ pub struct Application {
}

impl Application {
pub async fn start(configuration: &Config) -> Result<Self, anyhow::Error> {
pub async fn start(configuration: &Configuration) -> Result<Self, anyhow::Error> {
tracing::info!(
"Starting application with configuration: {}{:#?}{}",
"\n",
Expand Down Expand Up @@ -100,7 +91,7 @@ impl Application {

async fn run(
listener: TcpListener,
configuration: Config,
configuration: Configuration,
state: AppState,
) -> Result<Server, anyhow::Error> {
let governor = GovernorConfigBuilder::default()
Expand All @@ -123,15 +114,11 @@ async fn run(
.max_age(3600),
)
.wrap(Governor::new(&governor))
.service(
scope(&(PREFIX.to_owned() + ADMIN_PREFIX)) // /v1/admin
.wrap(from_fn(sensitive_middleware))
.service(get_state),
)
.service(
scope(&(PREFIX.to_owned() + INTEGRATION_PREFIX)) // /v1/integration
.wrap(from_fn(admin_middleware))
.service(trigger_refresh),
.wrap(from_fn(auth_middleware))
.service(trigger_refresh)
.service(get_state),
)
.service(scope(PREFIX).service(health_check)) // /v1
.app_data(Data::new(state.clone()))
Expand Down
8 changes: 3 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use dotenvy::dotenv;
use oauth_api::{
prelude::{get_subscriber, init_subscriber, Config, OAuthConfig, ServerConfig},
Application,
};
use integrationos_domain::telemetry::{get_subscriber, init_subscriber};
use oauth_api::{Application, Configuration, OAuthConfig, ServerConfig};

#[actix_web::main]
async fn main() -> anyhow::Result<()> {
Expand All @@ -13,7 +11,7 @@ async fn main() -> anyhow::Result<()> {

let oauth = OAuthConfig::load().expect("Failed to read configuration.");
let server = ServerConfig::load().expect("Failed to read configuration.");
let configuration = Config::new(oauth, server);
let configuration = Configuration::new(oauth, server);

let address = configuration.server().app_url().to_string();
let application = Application::start(&configuration).await?;
Expand Down
8 changes: 4 additions & 4 deletions src/service/configuration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,12 @@ impl ServerConfig {
}

#[derive(Clone)]
pub struct Config {
pub struct Configuration {
oauth: OAuthConfig,
server: ServerConfig,
}

impl Debug for Config {
impl Debug for Configuration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let _ = f
.debug_struct("OAuthConfig")
Expand All @@ -185,7 +185,7 @@ impl Debug for Config {
}
}

impl Config {
impl Configuration {
pub fn new(oauth: OAuthConfig, server: ServerConfig) -> Self {
Self { oauth, server }
}
Expand Down Expand Up @@ -296,7 +296,7 @@ impl From<HashMap<&str, &str>> for ServerConfig {
}
}

impl From<HashMap<&str, &str>> for Config {
impl From<HashMap<&str, &str>> for Configuration {
fn from(value: HashMap<&str, &str>) -> Self {
let oauth = OAuthConfig::from(value.clone());
let server = ServerConfig::from(value);
Expand Down
Loading
Loading