Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: data export refactors #144

Merged
merged 15 commits into from
Oct 26, 2023
32 changes: 19 additions & 13 deletions src/analytics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use {
self::{
notify_client::NotifyClientParams,
notify_message::{NotifyMessage, NotifyMessageParams},
subscriber_notification::SubscriberNotificationParams,
subscriber_update::SubscriberUpdateParams,
},
crate::{
analytics::{
subscriber_notification::SubscriberNotification, subscriber_update::SubscriberUpdate,
},
config::Configuration,
error::Result,
},
crate::{analytics::notify_client::NotifyClient, config::Configuration, error::Result},
aws_sdk_s3::Client as S3Client,
std::{net::IpAddr, sync::Arc},
tracing::{error, info},
Expand All @@ -18,13 +24,13 @@ use {
},
};

pub mod notify_client;
pub mod notify_message;
pub mod subscriber_notification;
pub mod subscriber_update;

#[derive(Clone)]
pub struct NotifyAnalytics {
pub messages: Analytics<NotifyMessage>,
pub clients: Analytics<NotifyClient>,
pub messages: Analytics<SubscriberNotification>,
pub clients: Analytics<SubscriberUpdate>,
pub geoip_resolver: Option<Arc<MaxMindResolver>>,
}

Expand Down Expand Up @@ -61,21 +67,21 @@ impl NotifyAnalytics {
node_ip: node_ip.clone(),
});

let collector = ParquetWriter::<NotifyMessage>::new(opts.clone(), exporter)?;
let collector = ParquetWriter::<SubscriberNotification>::new(opts.clone(), exporter)?;
Analytics::new(collector)
};

let clients = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "notify/clients",
export_name: "clients",
export_prefix: "notify/client_updates",
chris13524 marked this conversation as resolved.
Show resolved Hide resolved
export_name: "client_updates",
file_extension: "parquet",
bucket_name,
s3_client,
node_ip,
});

Analytics::new(ParquetWriter::<NotifyClient>::new(opts, exporter)?)
Analytics::new(ParquetWriter::<SubscriberUpdate>::new(opts, exporter)?)
};

Ok(Self {
Expand All @@ -85,11 +91,11 @@ impl NotifyAnalytics {
})
}

pub fn message(&self, message: NotifyMessageParams) {
pub fn message(&self, message: SubscriberNotificationParams) {
self.messages.collect(message.into());
}

pub fn client(&self, client: NotifyClientParams) {
pub fn client(&self, client: SubscriberUpdateParams) {
self.clients.collect(client.into());
}

Expand Down
38 changes: 0 additions & 38 deletions src/analytics/notify_message.rs

This file was deleted.

45 changes: 45 additions & 0 deletions src/analytics/subscriber_notification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use {
crate::model::types::AccountId,
parquet_derive::ParquetRecordWriter,
relay_rpc::domain::{ProjectId, Topic},
serde::Serialize,
std::sync::Arc,
uuid::Uuid,
};

pub struct SubscriberNotificationParams {
pub project_id: ProjectId,
pub client_pk: Uuid,
pub account: AccountId,
pub msg_id: Arc<str>,
pub topic: Topic,
pub notification_type: Arc<str>,
pub send_id: String,
}

#[derive(Debug, Serialize, ParquetRecordWriter)]
pub struct SubscriberNotification {
pub event_at: chrono::NaiveDateTime,
pub project_id: Arc<str>,
pub client_pk: String,
pub account_hash: String,
pub msg_id: Arc<str>,
pub topic: Arc<str>,
pub notification_type: Arc<str>,
pub send_id: String,
}

impl From<SubscriberNotificationParams> for SubscriberNotification {
fn from(message: SubscriberNotificationParams) -> Self {
Self {
project_id: message.project_id.into_value(),
msg_id: message.msg_id,
topic: message.topic.into_value(),
client_pk: message.client_pk.to_string(),
account_hash: sha256::digest(message.account.as_ref()),
notification_type: message.notification_type,
send_id: message.send_id,
event_at: wc::analytics::time::now(),
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use {
crate::model::types::AccountId,
parquet_derive::ParquetRecordWriter,
relay_rpc::domain::{ProjectId, Topic},
serde::Serialize,
std::fmt::{self, Display, Formatter},
std::{
fmt::{self, Display, Formatter},
sync::Arc,
},
uuid::Uuid,
};

pub enum NotifyClientMethod {
Expand All @@ -20,42 +26,39 @@ impl Display for NotifyClientMethod {
}
}

pub struct NotifyClientParams {
pub pk: String,
pub struct SubscriberUpdateParams {
pub project_id: ProjectId,
pub pk: Uuid,
pub account: AccountId,
pub method: NotifyClientMethod,
pub project_id: String,
pub account: String,
pub topic: String,
pub notify_topic: String,
pub topic: Topic,
pub notify_topic: Topic,
pub old_scope: String,
pub new_scope: String,
}

#[derive(Debug, Serialize, ParquetRecordWriter)]
#[serde(rename_all = "camelCase")]
pub struct NotifyClient {
pub pk: String,
pub method: String, // subscribe, update, unsubscribe
pub project_id: String,
pub account: String,
pub struct SubscriberUpdate {
pub event_at: chrono::NaiveDateTime,
pub project_id: Arc<str>,
pub pk: Uuid,
pub account_hash: String,
pub topic: String,
pub notify_topic: String,
pub method: String, // subscribe, update, unsubscribe
pub topic: Arc<str>,
pub notify_topic: Arc<str>,
pub old_scope: String,
pub new_scope: String,
pub event_at: chrono::NaiveDateTime,
}

impl From<NotifyClientParams> for NotifyClient {
fn from(client: NotifyClientParams) -> Self {
impl From<SubscriberUpdateParams> for SubscriberUpdate {
fn from(client: SubscriberUpdateParams) -> Self {
Self {
pk: client.pk,
method: client.method.to_string(),
project_id: client.project_id,
account_hash: sha256::digest(client.account.as_bytes()),
account: client.account,
topic: client.topic,
notify_topic: client.notify_topic,
project_id: client.project_id.into_value(),
account_hash: sha256::digest(client.account.as_ref()),
topic: client.topic.into_value(),
notify_topic: client.notify_topic.into_value(),
old_scope: client.old_scope,
new_scope: client.new_scope,
event_at: wc::analytics::time::now(),
Expand Down
25 changes: 15 additions & 10 deletions src/handlers/notify.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
analytics::notify_message::NotifyMessageParams,
analytics::subscriber_notification::SubscriberNotificationParams,
auth::add_ttl,
error,
extractors::AuthedProjectId,
Expand All @@ -23,14 +23,15 @@ use {
error::Result,
futures::FutureExt,
relay_rpc::{
domain::{ClientId, DecodedClientId, Topic},
domain::{ClientId, DecodedClientId, ProjectId, Topic},
jwt::{JwtHeader, JWT_HEADER_ALG, JWT_HEADER_TYP},
rpc::{msg_id::MsgId, Publish},
},
serde::{Deserialize, Serialize},
std::{collections::HashSet, sync::Arc, time::Duration},
tokio::time::error::Elapsed,
tracing::{info, warn},
uuid::Uuid,
wc::metrics::otel::{Context, KeyValue},
};

Expand All @@ -48,6 +49,7 @@ pub struct SendFailure {

#[derive(Clone)]
struct PublishJob {
client_pk: Uuid,
account: AccountId,
topic: Topic,
message: String,
Expand Down Expand Up @@ -125,7 +127,7 @@ pub async fn handler(
&mut response,
request_id,
&state,
project_id.as_ref(),
project_id.clone(),
)
.await?;

Expand All @@ -151,9 +153,9 @@ async fn process_publish_jobs(
notification_type: Arc<str>,
client: Arc<relay_client::http::Client>,
response: &mut Response,
request_id: uuid::Uuid,
request_id: Uuid,
state: &Arc<AppState>,
project_id: &str,
project_id: ProjectId,
) -> Result<()> {
let timer = std::time::Instant::now();
let futures = jobs.into_iter().map(|job| {
Expand Down Expand Up @@ -226,14 +228,16 @@ async fn process_publish_jobs(
}
})
.map({
let project_id = project_id.clone();
let notification_type = notification_type.clone();
move |result| {
if result.is_ok() {
state.analytics.message(NotifyMessageParams {
project_id: project_id.into(),
state.analytics.message(SubscriberNotificationParams {
project_id,
msg_id: msg_id.into(),
topic: job.topic.into_value(),
account: job.account.into_value(),
topic: job.topic,
client_pk: job.client_pk,
account: job.account,
notification_type,
send_id: "".to_string(), // TODO for when queueing is added
});
Expand Down Expand Up @@ -314,9 +318,10 @@ async fn generate_publish_jobs(
let topic = Topic::new(sha256::digest(&sym_key).into());

jobs.push(PublishJob {
client_pk: subscriber.id,
account: subscriber.account,
topic,
message: base64_notification,
account: subscriber.account,
})
}
Ok(jobs)
Expand Down
14 changes: 7 additions & 7 deletions src/websocket_service/handlers/notify_delete.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
analytics::notify_client::{NotifyClientMethod, NotifyClientParams},
analytics::subscriber_update::{NotifyClientMethod, SubscriberUpdateParams},
auth::{
add_ttl, from_jwt, sign_jwt, verify_identity, AuthError, Authorization, AuthorizedApp,
SharedClaims, SubscriptionDeleteRequestAuth, SubscriptionDeleteResponseAuth,
Expand Down Expand Up @@ -100,13 +100,13 @@ pub async fn handle(
warn!("Error unsubscribing Notify from topic: {}", e);
};

state.analytics.client(NotifyClientParams {
pk: subscriber.id.to_string(),
state.analytics.client(SubscriberUpdateParams {
pk: subscriber.id,
method: NotifyClientMethod::Unsubscribe,
project_id: project.id.to_string(),
account: account.to_string(),
topic: topic.to_string(),
notify_topic: subscriber.topic.to_string(),
project_id: project.project_id,
account: account.clone(),
topic,
notify_topic: subscriber.topic,
old_scope: subscriber.scope.join(","),
new_scope: "".to_owned(),
});
Expand Down
14 changes: 7 additions & 7 deletions src/websocket_service/handlers/notify_subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
analytics::notify_client::{NotifyClientMethod, NotifyClientParams},
analytics::subscriber_update::{NotifyClientMethod, SubscriberUpdateParams},
auth::{
add_ttl, from_jwt, sign_jwt, verify_identity, AuthError, Authorization, AuthorizedApp,
SharedClaims, SubscriptionRequestAuth, SubscriptionResponseAuth,
Expand Down Expand Up @@ -159,13 +159,13 @@ pub async fn handle(

state.wsclient.subscribe(notify_topic.clone()).await?;

state.analytics.client(NotifyClientParams {
pk: subscriber_id.to_string(),
state.analytics.client(SubscriberUpdateParams {
pk: subscriber_id,
method: NotifyClientMethod::Subscribe,
project_id: project_id.to_string(),
account: account.to_string(),
topic: topic.to_string(),
notify_topic: notify_topic.to_string(),
project_id,
account: account.clone(),
topic,
notify_topic: notify_topic.clone(),
old_scope: "".to_owned(),
new_scope: scope.into_iter().collect::<Vec<_>>().join(","),
});
Expand Down
Loading