Skip to content

Commit

Permalink
chore: migrate to new secret service impl
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez committed Sep 17, 2024
1 parent 99ea9f6 commit 8e9d13d
Show file tree
Hide file tree
Showing 8 changed files with 1,441 additions and 401 deletions.
1,493 changes: 1,254 additions & 239 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@ dotenvy = "0.15.7"
envconfig = "0.10.0"
futures = "0.3.30"
handlebars = "5.1.1"
integrationos-domain = { version = "4.1.6", features = ["dummy"] }
integrationos-domain = { version = "7.0.0", features = ["dummy"] }
metrics = "0.21.1"
metrics-exporter-prometheus = "0.12.1"
mongodb = "2.8.0"
reqwest = { version = "0.12.3", features = [
"json",
"rustls-tls",
], default-features = false }
] }
reqwest-middleware = { version = "0.3.3", features = [
"json",
"rustls-tls",
] }
reqwest-retry = "0.6.1"
reqwest-tracing = "0.5.3"
serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113"
tokio = { version = "1.35.1", features = ["macros", "rt-multi-thread"] }
Expand Down
2 changes: 2 additions & 0 deletions src/algebra/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod metrics;
mod parameter;
mod refresh;
mod secrets;
mod storage;

pub use metrics::*;
pub use parameter::*;
pub use refresh::*;
pub use secrets::*;
pub use storage::*;
31 changes: 14 additions & 17 deletions src/algebra/refresh.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use crate::{
algebra::StorageExt,
domain::{Refresh, Trigger, Unit},
Metrics, ParameterExt, Refreshed,
Metrics, ParameterExt, Refreshed, SecretsClient,
};
use chrono::{Duration, Utc};
use integrationos_domain::{
algebra::MongoStore,
api_model_config::ContentType,
client::secrets_client::SecretsClient,
connection_oauth_definition::{Computation, ConnectionOAuthDefinition, OAuthResponse},
error::IntegrationOSError as Error,
get_secret_request::GetSecretRequest,
oauth_secret::OAuthSecret,
ApplicationError, Connection, DefaultTemplate, InternalError, OAuth, TemplateExt,
};
use mongodb::bson::{self, doc};
use reqwest::Client;
use reqwest_middleware::ClientWithMiddleware;
use serde_json::json;
use std::sync::Arc;
use tracing::warn;
Expand All @@ -38,7 +36,7 @@ pub async fn refresh(
connections_store: Arc<MongoStore<Connection>>,
secrets: Arc<SecretsClient>,
oauths: Arc<MongoStore<ConnectionOAuthDefinition>>,
client: Client,
client: ClientWithMiddleware,
metrics: Arc<Metrics>,
) -> Result<Unit, Error> {
let refresh_before = Utc::now();
Expand Down Expand Up @@ -102,7 +100,7 @@ pub async fn trigger(
secrets: Arc<SecretsClient>,
connections: Arc<MongoStore<Connection>>,
oauths: Arc<MongoStore<ConnectionOAuthDefinition>>,
client: Client,
client: ClientWithMiddleware,
) -> Result<Refreshed, Error> {
let template = DefaultTemplate::default();

Expand Down Expand Up @@ -135,15 +133,12 @@ pub async fn trigger(
))?;

let secret: OAuthSecret = secrets
.get_secret::<OAuthSecret>(&GetSecretRequest {
id: msg.connection().secrets_service_id.clone(),
buildable_id: msg.connection().ownership.client_id.clone(),
})
.await
.map_err(|e| {
warn!("Failed to get secret: {}", e);
ApplicationError::not_found(format!("Failed to get secret: {}", e).as_str(), None)
})?;
.get_secret::<OAuthSecret>(
&msg.connection().secrets_service_id,
&msg.connection().ownership.client_id,
&msg.connection().environment,
)
.await?;

let compute_payload = serde_json::to_value(&secret).map_err(|e| {
warn!("Failed to serialize secret: {}", e);
Expand Down Expand Up @@ -175,6 +170,7 @@ pub async fn trigger(
let request = client
.post(conn_oauth_definition.configuration.refresh.uri())
.headers(headers.unwrap_or_default());

let request = match conn_oauth_definition.configuration.refresh.content {
Some(ContentType::Json) => request.json(&body).query(&query),
Some(ContentType::Form) => request.form(&body).query(&query),
Expand Down Expand Up @@ -218,7 +214,8 @@ pub async fn trigger(
let secret = secrets
.create_secret(
msg.connection().clone().ownership.client_id,
&oauth_secret.as_json(),
oauth_secret.as_json(),
msg.connection().environment,
)
.await
.map_err(|e| {
Expand All @@ -240,7 +237,7 @@ pub async fn trigger(
warn!("Failed to serialize oauth: {}", e);
InternalError::serialize_error("Failed to serialize oauth", None)
})?,
"secretsServiceId": secret.id,
"secretsServiceId": secret.id(),
}
};

Expand Down
129 changes: 129 additions & 0 deletions src/algebra/secrets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use crate::RefreshConfig;
use integrationos_domain::{
environment::Environment, event_access::EventAccess, IntegrationOSError, InternalError,
MongoStore, Secret,
};
use mongodb::bson::doc;
use reqwest_middleware::ClientWithMiddleware;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
use tracing::warn;

const PRODUCTION_KEY: &str = "event_access::custom::live::default::event-inc::internal-ui";
const TEST_KEY: &str = "event_access::custom::test::default::event-inc::internal-ui";
const INTEGRATIONOS_SECRET_HEADER: &str = "X-INTEGRATIONOS-SECRET";

#[derive(Debug, Clone)]
pub struct SecretsClient {
get: String,
create: String,
client: ClientWithMiddleware,
event: Arc<MongoStore<EventAccess>>,
}

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct CreateSecretRequest {
secret: Value,
}

impl SecretsClient {
pub fn new(
config: &RefreshConfig,
event: &Arc<MongoStore<EventAccess>>,
client: ClientWithMiddleware,
) -> Self {
Self {
get: config.get_secret().to_string(),
create: config.create_secret().to_string(),
client,
event: Arc::clone(event),
}
}

pub async fn get_secret<T: for<'a> Deserialize<'a>>(
&self,
id: &str,
buildable_id: &str,
environment: &Environment,
) -> Result<T, IntegrationOSError> {
let key = match environment {
Environment::Test | Environment::Development => TEST_KEY,
Environment::Live | Environment::Production => PRODUCTION_KEY,
};

let event = self
.event
.get_one(doc! {
"ownership.buildableId": buildable_id,
"key": key
})
.await?
.ok_or(InternalError::key_not_found("Event access not found", None))?;

let access_key = event.access_key.clone();

let uri = format!("{}/{}", self.get, id);
let response = self
.client
.get(&uri)
.header(INTEGRATIONOS_SECRET_HEADER, access_key)
.send()
.await
.map_err(|err| {
InternalError::io_err(&format!("Failed to send request: {err}"), None)
})?;

let secret: Secret = response.json().await.map_err(|err| {
InternalError::serialize_error(&format!("Failed to deserialize response: {err}"), None)
})?;

secret.decode()
}

pub async fn create_secret<T: Serialize + for<'a> Deserialize<'a>>(
&self,
buildable_id: String,
secret: T,
environment: Environment,
) -> Result<Secret, IntegrationOSError> {
let payload = CreateSecretRequest {
secret: serde_json::to_value(&secret).map_err(|e| {
warn!("Failed to serialize secret: {}", e);
InternalError::serialize_error("Failed to serialize secret", None)
})?,
};

let key = match environment {
Environment::Test | Environment::Development => TEST_KEY,
Environment::Live | Environment::Production => PRODUCTION_KEY,
};

let event = self
.event
.get_one(doc! {
"ownership.buildableId": buildable_id,
"key": key
})
.await?
.ok_or(InternalError::key_not_found("Event access not found", None))?;

let access_key = event.access_key.clone();

let response = self
.client
.post(&self.create)
.json(&payload)
.header(INTEGRATIONOS_SECRET_HEADER, access_key)
.send()
.await
.map_err(|err| {
InternalError::io_err(&format!("Failed to send request: {err}"), None)
})?;

response.json().await.map_err(|err| {
InternalError::serialize_error(&format!("Failed to deserialize response: {err}"), None)
})
}
}
53 changes: 17 additions & 36 deletions src/service/configuration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use envconfig::Envconfig;
use integrationos_domain::{
database::DatabaseConfig, environment::Environment, secrets::SecretsConfig,
};
use std::collections::HashMap;
use std::fmt::Debug;

#[derive(Clone, Envconfig)]
Expand All @@ -19,6 +18,12 @@ pub struct RefreshConfig {
timeout: u64,
#[envconfig(from = "ENVIRONMENT", default = "test")]
environment: Environment,
#[envconfig(from = "GET_SECRET_PATH", default = "http://localhost:3005/v1/secrets")]
get_secret: String,
#[envconfig(from = "CREATE_SECRET_PATH", default = "http://localhost:3005/v1/secrets")]
create_secret: String,
#[envconfig(from = "MAX_RETRIES", default = "3")]
max_retries: u32,
}

impl Debug for RefreshConfig {
Expand All @@ -27,6 +32,9 @@ impl Debug for RefreshConfig {
writeln!(f, "SLEEP_TIMER_IN_SECONDS: {}", self.sleep_timer)?;
writeln!(f, "TIMEOUT: {}", self.timeout)?;
writeln!(f, "ENVIRONMENT: {}", self.environment)?;
writeln!(f, "GET_SECRET_PATH: {}", self.get_secret)?;
writeln!(f, "CREATE_SECRET_PATH: {}", self.create_secret)?;
writeln!(f, "MAX_RETRIES: {}", self.max_retries)?;
write!(f, "{}", self.database)?;
write!(f, "{}", self.secrets_config)
}
Expand Down Expand Up @@ -56,43 +64,16 @@ impl RefreshConfig {
pub fn environment(&self) -> Environment {
self.environment
}
}

impl From<HashMap<&str, &str>> for RefreshConfig {
fn from(value: HashMap<&str, &str>) -> Self {
let refresh_before = value
.get("REFRESH_BEFORE_IN_MINUTES")
.and_then(|value| value.parse().ok())
.unwrap_or(10);

let sleep_timer = value
.get("SLEEP_TIMER_IN_SECONDS")
.and_then(|value| value.parse().ok())
.unwrap_or(20);
pub fn get_secret(&self) -> &str {
&self.get_secret
}

let owned = value
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
let database = DatabaseConfig::init_from_hashmap(&owned).unwrap_or_default();
let secrets_config = SecretsConfig::init_from_hashmap(&owned).unwrap_or_default();
let timeout = value
.get("TIMEOUT")
.and_then(|value| value.parse().ok())
.unwrap_or(30);
let environment = value
.get("ENVIRONMENT")
.unwrap_or(&"test")
.parse()
.expect("Failed to parse environment");
pub fn create_secret(&self) -> &str {
&self.create_secret
}

Self {
refresh_before,
environment,
sleep_timer,
timeout,
database,
secrets_config,
}
pub fn max_retries(&self) -> u32 {
self.max_retries
}
}
Loading

0 comments on commit 8e9d13d

Please sign in to comment.