Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 1d034d3

Browse files
committedJan 14, 2025·
chore(users/workflow): setup basic e2e user workflow
1 parent 73890c0 commit 1d034d3

File tree

6 files changed

+1262
-272
lines changed

6 files changed

+1262
-272
lines changed
 

‎Cargo.lock

+274-270
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎packages/services/user/Cargo.toml

+4-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ edition.workspace = true
88
[dependencies]
99
serde = { version = "1.0.198", features = ["derive"] }
1010
chirp-workflow.workspace = true
11+
rand = "0.8"
12+
lazy_static = "1.4"
1113

1214
cluster.workspace = true
1315
email-address-parser = "1.0.1"
@@ -19,14 +21,15 @@ token-create.workspace = true
1921
upload-file-list.workspace = true
2022
upload-get.workspace = true
2123
upload-complete.workspace = true
24+
upload-list-for-user.workspace = true
25+
team-get.workspace = true
2226

2327
[dependencies.sqlx]
2428
workspace = true
2529
default-features = false
2630

2731
[dev-dependencies]
2832
faker-user.workspace = true
29-
rand = "0.8"
3033
reqwest = "0.11"
3134
upload-get.workspace = true
3235
upload-prepare.workspace = true

‎packages/services/user/adjectives.txt

+559
Large diffs are not rendered by default.

‎packages/services/user/src/lib.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
pub mod ops;
2-
pub mod types;
2+
pub mod types;
3+
pub mod workflows;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod user;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,422 @@
1+
use chirp_workflow::prelude::*;
2+
use rivet_operation::prelude::proto::backend::pkg::*;
3+
4+
use lazy_static::lazy_static;
5+
use futures_util::{FutureExt, StreamExt, TryStreamExt};
6+
use rand::{seq::IteratorRandom, Rng};
7+
use serde_json::json;
8+
9+
lazy_static! {
10+
// Load adjectives from file
11+
static ref ADJECTIVES: Vec<&'static str> = include_str!("../../../adjectives.txt")
12+
.split('\n')
13+
.filter(|l| !l.is_empty())
14+
.map(|l| l.trim())
15+
.collect();
16+
}
17+
18+
const UPLOAD_BATCH_SIZE: usize = 256;
19+
20+
#[derive(Debug, Clone, Serialize, Deserialize)]
21+
pub struct Input {
22+
pub user_id: Uuid,
23+
pub display_name: Option<String>,
24+
}
25+
26+
#[workflow]
27+
pub async fn user(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
28+
let (display_name, _account_number) = ctx.activity(InsertDbInput {
29+
user_id: input.user_id,
30+
display_name: input.display_name.clone(),
31+
}).await?;
32+
33+
ctx.msg(CreateComplete {})
34+
.tag("user_id", input.user_id)
35+
.send()
36+
.await?;
37+
38+
ctx.activity(PublishUserCreateAnalyticsInput {
39+
user_id: input.user_id,
40+
display_name,
41+
}).await?;
42+
43+
ctx.repeat(|ctx| {
44+
let user_id = input.user_id;
45+
46+
async move {
47+
match ctx.listen::<Main>().await? {
48+
Main::AdminSet(_) => {
49+
ctx.activity(InsertDbInput {
50+
user_id,
51+
display_name: None,
52+
}).await?;
53+
54+
ctx.msg(Update {})
55+
.tag("user_id", user_id)
56+
.send()
57+
.await?;
58+
},
59+
Main::Delete(_) => {
60+
return Ok(Loop::Break(()));
61+
},
62+
}
63+
64+
Ok(Loop::Continue)
65+
}
66+
.boxed()
67+
}).await?;
68+
69+
ctx.activity(DeleteIdentitiesInput {
70+
user_id: input.user_id,
71+
}).await?;
72+
73+
ctx.activity(DeleteUploadsInput {
74+
user_id: input.user_id,
75+
}).await?;
76+
77+
ctx.activity(RemoveFromTeamsInput {
78+
user_id: input.user_id,
79+
}).await?;
80+
81+
ctx.activity(PublishUserDeletionInput {
82+
user_id: input.user_id,
83+
}).await?;
84+
85+
Ok(())
86+
}
87+
88+
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
89+
struct AdminSetInput {
90+
user_id: Uuid,
91+
}
92+
93+
#[activity(AdminSetActivity)]
94+
async fn admin_set(ctx: &ActivityCtx, input: &AdminSetInput) -> GlobalResult<()> {
95+
sql_execute!(
96+
[ctx]
97+
"
98+
UPDATE db_user.users
99+
SET
100+
is_admin = true
101+
WHERE user_id = $1
102+
",
103+
input.user_id,
104+
)
105+
.await?;
106+
107+
Ok(())
108+
}
109+
110+
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
111+
struct InsertDbInput {
112+
user_id: Uuid,
113+
display_name: Option<String>,
114+
}
115+
116+
#[activity(InsertDb)]
117+
#[max_retries = 5]
118+
async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult<(String, i64)> {
119+
let display_name = if let Some(display_name) = input.display_name.clone() {
120+
display_name
121+
} else {
122+
gen_display_name("Guest")
123+
};
124+
125+
let account_number = gen_account_number();
126+
tracing::debug!(%display_name, %account_number, "insert user attempt");
127+
128+
sql_execute!(
129+
[ctx]
130+
"
131+
INSERT INTO db_user.users (
132+
user_id,
133+
display_name,
134+
account_number,
135+
avatar_id,
136+
join_ts
137+
)
138+
VALUES ($1, $2, $3, $4, $5)
139+
",
140+
input.user_id,
141+
&display_name,
142+
gen_account_number(),
143+
gen_avatar_id(),
144+
ctx.ts(),
145+
)
146+
.await?;
147+
148+
Ok((display_name, account_number))
149+
}
150+
151+
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
152+
struct PublishUserCreateAnalyticsInput {
153+
user_id: Uuid,
154+
display_name: String,
155+
}
156+
157+
#[activity(PublishUserCreateAnalytics)]
158+
async fn publish_user_create_analytics(ctx: &ActivityCtx, input: &PublishUserCreateAnalyticsInput) -> GlobalResult<()> {
159+
let properties_json = Some(serde_json::to_string(&json!({
160+
"user_id": input.user_id,
161+
"display_name": input.display_name,
162+
}))?);
163+
164+
msg!([ctx] analytics::msg::event_create() {
165+
events: vec![
166+
analytics::msg::event_create::Event {
167+
event_id: Some(Uuid::new_v4().into()),
168+
name: "user.create".into(),
169+
properties_json: properties_json.clone(),
170+
..Default::default()
171+
},
172+
analytics::msg::event_create::Event {
173+
event_id: Some(Uuid::new_v4().into()),
174+
name: "user.profile_set".into(),
175+
properties_json,
176+
..Default::default()
177+
},
178+
],
179+
})
180+
.await?;
181+
182+
Ok(())
183+
}
184+
185+
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
186+
struct DeleteIdentitiesInput {
187+
user_id: Uuid
188+
}
189+
190+
#[activity(DeleteIdentities)]
191+
async fn delete_identities(ctx: &ActivityCtx, input: &DeleteIdentitiesInput) -> GlobalResult<()> {
192+
ctx.op(crate::ops::identity::delete::Input {
193+
user_ids: vec![input.user_id]
194+
}).await?;
195+
196+
Ok(())
197+
}
198+
199+
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
200+
struct DeleteUploadsInput {
201+
user_id: Uuid
202+
}
203+
204+
#[activity(DeleteUploads)]
205+
async fn delete_uploads(ctx: &ActivityCtx, input: &DeleteUploadsInput) -> GlobalResult<()> {
206+
tracing::info!(user_id = %input.user_id, "removing uploads");
207+
let mut last_create_ts = 0;
208+
loop {
209+
let uploads_res = op!([ctx] upload_list_for_user {
210+
user_ids: vec![input.user_id.into()],
211+
anchor: Some(last_create_ts),
212+
limit: UPLOAD_BATCH_SIZE as u32,
213+
})
214+
.await?;
215+
let user = unwrap!(uploads_res.users.first());
216+
217+
let request_id = Uuid::new_v4();
218+
msg!([ctx] upload::msg::delete(request_id) -> upload::msg::delete_complete {
219+
request_id: Some(request_id.into()),
220+
upload_ids: user.upload_ids.clone(),
221+
})
222+
.await?;
223+
224+
// Update last timestamp
225+
if let Some(anchor) = user.anchor {
226+
last_create_ts = anchor;
227+
}
228+
229+
if user.upload_ids.len() < UPLOAD_BATCH_SIZE {
230+
break;
231+
}
232+
}
233+
234+
Ok(())
235+
}
236+
237+
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
238+
struct RemoveFromTeamsInput {
239+
user_id: Uuid
240+
}
241+
242+
#[activity(RemoveFromTeams)]
243+
async fn remove_from_teams(ctx: &ActivityCtx, input: &RemoveFromTeamsInput) -> GlobalResult<()> {
244+
tracing::info!(user_id = %input.user_id, "removing teams");
245+
246+
let user_teams_res = ctx.op(crate::ops::team_list::Input {
247+
user_ids: vec![input.user_id],
248+
})
249+
.await?;
250+
let user_teams = unwrap!(user_teams_res.users.first());
251+
252+
let teams_res = op!([ctx] team_get {
253+
team_ids: user_teams.teams
254+
.iter()
255+
.map(|member| Ok(member.team_id.into()))
256+
.collect::<GlobalResult<Vec<_>>>()?
257+
})
258+
.await?;
259+
260+
// Filter out teams where the user is the owner
261+
let non_owner_teams = teams_res
262+
.teams
263+
.clone()
264+
.into_iter()
265+
.filter(|team| team.owner_user_id != Some(input.user_id.into()));
266+
futures_util::stream::iter(non_owner_teams)
267+
.map(|team| {
268+
let team_id_proto = team.team_id;
269+
270+
async move {
271+
let team_id = unwrap!(team_id_proto).as_uuid();
272+
273+
msg!([ctx] team::msg::member_remove(team_id, input.user_id) -> team::msg::member_remove_complete {
274+
user_id: Some(input.user_id.into()),
275+
team_id: team_id_proto,
276+
silent: false,
277+
})
278+
.await
279+
.map_err(Into::<GlobalError>::into)
280+
}
281+
})
282+
.buffer_unordered(32)
283+
.try_collect::<Vec<_>>()
284+
.await?;
285+
286+
Ok(())
287+
}
288+
289+
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
290+
struct RedactUserRecordInput {
291+
user_id: Uuid
292+
}
293+
294+
#[activity(RedactUserRecord)]
295+
async fn redact_user_record(ctx: &ActivityCtx, input: &RedactUserRecordInput) -> GlobalResult<()> {
296+
tracing::info!(user_id = %input.user_id, "removing user record");
297+
298+
sql_execute!(
299+
[ctx]
300+
"
301+
UPDATE db_user.users
302+
SET
303+
display_name = $2,
304+
profile_id = NULL,
305+
bio = '',
306+
delete_complete_ts = $3
307+
WHERE user_id = $1
308+
",
309+
input.user_id,
310+
gen_deleted_user_display_name(),
311+
util::timestamp::now(),
312+
)
313+
.await?;
314+
315+
ctx.cache().purge("user", [input.user_id]).await?;
316+
317+
Ok(())
318+
}
319+
320+
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
321+
struct PublishUserDeletionInput {
322+
user_id: Uuid
323+
}
324+
325+
#[activity(PublishUserDeletion)]
326+
async fn publish_user_deletion(ctx: &ActivityCtx, input: &PublishUserDeletionInput) -> GlobalResult<()> {
327+
msg!([ctx] user::msg::delete_complete(input.user_id) {
328+
user_id: Some(input.user_id.into()),
329+
})
330+
.await?;
331+
332+
msg!([ctx] user::msg::update(input.user_id) {
333+
user_id: Some(input.user_id.into()),
334+
})
335+
.await?;
336+
337+
msg!([ctx] analytics::msg::event_create() {
338+
events: vec![
339+
analytics::msg::event_create::Event {
340+
event_id: Some(Uuid::new_v4().into()),
341+
name: "user.delete".into(),
342+
properties_json: Some(serde_json::to_string(&json!({
343+
"deleted_user_id": input.user_id
344+
}))?),
345+
..Default::default()
346+
}
347+
],
348+
})
349+
.await?;
350+
351+
tracing::info!(user_id = %input.user_id, "complete");
352+
353+
Ok(())
354+
}
355+
356+
357+
358+
359+
// #[derive(Debug, Clone, Serialize, Deserialize, Hash)]
360+
// struct TEMP {
361+
// user_id: Uuid
362+
// }
363+
//
364+
// #[activity(TEMP)]
365+
// async fn TEMP(ctx: &ActivityCtx, input: &TEMP) -> GlobalResult<()> {}
366+
367+
368+
#[message("user_create_complete")]
369+
pub struct CreateComplete {}
370+
371+
#[message("user_update")]
372+
pub struct Update {}
373+
374+
#[signal("user_admin_set")]
375+
pub struct AdminSet {}
376+
377+
#[signal("user_delete")]
378+
pub struct Delete {}
379+
380+
join_signal!(Main {
381+
AdminSet,
382+
Delete,
383+
});
384+
385+
// Generates a display name with the format `{adjective:7}{space:1}{base:11}{space:1}{number:4}`
386+
fn gen_display_name(base: impl std::fmt::Display) -> String {
387+
let base_str = format!("{}", base);
388+
389+
let mut rand = rand::thread_rng();
390+
let adj = ADJECTIVES.iter().choose(&mut rand).unwrap_or(&"Unknown");
391+
392+
format!(
393+
"{} {} {}",
394+
adj,
395+
base_str,
396+
std::iter::repeat_with(|| rand.gen_range(0..10))
397+
.map(|d| d.to_string())
398+
.take(4)
399+
.collect::<String>()
400+
)
401+
}
402+
403+
// Generates a display name (for deleted users) with the format `Deleted User {alphanum:10}`
404+
fn gen_deleted_user_display_name() -> String {
405+
format!(
406+
"Deleted User {}",
407+
rand::thread_rng()
408+
.sample_iter(rand::distributions::Alphanumeric)
409+
.map(char::from)
410+
.take(10)
411+
.collect::<String>()
412+
)
413+
}
414+
415+
416+
fn gen_account_number() -> i64 {
417+
rand::thread_rng().gen_range(1..10000)
418+
}
419+
420+
fn gen_avatar_id() -> String {
421+
format!("avatar-{}", rand::thread_rng().gen_range(0..7))
422+
}

0 commit comments

Comments
 (0)
Please sign in to comment.