diff --git a/packages/api/identity/src/route/identities.rs b/packages/api/identity/src/route/identities.rs index e3bdda166d..faf52f565e 100644 --- a/packages/api/identity/src/route/identities.rs +++ b/packages/api/identity/src/route/identities.rs @@ -191,17 +191,25 @@ pub async fn update_profile( ensure!( body.account_number.unwrap_or_default() >= 0, - "invalid parameter account_number`" + "invalid parameter account_number" ); - msg!([ctx] user::msg::profile_set(user_ent.user_id) -> user::msg::update { - user_id: Some(user_ent.user_id.into()), + let mut sub = ctx + .subscribe::<::user::workflows::user::ProfileSetStatus>(("user_id", user_ent.user_id)) + .await?; + + ctx.signal(::user::workflows::user::ProfileSet { display_name: body.display_name.clone(), account_number: body.account_number.map(|n| n.api_try_into()).transpose()?, bio: body.bio.clone(), }) + .tag("user_id", user_ent.user_id) + .send() .await?; - + + let status_msg = sub.next().await?; + ensure!(status_msg.res.is_ok(), "bad profile set request"); + Ok(serde_json::json!({})) } @@ -214,7 +222,7 @@ pub async fn validate_profile( ensure!( body.account_number.unwrap_or_default() >= 0, - "invalid parameter account_number`" + "invalid parameter account_number" ); let res = ctx.op(::user::ops::profile_validate::Input { diff --git a/packages/services/pegboard/src/workflows/client.rs b/packages/services/pegboard/src/workflows/client.rs index 5f6eaa8a25..a5af19cbac 100644 --- a/packages/services/pegboard/src/workflows/client.rs +++ b/packages/services/pegboard/src/workflows/client.rs @@ -648,3 +648,4 @@ join_signal!(Main { Undrain, Destroy, }); + diff --git a/packages/services/user/src/workflows/user/mod.rs b/packages/services/user/src/workflows/user/mod.rs index d11200cce6..9911d545f5 100644 --- a/packages/services/user/src/workflows/user/mod.rs +++ b/packages/services/user/src/workflows/user/mod.rs @@ -55,6 +55,30 @@ pub async fn user(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> { .send() .await?; }, + Main::ProfileSet(sig) => { + let res = ctx.activity(ProfileSetInput { + user_id, + display_name: sig.display_name, + account_number: sig.account_number, + bio: sig.bio + }).await?; + + ctx.msg(ProfileSetStatus { res: res.clone() }) + .tag("user_id", user_id) + .send() + .await?; + + if res.is_ok() { + ctx.activity(PublishProfileSetAnalyticsInput { + user_id + }).await?; + + ctx.msg(Update {}) + .tag("user_id", user_id) + .send() + .await?; + } + }, Main::Delete(_) => { return Ok(Loop::Break(())); }, @@ -121,6 +145,92 @@ async fn admin_set(ctx: &ActivityCtx, input: &AdminSetInput) -> GlobalResult<()> Ok(()) } +// ProfileSet +#[derive(Debug, Clone, Serialize, Deserialize, Hash)] +struct ProfileSetInput { + user_id: Uuid, + display_name: Option, + account_number: Option, + bio: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Hash)] +pub enum ProfileSetError { + ValidationFailure, + MissingParameters, +} + +#[activity(ProfileSetActivity)] +async fn profile_set(ctx: &ActivityCtx, input: &ProfileSetInput) -> GlobalResult> { + // Check if each component exists + if input.display_name.is_none() && input.account_number.is_none() && input.bio.is_none() { + return Ok(Err(ProfileSetError::MissingParameters)); + } + + let validation_res = ctx.op(crate::ops::profile_validate::Input { + user_id: input.user_id, + display_name: input.display_name.clone(), + account_number: input.account_number, + bio: input.bio.clone() + }) + .await?; + + if !validation_res.errors.is_empty() { + tracing::warn!(errors = ?validation_res.errors, "validation errors"); + + return Ok(Err(ProfileSetError::ValidationFailure)); + } + + ctx.cache().purge("user", [input.user_id]).await?; + + sql_execute!( + [ctx] + " + UPDATE db_user.users + SET + display_name = COALESCE($2, display_name), + account_number = COALESCE($3, account_number), + bio = COALESCE($4, bio) + WHERE user_id = $1 + ", + input.user_id, + &input.display_name, + input.account_number.map(|x| x as i64), + input.bio.as_ref().map(|x| util::format::biography(x)) + ) + .await?; + + Ok(Ok(())) +} + +#[derive(Debug, Clone, Serialize, Deserialize, Hash)] +struct PublishProfileSetAnalyticsInput { + user_id: Uuid +} + +#[activity(PublishProfileSetAnalytics)] +async fn publish_profile_set_analytics( + ctx: &ActivityCtx, + input: &PublishProfileSetAnalyticsInput +) -> GlobalResult<()> { + msg!([ctx] analytics::msg::event_create() { + events: vec![ + analytics::msg::event_create::Event { + event_id: Some(Uuid::new_v4().into()), + name: "user.profile_set".into(), + properties_json: Some(serde_json::to_string(&json!({ + "user_id": input.user_id.to_string() + }))?), + ..Default::default() + }, + ], + }) + .await?; + + Ok(()) +} + + // Creation #[derive(Debug, Clone, Serialize, Deserialize, Hash)] struct InsertDbInput { @@ -366,14 +476,30 @@ pub struct Update {} #[message("user_delete_complete")] pub struct DeleteComplete {} +#[message("user_event")] +pub struct Event {} + +#[message("user_profile_set_status")] +pub struct ProfileSetStatus { + pub res: Result<(), ProfileSetError>, +} + #[signal("user_admin_set")] pub struct AdminSet {} +#[signal("user_profile_set")] +pub struct ProfileSet { + pub display_name: Option, + pub account_number: Option, + pub bio: Option, +} + #[signal("user_delete")] pub struct Delete {} join_signal!(Main { AdminSet, + ProfileSet, Delete, });