diff --git a/Cargo.lock b/Cargo.lock index 56dc24d0..0ed325fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2668,6 +2668,7 @@ dependencies = [ "hkdf", "hyper", "ipnet", + "itertools", "jsonwebtoken", "k256", "lazy_static", diff --git a/Cargo.toml b/Cargo.toml index 787469c3..2f195408 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ rmp-serde = "1.1.1" deadpool-redis = "0.12.0" rand_chacha = "0.3.1" sqlx = { version = "0.7.1", features = ["runtime-tokio-native-tls", "postgres", "chrono", "uuid"] } +itertools = "0.11.0" sha3 = "0.10.8" [dev-dependencies] diff --git a/src/analytics/mod.rs b/src/analytics/mod.rs index a52cc8a6..45e677d7 100644 --- a/src/analytics/mod.rs +++ b/src/analytics/mod.rs @@ -1,9 +1,15 @@ use { self::{ - notify_client::NotifyClientParams, - notify_message::{NotifyMessage, NotifyMessageParams}, + subscriber_notification::SubscriberNotificationParams, + subscriber_update::SubscriberUpdateParams, + }, + crate::{ + analytics::{ + subscriber_notification::SubscriberNotification, subscriber_update::SubscriberUpdate, + }, + config::Configuration, + error::Result, }, - crate::{analytics::notify_client::NotifyClient, config::Configuration, error::Result}, aws_sdk_s3::Client as S3Client, std::{net::IpAddr, sync::Arc}, tracing::{error, info}, @@ -18,13 +24,13 @@ use { }, }; -pub mod notify_client; -pub mod notify_message; +pub mod subscriber_notification; +pub mod subscriber_update; #[derive(Clone)] pub struct NotifyAnalytics { - pub messages: Analytics, - pub clients: Analytics, + pub messages: Analytics, + pub clients: Analytics, pub geoip_resolver: Option>, } @@ -53,29 +59,29 @@ impl NotifyAnalytics { let messages = { let exporter = AwsExporter::new(AwsOpts { - export_prefix: "notify/messages", - export_name: "messages", + export_prefix: "notify/subscriber_notifications", + export_name: "subscriber_notifications", file_extension: "parquet", bucket_name: bucket_name.clone(), s3_client: s3_client.clone(), node_ip: node_ip.clone(), }); - let collector = ParquetWriter::::new(opts.clone(), exporter)?; + let collector = ParquetWriter::::new(opts.clone(), exporter)?; Analytics::new(collector) }; let clients = { let exporter = AwsExporter::new(AwsOpts { - export_prefix: "notify/clients", - export_name: "clients", + export_prefix: "notify/subscriber_updates", + export_name: "subscriber_updates", file_extension: "parquet", bucket_name, s3_client, node_ip, }); - Analytics::new(ParquetWriter::::new(opts, exporter)?) + Analytics::new(ParquetWriter::::new(opts, exporter)?) }; Ok(Self { @@ -85,11 +91,11 @@ impl NotifyAnalytics { }) } - pub fn message(&self, message: NotifyMessageParams) { + pub fn message(&self, message: SubscriberNotificationParams) { self.messages.collect(message.into()); } - pub fn client(&self, client: NotifyClientParams) { + pub fn client(&self, client: SubscriberUpdateParams) { self.clients.collect(client.into()); } diff --git a/src/analytics/notify_client.rs b/src/analytics/notify_client.rs deleted file mode 100644 index 4decc1f9..00000000 --- a/src/analytics/notify_client.rs +++ /dev/null @@ -1,64 +0,0 @@ -use { - parquet_derive::ParquetRecordWriter, - serde::Serialize, - std::fmt::{self, Display, Formatter}, -}; - -pub enum NotifyClientMethod { - Subscribe, - Update, - Unsubscribe, -} - -impl Display for NotifyClientMethod { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - Self::Subscribe => write!(f, "subscribe"), - Self::Update => write!(f, "update"), - Self::Unsubscribe => write!(f, "unsubscribe"), - } - } -} - -pub struct NotifyClientParams { - pub pk: String, - pub method: NotifyClientMethod, - pub project_id: String, - pub account: String, - pub topic: String, - pub notify_topic: String, - pub old_scope: String, - pub new_scope: String, -} - -#[derive(Debug, Serialize, ParquetRecordWriter)] -#[serde(rename_all = "camelCase")] -pub struct NotifyClient { - pub pk: String, - pub method: String, // subscribe, update, unsubscribe - pub project_id: String, - pub account: String, - pub account_hash: String, - pub topic: String, - pub notify_topic: String, - pub old_scope: String, - pub new_scope: String, - pub event_at: chrono::NaiveDateTime, -} - -impl From for NotifyClient { - fn from(client: NotifyClientParams) -> Self { - Self { - pk: client.pk, - method: client.method.to_string(), - project_id: client.project_id, - account_hash: sha256::digest(client.account.as_bytes()), - account: client.account, - topic: client.topic, - notify_topic: client.notify_topic, - old_scope: client.old_scope, - new_scope: client.new_scope, - event_at: wc::analytics::time::now(), - } - } -} diff --git a/src/analytics/notify_message.rs b/src/analytics/notify_message.rs deleted file mode 100644 index 2566d2a6..00000000 --- a/src/analytics/notify_message.rs +++ /dev/null @@ -1,38 +0,0 @@ -use {parquet_derive::ParquetRecordWriter, serde::Serialize, std::sync::Arc}; - -pub struct NotifyMessageParams { - pub project_id: Arc, - pub msg_id: Arc, - pub topic: Arc, - pub account: Arc, - pub notification_type: Arc, - pub send_id: String, -} - -#[derive(Debug, Serialize, ParquetRecordWriter)] -#[serde(rename_all = "camelCase")] -pub struct NotifyMessage { - pub project_id: Arc, - pub msg_id: Arc, - pub topic: Arc, - pub account: Arc, - pub account_hash: String, - pub notification_type: Arc, - pub send_id: String, - pub event_at: chrono::NaiveDateTime, -} - -impl From for NotifyMessage { - fn from(message: NotifyMessageParams) -> Self { - Self { - project_id: message.project_id, - msg_id: message.msg_id, - topic: message.topic, - account_hash: sha256::digest(message.account.as_ref()), - account: message.account, - notification_type: message.notification_type, - send_id: message.send_id, - event_at: wc::analytics::time::now(), - } - } -} diff --git a/src/analytics/subscriber_notification.rs b/src/analytics/subscriber_notification.rs new file mode 100644 index 00000000..240ea3cb --- /dev/null +++ b/src/analytics/subscriber_notification.rs @@ -0,0 +1,53 @@ +use { + crate::model::types::AccountId, + parquet_derive::ParquetRecordWriter, + relay_rpc::domain::{ProjectId, Topic}, + serde::Serialize, + std::sync::Arc, + uuid::Uuid, +}; + +pub struct SubscriberNotificationParams { + pub project_pk: Uuid, + pub project_id: ProjectId, + pub subscriber_pk: Uuid, + pub account: AccountId, + pub notification_type: Arc, + pub notify_topic: Topic, + pub message_id: Arc, +} + +#[derive(Debug, Serialize, ParquetRecordWriter)] +pub struct SubscriberNotification { + /// Time at which the event was generated + pub event_at: chrono::NaiveDateTime, + /// Primary key of the project in the Notify Server database that the notification was sent from and the subscriber is subscribed to + pub project_pk: Uuid, + /// Project ID of the project that the notification was sent from and the subscriber is subscribed to + pub project_id: Arc, + /// Primary key of the subscriber in the Notify Server database that the notificaiton is being sent to + pub subscriber_pk: Uuid, + /// Hash of the CAIP-10 account of the subscriber + pub account_hash: String, + /// The notification type ID + pub notification_type: Arc, + /// The topic that the notification was sent on + pub notification_topic: Arc, + /// Relay message ID of the notification + pub message_id: Arc, +} + +impl From for SubscriberNotification { + fn from(params: SubscriberNotificationParams) -> Self { + Self { + event_at: wc::analytics::time::now(), + project_pk: params.project_pk, + project_id: params.project_id.into_value(), + subscriber_pk: params.subscriber_pk, + account_hash: sha256::digest(params.account.as_ref()), + notification_type: params.notification_type, + notification_topic: params.notify_topic.into_value(), + message_id: params.message_id, + } + } +} diff --git a/src/analytics/subscriber_update.rs b/src/analytics/subscriber_update.rs new file mode 100644 index 00000000..2c1289f6 --- /dev/null +++ b/src/analytics/subscriber_update.rs @@ -0,0 +1,90 @@ +use { + crate::model::types::AccountId, + itertools::Itertools, + parquet_derive::ParquetRecordWriter, + relay_rpc::domain::{ProjectId, Topic}, + serde::Serialize, + std::{ + collections::HashSet, + fmt::{self, Display, Formatter}, + sync::Arc, + }, + uuid::Uuid, +}; + +pub enum NotifyClientMethod { + Subscribe, + Update, + Unsubscribe, +} + +impl Display for NotifyClientMethod { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + Self::Subscribe => write!(f, "subscribe"), + Self::Update => write!(f, "update"), + Self::Unsubscribe => write!(f, "unsubscribe"), + } + } +} + +pub struct SubscriberUpdateParams { + pub project_pk: Uuid, + pub project_id: ProjectId, + pub pk: Uuid, + pub account: AccountId, + pub updated_by_iss: Arc, + pub updated_by_domain: String, + pub method: NotifyClientMethod, + pub old_scope: HashSet>, + pub new_scope: HashSet>, + pub notification_topic: Topic, + pub topic: Topic, +} + +#[derive(Debug, Serialize, ParquetRecordWriter)] +pub struct SubscriberUpdate { + /// Time at which the event was generated + pub event_at: chrono::NaiveDateTime, + /// Primary key of the project in the Notify Server database that the subscriber is subscribed to + pub project_pk: Uuid, + /// Project ID of the project that the subscriber is subscribed to + pub project_id: Arc, + /// Primary Key of the subscriber in the Notify Server database + pub pk: Uuid, + /// Hash of the CAIP-10 account of the subscriber + pub account_hash: String, + /// JWT iss that made the update + pub updated_by_iss: Arc, + /// CACAO domain that made the update + pub updated_by_domain: String, + /// The change that happend to the subscriber, can be subscribe, update, or unsubscribe + pub method: String, + /// Notification types that the subscriber was subscribed to before the update, separated by commas + pub old_scope: String, + /// Notification types that the subscriber is subscribed to after the update, separated by commas + pub new_scope: String, + /// The topic that notifications are sent on + pub notification_topic: Arc, + /// The topic used to create or manage the subscription that the update message was published to + pub topic: Arc, +} + +impl From for SubscriberUpdate { + fn from(params: SubscriberUpdateParams) -> Self { + Self { + event_at: wc::analytics::time::now(), + project_pk: params.project_pk, + project_id: params.project_id.into_value(), + pk: params.pk, + account_hash: sha256::digest(params.account.as_ref()), + updated_by_iss: params.updated_by_iss, + updated_by_domain: params.updated_by_domain, + method: params.method.to_string(), + old_scope: params.old_scope.iter().join(","), + new_scope: params.new_scope.iter().join(","), + notification_topic: params.notification_topic.into_value(), + topic: params.topic.into_value(), + } + } +} diff --git a/src/auth.rs b/src/auth.rs index 742c145d..b2d83ff5 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -407,6 +407,7 @@ pub enum AuthError { pub struct Authorization { pub account: AccountId, pub app: AuthorizedApp, + pub domain: String, } #[derive(Serialize, Deserialize, Debug)] @@ -462,7 +463,7 @@ pub async fn verify_identity(iss: &str, ksu: &str, sub: &str) -> Result Result, notification_type: Arc, client: Arc, response: &mut Response, - request_id: uuid::Uuid, + request_id: Uuid, state: &Arc, - project_id: &str, + project_pk: Uuid, + project_id: ProjectId, ) -> Result<()> { let timer = std::time::Instant::now(); let futures = jobs.into_iter().map(|job| { @@ -226,16 +231,18 @@ async fn process_publish_jobs( } }) .map({ + let project_id = project_id.clone(); let notification_type = notification_type.clone(); move |result| { if result.is_ok() { - state.analytics.message(NotifyMessageParams { - project_id: project_id.into(), - msg_id: msg_id.into(), - topic: job.topic.into_value(), - account: job.account.into_value(), + state.analytics.message(SubscriberNotificationParams { + project_pk, + project_id, + subscriber_pk: job.client_pk, + account: job.account, notification_type, - send_id: "".to_string(), // TODO for when queueing is added + notify_topic: job.topic, + message_id: msg_id.into(), }); } result @@ -314,9 +321,10 @@ async fn generate_publish_jobs( let topic = Topic::new(sha256::digest(&sym_key).into()); jobs.push(PublishJob { + client_pk: subscriber.id, + account: subscriber.account, topic, message: base64_notification, - account: subscriber.account, }) } Ok(jobs) diff --git a/src/websocket_service/handlers/notify_delete.rs b/src/websocket_service/handlers/notify_delete.rs index 35ae2335..6be42976 100644 --- a/src/websocket_service/handlers/notify_delete.rs +++ b/src/websocket_service/handlers/notify_delete.rs @@ -1,6 +1,6 @@ use { crate::{ - analytics::notify_client::{NotifyClientMethod, NotifyClientParams}, + analytics::subscriber_update::{NotifyClientMethod, SubscriberUpdateParams}, auth::{ add_ttl, from_jwt, sign_jwt, verify_identity, AuthError, Authorization, AuthorizedApp, SharedClaims, SubscriptionDeleteRequestAuth, SubscriptionDeleteResponseAuth, @@ -22,7 +22,7 @@ use { chrono::Utc, relay_rpc::domain::DecodedClientId, serde_json::{json, Value}, - std::sync::Arc, + std::{collections::HashSet, sync::Arc}, tracing::warn, }; @@ -66,13 +66,16 @@ pub async fn handle( Err(Error::AppDoesNotMatch)?; } - let account = { + let (account, siwe_domain) = { if sub_auth.shared_claims.act != "notify_delete" { return Err(AuthError::InvalidAct)?; } - let Authorization { account, app } = - verify_identity(&sub_auth.shared_claims.iss, &sub_auth.ksu, &sub_auth.sub).await?; + let Authorization { + account, + app, + domain, + } = verify_identity(&sub_auth.shared_claims.iss, &sub_auth.ksu, &sub_auth.sub).await?; // TODO verify `sub_auth.aud` matches `project_data.identity_keypair` @@ -82,7 +85,7 @@ pub async fn handle( } } - account + (account, domain) }; delete_subscriber(subscriber.id, &state.postgres).await?; @@ -100,15 +103,18 @@ pub async fn handle( warn!("Error unsubscribing Notify from topic: {}", e); }; - state.analytics.client(NotifyClientParams { - pk: subscriber.id.to_string(), + state.analytics.client(SubscriberUpdateParams { + project_pk: project.id, + project_id: project.project_id, + pk: subscriber.id, + account: account.clone(), + updated_by_iss: sub_auth.shared_claims.iss.clone().into(), + updated_by_domain: siwe_domain, method: NotifyClientMethod::Unsubscribe, - project_id: project.id.to_string(), - account: account.to_string(), - topic: topic.to_string(), - notify_topic: subscriber.topic.to_string(), - old_scope: subscriber.scope.join(","), - new_scope: "".to_owned(), + old_scope: subscriber.scope.into_iter().map(Into::into).collect(), + new_scope: HashSet::new(), + notification_topic: subscriber.topic, + topic, }); let identity = DecodedClientId(decode_key(&project.authentication_public_key)?); diff --git a/src/websocket_service/handlers/notify_subscribe.rs b/src/websocket_service/handlers/notify_subscribe.rs index 51866f67..2b3bec8e 100644 --- a/src/websocket_service/handlers/notify_subscribe.rs +++ b/src/websocket_service/handlers/notify_subscribe.rs @@ -1,6 +1,6 @@ use { crate::{ - analytics::notify_client::{NotifyClientMethod, NotifyClientParams}, + analytics::subscriber_update::{NotifyClientMethod, SubscriberUpdateParams}, auth::{ add_ttl, from_jwt, sign_jwt, verify_identity, AuthError, Authorization, AuthorizedApp, SharedClaims, SubscriptionRequestAuth, SubscriptionResponseAuth, @@ -68,13 +68,16 @@ pub async fn handle( Err(Error::AppDoesNotMatch)?; } - let account = { + let (account, siwe_domain) = { if sub_auth.shared_claims.act != "notify_subscription" { return Err(AuthError::InvalidAct)?; } - let Authorization { account, app } = - verify_identity(&sub_auth.shared_claims.iss, &sub_auth.ksu, &sub_auth.sub).await?; + let Authorization { + account, + app, + domain, + } = verify_identity(&sub_auth.shared_claims.iss, &sub_auth.ksu, &sub_auth.sub).await?; // TODO verify `sub_auth.aud` matches `project_data.identity_keypair` @@ -87,7 +90,7 @@ pub async fn handle( // TODO merge code with integration.rs#verify_jwt() // - put desired `iss` value as an argument to make sure we verify it - account + (account, domain) }; let secret = StaticSecret::random_from_rng(chacha20poly1305::aead::OsRng); @@ -100,7 +103,7 @@ pub async fn handle( iat: now.timestamp() as u64, exp: add_ttl(now, NOTIFY_SUBSCRIBE_RESPONSE_TTL).timestamp() as u64, iss: format!("did:key:{identity}"), - aud: sub_auth.shared_claims.iss, + aud: sub_auth.shared_claims.iss.clone(), act: "notify_subscription_response".to_string(), }, sub: format!("did:pkh:{account}"), @@ -158,15 +161,18 @@ pub async fn handle( state.wsclient.subscribe(notify_topic.clone()).await?; - state.analytics.client(NotifyClientParams { - pk: subscriber_id.to_string(), + state.analytics.client(SubscriberUpdateParams { + project_pk: project.id, + project_id, + pk: subscriber_id, + account: account.clone(), + updated_by_iss: sub_auth.shared_claims.iss.into(), + updated_by_domain: siwe_domain, method: NotifyClientMethod::Subscribe, - project_id: project_id.to_string(), - account: account.to_string(), - topic: topic.to_string(), - notify_topic: notify_topic.to_string(), - old_scope: "".to_owned(), - new_scope: scope.into_iter().collect::>().join(","), + old_scope: HashSet::new(), + new_scope: scope.into_iter().map(Into::into).collect(), + notification_topic: notify_topic.clone(), + topic, }); // Send noop to extend ttl of relay's mapping diff --git a/src/websocket_service/handlers/notify_update.rs b/src/websocket_service/handlers/notify_update.rs index 31f7d3df..8c93dd4b 100644 --- a/src/websocket_service/handlers/notify_update.rs +++ b/src/websocket_service/handlers/notify_update.rs @@ -1,7 +1,7 @@ use { super::notify_watch_subscriptions::update_subscription_watchers, crate::{ - analytics::notify_client::{NotifyClientMethod, NotifyClientParams}, + analytics::subscriber_update::{NotifyClientMethod, SubscriberUpdateParams}, auth::{ add_ttl, from_jwt, sign_jwt, verify_identity, AuthError, Authorization, AuthorizedApp, SharedClaims, SubscriptionUpdateRequestAuth, SubscriptionUpdateResponseAuth, @@ -57,13 +57,16 @@ pub async fn handle( Err(Error::AppDoesNotMatch)?; } - let account = { + let (account, siwe_domain) = { if sub_auth.shared_claims.act != "notify_update" { return Err(AuthError::InvalidAct)?; } - let Authorization { account, app } = - verify_identity(&sub_auth.shared_claims.iss, &sub_auth.ksu, &sub_auth.sub).await?; + let Authorization { + account, + app, + domain, + } = verify_identity(&sub_auth.shared_claims.iss, &sub_auth.ksu, &sub_auth.sub).await?; // TODO verify `sub_auth.aud` matches `project_data.identity_keypair` @@ -73,18 +76,23 @@ pub async fn handle( } } - account + (account, domain) }; let old_scope = subscriber.scope; - let scope = sub_auth + let new_scope = sub_auth .scp .split(' ') .map(|s| s.to_owned()) .collect::>(); - let subscriber = - update_subscriber(project.id, account.clone(), scope.clone(), &state.postgres).await?; + let subscriber = update_subscriber( + project.id, + account.clone(), + new_scope.clone(), + &state.postgres, + ) + .await?; // TODO do in same transaction as update_subscriber() // state @@ -95,15 +103,18 @@ pub async fn handle( // ) // .await?; - state.analytics.client(NotifyClientParams { - pk: subscriber.id.to_string(), + state.analytics.client(SubscriberUpdateParams { + project_pk: project.id, + project_id: project.project_id, + pk: subscriber.id, + account: account.clone(), + updated_by_iss: sub_auth.shared_claims.iss.clone().into(), + updated_by_domain: siwe_domain, method: NotifyClientMethod::Update, - project_id: project.id.to_string(), - account: account.to_string(), - topic: topic.to_string(), - notify_topic: subscriber.topic.to_string(), - old_scope: old_scope.join(","), - new_scope: scope.into_iter().collect::>().join(","), + old_scope: old_scope.into_iter().map(Into::into).collect(), + new_scope: new_scope.into_iter().map(Into::into).collect(), + notification_topic: subscriber.topic.clone(), + topic, }); let identity = DecodedClientId(decode_key(&project.authentication_public_key)?);