Skip to content

Commit

Permalink
fix: subscribe twice sends 2 welcome notifications (#280)
Browse files Browse the repository at this point in the history
* fix: subscribe not idempotent

* chore: add comment
  • Loading branch information
chris13524 authored Jan 15, 2024
1 parent 16b80d7 commit 9322d8d
Show file tree
Hide file tree
Showing 3 changed files with 275 additions and 84 deletions.
20 changes: 12 additions & 8 deletions src/model/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,13 @@ pub async fn get_project_topics(
}

#[derive(Debug, FromRow)]
pub struct SubscriberWithId {
pub struct SubscribeResponse {
pub id: Uuid,
#[sqlx(try_from = "String")]
pub account: AccountId,
#[sqlx(try_from = "String")]
pub topic: Topic,
pub inserted: bool,
}

// TODO test idempotency
Expand All @@ -330,11 +333,10 @@ pub async fn upsert_subscriber(
notify_topic: Topic,
postgres: &PgPool,
metrics: Option<&Metrics>,
) -> Result<SubscriberWithId, sqlx::error::Error> {
) -> Result<SubscribeResponse, sqlx::error::Error> {
let mut txn = postgres.begin().await?;

// Note that sym_key and topic are updated on conflict. This could be implemented return the existing value like subscribe-topic does,
// but no reason to currently: https://walletconnect.slack.com/archives/C044SKFKELR/p1701994415291179?thread_ts=1701960403.729959&cid=C044SKFKELR
// `xmax = 0`: https://stackoverflow.com/a/39204667

let query = "
INSERT INTO subscriber (
Expand All @@ -347,13 +349,15 @@ pub async fn upsert_subscriber(
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (project, get_address_lower(account)) DO UPDATE SET
updated_at=now(),
sym_key=$3,
topic=$4,
expiry=$5
RETURNING id, account
RETURNING
id,
account,
topic,
(xmax = 0) AS inserted
";
let start = Instant::now();
let subscriber = sqlx::query_as::<Postgres, SubscriberWithId>(query)
let subscriber = sqlx::query_as::<Postgres, SubscribeResponse>(query)
.bind(project)
.bind(account.as_ref())
.bind(hex::encode(notify_key))
Expand Down
123 changes: 64 additions & 59 deletions src/services/websocket_server/handlers/notify_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> {
info!("response_topic: {response_topic}");

let msg: NotifyRequest<NotifySubscribe> = decrypt_message(envelope, &sym_key)?;
let id = msg.id;

let request_auth = from_jwt::<SubscriptionRequestAuth>(&msg.params.subscription_auth)?;
info!(
Expand Down Expand Up @@ -133,41 +132,42 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> {
(account, domain)
};

let secret = StaticSecret::random_from_rng(chacha20poly1305::aead::OsRng);

// Technically we don't need to derive based on client_public_key anymore; we just need a symkey. But this is technical
// debt from when clients derived the same symkey on their end via Diffie-Hellman. But now they use the value from
// watch subscriptions.
let notify_key = derive_key(&client_public_key, &secret)?;

let scope = parse_scope(&request_auth.scp)?;

let notify_topic = topic_from_key(&notify_key);

let project_id = project.project_id;
info!(
"Registering account: {account} with topic: {notify_topic} at project: {project_id}. \
Scope: {scope:?}. RPC ID: {id:?}",
);

info!("Timing: Upserting subscriber");
let subscriber = upsert_subscriber(
project.id,
account.clone(),
scope.clone(),
&notify_key,
notify_topic.clone(),
&state.postgres,
state.metrics.as_ref(),
)
.await?;
let subscriber = {
// Technically we don't need to derive based on client_public_key anymore; we just need a symkey. But this is technical
// debt from when clients derived the same symkey on their end via Diffie-Hellman. But now they use the value from
// watch subscriptions.
let secret = StaticSecret::random_from_rng(chacha20poly1305::aead::OsRng);
let notify_key = derive_key(&client_public_key, &secret)?;
let notify_topic = topic_from_key(&notify_key);

info!("Timing: Upserting subscriber");
upsert_subscriber(
project.id,
account.clone(),
scope.clone(),
&notify_key,
notify_topic,
&state.postgres,
state.metrics.as_ref(),
)
.await?
};
info!("Timing: Finished upserting subscriber");

let notify_topic = subscriber.topic;

// TODO do in same transaction as upsert_subscriber()
state
.notify_webhook(
project_id.as_ref(),
project.project_id.as_ref(),
// TODO uncomment when `WebhookNotificationEvent::Updated` exists
// if subscriber.inserted {
WebhookNotificationEvent::Subscribed,
// } else {
// WebhookNotificationEvent::Updated
// },
account.as_ref(),
)
.await?;
Expand All @@ -182,7 +182,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> {
info!("Timing: Recording SubscriberUpdateParams");
state.analytics.client(SubscriberUpdateParams {
project_pk: project.id,
project_id,
project_id: project.project_id,
pk: subscriber.id,
account: subscriber.account, // Use a consistent account for analytics rather than the per-request one
updated_by_iss: request_iss_client_id.to_did_key().into(),
Expand Down Expand Up @@ -267,39 +267,44 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> {
.await?;
info!("Timing: Finished publishing noop to notify_topic");

let welcome_notification =
get_welcome_notification(project.id, &state.postgres, state.metrics.as_ref()).await?;
if let Some(welcome_notification) = welcome_notification {
info!("Welcome notification enabled");
if welcome_notification.enabled && scope.contains(&welcome_notification.r#type) {
info!("Scope contains welcome notification type, sending welcome notification");
let notification = upsert_notification(
Uuid::new_v4().to_string(),
project.id,
Notification {
r#type: welcome_notification.r#type,
title: welcome_notification.title,
body: welcome_notification.body,
url: welcome_notification.url,
icon: None,
},
&state.postgres,
state.metrics.as_ref(),
)
.await?;

upsert_subscriber_notifications(
notification.id,
&[subscriber.id],
&state.postgres,
state.metrics.as_ref(),
)
.await?;
// TODO do in same txn as upsert_subscriber()
if subscriber.inserted {
let welcome_notification =
get_welcome_notification(project.id, &state.postgres, state.metrics.as_ref()).await?;
if let Some(welcome_notification) = welcome_notification {
info!("Welcome notification enabled");
if welcome_notification.enabled && scope.contains(&welcome_notification.r#type) {
info!("Scope contains welcome notification type, sending welcome notification");
let notification = upsert_notification(
Uuid::new_v4().to_string(),
project.id,
Notification {
r#type: welcome_notification.r#type,
title: welcome_notification.title,
body: welcome_notification.body,
url: welcome_notification.url,
icon: None,
},
&state.postgres,
state.metrics.as_ref(),
)
.await?;

upsert_subscriber_notifications(
notification.id,
&[subscriber.id],
&state.postgres,
state.metrics.as_ref(),
)
.await?;
} else {
info!("Scope does not contain welcome notification type, not sending welcome notification");
}
} else {
info!("Scope does not contain welcome notification type, not sending welcome notification");
info!("Welcome notification not enabled");
}
} else {
info!("Welcome notification not enabled");
info!("Subscriber already existed, not sending welcome notification");
}

send_to_subscription_watchers(
Expand Down
Loading

0 comments on commit 9322d8d

Please sign in to comment.