diff --git a/Cargo.lock b/Cargo.lock index f1696bfe5d..349b986359 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -876,7 +876,6 @@ dependencies = [ "api-matchmaker", "api-portal", "api-status", - "api-ui", "async-trait", "chirp-client", "http 0.2.12", @@ -12141,6 +12140,7 @@ dependencies = [ "tracing", "url", "user-delete-pending", + "user-workflow-create", "uuid", "workflow-gc", "workflow-metrics-publish", @@ -15929,6 +15929,21 @@ dependencies = [ "user", ] +[[package]] +name = "user-workflow-create" +version = "25.1.1" +dependencies = [ + "chirp-client", + "chirp-workflow", + "futures-util", + "rivet-config", + "rivet-connection", + "rivet-pools", + "tokio", + "user", + "uuid", +] + [[package]] name = "utf-8" version = "0.7.6" diff --git a/Cargo.toml b/Cargo.toml index 2919e3d5d8..db4301075e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["packages/api/actor","packages/api/auth","packages/api/cf-verification","packages/api/cloud","packages/api/games","packages/api/group","packages/api/identity","packages/api/job","packages/api/matchmaker","packages/api/monolith-edge","packages/api/monolith-public","packages/api/portal","packages/api/provision","packages/api/status","packages/api/traefik-provider","packages/api/ui","packages/common/api-helper/build","packages/common/api-helper/macros","packages/common/cache/build","packages/common/cache/result","packages/common/chirp-workflow/core","packages/common/chirp-workflow/macros","packages/common/chirp/client","packages/common/chirp/metrics","packages/common/chirp/perf","packages/common/chirp/types","packages/common/chirp/worker","packages/common/chirp/worker-attributes","packages/common/claims","packages/common/config","packages/common/connection","packages/common/convert","packages/common/deno-embed","packages/common/env","packages/common/formatted-error","packages/common/global-error","packages/common/health-checks","packages/common/hub-embed","packages/common/kv-str","packages/common/metrics","packages/common/migrate","packages/common/nomad-util","packages/common/operation/core","packages/common/operation/macros","packages/common/pools","packages/common/redis-util","packages/common/runtime","packages/common/s3-util","packages/common/schemac","packages/common/service-manager","packages/common/smithy-output/api-auth/rust","packages/common/smithy-output/api-auth/rust-server","packages/common/smithy-output/api-cf-verification/rust","packages/common/smithy-output/api-cf-verification/rust-server","packages/common/smithy-output/api-cloud/rust","packages/common/smithy-output/api-cloud/rust-server","packages/common/smithy-output/api-group/rust","packages/common/smithy-output/api-group/rust-server","packages/common/smithy-output/api-identity/rust","packages/common/smithy-output/api-identity/rust-server","packages/common/smithy-output/api-job/rust","packages/common/smithy-output/api-job/rust-server","packages/common/smithy-output/api-kv/rust","packages/common/smithy-output/api-kv/rust-server","packages/common/smithy-output/api-matchmaker/rust","packages/common/smithy-output/api-matchmaker/rust-server","packages/common/smithy-output/api-party/rust","packages/common/smithy-output/api-party/rust-server","packages/common/smithy-output/api-portal/rust","packages/common/smithy-output/api-portal/rust-server","packages/common/smithy-output/api-status/rust","packages/common/smithy-output/api-status/rust-server","packages/common/smithy-output/api-traefik-provider/rust","packages/common/smithy-output/api-traefik-provider/rust-server","packages/common/test","packages/common/test-images","packages/common/types-proto/build","packages/common/types-proto/core","packages/common/util/core","packages/common/util/macros","packages/common/util/search","packages/infra/client/actor-kv","packages/infra/client/config","packages/infra/client/container-runner","packages/infra/client/echo","packages/infra/client/isolate-v8-runner","packages/infra/client/logs","packages/infra/client/manager","packages/infra/legacy/job-runner","packages/infra/schema-generator","packages/infra/server","packages/services/build","packages/services/build/ops/create","packages/services/build/ops/get","packages/services/build/ops/list-for-env","packages/services/build/ops/list-for-game","packages/services/build/standalone/default-create","packages/services/build/util","packages/services/captcha/ops/hcaptcha-config-get","packages/services/captcha/ops/hcaptcha-verify","packages/services/captcha/ops/request","packages/services/captcha/ops/turnstile-config-get","packages/services/captcha/ops/turnstile-verify","packages/services/captcha/ops/verify","packages/services/captcha/util","packages/services/cdn/ops/namespace-auth-user-remove","packages/services/cdn/ops/namespace-auth-user-update","packages/services/cdn/ops/namespace-create","packages/services/cdn/ops/namespace-domain-create","packages/services/cdn/ops/namespace-domain-remove","packages/services/cdn/ops/namespace-get","packages/services/cdn/ops/namespace-resolve-domain","packages/services/cdn/ops/ns-auth-type-set","packages/services/cdn/ops/ns-enable-domain-public-auth-set","packages/services/cdn/ops/site-create","packages/services/cdn/ops/site-get","packages/services/cdn/ops/site-list-for-game","packages/services/cdn/ops/version-get","packages/services/cdn/ops/version-prepare","packages/services/cdn/ops/version-publish","packages/services/cdn/util","packages/services/cdn/worker","packages/services/cf-custom-hostname/ops/get","packages/services/cf-custom-hostname/ops/list-for-namespace-id","packages/services/cf-custom-hostname/ops/resolve-hostname","packages/services/cf-custom-hostname/worker","packages/services/cloud/ops/device-link-create","packages/services/cloud/ops/game-config-create","packages/services/cloud/ops/game-config-get","packages/services/cloud/ops/game-token-create","packages/services/cloud/ops/namespace-create","packages/services/cloud/ops/namespace-get","packages/services/cloud/ops/namespace-token-development-create","packages/services/cloud/ops/namespace-token-public-create","packages/services/cloud/ops/version-get","packages/services/cloud/ops/version-publish","packages/services/cloud/standalone/default-create","packages/services/cloud/worker","packages/services/cluster","packages/services/cluster/standalone/datacenter-tls-renew","packages/services/cluster/standalone/default-update","packages/services/cluster/standalone/gc","packages/services/cluster/standalone/metrics-publish","packages/services/custom-user-avatar/ops/list-for-game","packages/services/custom-user-avatar/ops/upload-complete","packages/services/debug/ops/email-res","packages/services/ds","packages/services/ds-log/ops/export","packages/services/ds-log/ops/read","packages/services/dynamic-config","packages/services/email-verification/ops/complete","packages/services/email-verification/ops/create","packages/services/email/ops/send","packages/services/external/ops/request-validate","packages/services/external/worker","packages/services/faker","packages/services/faker/ops/build","packages/services/faker/ops/cdn-site","packages/services/faker/ops/game","packages/services/faker/ops/game-namespace","packages/services/faker/ops/game-version","packages/services/faker/ops/job-run","packages/services/faker/ops/job-template","packages/services/faker/ops/mm-lobby","packages/services/faker/ops/mm-lobby-row","packages/services/faker/ops/mm-player","packages/services/faker/ops/region","packages/services/faker/ops/team","packages/services/game/ops/banner-upload-complete","packages/services/game/ops/create","packages/services/game/ops/get","packages/services/game/ops/list-all","packages/services/game/ops/list-for-team","packages/services/game/ops/logo-upload-complete","packages/services/game/ops/namespace-create","packages/services/game/ops/namespace-get","packages/services/game/ops/namespace-list","packages/services/game/ops/namespace-resolve-name-id","packages/services/game/ops/namespace-resolve-url","packages/services/game/ops/namespace-validate","packages/services/game/ops/namespace-version-history-list","packages/services/game/ops/namespace-version-set","packages/services/game/ops/recommend","packages/services/game/ops/resolve-name-id","packages/services/game/ops/resolve-namespace-id","packages/services/game/ops/token-development-validate","packages/services/game/ops/validate","packages/services/game/ops/version-create","packages/services/game/ops/version-get","packages/services/game/ops/version-list","packages/services/game/ops/version-validate","packages/services/ip/ops/info","packages/services/job-log/ops/read","packages/services/job-log/worker","packages/services/job-run","packages/services/job/standalone/gc","packages/services/job/util","packages/services/linode","packages/services/linode/standalone/gc","packages/services/load-test/standalone/api-cloud","packages/services/load-test/standalone/mm","packages/services/load-test/standalone/mm-sustain","packages/services/load-test/standalone/sqlx","packages/services/load-test/standalone/watch-requests","packages/services/loops","packages/services/mm-config/ops/game-get","packages/services/mm-config/ops/game-upsert","packages/services/mm-config/ops/lobby-group-get","packages/services/mm-config/ops/lobby-group-resolve-name-id","packages/services/mm-config/ops/lobby-group-resolve-version","packages/services/mm-config/ops/namespace-config-set","packages/services/mm-config/ops/namespace-config-validate","packages/services/mm-config/ops/namespace-create","packages/services/mm-config/ops/namespace-get","packages/services/mm-config/ops/version-get","packages/services/mm-config/ops/version-prepare","packages/services/mm-config/ops/version-publish","packages/services/mm/ops/dev-player-token-create","packages/services/mm/ops/lobby-find-fail","packages/services/mm/ops/lobby-find-lobby-query-list","packages/services/mm/ops/lobby-find-try-complete","packages/services/mm/ops/lobby-for-run-id","packages/services/mm/ops/lobby-get","packages/services/mm/ops/lobby-history","packages/services/mm/ops/lobby-idle-update","packages/services/mm/ops/lobby-list-for-namespace","packages/services/mm/ops/lobby-list-for-user-id","packages/services/mm/ops/lobby-player-count","packages/services/mm/ops/lobby-runtime-aggregate","packages/services/mm/ops/lobby-state-get","packages/services/mm/ops/player-count-for-namespace","packages/services/mm/ops/player-get","packages/services/mm/standalone/gc","packages/services/mm/util","packages/services/mm/worker","packages/services/monolith/standalone/worker","packages/services/monolith/standalone/workflow-worker","packages/services/nomad/standalone/monitor","packages/services/pegboard","packages/services/pegboard/standalone/dc-init","packages/services/pegboard/standalone/gc","packages/services/pegboard/standalone/metrics-publish","packages/services/pegboard/standalone/ws","packages/services/region/ops/get","packages/services/region/ops/list","packages/services/region/ops/list-for-game","packages/services/region/ops/recommend","packages/services/region/ops/resolve","packages/services/region/ops/resolve-for-game","packages/services/server-spec","packages/services/team-invite/ops/get","packages/services/team-invite/worker","packages/services/team/ops/avatar-upload-complete","packages/services/team/ops/get","packages/services/team/ops/join-request-list","packages/services/team/ops/member-count","packages/services/team/ops/member-get","packages/services/team/ops/member-list","packages/services/team/ops/member-relationship-get","packages/services/team/ops/profile-validate","packages/services/team/ops/recommend","packages/services/team/ops/resolve-display-name","packages/services/team/ops/user-ban-get","packages/services/team/ops/user-ban-list","packages/services/team/ops/validate","packages/services/team/util","packages/services/team/worker","packages/services/telemetry/standalone/beacon","packages/services/tier","packages/services/token/ops/create","packages/services/token/ops/exchange","packages/services/token/ops/get","packages/services/token/ops/revoke","packages/services/upload/ops/complete","packages/services/upload/ops/file-list","packages/services/upload/ops/get","packages/services/upload/ops/list-for-user","packages/services/upload/ops/prepare","packages/services/upload/worker","packages/services/user","packages/services/user/standalone/delete-pending","packages/services/workflow/standalone/gc","packages/services/workflow/standalone/metrics-publish","packages/toolchain/actors-sdk-embed","packages/toolchain/cli","packages/toolchain/js-utils-embed","packages/toolchain/toolchain","sdks/api/full/rust"] +members = ["packages/api/actor","packages/api/auth","packages/api/cf-verification","packages/api/cloud","packages/api/games","packages/api/group","packages/api/identity","packages/api/job","packages/api/matchmaker","packages/api/monolith-edge","packages/api/monolith-public","packages/api/portal","packages/api/provision","packages/api/status","packages/api/traefik-provider","packages/api/ui","packages/common/api-helper/build","packages/common/api-helper/macros","packages/common/cache/build","packages/common/cache/result","packages/common/chirp-workflow/core","packages/common/chirp-workflow/macros","packages/common/chirp/client","packages/common/chirp/metrics","packages/common/chirp/perf","packages/common/chirp/types","packages/common/chirp/worker","packages/common/chirp/worker-attributes","packages/common/claims","packages/common/config","packages/common/connection","packages/common/convert","packages/common/deno-embed","packages/common/env","packages/common/formatted-error","packages/common/global-error","packages/common/health-checks","packages/common/hub-embed","packages/common/kv-str","packages/common/metrics","packages/common/migrate","packages/common/nomad-util","packages/common/operation/core","packages/common/operation/macros","packages/common/pools","packages/common/redis-util","packages/common/runtime","packages/common/s3-util","packages/common/schemac","packages/common/service-manager","packages/common/smithy-output/api-auth/rust","packages/common/smithy-output/api-auth/rust-server","packages/common/smithy-output/api-cf-verification/rust","packages/common/smithy-output/api-cf-verification/rust-server","packages/common/smithy-output/api-cloud/rust","packages/common/smithy-output/api-cloud/rust-server","packages/common/smithy-output/api-group/rust","packages/common/smithy-output/api-group/rust-server","packages/common/smithy-output/api-identity/rust","packages/common/smithy-output/api-identity/rust-server","packages/common/smithy-output/api-job/rust","packages/common/smithy-output/api-job/rust-server","packages/common/smithy-output/api-kv/rust","packages/common/smithy-output/api-kv/rust-server","packages/common/smithy-output/api-matchmaker/rust","packages/common/smithy-output/api-matchmaker/rust-server","packages/common/smithy-output/api-party/rust","packages/common/smithy-output/api-party/rust-server","packages/common/smithy-output/api-portal/rust","packages/common/smithy-output/api-portal/rust-server","packages/common/smithy-output/api-status/rust","packages/common/smithy-output/api-status/rust-server","packages/common/smithy-output/api-traefik-provider/rust","packages/common/smithy-output/api-traefik-provider/rust-server","packages/common/test","packages/common/test-images","packages/common/types-proto/build","packages/common/types-proto/core","packages/common/util/core","packages/common/util/macros","packages/common/util/search","packages/infra/client/actor-kv","packages/infra/client/config","packages/infra/client/container-runner","packages/infra/client/echo","packages/infra/client/isolate-v8-runner","packages/infra/client/logs","packages/infra/client/manager","packages/infra/legacy/job-runner","packages/infra/schema-generator","packages/infra/server","packages/services/build","packages/services/build/ops/create","packages/services/build/ops/get","packages/services/build/ops/list-for-env","packages/services/build/ops/list-for-game","packages/services/build/standalone/default-create","packages/services/build/util","packages/services/captcha/ops/hcaptcha-config-get","packages/services/captcha/ops/hcaptcha-verify","packages/services/captcha/ops/request","packages/services/captcha/ops/turnstile-config-get","packages/services/captcha/ops/turnstile-verify","packages/services/captcha/ops/verify","packages/services/captcha/util","packages/services/cdn/ops/namespace-auth-user-remove","packages/services/cdn/ops/namespace-auth-user-update","packages/services/cdn/ops/namespace-create","packages/services/cdn/ops/namespace-domain-create","packages/services/cdn/ops/namespace-domain-remove","packages/services/cdn/ops/namespace-get","packages/services/cdn/ops/namespace-resolve-domain","packages/services/cdn/ops/ns-auth-type-set","packages/services/cdn/ops/ns-enable-domain-public-auth-set","packages/services/cdn/ops/site-create","packages/services/cdn/ops/site-get","packages/services/cdn/ops/site-list-for-game","packages/services/cdn/ops/version-get","packages/services/cdn/ops/version-prepare","packages/services/cdn/ops/version-publish","packages/services/cdn/util","packages/services/cdn/worker","packages/services/cf-custom-hostname/ops/get","packages/services/cf-custom-hostname/ops/list-for-namespace-id","packages/services/cf-custom-hostname/ops/resolve-hostname","packages/services/cf-custom-hostname/worker","packages/services/cloud/ops/device-link-create","packages/services/cloud/ops/game-config-create","packages/services/cloud/ops/game-config-get","packages/services/cloud/ops/game-token-create","packages/services/cloud/ops/namespace-create","packages/services/cloud/ops/namespace-get","packages/services/cloud/ops/namespace-token-development-create","packages/services/cloud/ops/namespace-token-public-create","packages/services/cloud/ops/version-get","packages/services/cloud/ops/version-publish","packages/services/cloud/standalone/default-create","packages/services/cloud/worker","packages/services/cluster","packages/services/cluster/standalone/datacenter-tls-renew","packages/services/cluster/standalone/default-update","packages/services/cluster/standalone/gc","packages/services/cluster/standalone/metrics-publish","packages/services/custom-user-avatar/ops/list-for-game","packages/services/custom-user-avatar/ops/upload-complete","packages/services/debug/ops/email-res","packages/services/ds","packages/services/ds-log/ops/export","packages/services/ds-log/ops/read","packages/services/dynamic-config","packages/services/email-verification/ops/complete","packages/services/email-verification/ops/create","packages/services/email/ops/send","packages/services/external/ops/request-validate","packages/services/external/worker","packages/services/faker","packages/services/faker/ops/build","packages/services/faker/ops/cdn-site","packages/services/faker/ops/game","packages/services/faker/ops/game-namespace","packages/services/faker/ops/game-version","packages/services/faker/ops/job-run","packages/services/faker/ops/job-template","packages/services/faker/ops/mm-lobby","packages/services/faker/ops/mm-lobby-row","packages/services/faker/ops/mm-player","packages/services/faker/ops/region","packages/services/faker/ops/team","packages/services/game/ops/banner-upload-complete","packages/services/game/ops/create","packages/services/game/ops/get","packages/services/game/ops/list-all","packages/services/game/ops/list-for-team","packages/services/game/ops/logo-upload-complete","packages/services/game/ops/namespace-create","packages/services/game/ops/namespace-get","packages/services/game/ops/namespace-list","packages/services/game/ops/namespace-resolve-name-id","packages/services/game/ops/namespace-resolve-url","packages/services/game/ops/namespace-validate","packages/services/game/ops/namespace-version-history-list","packages/services/game/ops/namespace-version-set","packages/services/game/ops/recommend","packages/services/game/ops/resolve-name-id","packages/services/game/ops/resolve-namespace-id","packages/services/game/ops/token-development-validate","packages/services/game/ops/validate","packages/services/game/ops/version-create","packages/services/game/ops/version-get","packages/services/game/ops/version-list","packages/services/game/ops/version-validate","packages/services/ip/ops/info","packages/services/job-log/ops/read","packages/services/job-log/worker","packages/services/job-run","packages/services/job/standalone/gc","packages/services/job/util","packages/services/linode","packages/services/linode/standalone/gc","packages/services/load-test/standalone/api-cloud","packages/services/load-test/standalone/mm","packages/services/load-test/standalone/mm-sustain","packages/services/load-test/standalone/sqlx","packages/services/load-test/standalone/watch-requests","packages/services/loops","packages/services/mm-config/ops/game-get","packages/services/mm-config/ops/game-upsert","packages/services/mm-config/ops/lobby-group-get","packages/services/mm-config/ops/lobby-group-resolve-name-id","packages/services/mm-config/ops/lobby-group-resolve-version","packages/services/mm-config/ops/namespace-config-set","packages/services/mm-config/ops/namespace-config-validate","packages/services/mm-config/ops/namespace-create","packages/services/mm-config/ops/namespace-get","packages/services/mm-config/ops/version-get","packages/services/mm-config/ops/version-prepare","packages/services/mm-config/ops/version-publish","packages/services/mm/ops/dev-player-token-create","packages/services/mm/ops/lobby-find-fail","packages/services/mm/ops/lobby-find-lobby-query-list","packages/services/mm/ops/lobby-find-try-complete","packages/services/mm/ops/lobby-for-run-id","packages/services/mm/ops/lobby-get","packages/services/mm/ops/lobby-history","packages/services/mm/ops/lobby-idle-update","packages/services/mm/ops/lobby-list-for-namespace","packages/services/mm/ops/lobby-list-for-user-id","packages/services/mm/ops/lobby-player-count","packages/services/mm/ops/lobby-runtime-aggregate","packages/services/mm/ops/lobby-state-get","packages/services/mm/ops/player-count-for-namespace","packages/services/mm/ops/player-get","packages/services/mm/standalone/gc","packages/services/mm/util","packages/services/mm/worker","packages/services/monolith/standalone/worker","packages/services/monolith/standalone/workflow-worker","packages/services/nomad/standalone/monitor","packages/services/pegboard","packages/services/pegboard/standalone/dc-init","packages/services/pegboard/standalone/gc","packages/services/pegboard/standalone/metrics-publish","packages/services/pegboard/standalone/ws","packages/services/region/ops/get","packages/services/region/ops/list","packages/services/region/ops/list-for-game","packages/services/region/ops/recommend","packages/services/region/ops/resolve","packages/services/region/ops/resolve-for-game","packages/services/server-spec","packages/services/team-invite/ops/get","packages/services/team-invite/worker","packages/services/team/ops/avatar-upload-complete","packages/services/team/ops/get","packages/services/team/ops/join-request-list","packages/services/team/ops/member-count","packages/services/team/ops/member-get","packages/services/team/ops/member-list","packages/services/team/ops/member-relationship-get","packages/services/team/ops/profile-validate","packages/services/team/ops/recommend","packages/services/team/ops/resolve-display-name","packages/services/team/ops/user-ban-get","packages/services/team/ops/user-ban-list","packages/services/team/ops/validate","packages/services/team/util","packages/services/team/worker","packages/services/telemetry/standalone/beacon","packages/services/tier","packages/services/token/ops/create","packages/services/token/ops/exchange","packages/services/token/ops/get","packages/services/token/ops/revoke","packages/services/upload/ops/complete","packages/services/upload/ops/file-list","packages/services/upload/ops/get","packages/services/upload/ops/list-for-user","packages/services/upload/ops/prepare","packages/services/upload/worker","packages/services/user","packages/services/user/standalone/delete-pending","packages/services/user/standalone/workflow-create","packages/services/workflow/standalone/gc","packages/services/workflow/standalone/metrics-publish","packages/toolchain/actors-sdk-embed","packages/toolchain/cli","packages/toolchain/js-utils-embed","packages/toolchain/toolchain","sdks/api/full/rust"] [workspace.package] version = "25.1.1" @@ -950,6 +950,9 @@ path = "packages/services/user" [workspace.dependencies.user-delete-pending] path = "packages/services/user/standalone/delete-pending" +[workspace.dependencies.user-workflow-create] +path = "packages/services/user/standalone/workflow-create" + [workspace.dependencies.workflow-gc] path = "packages/services/workflow/standalone/gc" diff --git a/packages/api/auth/src/route/tokens.rs b/packages/api/auth/src/route/tokens.rs index 0d3e69cf66..0e17fdb42c 100644 --- a/packages/api/auth/src/route/tokens.rs +++ b/packages/api/auth/src/route/tokens.rs @@ -190,6 +190,7 @@ async fn fallback_user( ::user::workflows::user::Input { user_id, display_name: None, + is_already_in_db: false } ) .await? diff --git a/packages/infra/server/Cargo.toml b/packages/infra/server/Cargo.toml index bf37bde742..d3387d2623 100644 --- a/packages/infra/server/Cargo.toml +++ b/packages/infra/server/Cargo.toml @@ -76,6 +76,7 @@ pegboard-dc-init.workspace = true rivet-cache.workspace = true rivet-config.workspace = true rivet-connection.workspace = true +user-workflow-create.workspace = true [dependencies.sqlx] workspace = true diff --git a/packages/infra/server/src/run_config.rs b/packages/infra/server/src/run_config.rs index 0707d75e11..f3a46e9fa7 100644 --- a/packages/infra/server/src/run_config.rs +++ b/packages/infra/server/src/run_config.rs @@ -104,6 +104,11 @@ pub fn config(rivet_config: rivet_config::Config) -> Result { ServiceKind::Singleton, |config, pools| Box::pin(pegboard_metrics_publish::start(config, pools)), ), + Service::new( + "user_workflow_create", + ServiceKind::Oneshot, + |config, pools| Box::pin(user_workflow_create::start(config, pools)) + ) ]; if server_config.is_tls_enabled() { diff --git a/packages/services/cloud/standalone/default-create/src/lib.rs b/packages/services/cloud/standalone/default-create/src/lib.rs index 2179c78065..20df800c81 100644 --- a/packages/services/cloud/standalone/default-create/src/lib.rs +++ b/packages/services/cloud/standalone/default-create/src/lib.rs @@ -42,6 +42,7 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> G ::user::workflows::user::Input { user_id, display_name: Some(dev_defaults::USER_NAME.into()), + is_already_in_db: false } ) .tag("user_id", user_id) diff --git a/packages/services/faker/src/ops/user.rs b/packages/services/faker/src/ops/user.rs index d0d5c85c40..85abf24711 100644 --- a/packages/services/faker/src/ops/user.rs +++ b/packages/services/faker/src/ops/user.rs @@ -22,6 +22,7 @@ pub async fn user(ctx: &OperationCtx, input: &Input) -> GlobalResult { user::workflows::user::Input { user_id, display_name: None, + is_already_in_db: false } ) .await? diff --git a/packages/services/user/src/workflows/user/mod.rs b/packages/services/user/src/workflows/user/mod.rs index d196bdb8a3..3dee78569a 100644 --- a/packages/services/user/src/workflows/user/mod.rs +++ b/packages/services/user/src/workflows/user/mod.rs @@ -21,25 +21,28 @@ const UPLOAD_BATCH_SIZE: usize = 256; pub struct Input { pub user_id: Uuid, pub display_name: Option, + pub is_already_in_db: bool } #[workflow] pub async fn user(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> { - let (display_name, _account_number) = ctx.activity(InsertDbInput { - user_id: input.user_id, - display_name: input.display_name.clone(), - }).await?; + if !input.is_already_in_db { + let (display_name, _account_number) = ctx.activity(InsertDbInput { + user_id: input.user_id, + display_name: input.display_name.clone(), + }).await?; + + ctx.activity(PublishCreationAnalyticsInput { + user_id: input.user_id, + display_name, + }).await?; + } ctx.msg(CreateComplete {}) .tag("user_id", input.user_id) .send() .await?; - ctx.activity(PublishCreationAnalyticsInput { - user_id: input.user_id, - display_name, - }).await?; - ctx.repeat(|ctx| { let user_id = input.user_id; diff --git a/packages/services/user/standalone/workflow-create/Cargo.toml b/packages/services/user/standalone/workflow-create/Cargo.toml new file mode 100644 index 0000000000..6ea3ab3bbb --- /dev/null +++ b/packages/services/user/standalone/workflow-create/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "user-workflow-create" +version.workspace = true +authors.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +chirp-client.workspace = true +chirp-workflow.workspace = true +futures-util = "0.3" +rivet-connection.workspace = true +rivet-config.workspace = true +rivet-pools.workspace = true +tokio = { version = "1.40", features = ["full"] } +uuid = { version = "1", features = ["v4"] } +user.workspace = true diff --git a/packages/services/user/standalone/workflow-create/src/lib.rs b/packages/services/user/standalone/workflow-create/src/lib.rs new file mode 100644 index 0000000000..732507ee5e --- /dev/null +++ b/packages/services/user/standalone/workflow-create/src/lib.rs @@ -0,0 +1,96 @@ +use chirp_workflow::prelude::*; + +const USER_BATCH_SIZE: usize = 128; + +#[tracing::instrument(skip_all)] +pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> GlobalResult<()> { + let client = + chirp_client::SharedClient::from_env(pools.clone())?.wrap_new("user-workflow-create"); + let cache = rivet_cache::CacheInner::from_env(pools.clone())?; + let ctx = StandaloneCtx::new( + chirp_workflow::compat::db_from_pools(&pools).await?, + config, + rivet_connection::Connection::new(client, pools, cache), + "user-workflow-create", + ) + .await?; + + let (user_count,) = sql_fetch_one!( + [ctx, (i64,)] + " + SELECT COUNT(*) + FROM db_users.users + WHERE EXISTS( + SELECT 1 + FROM db_user_identity.emails as e + WHERE e.user_id = users.user_id + ) + AND NOT EXISTS( + SELECT 1 + FROM db_workflow.workflows + WHERE + workflow_name = 'user' AND + (tags->>'user_id')::UUID = users.user_id + ) + ", + ) + .await?; + + if user_count == 0 { + return Ok(()) + } + + for offset in (0..user_count).step_by(USER_BATCH_SIZE) { + tracing::debug!(?offset, "creating users"); + + let user_ids = sql_fetch_all!( + [ctx, (Uuid,)] + " + SELECT user_id + FROM db_user.users + WHERE EXISTS( + SELECT 1 + FROM db_user_identity.emails as e + WHERE e.user_id = users.user_id + ) + AND NOT EXISTS( + SELECT 1 + FROM db_workflow.workflows + WHERE + workflow_name = 'user' AND + (tags->>'user_id')::UUID = users.user_id + ) + LIMIT $1 OFFSET $2 + ", + offset, + USER_BATCH_SIZE as i64 + ) + .await? + .into_iter() + .map(|(user_id,)| user_id) + .collect::>(); + + if user_ids.len() == 0 { + continue; + } + + for user_id in user_ids { + let mut sub = ctx.subscribe::< + user::workflows::user::CreateComplete + >(("user_id", user_id)).await?; + + let _ = ctx.workflow(user::workflows::user::Input { + user_id, + display_name: None, + is_already_in_db: true + }) + .tag("user_id", user_id) + .dispatch(); + + // Await creation completion + sub.next().await?; + } + } + + Ok(()) +}