-
Notifications
You must be signed in to change notification settings - Fork 142
Update Core, configure worker types, send plugin names to Core #1157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Sushisource
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good to me, only thing is the default interval
Sushisource
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
…untime client identity requirement
c276572 to
e83a260
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Runtime Comparison Needs Proper Instance Resolution
The runtime comparison uses identity (is not) but doesn't properly handle when bridge_client.config.runtime is None. When the original client uses the default runtime (via None), self._runtime stores the actual default runtime object, but if a new client also has runtime=None in its config, the comparison self._runtime is not None incorrectly raises an error even though both clients use the same default runtime. The comparison should resolve both sides to actual runtime instances before comparing.
temporalio/worker/_worker.py#L648-L652
sdk-python/temporalio/worker/_worker.py
Lines 648 to 652 in bd5a404
| bridge_client = _extract_bridge_client_for_worker(value) | |
| if self._runtime is not bridge_client.config.runtime: | |
| raise ValueError( | |
| "New client is not on the same runtime as the existing client" | |
| ) |
f56d819 to
5862a1e
Compare
| "temporalio_sdk", | ||
| ] | ||
| parts = [self.other_level] | ||
| parts.extend(f"{target}={self.core_level}" for target in targets) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Outdated Telemetry Target Breaks Rust Log Filtering
The TelemetryFilter.formatted() method includes "temporalio_sdk" as a target, but this doesn't match any actual Rust crate name. The crates were renamed from temporal_sdk_core, temporal_client, etc. to temporalio_sdk_core, temporalio_client, and temporalio_common. The target "temporalio_sdk" will never match any log output from the Rust code, preventing those logs from being filtered at the configured level. This should likely be "temporalio_sdk_core" (which is already in the list) or removed entirely.
e9565ab to
6ef144a
Compare
| nonsticky_to_sticky_poll_ratio: f32, | ||
| activity_task_poller_behavior: PollerBehavior, | ||
| no_remote_activities: bool, | ||
| task_types: WorkerTaskTypes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure it deserves a whole new single-use Python type here vs just three booleans inlined, but not a big deal
tests/worker/test_workflow.py
Outdated
| # Terminate both | ||
| await handle1.terminate() | ||
| await handle2.terminate() | ||
| async def test_workflow_replace_worker_client(client: Client): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide some background why the previous test was removed and replaced with this? We want to test that replacing a client works, not that it raises when a runtime is different (if we want the latter, that's a different test).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously thought the error I was hitting was due to using different runtimes, but just took another look into things and found the real fix, enter_sync!(self.runtime);, which is now needed bc replacing the client sometimes creates a new workflow worker, which spawns a tokio task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing out
cretz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing blocking, but did notice some two things we should fix (maybe separately) about the plugin impl while reviewing:
- We should not make a public attribute,
plugins, on our worker IMO Client.config()andWorker.config()need to represent immutable copies of the exact kwargs passed in to constructor or at least be sure they can be passed in to another constructor
temporalio/worker/_worker.py
Outdated
| f"The same plugin type {type(client_plugin)} is present from both client and worker. It may run twice and may not be the intended behavior." | ||
| ) | ||
| plugins = plugins_from_client + list(plugins) | ||
| config["plugins"] = plugins |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think this may be a problem we need to address at some point. The purpose of config was to match kwargs so someone could easily re-initialize a worker by splatting them.
What was changed
WorkerTaskTypesto core, based on workflows, activities, and nexus handlers registered to the worker, as well asno_remote_activitiesconfiguration.Why?
Worker heartbeating
Checklist
Closes [Feature Request] Enable Worker Heartbeating #1196
How was this tested:
Note
Upgrades to latest Core and protos, adds RuntimeOptions (incl. worker heartbeat), sends worker task types and plugin names to Core, exposes new service RPCs (e.g., DescribeWorker) and cloud endpoints, and tightens client/worker runtime compatibility with accompanying tests.
RuntimeOptions(incl.worker_heartbeat_interval_millis) and switchRuntimeinit to use options.temporalio-*crates; update imports and Cargo deps.WorkerTaskTypesand pass enabled types (workflows/local/remote activities/nexus) to Core.DescribeWorkerandSetWorkerDeploymentManager; extend requests (e.g., allow_no_pollers, eager worker options).SetServiceAccountNamespaceAccessandValidateAccountAuditLogSink; introduceAuditLogSinkSpec,KinesisSpec,PubSubSpec.no_remote_activities, and client replacement runtime validation.Written by Cursor Bugbot for commit e0d0a25. This will update automatically on new commits. Configure here.