Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(user/workflows): profile set signal #1902

Open
wants to merge 1 commit into
base: 01-14-chore_users_workflow_setup_basic_e2e_user_workflow
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions packages/api/identity/src/route/identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,25 @@ pub async fn update_profile(

ensure!(
body.account_number.unwrap_or_default() >= 0,
"invalid parameter account_number`"
"invalid parameter account_number"
);

msg!([ctx] user::msg::profile_set(user_ent.user_id) -> user::msg::update {
user_id: Some(user_ent.user_id.into()),
let mut sub = ctx
.subscribe::<::user::workflows::user::ProfileSetStatus>(("user_id", user_ent.user_id))
.await?;

ctx.signal(::user::workflows::user::ProfileSet {
display_name: body.display_name.clone(),
account_number: body.account_number.map(|n| n.api_try_into()).transpose()?,
bio: body.bio.clone(),
})
.tag("user_id", user_ent.user_id)
.send()
.await?;


let status_msg = sub.next().await?;
ensure!(status_msg.res.is_ok(), "bad profile set request");
Comment on lines +197 to +211
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: No timeout specified for subscription - could potentially hang indefinitely waiting for status message

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Generic error message 'bad profile set request' doesn't provide useful information about what failed


Ok(serde_json::json!({}))
}

Expand All @@ -214,7 +222,7 @@ pub async fn validate_profile(

ensure!(
body.account_number.unwrap_or_default() >= 0,
"invalid parameter account_number`"
"invalid parameter account_number"
);

let res = ctx.op(::user::ops::profile_validate::Input {
Expand Down
1 change: 1 addition & 0 deletions packages/services/pegboard/src/workflows/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,3 +648,4 @@ join_signal!(Main {
Undrain,
Destroy,
});

126 changes: 126 additions & 0 deletions packages/services/user/src/workflows/user/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,30 @@ pub async fn user(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
.send()
.await?;
},
Main::ProfileSet(sig) => {
let res = ctx.activity(ProfileSetInput {
user_id,
display_name: sig.display_name,
account_number: sig.account_number,
bio: sig.bio
}).await?;

ctx.msg(ProfileSetStatus { res: res.clone() })
.tag("user_id", user_id)
.send()
.await?;

if res.is_ok() {
ctx.activity(PublishProfileSetAnalyticsInput {
user_id
}).await?;

ctx.msg(Update {})
.tag("user_id", user_id)
.send()
.await?;
}
Comment on lines +71 to +80
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: consider moving Update message before analytics to ensure data consistency

},
Main::Delete(_) => {
return Ok(Loop::Break(()));
},
Expand Down Expand Up @@ -121,6 +145,92 @@ async fn admin_set(ctx: &ActivityCtx, input: &AdminSetInput) -> GlobalResult<()>
Ok(())
}

// ProfileSet
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
struct ProfileSetInput {
user_id: Uuid,
display_name: Option<String>,
account_number: Option<u32>,
bio: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
pub enum ProfileSetError {
ValidationFailure,
MissingParameters,
}

#[activity(ProfileSetActivity)]
async fn profile_set(ctx: &ActivityCtx, input: &ProfileSetInput) -> GlobalResult<Result<(), ProfileSetError>> {
// Check if each component exists
if input.display_name.is_none() && input.account_number.is_none() && input.bio.is_none() {
return Ok(Err(ProfileSetError::MissingParameters));
}

let validation_res = ctx.op(crate::ops::profile_validate::Input {
user_id: input.user_id,
display_name: input.display_name.clone(),
account_number: input.account_number,
bio: input.bio.clone()
})
.await?;

if !validation_res.errors.is_empty() {
tracing::warn!(errors = ?validation_res.errors, "validation errors");

return Ok(Err(ProfileSetError::ValidationFailure));
}

ctx.cache().purge("user", [input.user_id]).await?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: cache purge should happen after successful SQL update to prevent race conditions


sql_execute!(
[ctx]
"
UPDATE db_user.users
SET
display_name = COALESCE($2, display_name),
account_number = COALESCE($3, account_number),
bio = COALESCE($4, bio)
WHERE user_id = $1
",
input.user_id,
&input.display_name,
input.account_number.map(|x| x as i64),
input.bio.as_ref().map(|x| util::format::biography(x))
)
.await?;

Ok(Ok(()))
}

#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
struct PublishProfileSetAnalyticsInput {
user_id: Uuid
}

#[activity(PublishProfileSetAnalytics)]
async fn publish_profile_set_analytics(
ctx: &ActivityCtx,
input: &PublishProfileSetAnalyticsInput
) -> GlobalResult<()> {
msg!([ctx] analytics::msg::event_create() {
events: vec![
analytics::msg::event_create::Event {
event_id: Some(Uuid::new_v4().into()),
name: "user.profile_set".into(),
properties_json: Some(serde_json::to_string(&json!({
"user_id": input.user_id.to_string()
}))?),
..Default::default()
},
],
})
.await?;

Ok(())
}


// Creation
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
struct InsertDbInput {
Expand Down Expand Up @@ -366,14 +476,30 @@ pub struct Update {}
#[message("user_delete_complete")]
pub struct DeleteComplete {}

#[message("user_event")]
pub struct Event {}

#[message("user_profile_set_status")]
pub struct ProfileSetStatus {
pub res: Result<(), ProfileSetError>,
}

#[signal("user_admin_set")]
pub struct AdminSet {}

#[signal("user_profile_set")]
pub struct ProfileSet {
pub display_name: Option<String>,
pub account_number: Option<u32>,
pub bio: Option<String>,
}

#[signal("user_delete")]
pub struct Delete {}

join_signal!(Main {
AdminSet,
ProfileSet,
Delete,
});

Expand Down
Loading