From 9c3d98e3d531f49f06fef57e489ae939bfc73284 Mon Sep 17 00:00:00 2001 From: Samuel Gomez Date: Thu, 6 Jun 2024 19:26:08 +0100 Subject: [PATCH 1/2] Returning connection id as actor outcome --- src/algebra/refresh.rs | 24 +----------------------- src/algebra/trigger.rs | 12 ++++++------ src/domain/mod.rs | 2 -- src/domain/query.rs | 6 ------ src/lib.rs | 1 - src/service/http/private/mod.rs | 2 -- src/service/http/private/refresh.rs | 17 ----------------- 7 files changed, 7 insertions(+), 57 deletions(-) delete mode 100644 src/domain/query.rs delete mode 100644 src/service/http/private/refresh.rs diff --git a/src/algebra/refresh.rs b/src/algebra/refresh.rs index 7989f92..168aebe 100644 --- a/src/algebra/refresh.rs +++ b/src/algebra/refresh.rs @@ -1,10 +1,9 @@ use crate::{ algebra::{StorageExt, TriggerActor}, - domain::{Query, Refresh, StatefulActor, Trigger, Unit}, + domain::{Refresh, Trigger, Unit}, }; use actix::prelude::*; use chrono::{Duration, Utc}; -use futures::lock::Mutex; use integrationos_domain::{ algebra::MongoStore, client::secrets_client::SecretsClient, connection_oauth_definition::ConnectionOAuthDefinition, error::IntegrationOSError as Error, @@ -18,7 +17,6 @@ pub struct RefreshActor { oauths: Arc>, secrets: Arc, client: Client, - state: Arc>, } impl RefreshActor { @@ -33,7 +31,6 @@ impl RefreshActor { oauths, secrets, client, - state: StatefulActor::empty(), } } } @@ -64,7 +61,6 @@ impl Handler for RefreshActor { let client = self.client.clone(); let connections_store = self.connections.clone(); let oauths_store = self.oauths.clone(); - let state = self.state.clone(); Box::pin(async move { tracing::info!("Searching for connections to refresh"); @@ -99,14 +95,6 @@ impl Handler for RefreshActor { .collect::, _>>() { Ok(vec) => { - let vec_as_json = serde_json::to_value(&vec).map_err(|e| { - InternalError::encryption_error( - "Failed to serialize outcome", - Some(e.to_string().as_str()), - ) - })?; - StatefulActor::update(vec_as_json, state).await; - tracing::info!( "Refreshed {} connections with outcome: {:?}", vec.len(), @@ -120,13 +108,3 @@ impl Handler for RefreshActor { }) } } - -impl Handler for RefreshActor { - type Result = ResponseFuture; - - fn handle(&mut self, _: Query, _: &mut Self::Context) -> Self::Result { - let state = self.state.clone(); - - Box::pin(async move { state.lock().await.clone() }) - } -} diff --git a/src/algebra/trigger.rs b/src/algebra/trigger.rs index 1a696b7..82cd9f9 100644 --- a/src/algebra/trigger.rs +++ b/src/algebra/trigger.rs @@ -91,7 +91,7 @@ impl Handler for TriggerActor { let template = DefaultTemplate::default(); let ask = || async { - let conn_id = match &msg.connection().oauth { + let conn_oauth_id = match &msg.connection().oauth { Some(OAuth::Enabled { connection_oauth_definition_id: conn_oauth_definition_id, .. @@ -104,7 +104,7 @@ impl Handler for TriggerActor { let conn_oauth_definition = oauths .get_one(doc! { - "_id": conn_id.to_string(), + "_id": conn_oauth_id.to_string(), }) .await .map_err(|e| { @@ -115,7 +115,8 @@ impl Handler for TriggerActor { ) })? .ok_or(ApplicationError::not_found( - format!("Connection oauth definition not found: {}", conn_id).as_str(), + format!("Connection oauth definition not found: {}", conn_oauth_id) + .as_str(), None, ))?; @@ -203,7 +204,6 @@ impl Handler for TriggerActor { })?; let oauth_secret = secret.from_refresh(decoded, None, None, json); - let secret = secrets_client .create_secret( msg.connection().clone().ownership.client_id, @@ -216,7 +216,7 @@ impl Handler for TriggerActor { })?; let set = OAuth::Enabled { - connection_oauth_definition_id: *conn_id, + connection_oauth_definition_id: *conn_oauth_id, expires_at: Some( (chrono::Utc::now() + Duration::seconds(oauth_secret.expires_in as i64)) .timestamp(), @@ -248,7 +248,7 @@ impl Handler for TriggerActor { msg.connection().id ); - Ok::(*conn_id) + Ok::(msg.connection().id) }; match ask().await { diff --git a/src/domain/mod.rs b/src/domain/mod.rs index 3bb1e90..94c9631 100644 --- a/src/domain/mod.rs +++ b/src/domain/mod.rs @@ -1,11 +1,9 @@ mod outcome; -mod query; mod refresh; mod state; mod trigger; pub use outcome::*; -pub use query::*; pub use refresh::*; pub use state::*; pub use trigger::*; diff --git a/src/domain/query.rs b/src/domain/query.rs deleted file mode 100644 index bea6e81..0000000 --- a/src/domain/query.rs +++ /dev/null @@ -1,6 +0,0 @@ -use super::state::StatefulActor; -use actix::prelude::*; - -#[derive(Message, Debug, Clone)] -#[rtype(result = "StatefulActor")] -pub struct Query; diff --git a/src/lib.rs b/src/lib.rs index 1cd821d..23fc46e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -118,7 +118,6 @@ async fn run( scope(&(PREFIX.to_owned() + INTEGRATION_PREFIX)) // /v1/integration .wrap(from_fn(auth_middleware)) .service(trigger_refresh) - .service(get_state), ) .service(scope(PREFIX).service(health_check)) // /v1 .app_data(Data::new(state.clone())) diff --git a/src/service/http/private/mod.rs b/src/service/http/private/mod.rs index f6e3670..e863fd1 100644 --- a/src/service/http/private/mod.rs +++ b/src/service/http/private/mod.rs @@ -1,5 +1,3 @@ -mod refresh; mod trigger; -pub use refresh::*; pub use trigger::*; diff --git a/src/service/http/private/refresh.rs b/src/service/http/private/refresh.rs deleted file mode 100644 index 89e57ac..0000000 --- a/src/service/http/private/refresh.rs +++ /dev/null @@ -1,17 +0,0 @@ -use crate::domain::Query; -use crate::service::{AppState, ResponseType, ServerResponse}; -use actix_web::{get, web::Data, HttpResponse}; -use integrationos_domain::error::IntegrationOSError as Error; -use integrationos_domain::InternalError; - -#[tracing::instrument(skip(state))] -#[get("/get_state")] -pub async fn get_state(state: Data) -> Result { - let response = state - .refresh_actor - .send(Query) - .await - .map_err(|e| InternalError::io_err(e.to_string().as_str(), None))?; - - Ok(ServerResponse::from(ResponseType::Query, response, 200)) -} From 5fe996433b20ad3616ba345eaff809f760b703f6 Mon Sep 17 00:00:00 2001 From: Samuel Gomez Date: Thu, 6 Jun 2024 19:27:13 +0100 Subject: [PATCH 2/2] formatting --- src/domain/mod.rs | 2 -- src/domain/state.rs | 53 --------------------------------------------- src/lib.rs | 2 +- 3 files changed, 1 insertion(+), 56 deletions(-) delete mode 100644 src/domain/state.rs diff --git a/src/domain/mod.rs b/src/domain/mod.rs index 94c9631..d0d0b51 100644 --- a/src/domain/mod.rs +++ b/src/domain/mod.rs @@ -1,11 +1,9 @@ mod outcome; mod refresh; -mod state; mod trigger; pub use outcome::*; pub use refresh::*; -pub use state::*; pub use trigger::*; use futures::Future; diff --git a/src/domain/state.rs b/src/domain/state.rs deleted file mode 100644 index bd43aab..0000000 --- a/src/domain/state.rs +++ /dev/null @@ -1,53 +0,0 @@ -use chrono::{DateTime, Utc}; -use futures::lock::Mutex; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use std::sync::Arc; - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(rename_all = "camelCase")] -pub struct StatefulActor { - /// The state of the actor - state: Value, - /// The last time the actor was updated - last_updated: DateTime, -} - -impl StatefulActor { - /// Create a new empty stateful actor - /// - /// An `Arc` is used to share the state between actors, - /// when `clone` is called on the `Arc` the reference count - /// is increased by one rather than creating a new copy of - /// the state. - /// - /// Whilst a `Mutex` is used to ensure that only one actor - /// can access the state for mutation at a time. - /// - /// Hence, the combination of `Arc` and `Mutex` allows - /// multiple actors to share the same state and mutate it - /// in a thread-safe manner. - pub fn empty() -> Arc> { - Arc::new(Mutex::new(StatefulActor { - state: Value::Null, - last_updated: Utc::now(), - })) - } - - /// Returns the state of the actor - pub fn state(&self) -> &Value { - &self.state - } - - /// Returns the last time the actor was updated - pub fn last_updated(&self) -> &DateTime { - &self.last_updated - } - - /// Updates the state of the actor - pub async fn update(value: Value, state: Arc>) { - let mut actor = state.lock().await; - actor.state = value; - actor.last_updated = Utc::now(); - } -} diff --git a/src/lib.rs b/src/lib.rs index 23fc46e..8c4a86c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -117,7 +117,7 @@ async fn run( .service( scope(&(PREFIX.to_owned() + INTEGRATION_PREFIX)) // /v1/integration .wrap(from_fn(auth_middleware)) - .service(trigger_refresh) + .service(trigger_refresh), ) .service(scope(PREFIX).service(health_check)) // /v1 .app_data(Data::new(state.clone()))