-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(user): user one-off for users w/o wf #2036
base: 02-14-chore_move_loops_to_oss
Are you sure you want to change the base?
feat(user): user one-off for users w/o wf #2036
Conversation
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
How to use the Graphite Merge QueueAdd the label merge-queue to this PR to add it to the merge queue. You must have a Graphite account in order to use the merge queue. Sign up using this link. An organization admin has enabled the Graphite Merge Queue in this repository. Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue. This stack of pull requests is managed by Graphite. Learn more about stacking. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
This PR adds a one-off service to create workflows for existing users that lack them, with several key changes:
- Added new
user-workflow-create
package with batch processing to create missing user workflows - Introduced
is_already_in_db
flag to user workflow input to handle existing users differently - Fixed table name inconsistency in SQL queries (
db_users.users
vsdb_user.users
) - Improved workflow dispatch error handling and race condition prevention
- Added proper batch size configuration and offset handling for user processing
Here are the key issues that need attention:
- SQL query table name mismatch between COUNT and SELECT statements needs to be fixed
- Incorrect offset parameter usage in LIMIT/OFFSET query
- Potential race condition between workflow dispatch and subscription
- Batch processing optimization opportunities
- Error handling improvements needed for workflow dispatch
9 file(s) reviewed, 7 comment(s)
Edit PR Review Bot Settings | Greptile
Service::new( | ||
"user_workflow_create", | ||
ServiceKind::Oneshot, | ||
|config, pools| Box::pin(user_workflow_create::start(config, pools)) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Missing trailing comma after the service definition. While this works, it's inconsistent with the style used for other services in this file.
display_name: None, | ||
is_already_in_db: false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Consider adding a comment explaining why display_name is set to None here, since it may not be immediately obvious why we don't want to set a display name for faker-generated users
rivet-connection.workspace = true | ||
rivet-config.workspace = true | ||
rivet-pools.workspace = true | ||
tokio = { version = "1.40", features = ["full"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: tokio version should use workspace dependency since it's already defined in workspace.dependencies
rivet-config.workspace = true | ||
rivet-pools.workspace = true | ||
tokio = { version = "1.40", features = ["full"] } | ||
uuid = { version = "1", features = ["v4"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: uuid version should use workspace dependency for consistency with other packages
ctx.msg(CreateComplete {}) | ||
.tag("user_id", input.user_id) | ||
.send() | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: CreateComplete message is now sent before any potential errors in the workflow could occur. Consider moving this after successful workflow initialization.
[ctx, (i64,)] | ||
" | ||
SELECT COUNT(*) | ||
FROM db_users.users |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Table name 'db_users.users' here differs from 'db_user.users' in the second query. This needs to be consistent.
FROM db_users.users | |
FROM db_user.users |
let _ = ctx.workflow(user::workflows::user::Input { | ||
user_id, | ||
display_name: None, | ||
is_already_in_db: true | ||
}) | ||
.tag("user_id", user_id) | ||
.dispatch(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: The workflow dispatch result is discarded with let _. This could silently ignore dispatch failures. Should await and handle the Result.
let _ = ctx.workflow(user::workflows::user::Input { | |
user_id, | |
display_name: None, | |
is_already_in_db: true | |
}) | |
.tag("user_id", user_id) | |
.dispatch(); | |
ctx.workflow(user::workflows::user::Input { | |
user_id, | |
display_name: None, | |
is_already_in_db: true | |
}) | |
.tag("user_id", user_id) | |
.dispatch() | |
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs test:
- insert user a manually in to db
- make user b as a workflow
- run oneoff, test that the workflow was created & works for user a
- somehow validate that user b workflow was not created
Deploying rivet with
|
Latest commit: |
1f42248
|
Status: | ✅ Deploy successful! |
Preview URL: | https://c4d5e3a1.rivet.pages.dev |
Branch Preview URL: | https://02-14-feat-user-user-one-off.rivet.pages.dev |
) | ||
.await?; | ||
|
||
let (user_count,) = sql_fetch_one!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed
return Ok(()) | ||
} | ||
|
||
for offset in (0..user_count).step_by(USER_BATCH_SIZE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
loop until user ids is empty. keep the limit on the query, but remove the offset.
Changes