Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: data export refactors #144

Merged
merged 15 commits into from
Oct 26, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ rmp-serde = "1.1.1"
deadpool-redis = "0.12.0"
rand_chacha = "0.3.1"
sqlx = { version = "0.7.1", features = ["runtime-tokio-native-tls", "postgres", "chrono", "uuid"] }
itertools = "0.11.0"

[dev-dependencies]
sha3 = "0.10.8"
Expand Down
36 changes: 21 additions & 15 deletions src/analytics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use {
self::{
notify_client::NotifyClientParams,
notify_message::{NotifyMessage, NotifyMessageParams},
subscriber_notification::SubscriberNotificationParams,
subscriber_update::SubscriberUpdateParams,
},
crate::{
analytics::{
subscriber_notification::SubscriberNotification, subscriber_update::SubscriberUpdate,
},
config::Configuration,
error::Result,
},
crate::{analytics::notify_client::NotifyClient, config::Configuration, error::Result},
aws_sdk_s3::Client as S3Client,
std::{net::IpAddr, sync::Arc},
tracing::{error, info},
Expand All @@ -18,13 +24,13 @@ use {
},
};

pub mod notify_client;
pub mod notify_message;
pub mod subscriber_notification;
pub mod subscriber_update;

#[derive(Clone)]
pub struct NotifyAnalytics {
pub messages: Analytics<NotifyMessage>,
pub clients: Analytics<NotifyClient>,
pub messages: Analytics<SubscriberNotification>,
pub clients: Analytics<SubscriberUpdate>,
pub geoip_resolver: Option<Arc<MaxMindResolver>>,
}

Expand Down Expand Up @@ -53,29 +59,29 @@ impl NotifyAnalytics {

let messages = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/messages",
export_name: "messages",
export_prefix: "notify/subscriber_notifications",
export_name: "subscriber_notifications",
file_extension: "parquet",
bucket_name: bucket_name.clone(),
s3_client: s3_client.clone(),
node_ip: node_ip.clone(),
});

let collector = ParquetWriter::<NotifyMessage>::new(opts.clone(), exporter)?;
let collector = ParquetWriter::<SubscriberNotification>::new(opts.clone(), exporter)?;
Analytics::new(collector)
};

let clients = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/clients",
export_name: "clients",
export_prefix: "notify/subscriber_updates",
export_name: "subscriber_updates",
file_extension: "parquet",
bucket_name,
s3_client,
node_ip,
});

Analytics::new(ParquetWriter::<NotifyClient>::new(opts, exporter)?)
Analytics::new(ParquetWriter::<SubscriberUpdate>::new(opts, exporter)?)
};

Ok(Self {
Expand All @@ -85,11 +91,11 @@ impl NotifyAnalytics {
})
}

pub fn message(&self, message: NotifyMessageParams) {
pub fn message(&self, message: SubscriberNotificationParams) {
self.messages.collect(message.into());
}

pub fn client(&self, client: NotifyClientParams) {
pub fn client(&self, client: SubscriberUpdateParams) {
self.clients.collect(client.into());
}

Expand Down
64 changes: 0 additions & 64 deletions src/analytics/notify_client.rs

This file was deleted.

38 changes: 0 additions & 38 deletions src/analytics/notify_message.rs

This file was deleted.

53 changes: 53 additions & 0 deletions src/analytics/subscriber_notification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use {
crate::model::types::AccountId,
parquet_derive::ParquetRecordWriter,
relay_rpc::domain::{ProjectId, Topic},
serde::Serialize,
std::sync::Arc,
uuid::Uuid,
};

pub struct SubscriberNotificationParams {
pub project_pk: Uuid,
pub project_id: ProjectId,
pub subscriber_pk: Uuid,
pub account: AccountId,
pub notification_type: Arc<str>,
pub notify_topic: Topic,
pub message_id: Arc<str>,
}

#[derive(Debug, Serialize, ParquetRecordWriter)]
pub struct SubscriberNotification {
/// Time at which the event was generated
pub event_at: chrono::NaiveDateTime,
/// Primary key of the project in the Notify Server database that the notification was sent from and the subscriber is subscribed to
pub project_pk: Uuid,
/// Project ID of the project that the notification was sent from and the subscriber is subscribed to
pub project_id: Arc<str>,
/// Primary key of the subscriber in the Notify Server database that the notificaiton is being sent to
pub subscriber_pk: Uuid,
/// Hash of the CAIP-10 account of the subscriber
pub account_hash: String,
/// The notification type ID
pub notification_type: Arc<str>,
/// The topic that the notification was sent on
pub notification_topic: Arc<str>,
/// Relay message ID of the notification
pub message_id: Arc<str>,
}

impl From<SubscriberNotificationParams> for SubscriberNotification {
fn from(params: SubscriberNotificationParams) -> Self {
Self {
event_at: wc::analytics::time::now(),
project_pk: params.project_pk,
project_id: params.project_id.into_value(),
subscriber_pk: params.subscriber_pk,
account_hash: sha256::digest(params.account.as_ref()),
notification_type: params.notification_type,
notification_topic: params.notify_topic.into_value(),
message_id: params.message_id,
}
}
}
90 changes: 90 additions & 0 deletions src/analytics/subscriber_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use {
crate::model::types::AccountId,
itertools::Itertools,
parquet_derive::ParquetRecordWriter,
relay_rpc::domain::{ProjectId, Topic},
serde::Serialize,
std::{
collections::HashSet,
fmt::{self, Display, Formatter},
sync::Arc,
},
uuid::Uuid,
};

pub enum NotifyClientMethod {
Subscribe,
Update,
Unsubscribe,
}

impl Display for NotifyClientMethod {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::Subscribe => write!(f, "subscribe"),
Self::Update => write!(f, "update"),
Self::Unsubscribe => write!(f, "unsubscribe"),
}
}
}

pub struct SubscriberUpdateParams {
pub project_pk: Uuid,
pub project_id: ProjectId,
pub pk: Uuid,
pub account: AccountId,
pub updated_by_iss: Arc<str>,
pub updated_by_domain: String,
pub method: NotifyClientMethod,
pub old_scope: HashSet<Arc<str>>,
pub new_scope: HashSet<Arc<str>>,
pub notification_topic: Topic,
pub topic: Topic,
}

#[derive(Debug, Serialize, ParquetRecordWriter)]
pub struct SubscriberUpdate {
/// Time at which the event was generated
pub event_at: chrono::NaiveDateTime,
/// Primary key of the project in the Notify Server database that the subscriber is subscribed to
pub project_pk: Uuid,
/// Project ID of the project that the subscriber is subscribed to
pub project_id: Arc<str>,
/// Primary Key of the subscriber in the Notify Server database
pub pk: Uuid,
/// Hash of the CAIP-10 account of the subscriber
pub account_hash: String,
/// JWT iss that made the update
pub updated_by_iss: Arc<str>,
/// CACAO domain that made the update
pub updated_by_domain: String,
/// The change that happend to the subscriber, can be subscribe, update, or unsubscribe
pub method: String,
/// Notification types that the subscriber was subscribed to before the update, separated by commas
pub old_scope: String,
/// Notification types that the subscriber is subscribed to after the update, separated by commas
pub new_scope: String,
/// The topic that notifications are sent on
pub notification_topic: Arc<str>,
/// The topic used to create or manage the subscription that the update message was published to
pub topic: Arc<str>,
}

impl From<SubscriberUpdateParams> for SubscriberUpdate {
fn from(params: SubscriberUpdateParams) -> Self {
Self {
event_at: wc::analytics::time::now(),
project_pk: params.project_pk,
project_id: params.project_id.into_value(),
pk: params.pk,
account_hash: sha256::digest(params.account.as_ref()),
updated_by_iss: params.updated_by_iss,
updated_by_domain: params.updated_by_domain,
method: params.method.to_string(),
old_scope: params.old_scope.iter().join(","),
new_scope: params.new_scope.iter().join(","),
notification_topic: params.notification_topic.into_value(),
topic: params.topic.into_value(),
}
}
}
9 changes: 7 additions & 2 deletions src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ pub enum AuthError {
pub struct Authorization {
pub account: AccountId,
pub app: AuthorizedApp,
pub domain: String,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -462,7 +463,7 @@ pub async fn verify_identity(iss: &str, ksu: &str, sub: &str) -> Result<Authoriz
let app = {
let statement = cacao.p.statement.ok_or(AuthError::CacaoStatementMissing)?;
if statement.contains("DAPP") || statement == STATEMENT_THIS_DOMAIN {
AuthorizedApp::Limited(cacao.p.domain)
AuthorizedApp::Limited(cacao.p.domain.clone())
} else if statement.contains("WALLET")
|| statement == STATEMENT
|| statement == STATEMENT_ALL_DOMAINS
Expand Down Expand Up @@ -500,7 +501,11 @@ pub async fn verify_identity(iss: &str, ksu: &str, sub: &str) -> Result<Authoriz
}
}

Ok(Authorization { account, app })
Ok(Authorization {
account,
app,
domain: cacao.p.domain,
})
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
Loading