Skip to content

Commit

Permalink
fix: relay websocket connection state implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
geekbrother committed Oct 17, 2023
1 parent 6bc68c5 commit a0bf2f2
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 95 deletions.
29 changes: 17 additions & 12 deletions src/websocket_service/handlers/notify_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
websocket_service::{
decode_key,
handlers::{decrypt_message, notify_watch_subscriptions::update_subscription_watchers},
NotifyDelete, NotifyRequest, NotifyResponse,
publish_message, NotifyDelete, NotifyRequest, NotifyResponse, WebSocketClientState,
},
Result,
},
Expand All @@ -22,7 +22,7 @@ use {
mongodb::bson::doc,
relay_rpc::domain::DecodedClientId,
serde_json::{json, Value},
std::sync::Arc,
std::sync::{Arc, Mutex},
tracing::{info, warn},
};

Expand All @@ -31,6 +31,7 @@ pub async fn handle(
msg: relay_client::websocket::PublishedMessage,
state: &Arc<AppState>,
client: &Arc<relay_client::websocket::Client>,
client_state: &Arc<Mutex<WebSocketClientState>>,
) -> Result<()> {
let request_id = uuid::Uuid::new_v4();
let topic = msg.topic;
Expand Down Expand Up @@ -160,21 +161,25 @@ pub async fn handle(

let response_topic = sha256::digest(&sym_key);

client
.publish(
response_topic.into(),
base64_notification,
NOTIFY_DELETE_RESPONSE_TAG,
NOTIFY_DELETE_RESPONSE_TTL,
false,
)
.await?;
publish_message(
client.clone(),
client_state.clone(),
state.clone(),
response_topic.into(),
&base64_notification,
NOTIFY_DELETE_RESPONSE_TAG,
NOTIFY_DELETE_RESPONSE_TTL,
false,
)
.await?;

update_subscription_watchers(
&account,
&project_data.app_domain,
&state.database,
client.as_ref(),
client,
client_state,
state,
&state.notify_keys.authentication_secret,
&state.notify_keys.authentication_public,
)
Expand Down
52 changes: 31 additions & 21 deletions src/websocket_service/handlers/notify_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
websocket_service::{
decode_key, derive_key,
handlers::{decrypt_message, notify_watch_subscriptions::update_subscription_watchers},
NotifyRequest, NotifyResponse, NotifySubscribe,
publish_message, NotifyRequest, NotifyResponse, NotifySubscribe, WebSocketClientState,
},
Result,
},
Expand All @@ -21,7 +21,10 @@ use {
mongodb::bson::doc,
relay_rpc::domain::DecodedClientId,
serde_json::{json, Value},
std::{sync::Arc, time::Duration},
std::{
sync::{Arc, Mutex},
time::Duration,
},
tracing::{info, instrument},
x25519_dalek::StaticSecret,
};
Expand All @@ -32,6 +35,7 @@ pub async fn handle(
msg: relay_client::websocket::PublishedMessage,
state: &Arc<AppState>,
client: &Arc<relay_client::websocket::Client>,
client_state: &Arc<Mutex<WebSocketClientState>>,
) -> Result<()> {
let topic = msg.topic.to_string();

Expand Down Expand Up @@ -152,32 +156,38 @@ pub async fn handle(

// Send noop to extend ttl of relay's mapping
info!("publishing noop to notify_topic: {notify_topic}");
client
.publish(
notify_topic.clone().into(),
"",
4050,
Duration::from_secs(300),
false,
)
.await?;
publish_message(
client.clone(),
client_state.clone(),
state.clone(),
notify_topic.clone().into(),
"",
4050,
Duration::from_secs(300),
false,
)
.await?;

info!("publishing subscribe response to topic: {response_topic}");
client
.publish(
response_topic.into(),
base64_notification,
NOTIFY_SUBSCRIBE_RESPONSE_TAG,
NOTIFY_SUBSCRIBE_RESPONSE_TTL,
false,
)
.await?;
publish_message(
client.clone(),
client_state.clone(),
state.clone(),
response_topic.into(),
&base64_notification,
NOTIFY_SUBSCRIBE_RESPONSE_TAG,
NOTIFY_SUBSCRIBE_RESPONSE_TTL,
false,
)
.await?;

update_subscription_watchers(
&account,
&project_data.app_domain,
&state.database,
client.as_ref(),
client,
client_state,
state,
&state.notify_keys.authentication_secret,
&state.notify_keys.authentication_public,
)
Expand Down
30 changes: 18 additions & 12 deletions src/websocket_service/handlers/notify_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use {
state::AppState,
types::{ClientData, Envelope, EnvelopeType0, LookupEntry},
websocket_service::{
decode_key, handlers::decrypt_message, NotifyRequest, NotifyResponse, NotifyUpdate,
decode_key, handlers::decrypt_message, publish_message, NotifyRequest, NotifyResponse,
NotifyUpdate, WebSocketClientState,
},
Result,
},
Expand All @@ -20,14 +21,15 @@ use {
mongodb::bson::doc,
relay_rpc::domain::DecodedClientId,
serde_json::{json, Value},
std::sync::Arc,
std::sync::{Arc, Mutex},
};

// TODO test idempotency
pub async fn handle(
msg: relay_client::websocket::PublishedMessage,
state: &Arc<AppState>,
client: &Arc<relay_client::websocket::Client>,
client_state: &Arc<Mutex<WebSocketClientState>>,
) -> Result<()> {
let _request_id = uuid::Uuid::new_v4();
let topic = msg.topic.to_string();
Expand Down Expand Up @@ -142,21 +144,25 @@ pub async fn handle(

let response_topic = sha256::digest(&sym_key);

client
.publish(
response_topic.into(),
base64_notification,
NOTIFY_UPDATE_RESPONSE_TAG,
NOTIFY_UPDATE_RESPONSE_TTL,
false,
)
.await?;
publish_message(
client.clone(),
client_state.clone(),
state.clone(),
response_topic.into(),
&base64_notification,
NOTIFY_UPDATE_RESPONSE_TAG,
NOTIFY_UPDATE_RESPONSE_TTL,
false,
)
.await?;

update_subscription_watchers(
&account,
&project_data.app_domain,
&state.database,
client.as_ref(),
client,
client_state,
state,
&state.notify_keys.authentication_secret,
&state.notify_keys.authentication_public,
)
Expand Down
59 changes: 36 additions & 23 deletions src/websocket_service/handlers/notify_watch_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use {
WatchSubscriptionsEntry,
},
websocket_service::{
decode_key, derive_key, handlers::decrypt_message, NotifyRequest, NotifyResponse,
NotifyWatchSubscriptions,
decode_key, derive_key, handlers::decrypt_message, publish_message, NotifyRequest,
NotifyResponse, NotifyWatchSubscriptions, WebSocketClientState,
},
Result,
},
Expand All @@ -29,7 +29,7 @@ use {
mongodb::{bson::doc, Database},
relay_rpc::domain::{DecodedClientId, Topic},
serde_json::{json, Value},
std::sync::Arc,
std::sync::{Arc, Mutex},
tracing::{info, instrument},
};

Expand All @@ -38,6 +38,7 @@ pub async fn handle(
msg: relay_client::websocket::PublishedMessage,
state: &Arc<AppState>,
client: &Arc<relay_client::websocket::Client>,
client_state: &Arc<Mutex<WebSocketClientState>>,
) -> Result<()> {
if msg.topic != state.notify_keys.key_agreement_topic {
return Err(Error::WrongNotifyWatchSubscriptionsTopic(msg.topic));
Expand Down Expand Up @@ -153,15 +154,17 @@ pub async fn handle(
base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes());

info!("Publishing response on topic {response_topic}");
client
.publish(
response_topic.into(),
base64_notification,
NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TAG,
NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TTL,
false,
)
.await?;
publish_message(
client.clone(),
client_state.clone(),
state.clone(),
response_topic.into(),
&base64_notification,
NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TAG,
NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TTL,
false,
)
.await?;
}

Ok(())
Expand Down Expand Up @@ -231,7 +234,9 @@ pub async fn update_subscription_watchers(
account: &str,
app_domain: &str,
database: &Database,
client: &relay_client::websocket::Client,
client: &Arc<relay_client::websocket::Client>,
client_state: &Arc<Mutex<WebSocketClientState>>,
app_state: &Arc<AppState>,
authentication_secret: &ed25519_dalek::SigningKey,
authentication_public: &ed25519_dalek::VerifyingKey,
) -> Result<()> {
Expand All @@ -249,7 +254,9 @@ pub async fn update_subscription_watchers(
sym_key: &str,
notify_did_key: String,
did_pkh: String,
client: &relay_client::websocket::Client,
client: &Arc<relay_client::websocket::Client>,
client_state: &Arc<Mutex<WebSocketClientState>>,
app_state: &Arc<AppState>,
authentication_secret: &ed25519_dalek::SigningKey,
) -> Result<()> {
let now = Utc::now();
Expand Down Expand Up @@ -277,15 +284,17 @@ pub async fn update_subscription_watchers(

let topic = Topic::from(sha256::digest(&sym_key));
info!("topic: {topic}");
client
.publish(
topic,
base64_notification,
NOTIFY_SUBSCRIPTIONS_CHANGED_TAG,
NOTIFY_SUBSCRIPTIONS_CHANGED_TTL,
false,
)
.await?;
publish_message(
client.clone(),
client_state.clone(),
app_state.clone(),
topic,
&base64_notification,
NOTIFY_SUBSCRIPTIONS_CHANGED_TAG,
NOTIFY_SUBSCRIPTIONS_CHANGED_TTL,
false,
)
.await?;

Ok(())
}
Expand All @@ -308,6 +317,8 @@ pub async fn update_subscription_watchers(
notify_did_key.clone(),
did_pkh.clone(),
client,
client_state,
app_state,
authentication_secret,
)
.await?;
Expand All @@ -331,6 +342,8 @@ pub async fn update_subscription_watchers(
notify_did_key.clone(),
did_pkh.clone(),
client,
client_state,
app_state,
authentication_secret,
)
.await?;
Expand Down
Loading

0 comments on commit a0bf2f2

Please sign in to comment.