Skip to content

Commit

Permalink
fix: data export refactors (#144)
Browse files Browse the repository at this point in the history
* chore: remove account data

* chore: snake_case

* chore: more changes

* chore: export names

* chore: remove send_id

* chore: tweaks

* chore: message_id

* chore: subscriber_pk

* chore: comment

* chore: reorder and comments

* chore: add project_pk

* chore: notify_topic -> notification_topic

* chore: keep as Uuid

* chore: export JWT ss & domain
  • Loading branch information
chris13524 authored Oct 26, 2023
1 parent cc5c00d commit 296d39e
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 175 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ rmp-serde = "1.1.1"
deadpool-redis = "0.12.0"
rand_chacha = "0.3.1"
sqlx = { version = "0.7.1", features = ["runtime-tokio-native-tls", "postgres", "chrono", "uuid"] }
itertools = "0.11.0"
sha3 = "0.10.8"

[dev-dependencies]
Expand Down
36 changes: 21 additions & 15 deletions src/analytics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use {
self::{
notify_client::NotifyClientParams,
notify_message::{NotifyMessage, NotifyMessageParams},
subscriber_notification::SubscriberNotificationParams,
subscriber_update::SubscriberUpdateParams,
},
crate::{
analytics::{
subscriber_notification::SubscriberNotification, subscriber_update::SubscriberUpdate,
},
config::Configuration,
error::Result,
},
crate::{analytics::notify_client::NotifyClient, config::Configuration, error::Result},
aws_sdk_s3::Client as S3Client,
std::{net::IpAddr, sync::Arc},
tracing::{error, info},
Expand All @@ -18,13 +24,13 @@ use {
},
};

pub mod notify_client;
pub mod notify_message;
pub mod subscriber_notification;
pub mod subscriber_update;

#[derive(Clone)]
pub struct NotifyAnalytics {
pub messages: Analytics<NotifyMessage>,
pub clients: Analytics<NotifyClient>,
pub messages: Analytics<SubscriberNotification>,
pub clients: Analytics<SubscriberUpdate>,
pub geoip_resolver: Option<Arc<MaxMindResolver>>,
}

Expand Down Expand Up @@ -53,29 +59,29 @@ impl NotifyAnalytics {

let messages = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/messages",
export_name: "messages",
export_prefix: "notify/subscriber_notifications",
export_name: "subscriber_notifications",
file_extension: "parquet",
bucket_name: bucket_name.clone(),
s3_client: s3_client.clone(),
node_ip: node_ip.clone(),
});

let collector = ParquetWriter::<NotifyMessage>::new(opts.clone(), exporter)?;
let collector = ParquetWriter::<SubscriberNotification>::new(opts.clone(), exporter)?;
Analytics::new(collector)
};

let clients = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/clients",
export_name: "clients",
export_prefix: "notify/subscriber_updates",
export_name: "subscriber_updates",
file_extension: "parquet",
bucket_name,
s3_client,
node_ip,
});

Analytics::new(ParquetWriter::<NotifyClient>::new(opts, exporter)?)
Analytics::new(ParquetWriter::<SubscriberUpdate>::new(opts, exporter)?)
};

Ok(Self {
Expand All @@ -85,11 +91,11 @@ impl NotifyAnalytics {
})
}

pub fn message(&self, message: NotifyMessageParams) {
pub fn message(&self, message: SubscriberNotificationParams) {
self.messages.collect(message.into());
}

pub fn client(&self, client: NotifyClientParams) {
pub fn client(&self, client: SubscriberUpdateParams) {
self.clients.collect(client.into());
}

Expand Down
64 changes: 0 additions & 64 deletions src/analytics/notify_client.rs

This file was deleted.

38 changes: 0 additions & 38 deletions src/analytics/notify_message.rs

This file was deleted.

53 changes: 53 additions & 0 deletions src/analytics/subscriber_notification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use {
crate::model::types::AccountId,
parquet_derive::ParquetRecordWriter,
relay_rpc::domain::{ProjectId, Topic},
serde::Serialize,
std::sync::Arc,
uuid::Uuid,
};

pub struct SubscriberNotificationParams {
pub project_pk: Uuid,
pub project_id: ProjectId,
pub subscriber_pk: Uuid,
pub account: AccountId,
pub notification_type: Arc<str>,
pub notify_topic: Topic,
pub message_id: Arc<str>,
}

#[derive(Debug, Serialize, ParquetRecordWriter)]
pub struct SubscriberNotification {
/// Time at which the event was generated
pub event_at: chrono::NaiveDateTime,
/// Primary key of the project in the Notify Server database that the notification was sent from and the subscriber is subscribed to
pub project_pk: Uuid,
/// Project ID of the project that the notification was sent from and the subscriber is subscribed to
pub project_id: Arc<str>,
/// Primary key of the subscriber in the Notify Server database that the notificaiton is being sent to
pub subscriber_pk: Uuid,
/// Hash of the CAIP-10 account of the subscriber
pub account_hash: String,
/// The notification type ID
pub notification_type: Arc<str>,
/// The topic that the notification was sent on
pub notification_topic: Arc<str>,
/// Relay message ID of the notification
pub message_id: Arc<str>,
}

impl From<SubscriberNotificationParams> for SubscriberNotification {
fn from(params: SubscriberNotificationParams) -> Self {
Self {
event_at: wc::analytics::time::now(),
project_pk: params.project_pk,
project_id: params.project_id.into_value(),
subscriber_pk: params.subscriber_pk,
account_hash: sha256::digest(params.account.as_ref()),
notification_type: params.notification_type,
notification_topic: params.notify_topic.into_value(),
message_id: params.message_id,
}
}
}
90 changes: 90 additions & 0 deletions src/analytics/subscriber_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use {
crate::model::types::AccountId,
itertools::Itertools,
parquet_derive::ParquetRecordWriter,
relay_rpc::domain::{ProjectId, Topic},
serde::Serialize,
std::{
collections::HashSet,
fmt::{self, Display, Formatter},
sync::Arc,
},
uuid::Uuid,
};

pub enum NotifyClientMethod {
Subscribe,
Update,
Unsubscribe,
}

impl Display for NotifyClientMethod {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::Subscribe => write!(f, "subscribe"),
Self::Update => write!(f, "update"),
Self::Unsubscribe => write!(f, "unsubscribe"),
}
}
}

pub struct SubscriberUpdateParams {
pub project_pk: Uuid,
pub project_id: ProjectId,
pub pk: Uuid,
pub account: AccountId,
pub updated_by_iss: Arc<str>,
pub updated_by_domain: String,
pub method: NotifyClientMethod,
pub old_scope: HashSet<Arc<str>>,
pub new_scope: HashSet<Arc<str>>,
pub notification_topic: Topic,
pub topic: Topic,
}

#[derive(Debug, Serialize, ParquetRecordWriter)]
pub struct SubscriberUpdate {
/// Time at which the event was generated
pub event_at: chrono::NaiveDateTime,
/// Primary key of the project in the Notify Server database that the subscriber is subscribed to
pub project_pk: Uuid,
/// Project ID of the project that the subscriber is subscribed to
pub project_id: Arc<str>,
/// Primary Key of the subscriber in the Notify Server database
pub pk: Uuid,
/// Hash of the CAIP-10 account of the subscriber
pub account_hash: String,
/// JWT iss that made the update
pub updated_by_iss: Arc<str>,
/// CACAO domain that made the update
pub updated_by_domain: String,
/// The change that happend to the subscriber, can be subscribe, update, or unsubscribe
pub method: String,
/// Notification types that the subscriber was subscribed to before the update, separated by commas
pub old_scope: String,
/// Notification types that the subscriber is subscribed to after the update, separated by commas
pub new_scope: String,
/// The topic that notifications are sent on
pub notification_topic: Arc<str>,
/// The topic used to create or manage the subscription that the update message was published to
pub topic: Arc<str>,
}

impl From<SubscriberUpdateParams> for SubscriberUpdate {
fn from(params: SubscriberUpdateParams) -> Self {
Self {
event_at: wc::analytics::time::now(),
project_pk: params.project_pk,
project_id: params.project_id.into_value(),
pk: params.pk,
account_hash: sha256::digest(params.account.as_ref()),
updated_by_iss: params.updated_by_iss,
updated_by_domain: params.updated_by_domain,
method: params.method.to_string(),
old_scope: params.old_scope.iter().join(","),
new_scope: params.new_scope.iter().join(","),
notification_topic: params.notification_topic.into_value(),
topic: params.topic.into_value(),
}
}
}
9 changes: 7 additions & 2 deletions src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ pub enum AuthError {
pub struct Authorization {
pub account: AccountId,
pub app: AuthorizedApp,
pub domain: String,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -462,7 +463,7 @@ pub async fn verify_identity(iss: &str, ksu: &str, sub: &str) -> Result<Authoriz
let app = {
let statement = cacao.p.statement.ok_or(AuthError::CacaoStatementMissing)?;
if statement.contains("DAPP") || statement == STATEMENT_THIS_DOMAIN {
AuthorizedApp::Limited(cacao.p.domain)
AuthorizedApp::Limited(cacao.p.domain.clone())
} else if statement.contains("WALLET")
|| statement == STATEMENT
|| statement == STATEMENT_ALL_DOMAINS
Expand Down Expand Up @@ -500,7 +501,11 @@ pub async fn verify_identity(iss: &str, ksu: &str, sub: &str) -> Result<Authoriz
}
}

Ok(Authorization { account, app })
Ok(Authorization {
account,
app,
domain: cacao.p.domain,
})
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
Loading

0 comments on commit 296d39e

Please sign in to comment.