From 14f3eae87f909a780087bbbbe85bf41b60df0699 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 24 Oct 2023 13:44:58 -0400 Subject: [PATCH 01/14] chore: remove account data --- src/analytics/notify_client.rs | 2 -- src/analytics/notify_message.rs | 2 -- 2 files changed, 4 deletions(-) diff --git a/src/analytics/notify_client.rs b/src/analytics/notify_client.rs index 4decc1f9..25bafd43 100644 --- a/src/analytics/notify_client.rs +++ b/src/analytics/notify_client.rs @@ -37,7 +37,6 @@ pub struct NotifyClient { pub pk: String, pub method: String, // subscribe, update, unsubscribe pub project_id: String, - pub account: String, pub account_hash: String, pub topic: String, pub notify_topic: String, @@ -53,7 +52,6 @@ impl From for NotifyClient { method: client.method.to_string(), project_id: client.project_id, account_hash: sha256::digest(client.account.as_bytes()), - account: client.account, topic: client.topic, notify_topic: client.notify_topic, old_scope: client.old_scope, diff --git a/src/analytics/notify_message.rs b/src/analytics/notify_message.rs index 2566d2a6..725c440d 100644 --- a/src/analytics/notify_message.rs +++ b/src/analytics/notify_message.rs @@ -15,7 +15,6 @@ pub struct NotifyMessage { pub project_id: Arc, pub msg_id: Arc, pub topic: Arc, - pub account: Arc, pub account_hash: String, pub notification_type: Arc, pub send_id: String, @@ -29,7 +28,6 @@ impl From for NotifyMessage { msg_id: message.msg_id, topic: message.topic, account_hash: sha256::digest(message.account.as_ref()), - account: message.account, notification_type: message.notification_type, send_id: message.send_id, event_at: wc::analytics::time::now(), From 5c252bc47afdfe32432f866510af0d2c5acdfd57 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 24 Oct 2023 13:55:44 -0400 Subject: [PATCH 02/14] chore: snake_case --- src/analytics/notify_client.rs | 1 - src/analytics/notify_message.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/src/analytics/notify_client.rs b/src/analytics/notify_client.rs index 25bafd43..2cc4c006 100644 --- a/src/analytics/notify_client.rs +++ b/src/analytics/notify_client.rs @@ -32,7 +32,6 @@ pub struct NotifyClientParams { } #[derive(Debug, Serialize, ParquetRecordWriter)] -#[serde(rename_all = "camelCase")] pub struct NotifyClient { pub pk: String, pub method: String, // subscribe, update, unsubscribe diff --git a/src/analytics/notify_message.rs b/src/analytics/notify_message.rs index 725c440d..fa34a4d5 100644 --- a/src/analytics/notify_message.rs +++ b/src/analytics/notify_message.rs @@ -10,7 +10,6 @@ pub struct NotifyMessageParams { } #[derive(Debug, Serialize, ParquetRecordWriter)] -#[serde(rename_all = "camelCase")] pub struct NotifyMessage { pub project_id: Arc, pub msg_id: Arc, From 3d2aa83d8b23f120105b139bc1a2be2de8ce97d3 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 24 Oct 2023 14:28:36 -0400 Subject: [PATCH 03/14] chore: more changes --- src/analytics/mod.rs | 32 +++++++------ src/analytics/notify_message.rs | 35 -------------- src/analytics/subscriber_notification.rs | 45 ++++++++++++++++++ ...{notify_client.rs => subscriber_update.rs} | 46 +++++++++++-------- src/handlers/notify.rs | 25 ++++++---- .../handlers/notify_delete.rs | 14 +++--- .../handlers/notify_subscribe.rs | 14 +++--- .../handlers/notify_update.rs | 14 +++--- 8 files changed, 126 insertions(+), 99 deletions(-) delete mode 100644 src/analytics/notify_message.rs create mode 100644 src/analytics/subscriber_notification.rs rename src/analytics/{notify_client.rs => subscriber_update.rs} (55%) diff --git a/src/analytics/mod.rs b/src/analytics/mod.rs index a52cc8a6..3bfebde8 100644 --- a/src/analytics/mod.rs +++ b/src/analytics/mod.rs @@ -1,9 +1,15 @@ use { self::{ - notify_client::NotifyClientParams, - notify_message::{NotifyMessage, NotifyMessageParams}, + subscriber_notification::SubscriberNotificationParams, + subscriber_update::SubscriberUpdateParams, + }, + crate::{ + analytics::{ + subscriber_notification::SubscriberNotification, subscriber_update::SubscriberUpdate, + }, + config::Configuration, + error::Result, }, - crate::{analytics::notify_client::NotifyClient, config::Configuration, error::Result}, aws_sdk_s3::Client as S3Client, std::{net::IpAddr, sync::Arc}, tracing::{error, info}, @@ -18,13 +24,13 @@ use { }, }; -pub mod notify_client; -pub mod notify_message; +pub mod subscriber_notification; +pub mod subscriber_update; #[derive(Clone)] pub struct NotifyAnalytics { - pub messages: Analytics, - pub clients: Analytics, + pub messages: Analytics, + pub clients: Analytics, pub geoip_resolver: Option>, } @@ -61,21 +67,21 @@ impl NotifyAnalytics { node_ip: node_ip.clone(), }); - let collector = ParquetWriter::::new(opts.clone(), exporter)?; + let collector = ParquetWriter::::new(opts.clone(), exporter)?; Analytics::new(collector) }; let clients = { let exporter = AwsExporter::new(AwsOpts { - export_prefix: "notify/clients", - export_name: "clients", + export_prefix: "notify/client_updates", + export_name: "client_updates", file_extension: "parquet", bucket_name, s3_client, node_ip, }); - Analytics::new(ParquetWriter::::new(opts, exporter)?) + Analytics::new(ParquetWriter::::new(opts, exporter)?) }; Ok(Self { @@ -85,11 +91,11 @@ impl NotifyAnalytics { }) } - pub fn message(&self, message: NotifyMessageParams) { + pub fn message(&self, message: SubscriberNotificationParams) { self.messages.collect(message.into()); } - pub fn client(&self, client: NotifyClientParams) { + pub fn client(&self, client: SubscriberUpdateParams) { self.clients.collect(client.into()); } diff --git a/src/analytics/notify_message.rs b/src/analytics/notify_message.rs deleted file mode 100644 index fa34a4d5..00000000 --- a/src/analytics/notify_message.rs +++ /dev/null @@ -1,35 +0,0 @@ -use {parquet_derive::ParquetRecordWriter, serde::Serialize, std::sync::Arc}; - -pub struct NotifyMessageParams { - pub project_id: Arc, - pub msg_id: Arc, - pub topic: Arc, - pub account: Arc, - pub notification_type: Arc, - pub send_id: String, -} - -#[derive(Debug, Serialize, ParquetRecordWriter)] -pub struct NotifyMessage { - pub project_id: Arc, - pub msg_id: Arc, - pub topic: Arc, - pub account_hash: String, - pub notification_type: Arc, - pub send_id: String, - pub event_at: chrono::NaiveDateTime, -} - -impl From for NotifyMessage { - fn from(message: NotifyMessageParams) -> Self { - Self { - project_id: message.project_id, - msg_id: message.msg_id, - topic: message.topic, - account_hash: sha256::digest(message.account.as_ref()), - notification_type: message.notification_type, - send_id: message.send_id, - event_at: wc::analytics::time::now(), - } - } -} diff --git a/src/analytics/subscriber_notification.rs b/src/analytics/subscriber_notification.rs new file mode 100644 index 00000000..b9ce4109 --- /dev/null +++ b/src/analytics/subscriber_notification.rs @@ -0,0 +1,45 @@ +use { + crate::model::types::AccountId, + parquet_derive::ParquetRecordWriter, + relay_rpc::domain::{ProjectId, Topic}, + serde::Serialize, + std::sync::Arc, + uuid::Uuid, +}; + +pub struct SubscriberNotificationParams { + pub project_id: ProjectId, + pub client_pk: Uuid, + pub account: AccountId, + pub msg_id: Arc, + pub topic: Topic, + pub notification_type: Arc, + pub send_id: String, +} + +#[derive(Debug, Serialize, ParquetRecordWriter)] +pub struct SubscriberNotification { + pub event_at: chrono::NaiveDateTime, + pub project_id: Arc, + pub client_pk: String, + pub account_hash: String, + pub msg_id: Arc, + pub topic: Arc, + pub notification_type: Arc, + pub send_id: String, +} + +impl From for SubscriberNotification { + fn from(message: SubscriberNotificationParams) -> Self { + Self { + project_id: message.project_id.into_value(), + msg_id: message.msg_id, + topic: message.topic.into_value(), + client_pk: message.client_pk.to_string(), + account_hash: sha256::digest(message.account.as_ref()), + notification_type: message.notification_type, + send_id: message.send_id, + event_at: wc::analytics::time::now(), + } + } +} diff --git a/src/analytics/notify_client.rs b/src/analytics/subscriber_update.rs similarity index 55% rename from src/analytics/notify_client.rs rename to src/analytics/subscriber_update.rs index 2cc4c006..a9d121dc 100644 --- a/src/analytics/notify_client.rs +++ b/src/analytics/subscriber_update.rs @@ -1,7 +1,13 @@ use { + crate::model::types::AccountId, parquet_derive::ParquetRecordWriter, + relay_rpc::domain::{ProjectId, Topic}, serde::Serialize, - std::fmt::{self, Display, Formatter}, + std::{ + fmt::{self, Display, Formatter}, + sync::Arc, + }, + uuid::Uuid, }; pub enum NotifyClientMethod { @@ -20,39 +26,39 @@ impl Display for NotifyClientMethod { } } -pub struct NotifyClientParams { - pub pk: String, +pub struct SubscriberUpdateParams { + pub project_id: ProjectId, + pub pk: Uuid, + pub account: AccountId, pub method: NotifyClientMethod, - pub project_id: String, - pub account: String, - pub topic: String, - pub notify_topic: String, + pub topic: Topic, + pub notify_topic: Topic, pub old_scope: String, pub new_scope: String, } #[derive(Debug, Serialize, ParquetRecordWriter)] -pub struct NotifyClient { - pub pk: String, - pub method: String, // subscribe, update, unsubscribe - pub project_id: String, +pub struct SubscriberUpdate { + pub event_at: chrono::NaiveDateTime, + pub project_id: Arc, + pub pk: Uuid, pub account_hash: String, - pub topic: String, - pub notify_topic: String, + pub method: String, // subscribe, update, unsubscribe + pub topic: Arc, + pub notify_topic: Arc, pub old_scope: String, pub new_scope: String, - pub event_at: chrono::NaiveDateTime, } -impl From for NotifyClient { - fn from(client: NotifyClientParams) -> Self { +impl From for SubscriberUpdate { + fn from(client: SubscriberUpdateParams) -> Self { Self { pk: client.pk, method: client.method.to_string(), - project_id: client.project_id, - account_hash: sha256::digest(client.account.as_bytes()), - topic: client.topic, - notify_topic: client.notify_topic, + project_id: client.project_id.into_value(), + account_hash: sha256::digest(client.account.as_ref()), + topic: client.topic.into_value(), + notify_topic: client.notify_topic.into_value(), old_scope: client.old_scope, new_scope: client.new_scope, event_at: wc::analytics::time::now(), diff --git a/src/handlers/notify.rs b/src/handlers/notify.rs index 061680bd..cb46c4e1 100644 --- a/src/handlers/notify.rs +++ b/src/handlers/notify.rs @@ -1,6 +1,6 @@ use { crate::{ - analytics::notify_message::NotifyMessageParams, + analytics::subscriber_notification::SubscriberNotificationParams, auth::add_ttl, error, extractors::AuthedProjectId, @@ -23,7 +23,7 @@ use { error::Result, futures::FutureExt, relay_rpc::{ - domain::{ClientId, DecodedClientId, Topic}, + domain::{ClientId, DecodedClientId, ProjectId, Topic}, jwt::{JwtHeader, JWT_HEADER_ALG, JWT_HEADER_TYP}, rpc::{msg_id::MsgId, Publish}, }, @@ -31,6 +31,7 @@ use { std::{collections::HashSet, sync::Arc, time::Duration}, tokio::time::error::Elapsed, tracing::{info, warn}, + uuid::Uuid, wc::metrics::otel::{Context, KeyValue}, }; @@ -48,6 +49,7 @@ pub struct SendFailure { #[derive(Clone)] struct PublishJob { + client_pk: Uuid, account: AccountId, topic: Topic, message: String, @@ -125,7 +127,7 @@ pub async fn handler( &mut response, request_id, &state, - project_id.as_ref(), + project_id.clone(), ) .await?; @@ -151,9 +153,9 @@ async fn process_publish_jobs( notification_type: Arc, client: Arc, response: &mut Response, - request_id: uuid::Uuid, + request_id: Uuid, state: &Arc, - project_id: &str, + project_id: ProjectId, ) -> Result<()> { let timer = std::time::Instant::now(); let futures = jobs.into_iter().map(|job| { @@ -226,14 +228,16 @@ async fn process_publish_jobs( } }) .map({ + let project_id = project_id.clone(); let notification_type = notification_type.clone(); move |result| { if result.is_ok() { - state.analytics.message(NotifyMessageParams { - project_id: project_id.into(), + state.analytics.message(SubscriberNotificationParams { + project_id, msg_id: msg_id.into(), - topic: job.topic.into_value(), - account: job.account.into_value(), + topic: job.topic, + client_pk: job.client_pk, + account: job.account, notification_type, send_id: "".to_string(), // TODO for when queueing is added }); @@ -314,9 +318,10 @@ async fn generate_publish_jobs( let topic = Topic::new(sha256::digest(&sym_key).into()); jobs.push(PublishJob { + client_pk: subscriber.id, + account: subscriber.account, topic, message: base64_notification, - account: subscriber.account, }) } Ok(jobs) diff --git a/src/websocket_service/handlers/notify_delete.rs b/src/websocket_service/handlers/notify_delete.rs index cc4945c9..d5702223 100644 --- a/src/websocket_service/handlers/notify_delete.rs +++ b/src/websocket_service/handlers/notify_delete.rs @@ -1,6 +1,6 @@ use { crate::{ - analytics::notify_client::{NotifyClientMethod, NotifyClientParams}, + analytics::subscriber_update::{NotifyClientMethod, SubscriberUpdateParams}, auth::{ add_ttl, from_jwt, sign_jwt, verify_identity, AuthError, Authorization, AuthorizedApp, SharedClaims, SubscriptionDeleteRequestAuth, SubscriptionDeleteResponseAuth, @@ -100,13 +100,13 @@ pub async fn handle( warn!("Error unsubscribing Notify from topic: {}", e); }; - state.analytics.client(NotifyClientParams { - pk: subscriber.id.to_string(), + state.analytics.client(SubscriberUpdateParams { + pk: subscriber.id, method: NotifyClientMethod::Unsubscribe, - project_id: project.id.to_string(), - account: account.to_string(), - topic: topic.to_string(), - notify_topic: subscriber.topic.to_string(), + project_id: project.project_id, + account: account.clone(), + topic, + notify_topic: subscriber.topic, old_scope: subscriber.scope.join(","), new_scope: "".to_owned(), }); diff --git a/src/websocket_service/handlers/notify_subscribe.rs b/src/websocket_service/handlers/notify_subscribe.rs index 43275070..f8e795f4 100644 --- a/src/websocket_service/handlers/notify_subscribe.rs +++ b/src/websocket_service/handlers/notify_subscribe.rs @@ -1,6 +1,6 @@ use { crate::{ - analytics::notify_client::{NotifyClientMethod, NotifyClientParams}, + analytics::subscriber_update::{NotifyClientMethod, SubscriberUpdateParams}, auth::{ add_ttl, from_jwt, sign_jwt, verify_identity, AuthError, Authorization, AuthorizedApp, SharedClaims, SubscriptionRequestAuth, SubscriptionResponseAuth, @@ -159,13 +159,13 @@ pub async fn handle( state.wsclient.subscribe(notify_topic.clone()).await?; - state.analytics.client(NotifyClientParams { - pk: subscriber_id.to_string(), + state.analytics.client(SubscriberUpdateParams { + pk: subscriber_id, method: NotifyClientMethod::Subscribe, - project_id: project_id.to_string(), - account: account.to_string(), - topic: topic.to_string(), - notify_topic: notify_topic.to_string(), + project_id, + account: account.clone(), + topic, + notify_topic: notify_topic.clone(), old_scope: "".to_owned(), new_scope: scope.into_iter().collect::>().join(","), }); diff --git a/src/websocket_service/handlers/notify_update.rs b/src/websocket_service/handlers/notify_update.rs index 9c85b664..b5c6eae9 100644 --- a/src/websocket_service/handlers/notify_update.rs +++ b/src/websocket_service/handlers/notify_update.rs @@ -1,7 +1,7 @@ use { super::notify_watch_subscriptions::update_subscription_watchers, crate::{ - analytics::notify_client::{NotifyClientMethod, NotifyClientParams}, + analytics::subscriber_update::{NotifyClientMethod, SubscriberUpdateParams}, auth::{ add_ttl, from_jwt, sign_jwt, verify_identity, AuthError, Authorization, AuthorizedApp, SharedClaims, SubscriptionUpdateRequestAuth, SubscriptionUpdateResponseAuth, @@ -96,13 +96,13 @@ pub async fn handle( // ) // .await?; - state.analytics.client(NotifyClientParams { - pk: subscriber.id.to_string(), + state.analytics.client(SubscriberUpdateParams { + pk: subscriber.id, method: NotifyClientMethod::Update, - project_id: project.id.to_string(), - account: account.to_string(), - topic: topic.to_string(), - notify_topic: subscriber.topic.to_string(), + project_id: project.project_id, + account: account.clone(), + topic, + notify_topic: subscriber.topic.clone(), old_scope: old_scope.join(","), new_scope: scope.into_iter().collect::>().join(","), }); From 9364103257f480703a479ce667cee01ed12f7d23 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 24 Oct 2023 14:31:32 -0400 Subject: [PATCH 04/14] chore: export names --- src/analytics/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/analytics/mod.rs b/src/analytics/mod.rs index 3bfebde8..45e677d7 100644 --- a/src/analytics/mod.rs +++ b/src/analytics/mod.rs @@ -59,8 +59,8 @@ impl NotifyAnalytics { let messages = { let exporter = AwsExporter::new(AwsOpts { - export_prefix: "notify/messages", - export_name: "messages", + export_prefix: "notify/subscriber_notifications", + export_name: "subscriber_notifications", file_extension: "parquet", bucket_name: bucket_name.clone(), s3_client: s3_client.clone(), @@ -73,8 +73,8 @@ impl NotifyAnalytics { let clients = { let exporter = AwsExporter::new(AwsOpts { - export_prefix: "notify/client_updates", - export_name: "client_updates", + export_prefix: "notify/subscriber_updates", + export_name: "subscriber_updates", file_extension: "parquet", bucket_name, s3_client, From 2b02c4655658f9c4039b68277ed7be3a7b3ab64d Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 24 Oct 2023 14:37:05 -0400 Subject: [PATCH 05/14] chore: remove send_id --- src/analytics/subscriber_notification.rs | 3 --- src/handlers/notify.rs | 1 - 2 files changed, 4 deletions(-) diff --git a/src/analytics/subscriber_notification.rs b/src/analytics/subscriber_notification.rs index b9ce4109..150f522c 100644 --- a/src/analytics/subscriber_notification.rs +++ b/src/analytics/subscriber_notification.rs @@ -14,7 +14,6 @@ pub struct SubscriberNotificationParams { pub msg_id: Arc, pub topic: Topic, pub notification_type: Arc, - pub send_id: String, } #[derive(Debug, Serialize, ParquetRecordWriter)] @@ -26,7 +25,6 @@ pub struct SubscriberNotification { pub msg_id: Arc, pub topic: Arc, pub notification_type: Arc, - pub send_id: String, } impl From for SubscriberNotification { @@ -38,7 +36,6 @@ impl From for SubscriberNotification { client_pk: message.client_pk.to_string(), account_hash: sha256::digest(message.account.as_ref()), notification_type: message.notification_type, - send_id: message.send_id, event_at: wc::analytics::time::now(), } } diff --git a/src/handlers/notify.rs b/src/handlers/notify.rs index cb46c4e1..bd59e67d 100644 --- a/src/handlers/notify.rs +++ b/src/handlers/notify.rs @@ -239,7 +239,6 @@ async fn process_publish_jobs( client_pk: job.client_pk, account: job.account, notification_type, - send_id: "".to_string(), // TODO for when queueing is added }); } result From 17222a7f54f2ad82bf5004a190d75a4198f9d8b0 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 24 Oct 2023 14:46:00 -0400 Subject: [PATCH 06/14] chore: tweaks --- Cargo.lock | 1 + Cargo.toml | 1 + src/analytics/subscriber_notification.rs | 23 +++++++++++-------- src/analytics/subscriber_update.rs | 14 +++++++---- src/handlers/notify.rs | 2 +- .../handlers/notify_delete.rs | 6 ++--- .../handlers/notify_subscribe.rs | 4 ++-- .../handlers/notify_update.rs | 15 ++++++++---- 8 files changed, 42 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d4b0c15..5e454d46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2668,6 +2668,7 @@ dependencies = [ "hkdf", "hyper", "ipnet", + "itertools", "jsonwebtoken", "k256", "lazy_static", diff --git a/Cargo.toml b/Cargo.toml index 62eced31..1edb3edf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ rmp-serde = "1.1.1" deadpool-redis = "0.12.0" rand_chacha = "0.3.1" sqlx = { version = "0.7.1", features = ["runtime-tokio-native-tls", "postgres", "chrono", "uuid"] } +itertools = "0.11.0" [dev-dependencies] sha3 = "0.10.8" diff --git a/src/analytics/subscriber_notification.rs b/src/analytics/subscriber_notification.rs index 150f522c..d47dc7d6 100644 --- a/src/analytics/subscriber_notification.rs +++ b/src/analytics/subscriber_notification.rs @@ -12,30 +12,35 @@ pub struct SubscriberNotificationParams { pub client_pk: Uuid, pub account: AccountId, pub msg_id: Arc, - pub topic: Topic, + pub notify_topic: Topic, pub notification_type: Arc, } #[derive(Debug, Serialize, ParquetRecordWriter)] pub struct SubscriberNotification { + /// Time at which the event was generated pub event_at: chrono::NaiveDateTime, pub project_id: Arc, + /// Primary Key of the subscriber in the Notify Server database that the notificaiton is being sent to pub client_pk: String, pub account_hash: String, + /// Relay message ID pub msg_id: Arc, - pub topic: Arc, + /// The topic that notifications are sent on + pub notify_topic: Arc, + /// The notification type ID pub notification_type: Arc, } impl From for SubscriberNotification { - fn from(message: SubscriberNotificationParams) -> Self { + fn from(params: SubscriberNotificationParams) -> Self { Self { - project_id: message.project_id.into_value(), - msg_id: message.msg_id, - topic: message.topic.into_value(), - client_pk: message.client_pk.to_string(), - account_hash: sha256::digest(message.account.as_ref()), - notification_type: message.notification_type, + project_id: params.project_id.into_value(), + msg_id: params.msg_id, + notify_topic: params.notify_topic.into_value(), + client_pk: params.client_pk.to_string(), + account_hash: sha256::digest(params.account.as_ref()), + notification_type: params.notification_type, event_at: wc::analytics::time::now(), } } diff --git a/src/analytics/subscriber_update.rs b/src/analytics/subscriber_update.rs index a9d121dc..4fc91797 100644 --- a/src/analytics/subscriber_update.rs +++ b/src/analytics/subscriber_update.rs @@ -1,9 +1,11 @@ use { crate::model::types::AccountId, + itertools::Itertools, parquet_derive::ParquetRecordWriter, relay_rpc::domain::{ProjectId, Topic}, serde::Serialize, std::{ + collections::HashSet, fmt::{self, Display, Formatter}, sync::Arc, }, @@ -33,18 +35,22 @@ pub struct SubscriberUpdateParams { pub method: NotifyClientMethod, pub topic: Topic, pub notify_topic: Topic, - pub old_scope: String, - pub new_scope: String, + pub old_scope: HashSet>, + pub new_scope: HashSet>, } #[derive(Debug, Serialize, ParquetRecordWriter)] pub struct SubscriberUpdate { + /// Time at which the event was generated pub event_at: chrono::NaiveDateTime, pub project_id: Arc, + /// Primary Key of the subscriber in the Notify Server database pub pk: Uuid, pub account_hash: String, pub method: String, // subscribe, update, unsubscribe + /// The topic used to manage the subscription pub topic: Arc, + /// The topic that notifications are sent on pub notify_topic: Arc, pub old_scope: String, pub new_scope: String, @@ -59,8 +65,8 @@ impl From for SubscriberUpdate { account_hash: sha256::digest(client.account.as_ref()), topic: client.topic.into_value(), notify_topic: client.notify_topic.into_value(), - old_scope: client.old_scope, - new_scope: client.new_scope, + old_scope: client.old_scope.iter().join(","), + new_scope: client.new_scope.iter().join(","), event_at: wc::analytics::time::now(), } } diff --git a/src/handlers/notify.rs b/src/handlers/notify.rs index bd59e67d..0aff6978 100644 --- a/src/handlers/notify.rs +++ b/src/handlers/notify.rs @@ -235,7 +235,7 @@ async fn process_publish_jobs( state.analytics.message(SubscriberNotificationParams { project_id, msg_id: msg_id.into(), - topic: job.topic, + notify_topic: job.topic, client_pk: job.client_pk, account: job.account, notification_type, diff --git a/src/websocket_service/handlers/notify_delete.rs b/src/websocket_service/handlers/notify_delete.rs index d5702223..43e935a8 100644 --- a/src/websocket_service/handlers/notify_delete.rs +++ b/src/websocket_service/handlers/notify_delete.rs @@ -22,7 +22,7 @@ use { chrono::Utc, relay_rpc::domain::DecodedClientId, serde_json::{json, Value}, - std::sync::Arc, + std::{collections::HashSet, sync::Arc}, tracing::warn, }; @@ -107,8 +107,8 @@ pub async fn handle( account: account.clone(), topic, notify_topic: subscriber.topic, - old_scope: subscriber.scope.join(","), - new_scope: "".to_owned(), + old_scope: subscriber.scope.into_iter().map(Into::into).collect(), + new_scope: HashSet::new(), }); let identity = DecodedClientId(decode_key(&project.authentication_public_key)?); diff --git a/src/websocket_service/handlers/notify_subscribe.rs b/src/websocket_service/handlers/notify_subscribe.rs index f8e795f4..e6d19748 100644 --- a/src/websocket_service/handlers/notify_subscribe.rs +++ b/src/websocket_service/handlers/notify_subscribe.rs @@ -166,8 +166,8 @@ pub async fn handle( account: account.clone(), topic, notify_topic: notify_topic.clone(), - old_scope: "".to_owned(), - new_scope: scope.into_iter().collect::>().join(","), + old_scope: HashSet::new(), + new_scope: scope.into_iter().map(Into::into).collect(), }); // Send noop to extend ttl of relay's mapping diff --git a/src/websocket_service/handlers/notify_update.rs b/src/websocket_service/handlers/notify_update.rs index b5c6eae9..e35bb6b4 100644 --- a/src/websocket_service/handlers/notify_update.rs +++ b/src/websocket_service/handlers/notify_update.rs @@ -78,14 +78,19 @@ pub async fn handle( }; let old_scope = subscriber.scope; - let scope = sub_auth + let new_scope = sub_auth .scp .split(' ') .map(|s| s.to_owned()) .collect::>(); - let subscriber = - update_subscriber(project.id, account.clone(), scope.clone(), &state.postgres).await?; + let subscriber = update_subscriber( + project.id, + account.clone(), + new_scope.clone(), + &state.postgres, + ) + .await?; // TODO do in same transaction as update_subscriber() // state @@ -103,8 +108,8 @@ pub async fn handle( account: account.clone(), topic, notify_topic: subscriber.topic.clone(), - old_scope: old_scope.join(","), - new_scope: scope.into_iter().collect::>().join(","), + old_scope: old_scope.into_iter().map(Into::into).collect(), + new_scope: new_scope.into_iter().map(Into::into).collect(), }); let identity = DecodedClientId(decode_key(&project.authentication_public_key)?); From 9a784d81b78e93d5c551749dc2469576100a1595 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 24 Oct 2023 14:47:09 -0400 Subject: [PATCH 07/14] chore: message_id --- src/analytics/subscriber_notification.rs | 6 +++--- src/handlers/notify.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/analytics/subscriber_notification.rs b/src/analytics/subscriber_notification.rs index d47dc7d6..457471e5 100644 --- a/src/analytics/subscriber_notification.rs +++ b/src/analytics/subscriber_notification.rs @@ -11,7 +11,7 @@ pub struct SubscriberNotificationParams { pub project_id: ProjectId, pub client_pk: Uuid, pub account: AccountId, - pub msg_id: Arc, + pub message_id: Arc, pub notify_topic: Topic, pub notification_type: Arc, } @@ -25,7 +25,7 @@ pub struct SubscriberNotification { pub client_pk: String, pub account_hash: String, /// Relay message ID - pub msg_id: Arc, + pub message_id: Arc, /// The topic that notifications are sent on pub notify_topic: Arc, /// The notification type ID @@ -36,7 +36,7 @@ impl From for SubscriberNotification { fn from(params: SubscriberNotificationParams) -> Self { Self { project_id: params.project_id.into_value(), - msg_id: params.msg_id, + message_id: params.message_id, notify_topic: params.notify_topic.into_value(), client_pk: params.client_pk.to_string(), account_hash: sha256::digest(params.account.as_ref()), diff --git a/src/handlers/notify.rs b/src/handlers/notify.rs index 0aff6978..2025a008 100644 --- a/src/handlers/notify.rs +++ b/src/handlers/notify.rs @@ -234,7 +234,7 @@ async fn process_publish_jobs( if result.is_ok() { state.analytics.message(SubscriberNotificationParams { project_id, - msg_id: msg_id.into(), + message_id: msg_id.into(), notify_topic: job.topic, client_pk: job.client_pk, account: job.account, From 36b599f3ddef9f22a5b427486b40226458c7b8dd Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 24 Oct 2023 14:47:58 -0400 Subject: [PATCH 08/14] chore: subscriber_pk --- src/analytics/subscriber_notification.rs | 6 +++--- src/handlers/notify.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/analytics/subscriber_notification.rs b/src/analytics/subscriber_notification.rs index 457471e5..aed3c0a7 100644 --- a/src/analytics/subscriber_notification.rs +++ b/src/analytics/subscriber_notification.rs @@ -9,7 +9,7 @@ use { pub struct SubscriberNotificationParams { pub project_id: ProjectId, - pub client_pk: Uuid, + pub subscriber_pk: Uuid, pub account: AccountId, pub message_id: Arc, pub notify_topic: Topic, @@ -22,7 +22,7 @@ pub struct SubscriberNotification { pub event_at: chrono::NaiveDateTime, pub project_id: Arc, /// Primary Key of the subscriber in the Notify Server database that the notificaiton is being sent to - pub client_pk: String, + pub subscriber_pk: String, pub account_hash: String, /// Relay message ID pub message_id: Arc, @@ -38,7 +38,7 @@ impl From for SubscriberNotification { project_id: params.project_id.into_value(), message_id: params.message_id, notify_topic: params.notify_topic.into_value(), - client_pk: params.client_pk.to_string(), + subscriber_pk: params.subscriber_pk.to_string(), account_hash: sha256::digest(params.account.as_ref()), notification_type: params.notification_type, event_at: wc::analytics::time::now(), diff --git a/src/handlers/notify.rs b/src/handlers/notify.rs index 2025a008..ffc006af 100644 --- a/src/handlers/notify.rs +++ b/src/handlers/notify.rs @@ -236,7 +236,7 @@ async fn process_publish_jobs( project_id, message_id: msg_id.into(), notify_topic: job.topic, - client_pk: job.client_pk, + subscriber_pk: job.client_pk, account: job.account, notification_type, }); From 92ea0b3fdc64b620d07d23af1e54ea057436e40d Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 24 Oct 2023 14:48:58 -0400 Subject: [PATCH 09/14] chore: comment --- src/analytics/subscriber_update.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/analytics/subscriber_update.rs b/src/analytics/subscriber_update.rs index 4fc91797..0ef97063 100644 --- a/src/analytics/subscriber_update.rs +++ b/src/analytics/subscriber_update.rs @@ -52,7 +52,9 @@ pub struct SubscriberUpdate { pub topic: Arc, /// The topic that notifications are sent on pub notify_topic: Arc, + /// Notification types that the subscriber was subscribed to before the update, separated by commas pub old_scope: String, + /// Notification types that the subscriber is subscribed to after the update, separated by commas pub new_scope: String, } From 2313f42fc7c982f856253474d576f16ba0de70f9 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 24 Oct 2023 14:56:55 -0400 Subject: [PATCH 10/14] chore: reorder and comments --- src/analytics/subscriber_notification.rs | 20 +++++++------- src/analytics/subscriber_update.rs | 27 ++++++++++--------- src/handlers/notify.rs | 4 +-- .../handlers/notify_delete.rs | 8 +++--- .../handlers/notify_subscribe.rs | 8 +++--- .../handlers/notify_update.rs | 8 +++--- 6 files changed, 40 insertions(+), 35 deletions(-) diff --git a/src/analytics/subscriber_notification.rs b/src/analytics/subscriber_notification.rs index aed3c0a7..1dad1997 100644 --- a/src/analytics/subscriber_notification.rs +++ b/src/analytics/subscriber_notification.rs @@ -11,37 +11,39 @@ pub struct SubscriberNotificationParams { pub project_id: ProjectId, pub subscriber_pk: Uuid, pub account: AccountId, - pub message_id: Arc, - pub notify_topic: Topic, pub notification_type: Arc, + pub notify_topic: Topic, + pub message_id: Arc, } #[derive(Debug, Serialize, ParquetRecordWriter)] pub struct SubscriberNotification { /// Time at which the event was generated pub event_at: chrono::NaiveDateTime, + /// Project ID of the project that the notification was sent from and the subscriber is subscribed to pub project_id: Arc, /// Primary Key of the subscriber in the Notify Server database that the notificaiton is being sent to pub subscriber_pk: String, + /// Hash of the CAIP-10 account of the subscriber pub account_hash: String, - /// Relay message ID - pub message_id: Arc, - /// The topic that notifications are sent on - pub notify_topic: Arc, /// The notification type ID pub notification_type: Arc, + /// The topic that the notification was sent on + pub notify_topic: Arc, + /// Relay message ID of the notification + pub message_id: Arc, } impl From for SubscriberNotification { fn from(params: SubscriberNotificationParams) -> Self { Self { + event_at: wc::analytics::time::now(), project_id: params.project_id.into_value(), - message_id: params.message_id, - notify_topic: params.notify_topic.into_value(), subscriber_pk: params.subscriber_pk.to_string(), account_hash: sha256::digest(params.account.as_ref()), notification_type: params.notification_type, - event_at: wc::analytics::time::now(), + notify_topic: params.notify_topic.into_value(), + message_id: params.message_id, } } } diff --git a/src/analytics/subscriber_update.rs b/src/analytics/subscriber_update.rs index 0ef97063..ff5406a4 100644 --- a/src/analytics/subscriber_update.rs +++ b/src/analytics/subscriber_update.rs @@ -33,43 +33,46 @@ pub struct SubscriberUpdateParams { pub pk: Uuid, pub account: AccountId, pub method: NotifyClientMethod, - pub topic: Topic, - pub notify_topic: Topic, pub old_scope: HashSet>, pub new_scope: HashSet>, + pub notify_topic: Topic, + pub topic: Topic, } #[derive(Debug, Serialize, ParquetRecordWriter)] pub struct SubscriberUpdate { /// Time at which the event was generated pub event_at: chrono::NaiveDateTime, + /// Project ID of the project that the subscriber is subscribed to pub project_id: Arc, /// Primary Key of the subscriber in the Notify Server database pub pk: Uuid, + /// Hash of the CAIP-10 account of the subscriber pub account_hash: String, - pub method: String, // subscribe, update, unsubscribe - /// The topic used to manage the subscription - pub topic: Arc, - /// The topic that notifications are sent on - pub notify_topic: Arc, + /// The change that happend to the subscriber, can be subscribe, update, or unsubscribe + pub method: String, /// Notification types that the subscriber was subscribed to before the update, separated by commas pub old_scope: String, /// Notification types that the subscriber is subscribed to after the update, separated by commas pub new_scope: String, + /// The topic that notifications are sent on + pub notify_topic: Arc, + /// The topic used to create or manage the subscription that the update message was published to + pub topic: Arc, } impl From for SubscriberUpdate { fn from(client: SubscriberUpdateParams) -> Self { Self { - pk: client.pk, - method: client.method.to_string(), + event_at: wc::analytics::time::now(), project_id: client.project_id.into_value(), + pk: client.pk, account_hash: sha256::digest(client.account.as_ref()), - topic: client.topic.into_value(), - notify_topic: client.notify_topic.into_value(), + method: client.method.to_string(), old_scope: client.old_scope.iter().join(","), new_scope: client.new_scope.iter().join(","), - event_at: wc::analytics::time::now(), + notify_topic: client.notify_topic.into_value(), + topic: client.topic.into_value(), } } } diff --git a/src/handlers/notify.rs b/src/handlers/notify.rs index ffc006af..00cbe049 100644 --- a/src/handlers/notify.rs +++ b/src/handlers/notify.rs @@ -234,11 +234,11 @@ async fn process_publish_jobs( if result.is_ok() { state.analytics.message(SubscriberNotificationParams { project_id, - message_id: msg_id.into(), - notify_topic: job.topic, subscriber_pk: job.client_pk, account: job.account, notification_type, + notify_topic: job.topic, + message_id: msg_id.into(), }); } result diff --git a/src/websocket_service/handlers/notify_delete.rs b/src/websocket_service/handlers/notify_delete.rs index 43e935a8..db277df3 100644 --- a/src/websocket_service/handlers/notify_delete.rs +++ b/src/websocket_service/handlers/notify_delete.rs @@ -101,14 +101,14 @@ pub async fn handle( }; state.analytics.client(SubscriberUpdateParams { - pk: subscriber.id, - method: NotifyClientMethod::Unsubscribe, project_id: project.project_id, + pk: subscriber.id, account: account.clone(), - topic, - notify_topic: subscriber.topic, + method: NotifyClientMethod::Unsubscribe, old_scope: subscriber.scope.into_iter().map(Into::into).collect(), new_scope: HashSet::new(), + notify_topic: subscriber.topic, + topic, }); let identity = DecodedClientId(decode_key(&project.authentication_public_key)?); diff --git a/src/websocket_service/handlers/notify_subscribe.rs b/src/websocket_service/handlers/notify_subscribe.rs index e6d19748..d7ac985c 100644 --- a/src/websocket_service/handlers/notify_subscribe.rs +++ b/src/websocket_service/handlers/notify_subscribe.rs @@ -160,14 +160,14 @@ pub async fn handle( state.wsclient.subscribe(notify_topic.clone()).await?; state.analytics.client(SubscriberUpdateParams { - pk: subscriber_id, - method: NotifyClientMethod::Subscribe, project_id, + pk: subscriber_id, account: account.clone(), - topic, - notify_topic: notify_topic.clone(), + method: NotifyClientMethod::Subscribe, old_scope: HashSet::new(), new_scope: scope.into_iter().map(Into::into).collect(), + notify_topic: notify_topic.clone(), + topic, }); // Send noop to extend ttl of relay's mapping diff --git a/src/websocket_service/handlers/notify_update.rs b/src/websocket_service/handlers/notify_update.rs index e35bb6b4..de964e74 100644 --- a/src/websocket_service/handlers/notify_update.rs +++ b/src/websocket_service/handlers/notify_update.rs @@ -102,14 +102,14 @@ pub async fn handle( // .await?; state.analytics.client(SubscriberUpdateParams { - pk: subscriber.id, - method: NotifyClientMethod::Update, project_id: project.project_id, + pk: subscriber.id, account: account.clone(), - topic, - notify_topic: subscriber.topic.clone(), + method: NotifyClientMethod::Update, old_scope: old_scope.into_iter().map(Into::into).collect(), new_scope: new_scope.into_iter().map(Into::into).collect(), + notify_topic: subscriber.topic.clone(), + topic, }); let identity = DecodedClientId(decode_key(&project.authentication_public_key)?); From aaacf55db9acc673bca4c14a8b655eaa0035f542 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 24 Oct 2023 15:01:58 -0400 Subject: [PATCH 11/14] chore: add project_pk --- src/analytics/subscriber_notification.rs | 10 +++++++--- src/analytics/subscriber_update.rs | 4 ++++ src/handlers/notify.rs | 4 ++++ src/websocket_service/handlers/notify_delete.rs | 1 + src/websocket_service/handlers/notify_subscribe.rs | 1 + src/websocket_service/handlers/notify_update.rs | 1 + 6 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/analytics/subscriber_notification.rs b/src/analytics/subscriber_notification.rs index 1dad1997..c26980eb 100644 --- a/src/analytics/subscriber_notification.rs +++ b/src/analytics/subscriber_notification.rs @@ -8,6 +8,7 @@ use { }; pub struct SubscriberNotificationParams { + pub project_pk: Uuid, pub project_id: ProjectId, pub subscriber_pk: Uuid, pub account: AccountId, @@ -20,10 +21,12 @@ pub struct SubscriberNotificationParams { pub struct SubscriberNotification { /// Time at which the event was generated pub event_at: chrono::NaiveDateTime, + /// Primary key of the project in the Notify Server database that the notification was sent from and the subscriber is subscribed to + pub project_pk: Uuid, /// Project ID of the project that the notification was sent from and the subscriber is subscribed to pub project_id: Arc, - /// Primary Key of the subscriber in the Notify Server database that the notificaiton is being sent to - pub subscriber_pk: String, + /// Primary key of the subscriber in the Notify Server database that the notificaiton is being sent to + pub subscriber_pk: Uuid, /// Hash of the CAIP-10 account of the subscriber pub account_hash: String, /// The notification type ID @@ -38,8 +41,9 @@ impl From for SubscriberNotification { fn from(params: SubscriberNotificationParams) -> Self { Self { event_at: wc::analytics::time::now(), + project_pk: params.project_pk, project_id: params.project_id.into_value(), - subscriber_pk: params.subscriber_pk.to_string(), + subscriber_pk: params.subscriber_pk, account_hash: sha256::digest(params.account.as_ref()), notification_type: params.notification_type, notify_topic: params.notify_topic.into_value(), diff --git a/src/analytics/subscriber_update.rs b/src/analytics/subscriber_update.rs index ff5406a4..aec68c8a 100644 --- a/src/analytics/subscriber_update.rs +++ b/src/analytics/subscriber_update.rs @@ -29,6 +29,7 @@ impl Display for NotifyClientMethod { } pub struct SubscriberUpdateParams { + pub project_pk: Uuid, pub project_id: ProjectId, pub pk: Uuid, pub account: AccountId, @@ -43,6 +44,8 @@ pub struct SubscriberUpdateParams { pub struct SubscriberUpdate { /// Time at which the event was generated pub event_at: chrono::NaiveDateTime, + /// Primary key of the project in the Notify Server database that the subscriber is subscribed to + pub project_pk: String, /// Project ID of the project that the subscriber is subscribed to pub project_id: Arc, /// Primary Key of the subscriber in the Notify Server database @@ -65,6 +68,7 @@ impl From for SubscriberUpdate { fn from(client: SubscriberUpdateParams) -> Self { Self { event_at: wc::analytics::time::now(), + project_pk: client.project_pk.to_string(), project_id: client.project_id.into_value(), pk: client.pk, account_hash: sha256::digest(client.account.as_ref()), diff --git a/src/handlers/notify.rs b/src/handlers/notify.rs index 00cbe049..432ed08a 100644 --- a/src/handlers/notify.rs +++ b/src/handlers/notify.rs @@ -127,6 +127,7 @@ pub async fn handler( &mut response, request_id, &state, + project.id, project_id.clone(), ) .await?; @@ -148,6 +149,7 @@ enum JobError { Elapsed(Elapsed), } +#[allow(clippy::too_many_arguments)] async fn process_publish_jobs( jobs: Vec, notification_type: Arc, @@ -155,6 +157,7 @@ async fn process_publish_jobs( response: &mut Response, request_id: Uuid, state: &Arc, + project_pk: Uuid, project_id: ProjectId, ) -> Result<()> { let timer = std::time::Instant::now(); @@ -233,6 +236,7 @@ async fn process_publish_jobs( move |result| { if result.is_ok() { state.analytics.message(SubscriberNotificationParams { + project_pk, project_id, subscriber_pk: job.client_pk, account: job.account, diff --git a/src/websocket_service/handlers/notify_delete.rs b/src/websocket_service/handlers/notify_delete.rs index db277df3..c68c09c1 100644 --- a/src/websocket_service/handlers/notify_delete.rs +++ b/src/websocket_service/handlers/notify_delete.rs @@ -101,6 +101,7 @@ pub async fn handle( }; state.analytics.client(SubscriberUpdateParams { + project_pk: project.id, project_id: project.project_id, pk: subscriber.id, account: account.clone(), diff --git a/src/websocket_service/handlers/notify_subscribe.rs b/src/websocket_service/handlers/notify_subscribe.rs index d7ac985c..630726b6 100644 --- a/src/websocket_service/handlers/notify_subscribe.rs +++ b/src/websocket_service/handlers/notify_subscribe.rs @@ -160,6 +160,7 @@ pub async fn handle( state.wsclient.subscribe(notify_topic.clone()).await?; state.analytics.client(SubscriberUpdateParams { + project_pk: project.id, project_id, pk: subscriber_id, account: account.clone(), diff --git a/src/websocket_service/handlers/notify_update.rs b/src/websocket_service/handlers/notify_update.rs index de964e74..1fb6d269 100644 --- a/src/websocket_service/handlers/notify_update.rs +++ b/src/websocket_service/handlers/notify_update.rs @@ -102,6 +102,7 @@ pub async fn handle( // .await?; state.analytics.client(SubscriberUpdateParams { + project_pk: project.id, project_id: project.project_id, pk: subscriber.id, account: account.clone(), From de78d3b3f7380a04658309ac45f386e52665efaa Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 24 Oct 2023 15:02:49 -0400 Subject: [PATCH 12/14] chore: notify_topic -> notification_topic --- src/analytics/subscriber_notification.rs | 4 ++-- src/analytics/subscriber_update.rs | 6 +++--- src/websocket_service/handlers/notify_delete.rs | 2 +- src/websocket_service/handlers/notify_subscribe.rs | 2 +- src/websocket_service/handlers/notify_update.rs | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/analytics/subscriber_notification.rs b/src/analytics/subscriber_notification.rs index c26980eb..240ea3cb 100644 --- a/src/analytics/subscriber_notification.rs +++ b/src/analytics/subscriber_notification.rs @@ -32,7 +32,7 @@ pub struct SubscriberNotification { /// The notification type ID pub notification_type: Arc, /// The topic that the notification was sent on - pub notify_topic: Arc, + pub notification_topic: Arc, /// Relay message ID of the notification pub message_id: Arc, } @@ -46,7 +46,7 @@ impl From for SubscriberNotification { subscriber_pk: params.subscriber_pk, account_hash: sha256::digest(params.account.as_ref()), notification_type: params.notification_type, - notify_topic: params.notify_topic.into_value(), + notification_topic: params.notify_topic.into_value(), message_id: params.message_id, } } diff --git a/src/analytics/subscriber_update.rs b/src/analytics/subscriber_update.rs index aec68c8a..00e4a85f 100644 --- a/src/analytics/subscriber_update.rs +++ b/src/analytics/subscriber_update.rs @@ -36,7 +36,7 @@ pub struct SubscriberUpdateParams { pub method: NotifyClientMethod, pub old_scope: HashSet>, pub new_scope: HashSet>, - pub notify_topic: Topic, + pub notification_topic: Topic, pub topic: Topic, } @@ -59,7 +59,7 @@ pub struct SubscriberUpdate { /// Notification types that the subscriber is subscribed to after the update, separated by commas pub new_scope: String, /// The topic that notifications are sent on - pub notify_topic: Arc, + pub notification_topic: Arc, /// The topic used to create or manage the subscription that the update message was published to pub topic: Arc, } @@ -75,7 +75,7 @@ impl From for SubscriberUpdate { method: client.method.to_string(), old_scope: client.old_scope.iter().join(","), new_scope: client.new_scope.iter().join(","), - notify_topic: client.notify_topic.into_value(), + notification_topic: client.notification_topic.into_value(), topic: client.topic.into_value(), } } diff --git a/src/websocket_service/handlers/notify_delete.rs b/src/websocket_service/handlers/notify_delete.rs index c68c09c1..30b5a84c 100644 --- a/src/websocket_service/handlers/notify_delete.rs +++ b/src/websocket_service/handlers/notify_delete.rs @@ -108,7 +108,7 @@ pub async fn handle( method: NotifyClientMethod::Unsubscribe, old_scope: subscriber.scope.into_iter().map(Into::into).collect(), new_scope: HashSet::new(), - notify_topic: subscriber.topic, + notification_topic: subscriber.topic, topic, }); diff --git a/src/websocket_service/handlers/notify_subscribe.rs b/src/websocket_service/handlers/notify_subscribe.rs index 630726b6..aa0781e3 100644 --- a/src/websocket_service/handlers/notify_subscribe.rs +++ b/src/websocket_service/handlers/notify_subscribe.rs @@ -167,7 +167,7 @@ pub async fn handle( method: NotifyClientMethod::Subscribe, old_scope: HashSet::new(), new_scope: scope.into_iter().map(Into::into).collect(), - notify_topic: notify_topic.clone(), + notification_topic: notify_topic.clone(), topic, }); diff --git a/src/websocket_service/handlers/notify_update.rs b/src/websocket_service/handlers/notify_update.rs index 1fb6d269..6d7045f8 100644 --- a/src/websocket_service/handlers/notify_update.rs +++ b/src/websocket_service/handlers/notify_update.rs @@ -109,7 +109,7 @@ pub async fn handle( method: NotifyClientMethod::Update, old_scope: old_scope.into_iter().map(Into::into).collect(), new_scope: new_scope.into_iter().map(Into::into).collect(), - notify_topic: subscriber.topic.clone(), + notification_topic: subscriber.topic.clone(), topic, }); From 278f54fc99f783a707ddb7c3deddbd1238bb2f82 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 24 Oct 2023 15:04:40 -0400 Subject: [PATCH 13/14] chore: keep as Uuid --- src/analytics/subscriber_update.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/analytics/subscriber_update.rs b/src/analytics/subscriber_update.rs index 00e4a85f..f8ce29b4 100644 --- a/src/analytics/subscriber_update.rs +++ b/src/analytics/subscriber_update.rs @@ -45,7 +45,7 @@ pub struct SubscriberUpdate { /// Time at which the event was generated pub event_at: chrono::NaiveDateTime, /// Primary key of the project in the Notify Server database that the subscriber is subscribed to - pub project_pk: String, + pub project_pk: Uuid, /// Project ID of the project that the subscriber is subscribed to pub project_id: Arc, /// Primary Key of the subscriber in the Notify Server database @@ -68,7 +68,7 @@ impl From for SubscriberUpdate { fn from(client: SubscriberUpdateParams) -> Self { Self { event_at: wc::analytics::time::now(), - project_pk: client.project_pk.to_string(), + project_pk: client.project_pk, project_id: client.project_id.into_value(), pk: client.pk, account_hash: sha256::digest(client.account.as_ref()), From 9778a7a24565eb2a6dc62521aef924af56c8ecf0 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 24 Oct 2023 16:58:55 -0400 Subject: [PATCH 14/14] chore: export JWT ss & domain --- src/analytics/subscriber_update.rs | 28 ++++++++++++------- src/auth.rs | 9 ++++-- .../handlers/notify_delete.rs | 13 ++++++--- .../handlers/notify_subscribe.rs | 15 ++++++---- .../handlers/notify_update.rs | 13 ++++++--- 5 files changed, 53 insertions(+), 25 deletions(-) diff --git a/src/analytics/subscriber_update.rs b/src/analytics/subscriber_update.rs index f8ce29b4..2c1289f6 100644 --- a/src/analytics/subscriber_update.rs +++ b/src/analytics/subscriber_update.rs @@ -33,6 +33,8 @@ pub struct SubscriberUpdateParams { pub project_id: ProjectId, pub pk: Uuid, pub account: AccountId, + pub updated_by_iss: Arc, + pub updated_by_domain: String, pub method: NotifyClientMethod, pub old_scope: HashSet>, pub new_scope: HashSet>, @@ -52,6 +54,10 @@ pub struct SubscriberUpdate { pub pk: Uuid, /// Hash of the CAIP-10 account of the subscriber pub account_hash: String, + /// JWT iss that made the update + pub updated_by_iss: Arc, + /// CACAO domain that made the update + pub updated_by_domain: String, /// The change that happend to the subscriber, can be subscribe, update, or unsubscribe pub method: String, /// Notification types that the subscriber was subscribed to before the update, separated by commas @@ -65,18 +71,20 @@ pub struct SubscriberUpdate { } impl From for SubscriberUpdate { - fn from(client: SubscriberUpdateParams) -> Self { + fn from(params: SubscriberUpdateParams) -> Self { Self { event_at: wc::analytics::time::now(), - project_pk: client.project_pk, - project_id: client.project_id.into_value(), - pk: client.pk, - account_hash: sha256::digest(client.account.as_ref()), - method: client.method.to_string(), - old_scope: client.old_scope.iter().join(","), - new_scope: client.new_scope.iter().join(","), - notification_topic: client.notification_topic.into_value(), - topic: client.topic.into_value(), + project_pk: params.project_pk, + project_id: params.project_id.into_value(), + pk: params.pk, + account_hash: sha256::digest(params.account.as_ref()), + updated_by_iss: params.updated_by_iss, + updated_by_domain: params.updated_by_domain, + method: params.method.to_string(), + old_scope: params.old_scope.iter().join(","), + new_scope: params.new_scope.iter().join(","), + notification_topic: params.notification_topic.into_value(), + topic: params.topic.into_value(), } } } diff --git a/src/auth.rs b/src/auth.rs index 742c145d..b2d83ff5 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -407,6 +407,7 @@ pub enum AuthError { pub struct Authorization { pub account: AccountId, pub app: AuthorizedApp, + pub domain: String, } #[derive(Serialize, Deserialize, Debug)] @@ -462,7 +463,7 @@ pub async fn verify_identity(iss: &str, ksu: &str, sub: &str) -> Result Result