Skip to content

Commit

Permalink
feat: returning connection id as actor outcome (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Jun 6, 2024
1 parent 893c082 commit 6c264e6
Show file tree
Hide file tree
Showing 8 changed files with 8 additions and 113 deletions.
24 changes: 1 addition & 23 deletions src/algebra/refresh.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::{
algebra::{StorageExt, TriggerActor},
domain::{Query, Refresh, StatefulActor, Trigger, Unit},
domain::{Refresh, Trigger, Unit},
};
use actix::prelude::*;
use chrono::{Duration, Utc};
use futures::lock::Mutex;
use integrationos_domain::{
algebra::MongoStore, client::secrets_client::SecretsClient,
connection_oauth_definition::ConnectionOAuthDefinition, error::IntegrationOSError as Error,
Expand All @@ -18,7 +17,6 @@ pub struct RefreshActor {
oauths: Arc<MongoStore<ConnectionOAuthDefinition>>,
secrets: Arc<SecretsClient>,
client: Client,
state: Arc<Mutex<StatefulActor>>,
}

impl RefreshActor {
Expand All @@ -33,7 +31,6 @@ impl RefreshActor {
oauths,
secrets,
client,
state: StatefulActor::empty(),
}
}
}
Expand Down Expand Up @@ -64,7 +61,6 @@ impl Handler<Refresh> for RefreshActor {
let client = self.client.clone();
let connections_store = self.connections.clone();
let oauths_store = self.oauths.clone();
let state = self.state.clone();

Box::pin(async move {
tracing::info!("Searching for connections to refresh");
Expand Down Expand Up @@ -99,14 +95,6 @@ impl Handler<Refresh> for RefreshActor {
.collect::<Result<Vec<_>, _>>()
{
Ok(vec) => {
let vec_as_json = serde_json::to_value(&vec).map_err(|e| {
InternalError::encryption_error(
"Failed to serialize outcome",
Some(e.to_string().as_str()),
)
})?;
StatefulActor::update(vec_as_json, state).await;

tracing::info!(
"Refreshed {} connections with outcome: {:?}",
vec.len(),
Expand All @@ -120,13 +108,3 @@ impl Handler<Refresh> for RefreshActor {
})
}
}

impl Handler<Query> for RefreshActor {
type Result = ResponseFuture<StatefulActor>;

fn handle(&mut self, _: Query, _: &mut Self::Context) -> Self::Result {
let state = self.state.clone();

Box::pin(async move { state.lock().await.clone() })
}
}
12 changes: 6 additions & 6 deletions src/algebra/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl Handler<Trigger> for TriggerActor {
let template = DefaultTemplate::default();

let ask = || async {
let conn_id = match &msg.connection().oauth {
let conn_oauth_id = match &msg.connection().oauth {
Some(OAuth::Enabled {
connection_oauth_definition_id: conn_oauth_definition_id,
..
Expand All @@ -104,7 +104,7 @@ impl Handler<Trigger> for TriggerActor {

let conn_oauth_definition = oauths
.get_one(doc! {
"_id": conn_id.to_string(),
"_id": conn_oauth_id.to_string(),
})
.await
.map_err(|e| {
Expand All @@ -115,7 +115,8 @@ impl Handler<Trigger> for TriggerActor {
)
})?
.ok_or(ApplicationError::not_found(
format!("Connection oauth definition not found: {}", conn_id).as_str(),
format!("Connection oauth definition not found: {}", conn_oauth_id)
.as_str(),
None,
))?;

Expand Down Expand Up @@ -203,7 +204,6 @@ impl Handler<Trigger> for TriggerActor {
})?;

let oauth_secret = secret.from_refresh(decoded, None, None, json);

let secret = secrets_client
.create_secret(
msg.connection().clone().ownership.client_id,
Expand All @@ -216,7 +216,7 @@ impl Handler<Trigger> for TriggerActor {
})?;

let set = OAuth::Enabled {
connection_oauth_definition_id: *conn_id,
connection_oauth_definition_id: *conn_oauth_id,
expires_at: Some(
(chrono::Utc::now() + Duration::seconds(oauth_secret.expires_in as i64))
.timestamp(),
Expand Down Expand Up @@ -248,7 +248,7 @@ impl Handler<Trigger> for TriggerActor {
msg.connection().id
);

Ok::<Id, Error>(*conn_id)
Ok::<Id, Error>(msg.connection().id)
};

match ask().await {
Expand Down
4 changes: 0 additions & 4 deletions src/domain/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
mod outcome;
mod query;
mod refresh;
mod state;
mod trigger;

pub use outcome::*;
pub use query::*;
pub use refresh::*;
pub use state::*;
pub use trigger::*;

use futures::Future;
Expand Down
6 changes: 0 additions & 6 deletions src/domain/query.rs

This file was deleted.

53 changes: 0 additions & 53 deletions src/domain/state.rs

This file was deleted.

3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ async fn run(
.service(
scope(&(PREFIX.to_owned() + INTEGRATION_PREFIX)) // /v1/integration
.wrap(from_fn(auth_middleware))
.service(trigger_refresh)
.service(get_state),
.service(trigger_refresh),
)
.service(scope(PREFIX).service(health_check)) // /v1
.app_data(Data::new(state.clone()))
Expand Down
2 changes: 0 additions & 2 deletions src/service/http/private/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
mod refresh;
mod trigger;

pub use refresh::*;
pub use trigger::*;
17 changes: 0 additions & 17 deletions src/service/http/private/refresh.rs

This file was deleted.

0 comments on commit 6c264e6

Please sign in to comment.