Skip to content

Commit

Permalink
fix: change publish to use http client in handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
geekbrother committed Oct 23, 2023
1 parent 8e0bd48 commit 64620e5
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 18 deletions.
5 changes: 3 additions & 2 deletions src/websocket_service/handlers/notify_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ pub async fn handle(

let response_topic = sha256::digest(&sym_key);

client
state
.http_relay_client
.publish(
response_topic.into(),
base64_notification,
Expand All @@ -156,7 +157,7 @@ pub async fn handle(
account.clone(),
&project.app_domain,
&state.postgres,
client.as_ref(),
&state.http_relay_client.clone(),
&state.notify_keys.authentication_secret,
&state.notify_keys.authentication_public,
)
Expand Down
9 changes: 5 additions & 4 deletions src/websocket_service/handlers/notify_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use {
pub async fn handle(
msg: relay_client::websocket::PublishedMessage,
state: &Arc<AppState>,
client: &Arc<relay_client::websocket::Client>,
) -> Result<()> {
let topic = msg.topic;

Expand Down Expand Up @@ -172,12 +171,14 @@ pub async fn handle(

// Send noop to extend ttl of relay's mapping
info!("publishing noop to notify_topic: {notify_topic}");
client
state
.http_relay_client
.publish(notify_topic, "", 4050, Duration::from_secs(300), false)
.await?;

info!("publishing subscribe response to topic: {response_topic}");
client
state
.http_relay_client
.publish(
response_topic.into(),
base64_notification,
Expand All @@ -191,7 +192,7 @@ pub async fn handle(
account,
&project.app_domain,
&state.postgres,
client.as_ref(),
&state.http_relay_client.clone(),
&state.notify_keys.authentication_secret,
&state.notify_keys.authentication_public,
)
Expand Down
6 changes: 3 additions & 3 deletions src/websocket_service/handlers/notify_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use {
pub async fn handle(
msg: relay_client::websocket::PublishedMessage,
state: &Arc<AppState>,
client: &Arc<relay_client::websocket::Client>,
) -> Result<()> {
let topic = msg.topic;

Expand Down Expand Up @@ -138,7 +137,8 @@ pub async fn handle(

let response_topic = sha256::digest(&sym_key);

client
state
.http_relay_client
.publish(
response_topic.into(),
base64_notification,
Expand All @@ -152,7 +152,7 @@ pub async fn handle(
account,
&project.app_domain,
&state.postgres,
client.as_ref(),
&state.http_relay_client.clone(),
&state.notify_keys.authentication_secret,
&state.notify_keys.authentication_public,
)
Expand Down
12 changes: 6 additions & 6 deletions src/websocket_service/handlers/notify_watch_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use {
pub async fn handle(
msg: relay_client::websocket::PublishedMessage,
state: &Arc<AppState>,
client: &Arc<relay_client::websocket::Client>,
) -> Result<()> {
if msg.topic != state.notify_keys.key_agreement_topic {
return Err(Error::WrongNotifyWatchSubscriptionsTopic(msg.topic));
Expand Down Expand Up @@ -152,7 +151,8 @@ pub async fn handle(
base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes());

info!("Publishing response on topic {response_topic}");
client
state
.http_relay_client
.publish(
response_topic.into(),
base64_notification,
Expand Down Expand Up @@ -200,7 +200,7 @@ pub async fn update_subscription_watchers(
account: AccountId,
app_domain: &str,
postgres: &PgPool,
client: &relay_client::websocket::Client,
http_client: &relay_client::http::Client,
authentication_secret: &ed25519_dalek::SigningKey,
authentication_public: &ed25519_dalek::VerifyingKey,
) -> Result<()> {
Expand All @@ -218,7 +218,7 @@ pub async fn update_subscription_watchers(
sym_key: &str,
notify_did_key: String,
did_pkh: String,
client: &relay_client::websocket::Client,
http_client: &relay_client::http::Client,
authentication_secret: &ed25519_dalek::SigningKey,
) -> Result<()> {
let now = Utc::now();
Expand Down Expand Up @@ -246,7 +246,7 @@ pub async fn update_subscription_watchers(

let topic = Topic::from(sha256::digest(&sym_key));
info!("topic: {topic}");
client
http_client
.publish(
topic,
base64_notification,
Expand Down Expand Up @@ -283,7 +283,7 @@ pub async fn update_subscription_watchers(
&watcher.sym_key,
notify_did_key.clone(),
did_pkh.clone(),
client,
http_client,
authentication_secret,
)
.await?
Expand Down
6 changes: 3 additions & 3 deletions src/websocket_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,21 @@ async fn handle_msg(
}
NOTIFY_SUBSCRIBE_TAG => {
info!("Received notify subscribe on topic {topic}");
if let Err(e) = notify_subscribe::handle(msg, state, client).await {
if let Err(e) = notify_subscribe::handle(msg, state).await {
warn!("Error handling notify subscribe: {e}");
}
info!("Finished processing notify subscribe on topic {topic}");
}
NOTIFY_UPDATE_TAG => {
info!("Received notify update on topic {topic}");
if let Err(e) = notify_update::handle(msg, state, client).await {
if let Err(e) = notify_update::handle(msg, state).await {
warn!("Error handling notify update: {e}");
}
info!("Finished processing notify update on topic {topic}");
}
NOTIFY_WATCH_SUBSCRIPTIONS_TAG => {
info!("Received notify watch subscriptions on topic {topic}");
if let Err(e) = notify_watch_subscriptions::handle(msg, state, client).await {
if let Err(e) = notify_watch_subscriptions::handle(msg, state).await {
warn!("Error handling notify watch subscriptions: {e}");
}
info!("Finished processing notify watch subscriptions on topic {topic}");
Expand Down

0 comments on commit 64620e5

Please sign in to comment.