Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(user): user one-off for users w/o wf #2036

Open
wants to merge 1 commit into
base: 02-14-chore_move_loops_to_oss
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions packages/api/auth/src/route/tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ async fn fallback_user(
::user::workflows::user::Input {
user_id,
display_name: None,
is_already_in_db: false
}
)
.await?
Expand Down
1 change: 1 addition & 0 deletions packages/infra/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pegboard-dc-init.workspace = true
rivet-cache.workspace = true
rivet-config.workspace = true
rivet-connection.workspace = true
user-workflow-create.workspace = true

[dependencies.sqlx]
workspace = true
Expand Down
5 changes: 5 additions & 0 deletions packages/infra/server/src/run_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ pub fn config(rivet_config: rivet_config::Config) -> Result<RunConfigData> {
ServiceKind::Singleton,
|config, pools| Box::pin(pegboard_metrics_publish::start(config, pools)),
),
Service::new(
"user_workflow_create",
ServiceKind::Oneshot,
|config, pools| Box::pin(user_workflow_create::start(config, pools))
)
Comment on lines +107 to +111
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Missing trailing comma after the service definition. While this works, it's inconsistent with the style used for other services in this file.

];

if server_config.is_tls_enabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> G
::user::workflows::user::Input {
user_id,
display_name: Some(dev_defaults::USER_NAME.into()),
is_already_in_db: false
}
)
.tag("user_id", user_id)
Expand Down
1 change: 1 addition & 0 deletions packages/services/faker/src/ops/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub async fn user(ctx: &OperationCtx, input: &Input) -> GlobalResult<Output> {
user::workflows::user::Input {
user_id,
display_name: None,
is_already_in_db: false
Comment on lines 24 to +25
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider adding a comment explaining why display_name is set to None here, since it may not be immediately obvious why we don't want to set a display name for faker-generated users

}
)
.await?
Expand Down
21 changes: 12 additions & 9 deletions packages/services/user/src/workflows/user/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,28 @@ const UPLOAD_BATCH_SIZE: usize = 256;
pub struct Input {
pub user_id: Uuid,
pub display_name: Option<String>,
pub is_already_in_db: bool
}

#[workflow]
pub async fn user(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
let (display_name, _account_number) = ctx.activity(InsertDbInput {
user_id: input.user_id,
display_name: input.display_name.clone(),
}).await?;
if !input.is_already_in_db {
let (display_name, _account_number) = ctx.activity(InsertDbInput {
user_id: input.user_id,
display_name: input.display_name.clone(),
}).await?;

ctx.activity(PublishCreationAnalyticsInput {
user_id: input.user_id,
display_name,
}).await?;
}

ctx.msg(CreateComplete {})
.tag("user_id", input.user_id)
.send()
.await?;
Comment on lines 41 to 44
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: CreateComplete message is now sent before any potential errors in the workflow could occur. Consider moving this after successful workflow initialization.


ctx.activity(PublishCreationAnalyticsInput {
user_id: input.user_id,
display_name,
}).await?;

ctx.repeat(|ctx| {
let user_id = input.user_id;

Expand Down
17 changes: 17 additions & 0 deletions packages/services/user/standalone/workflow-create/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "user-workflow-create"
version.workspace = true
authors.workspace = true
license.workspace = true
edition.workspace = true

[dependencies]
chirp-client.workspace = true
chirp-workflow.workspace = true
futures-util = "0.3"
rivet-connection.workspace = true
rivet-config.workspace = true
rivet-pools.workspace = true
tokio = { version = "1.40", features = ["full"] }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: tokio version should use workspace dependency since it's already defined in workspace.dependencies

uuid = { version = "1", features = ["v4"] }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: uuid version should use workspace dependency for consistency with other packages

user.workspace = true
96 changes: 96 additions & 0 deletions packages/services/user/standalone/workflow-create/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use chirp_workflow::prelude::*;

const USER_BATCH_SIZE: usize = 128;

#[tracing::instrument(skip_all)]
pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> GlobalResult<()> {
let client =
chirp_client::SharedClient::from_env(pools.clone())?.wrap_new("user-workflow-create");
let cache = rivet_cache::CacheInner::from_env(pools.clone())?;
let ctx = StandaloneCtx::new(
chirp_workflow::compat::db_from_pools(&pools).await?,
config,
rivet_connection::Connection::new(client, pools, cache),
"user-workflow-create",
)
.await?;

let (user_count,) = sql_fetch_one!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed

[ctx, (i64,)]
"
SELECT COUNT(*)
FROM db_users.users
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Table name 'db_users.users' here differs from 'db_user.users' in the second query. This needs to be consistent.

Suggested change
FROM db_users.users
FROM db_user.users

WHERE EXISTS(
SELECT 1
FROM db_user_identity.emails as e
WHERE e.user_id = users.user_id
)
AND NOT EXISTS(
SELECT 1
FROM db_workflow.workflows
WHERE
workflow_name = 'user' AND
(tags->>'user_id')::UUID = users.user_id
)
",
)
.await?;

if user_count == 0 {
return Ok(())
}

for offset in (0..user_count).step_by(USER_BATCH_SIZE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loop until user ids is empty. keep the limit on the query, but remove the offset.

tracing::debug!(?offset, "creating users");

let user_ids = sql_fetch_all!(
[ctx, (Uuid,)]
"
SELECT user_id
FROM db_user.users
WHERE EXISTS(
SELECT 1
FROM db_user_identity.emails as e
WHERE e.user_id = users.user_id
)
AND NOT EXISTS(
SELECT 1
FROM db_workflow.workflows
WHERE
workflow_name = 'user' AND
(tags->>'user_id')::UUID = users.user_id
)
LIMIT $1 OFFSET $2
",
offset,
USER_BATCH_SIZE as i64
)
.await?
.into_iter()
.map(|(user_id,)| user_id)
.collect::<Vec<_>>();

if user_ids.len() == 0 {
continue;
}

for user_id in user_ids {
let mut sub = ctx.subscribe::<
user::workflows::user::CreateComplete
>(("user_id", user_id)).await?;

let _ = ctx.workflow(user::workflows::user::Input {
user_id,
display_name: None,
is_already_in_db: true
})
.tag("user_id", user_id)
.dispatch();

Comment on lines +82 to +89
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The workflow dispatch result is discarded with let _. This could silently ignore dispatch failures. Should await and handle the Result.

Suggested change
let _ = ctx.workflow(user::workflows::user::Input {
user_id,
display_name: None,
is_already_in_db: true
})
.tag("user_id", user_id)
.dispatch();
ctx.workflow(user::workflows::user::Input {
user_id,
display_name: None,
is_already_in_db: true
})
.tag("user_id", user_id)
.dispatch()
.await?;

// Await creation completion
sub.next().await?;
}
}

Ok(())
}
Loading