diff --git a/src/websocket_service/handlers/notify_delete.rs b/src/websocket_service/handlers/notify_delete.rs index cc4945c9..35ae2335 100644 --- a/src/websocket_service/handlers/notify_delete.rs +++ b/src/websocket_service/handlers/notify_delete.rs @@ -142,7 +142,8 @@ pub async fn handle( let response_topic = sha256::digest(&sym_key); - client + state + .http_relay_client .publish( response_topic.into(), base64_notification, @@ -156,7 +157,7 @@ pub async fn handle( account.clone(), &project.app_domain, &state.postgres, - client.as_ref(), + &state.http_relay_client.clone(), &state.notify_keys.authentication_secret, &state.notify_keys.authentication_public, ) diff --git a/src/websocket_service/handlers/notify_subscribe.rs b/src/websocket_service/handlers/notify_subscribe.rs index 43275070..51866f67 100644 --- a/src/websocket_service/handlers/notify_subscribe.rs +++ b/src/websocket_service/handlers/notify_subscribe.rs @@ -31,7 +31,6 @@ use { pub async fn handle( msg: relay_client::websocket::PublishedMessage, state: &Arc, - client: &Arc, ) -> Result<()> { let topic = msg.topic; @@ -172,12 +171,14 @@ pub async fn handle( // Send noop to extend ttl of relay's mapping info!("publishing noop to notify_topic: {notify_topic}"); - client + state + .http_relay_client .publish(notify_topic, "", 4050, Duration::from_secs(300), false) .await?; info!("publishing subscribe response to topic: {response_topic}"); - client + state + .http_relay_client .publish( response_topic.into(), base64_notification, @@ -191,7 +192,7 @@ pub async fn handle( account, &project.app_domain, &state.postgres, - client.as_ref(), + &state.http_relay_client.clone(), &state.notify_keys.authentication_secret, &state.notify_keys.authentication_public, ) diff --git a/src/websocket_service/handlers/notify_update.rs b/src/websocket_service/handlers/notify_update.rs index 9c85b664..31f7d3df 100644 --- a/src/websocket_service/handlers/notify_update.rs +++ b/src/websocket_service/handlers/notify_update.rs @@ -27,7 +27,6 @@ use { pub async fn handle( msg: relay_client::websocket::PublishedMessage, state: &Arc, - client: &Arc, ) -> Result<()> { let topic = msg.topic; @@ -138,7 +137,8 @@ pub async fn handle( let response_topic = sha256::digest(&sym_key); - client + state + .http_relay_client .publish( response_topic.into(), base64_notification, @@ -152,7 +152,7 @@ pub async fn handle( account, &project.app_domain, &state.postgres, - client.as_ref(), + &state.http_relay_client.clone(), &state.notify_keys.authentication_secret, &state.notify_keys.authentication_public, ) diff --git a/src/websocket_service/handlers/notify_watch_subscriptions.rs b/src/websocket_service/handlers/notify_watch_subscriptions.rs index ef406ac9..5f8b8aed 100644 --- a/src/websocket_service/handlers/notify_watch_subscriptions.rs +++ b/src/websocket_service/handlers/notify_watch_subscriptions.rs @@ -40,7 +40,6 @@ use { pub async fn handle( msg: relay_client::websocket::PublishedMessage, state: &Arc, - client: &Arc, ) -> Result<()> { if msg.topic != state.notify_keys.key_agreement_topic { return Err(Error::WrongNotifyWatchSubscriptionsTopic(msg.topic)); @@ -152,7 +151,8 @@ pub async fn handle( base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); info!("Publishing response on topic {response_topic}"); - client + state + .http_relay_client .publish( response_topic.into(), base64_notification, @@ -200,7 +200,7 @@ pub async fn update_subscription_watchers( account: AccountId, app_domain: &str, postgres: &PgPool, - client: &relay_client::websocket::Client, + http_client: &relay_client::http::Client, authentication_secret: &ed25519_dalek::SigningKey, authentication_public: &ed25519_dalek::VerifyingKey, ) -> Result<()> { @@ -218,7 +218,7 @@ pub async fn update_subscription_watchers( sym_key: &str, notify_did_key: String, did_pkh: String, - client: &relay_client::websocket::Client, + http_client: &relay_client::http::Client, authentication_secret: &ed25519_dalek::SigningKey, ) -> Result<()> { let now = Utc::now(); @@ -246,7 +246,7 @@ pub async fn update_subscription_watchers( let topic = Topic::from(sha256::digest(&sym_key)); info!("topic: {topic}"); - client + http_client .publish( topic, base64_notification, @@ -283,7 +283,7 @@ pub async fn update_subscription_watchers( &watcher.sym_key, notify_did_key.clone(), did_pkh.clone(), - client, + http_client, authentication_secret, ) .await? diff --git a/src/websocket_service/mod.rs b/src/websocket_service/mod.rs index 08a78fa3..b14a289f 100644 --- a/src/websocket_service/mod.rs +++ b/src/websocket_service/mod.rs @@ -131,21 +131,21 @@ async fn handle_msg( } NOTIFY_SUBSCRIBE_TAG => { info!("Received notify subscribe on topic {topic}"); - if let Err(e) = notify_subscribe::handle(msg, state, client).await { + if let Err(e) = notify_subscribe::handle(msg, state).await { warn!("Error handling notify subscribe: {e}"); } info!("Finished processing notify subscribe on topic {topic}"); } NOTIFY_UPDATE_TAG => { info!("Received notify update on topic {topic}"); - if let Err(e) = notify_update::handle(msg, state, client).await { + if let Err(e) = notify_update::handle(msg, state).await { warn!("Error handling notify update: {e}"); } info!("Finished processing notify update on topic {topic}"); } NOTIFY_WATCH_SUBSCRIPTIONS_TAG => { info!("Received notify watch subscriptions on topic {topic}"); - if let Err(e) = notify_watch_subscriptions::handle(msg, state, client).await { + if let Err(e) = notify_watch_subscriptions::handle(msg, state).await { warn!("Error handling notify watch subscriptions: {e}"); } info!("Finished processing notify watch subscriptions on topic {topic}");