Skip to content

Commit 277d41a

Browse files
committed
feat(user/workflows): profile set signal
1 parent ca89652 commit 277d41a

File tree

3 files changed

+140
-5
lines changed

3 files changed

+140
-5
lines changed

packages/api/identity/src/route/identities.rs

+13-5
Original file line numberDiff line numberDiff line change
@@ -191,17 +191,25 @@ pub async fn update_profile(
191191

192192
ensure!(
193193
body.account_number.unwrap_or_default() >= 0,
194-
"invalid parameter account_number`"
194+
"invalid parameter account_number"
195195
);
196196

197-
msg!([ctx] user::msg::profile_set(user_ent.user_id) -> user::msg::update {
198-
user_id: Some(user_ent.user_id.into()),
197+
let mut sub = ctx
198+
.subscribe::<::user::workflows::user::ProfileSetStatus>(("user_id", user_ent.user_id))
199+
.await?;
200+
201+
ctx.signal(::user::workflows::user::ProfileSet {
199202
display_name: body.display_name.clone(),
200203
account_number: body.account_number.map(|n| n.api_try_into()).transpose()?,
201204
bio: body.bio.clone(),
202205
})
206+
.tag("user_id", user_ent.user_id)
207+
.send()
203208
.await?;
204-
209+
210+
let status_msg = sub.next().await?;
211+
ensure!(status_msg.res.is_ok(), "bad profile set request");
212+
205213
Ok(serde_json::json!({}))
206214
}
207215

@@ -214,7 +222,7 @@ pub async fn validate_profile(
214222

215223
ensure!(
216224
body.account_number.unwrap_or_default() >= 0,
217-
"invalid parameter account_number`"
225+
"invalid parameter account_number"
218226
);
219227

220228
let res = ctx.op(::user::ops::profile_validate::Input {

packages/services/pegboard/src/workflows/client.rs

+1
Original file line numberDiff line numberDiff line change
@@ -648,3 +648,4 @@ join_signal!(Main {
648648
Undrain,
649649
Destroy,
650650
});
651+

packages/services/user/src/workflows/user/mod.rs

+126
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,30 @@ pub async fn user(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
5555
.send()
5656
.await?;
5757
},
58+
Main::ProfileSet(sig) => {
59+
let res = ctx.activity(ProfileSetInput {
60+
user_id,
61+
display_name: sig.display_name,
62+
account_number: sig.account_number,
63+
bio: sig.bio
64+
}).await?;
65+
66+
ctx.msg(ProfileSetStatus { res: res.clone() })
67+
.tag("user_id", user_id)
68+
.send()
69+
.await?;
70+
71+
if res.is_ok() {
72+
ctx.activity(PublishProfileSetAnalyticsInput {
73+
user_id
74+
}).await?;
75+
76+
ctx.msg(Update {})
77+
.tag("user_id", user_id)
78+
.send()
79+
.await?;
80+
}
81+
},
5882
Main::Delete(_) => {
5983
return Ok(Loop::Break(()));
6084
},
@@ -121,6 +145,92 @@ async fn admin_set(ctx: &ActivityCtx, input: &AdminSetInput) -> GlobalResult<()>
121145
Ok(())
122146
}
123147

148+
// ProfileSet
149+
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
150+
struct ProfileSetInput {
151+
user_id: Uuid,
152+
display_name: Option<String>,
153+
account_number: Option<u32>,
154+
bio: Option<String>,
155+
}
156+
157+
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
158+
pub enum ProfileSetError {
159+
ValidationFailure,
160+
MissingParameters,
161+
}
162+
163+
#[activity(ProfileSetActivity)]
164+
async fn profile_set(ctx: &ActivityCtx, input: &ProfileSetInput) -> GlobalResult<Result<(), ProfileSetError>> {
165+
// Check if each component exists
166+
if input.display_name.is_none() && input.account_number.is_none() && input.bio.is_none() {
167+
return Ok(Err(ProfileSetError::MissingParameters));
168+
}
169+
170+
let validation_res = ctx.op(crate::ops::profile_validate::Input {
171+
user_id: input.user_id,
172+
display_name: input.display_name.clone(),
173+
account_number: input.account_number,
174+
bio: input.bio.clone()
175+
})
176+
.await?;
177+
178+
if !validation_res.errors.is_empty() {
179+
tracing::warn!(errors = ?validation_res.errors, "validation errors");
180+
181+
return Ok(Err(ProfileSetError::ValidationFailure));
182+
}
183+
184+
ctx.cache().purge("user", [input.user_id]).await?;
185+
186+
sql_execute!(
187+
[ctx]
188+
"
189+
UPDATE db_user.users
190+
SET
191+
display_name = COALESCE($2, display_name),
192+
account_number = COALESCE($3, account_number),
193+
bio = COALESCE($4, bio)
194+
WHERE user_id = $1
195+
",
196+
input.user_id,
197+
&input.display_name,
198+
input.account_number.map(|x| x as i64),
199+
input.bio.as_ref().map(|x| util::format::biography(x))
200+
)
201+
.await?;
202+
203+
Ok(Ok(()))
204+
}
205+
206+
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
207+
struct PublishProfileSetAnalyticsInput {
208+
user_id: Uuid
209+
}
210+
211+
#[activity(PublishProfileSetAnalytics)]
212+
async fn publish_profile_set_analytics(
213+
ctx: &ActivityCtx,
214+
input: &PublishProfileSetAnalyticsInput
215+
) -> GlobalResult<()> {
216+
msg!([ctx] analytics::msg::event_create() {
217+
events: vec![
218+
analytics::msg::event_create::Event {
219+
event_id: Some(Uuid::new_v4().into()),
220+
name: "user.profile_set".into(),
221+
properties_json: Some(serde_json::to_string(&json!({
222+
"user_id": input.user_id.to_string()
223+
}))?),
224+
..Default::default()
225+
},
226+
],
227+
})
228+
.await?;
229+
230+
Ok(())
231+
}
232+
233+
124234
// Creation
125235
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
126236
struct InsertDbInput {
@@ -366,14 +476,30 @@ pub struct Update {}
366476
#[message("user_delete_complete")]
367477
pub struct DeleteComplete {}
368478

479+
#[message("user_event")]
480+
pub struct Event {}
481+
482+
#[message("user_profile_set_status")]
483+
pub struct ProfileSetStatus {
484+
pub res: Result<(), ProfileSetError>,
485+
}
486+
369487
#[signal("user_admin_set")]
370488
pub struct AdminSet {}
371489

490+
#[signal("user_profile_set")]
491+
pub struct ProfileSet {
492+
pub display_name: Option<String>,
493+
pub account_number: Option<u32>,
494+
pub bio: Option<String>,
495+
}
496+
372497
#[signal("user_delete")]
373498
pub struct Delete {}
374499

375500
join_signal!(Main {
376501
AdminSet,
502+
ProfileSet,
377503
Delete,
378504
});
379505

0 commit comments

Comments
 (0)