diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e24187c5f..60c350d95 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -62,8 +62,8 @@ jobs: - run: uv sync --all-extras - run: poe bridge-lint if: ${{ matrix.clippyLinter }} - - run: poe lint - run: poe build-develop + - run: poe lint - run: mkdir junit-xml - run: poe test ${{matrix.pytestExtraArgs}} -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml timeout-minutes: 15 @@ -153,8 +153,8 @@ jobs: - run: uv tool install poethepoet - run: uv lock --upgrade - run: uv sync --all-extras - - run: poe lint - run: poe build-develop + - run: poe lint - run: mkdir junit-xml - run: poe test -s --junit-xml=junit-xml/latest-deps.xml timeout-minutes: 10 diff --git a/temporalio/bridge/Cargo.lock b/temporalio/bridge/Cargo.lock index b40d5fc72..ee5e5e668 100644 --- a/temporalio/bridge/Cargo.lock +++ b/temporalio/bridge/Cargo.lock @@ -412,6 +412,12 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "either" version = "1.15.0" @@ -1328,9 +1334,9 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "opentelemetry" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" dependencies = [ "futures-core", "futures-sink", @@ -1342,9 +1348,9 @@ dependencies = [ [[package]] name = "opentelemetry-http" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" dependencies = [ "async-trait", "bytes", @@ -1355,9 +1361,9 @@ dependencies = [ [[package]] name = "opentelemetry-otlp" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" +checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" dependencies = [ "http", "opentelemetry", @@ -1374,21 +1380,22 @@ dependencies = [ [[package]] name = "opentelemetry-proto" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" dependencies = [ "opentelemetry", "opentelemetry_sdk", "prost", "tonic", + "tonic-prost", ] [[package]] name = "opentelemetry_sdk" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" dependencies = [ "futures-channel", "futures-executor", @@ -1396,7 +1403,6 @@ dependencies = [ "opentelemetry", "percent-encoding", "rand 0.9.2", - "serde_json", "thiserror 2.0.15", "tokio", "tokio-stream", @@ -1589,9 +1595,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" dependencies = [ "bytes", "prost-derive", @@ -1599,9 +1605,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ "heck", "itertools", @@ -1612,6 +1618,8 @@ dependencies = [ "prettyplease", "prost", "prost-types", + "pulldown-cmark", + "pulldown-cmark-to-cmark", "regex", "syn", "tempfile", @@ -1619,9 +1627,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" dependencies = [ "anyhow", "itertools", @@ -1632,18 +1640,18 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72" dependencies = [ "prost", ] [[package]] name = "prost-wkt" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497e1e938f0c09ef9cabe1d49437b4016e03e8f82fbbe5d1c62a9b61b9decae1" +checksum = "655944d0ce015e71b3ec21279437e6a09e58433e50c7b0677901f3d5235e74f5" dependencies = [ "chrono", "inventory", @@ -1656,9 +1664,9 @@ dependencies = [ [[package]] name = "prost-wkt-build" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07b8bf115b70a7aa5af1fd5d6e9418492e9ccb6e4785e858c938e28d132a884b" +checksum = "f869f1443fee474b785e935d92e1007f57443e485f51668ed41943fc01a321a2" dependencies = [ "heck", "prost", @@ -1669,9 +1677,9 @@ dependencies = [ [[package]] name = "prost-wkt-types" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8cdde6df0a98311c839392ca2f2f0bcecd545f86a62b4e3c6a49c336e970fe5" +checksum = "eeeffd6b9becd4600dd461399f3f71aeda2ff0848802a9ed526cf12e8f42902a" dependencies = [ "chrono", "prost", @@ -1705,6 +1713,26 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "pulldown-cmark" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e8bbe1a966bd2f362681a44f6edce3c2310ac21e4d5067a6e7ec396297a6ea0" +dependencies = [ + "bitflags", + "memchr", + "unicase", +] + +[[package]] +name = "pulldown-cmark-to-cmark" +version = "21.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5b6a0769a491a08b31ea5c62494a8f144ee0987d86d670a8af4df1e1b7cde75" +dependencies = [ + "pulldown-cmark", +] + [[package]] name = "pyo3" version = "0.25.1" @@ -2073,29 +2101,6 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" -[[package]] -name = "rustfsm" -version = "0.1.0" -dependencies = [ - "rustfsm_procmacro", - "rustfsm_trait", -] - -[[package]] -name = "rustfsm_procmacro" -version = "0.1.0" -dependencies = [ - "derive_more", - "proc-macro2", - "quote", - "rustfsm_trait", - "syn", -] - -[[package]] -name = "rustfsm_trait" -version = "0.1.0" - [[package]] name = "rustix" version = "1.0.8" @@ -2440,7 +2445,28 @@ dependencies = [ ] [[package]] -name = "temporal-client" +name = "temporal-sdk-bridge" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "prost", + "pyo3", + "pyo3-async-runtimes", + "pythonize", + "temporalio-client", + "temporalio-common", + "temporalio-sdk-core", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "url", +] + +[[package]] +name = "temporalio-client" version = "0.1.0" dependencies = [ "anyhow", @@ -2450,6 +2476,7 @@ dependencies = [ "bytes", "derive_builder", "derive_more", + "dyn-clone", "futures-retry", "futures-util", "http", @@ -2457,9 +2484,9 @@ dependencies = [ "hyper", "hyper-util", "parking_lot", + "rand 0.9.2", "slotmap", - "temporal-sdk-core-api", - "temporal-sdk-core-protos", + "temporalio-common", "thiserror 2.0.15", "tokio", "tonic", @@ -2470,29 +2497,45 @@ dependencies = [ ] [[package]] -name = "temporal-sdk-bridge" +name = "temporalio-common" version = "0.1.0" dependencies = [ "anyhow", "async-trait", - "futures", + "base64", + "derive_builder", + "derive_more", + "dirs", + "opentelemetry", "prost", - "pyo3", - "pyo3-async-runtimes", - "pythonize", - "temporal-client", - "temporal-sdk-core", - "temporal-sdk-core-api", - "temporal-sdk-core-protos", - "tokio", - "tokio-stream", + "prost-wkt", + "prost-wkt-types", + "rand 0.9.2", + "serde", + "serde_json", + "thiserror 2.0.15", + "toml", "tonic", + "tonic-prost", + "tonic-prost-build", "tracing", + "tracing-core", "url", + "uuid", +] + +[[package]] +name = "temporalio-macros" +version = "0.1.0" +dependencies = [ + "derive_more", + "proc-macro2", + "quote", + "syn", ] [[package]] -name = "temporal-sdk-core" +name = "temporalio-sdk-core" version = "0.1.0" dependencies = [ "anyhow", @@ -2528,16 +2571,15 @@ dependencies = [ "rand 0.9.2", "reqwest", "ringbuf", - "rustfsm", "serde", "serde_json", "siphasher", "slotmap", "sysinfo", "tar", - "temporal-client", - "temporal-sdk-core-api", - "temporal-sdk-core-protos", + "temporalio-client", + "temporalio-common", + "temporalio-macros", "thiserror 2.0.15", "tokio", "tokio-stream", @@ -2550,48 +2592,6 @@ dependencies = [ "zip", ] -[[package]] -name = "temporal-sdk-core-api" -version = "0.1.0" -dependencies = [ - "async-trait", - "derive_builder", - "derive_more", - "dirs", - "opentelemetry", - "prost", - "serde", - "serde_json", - "temporal-sdk-core-protos", - "thiserror 2.0.15", - "toml", - "tonic", - "tracing", - "tracing-core", - "url", -] - -[[package]] -name = "temporal-sdk-core-protos" -version = "0.1.0" -dependencies = [ - "anyhow", - "base64", - "derive_more", - "prost", - "prost-build", - "prost-wkt", - "prost-wkt-build", - "prost-wkt-types", - "rand 0.9.2", - "serde", - "serde_json", - "thiserror 2.0.15", - "tonic", - "tonic-build", - "uuid", -] - [[package]] name = "termtree" version = "0.5.1" @@ -2778,9 +2778,9 @@ checksum = "fcc842091f2def52017664b53082ecbbeb5c7731092bad69d2c63050401dfd64" [[package]] name = "tonic" -version = "0.13.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" +checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" dependencies = [ "async-trait", "axum", @@ -2795,9 +2795,9 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", "rustls-native-certs", - "socket2 0.5.10", + "socket2 0.6.0", + "sync_wrapper", "tokio", "tokio-rustls", "tokio-stream", @@ -2809,9 +2809,32 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.13.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac6f67be712d12f0b41328db3137e0d0757645d8904b4cb7d51cd9c2279e847" +checksum = "4c40aaccc9f9eccf2cd82ebc111adc13030d23e887244bc9cfa5d1d636049de3" +dependencies = [ + "prettyplease", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tonic-prost" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" +dependencies = [ + "bytes", + "prost", + "tonic", +] + +[[package]] +name = "tonic-prost-build" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4a16cba4043dc3ff43fcb3f96b4c5c154c64cbd18ca8dce2ab2c6a451d058a2" dependencies = [ "prettyplease", "proc-macro2", @@ -2819,6 +2842,8 @@ dependencies = [ "prost-types", "quote", "syn", + "tempfile", + "tonic-build", ] [[package]] @@ -2955,6 +2980,12 @@ dependencies = [ "syn", ] +[[package]] +name = "unicase" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" + [[package]] name = "unicode-ident" version = "1.0.18" @@ -2998,9 +3029,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] name = "uuid" -version = "1.18.0" +version = "1.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" +checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" dependencies = [ "getrandom 0.3.3", "js-sys", diff --git a/temporalio/bridge/Cargo.toml b/temporalio/bridge/Cargo.toml index ad839c19b..b2d186b5e 100644 --- a/temporalio/bridge/Cargo.toml +++ b/temporalio/bridge/Cargo.toml @@ -19,7 +19,7 @@ crate-type = ["cdylib"] anyhow = "1.0" async-trait = "0.1" futures = "0.3" -prost = "0.13" +prost = "0.14" pyo3 = { version = "0.25", features = [ "extension-module", "abi3-py310", @@ -28,17 +28,16 @@ pyo3 = { version = "0.25", features = [ ] } pyo3-async-runtimes = { version = "0.25", features = ["tokio-runtime"] } pythonize = "0.25" -temporal-client = { version = "0.1.0", path = "./sdk-core/client" } -temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = [ - "ephemeral-server", -] } -temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api", features = [ +temporalio-client = { version = "0.1.0", path = "./sdk-core/crates/client" } +temporalio-common = { version = "0.1.0", path = "./sdk-core/crates/common", features = [ "envconfig", +]} +temporalio-sdk-core = { version = "0.1.0", path = "./sdk-core/crates/sdk-core", features = [ + "ephemeral-server", ] } -temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" } tokio = "1.26" tokio-stream = "0.1" -tonic = "0.13" +tonic = "0.14" tracing = "0.1" url = "2.2" diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index afc79f0f5..3583c1d46 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -23,9 +23,9 @@ def _raise_in_thread(thread_id: int, exc_type: Type[BaseException]) -> bool: thread_id, exc_type ) - def __init__(self, *, telemetry: TelemetryConfig) -> None: + def __init__(self, *, options: RuntimeOptions) -> None: """Create SDK Core runtime.""" - self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(telemetry) + self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(options) def retrieve_buffered_metrics(self, durations_as_seconds: bool) -> Sequence[Any]: """Get buffered metrics.""" @@ -91,6 +91,14 @@ class TelemetryConfig: metrics: Optional[MetricsConfig] +@dataclass(frozen=True) +class RuntimeOptions: + """Python representation of the Rust struct for runtime options.""" + + telemetry: TelemetryConfig + worker_heartbeat_interval_millis: Optional[int] = 60_000 # 60s + + # WARNING: This must match Rust runtime::BufferedLogEntry class BufferedLogEntry(Protocol): """A buffered log entry.""" diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 682d441dd..0c5ee51a1 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 682d441dd3b830e1477af3edb7c2330b403c4c33 +Subproject commit 0c5ee51a1fc66507204eb0c5ae35b3b3b25216bb diff --git a/temporalio/bridge/src/client.rs b/temporalio/bridge/src/client.rs index abe4a2354..c955bb9e4 100644 --- a/temporalio/bridge/src/client.rs +++ b/temporalio/bridge/src/client.rs @@ -3,13 +3,14 @@ use pyo3::prelude::*; use std::collections::HashMap; use std::str::FromStr; use std::time::Duration; -use temporal_client::{ +use temporalio_client::{ ClientKeepAliveConfig as CoreClientKeepAliveConfig, ClientOptions, ClientOptionsBuilder, ConfiguredClient, HttpConnectProxyOptions, RetryClient, RetryConfig, - TemporalServiceClientWithMetrics, TlsConfig, + TlsConfig, TemporalServiceClient, }; -use tonic::metadata::{ - AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue, +use temporalio_client::tonic::{ + self, + metadata::{AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue}, }; use url::Url; @@ -17,7 +18,7 @@ use crate::runtime; pyo3::create_exception!(temporal_sdk_bridge, RPCError, PyException); -type Client = RetryClient>; +type Client = RetryClient>; #[pyclass] pub struct ClientRef { @@ -258,7 +259,7 @@ impl TryFrom for ClientOptions { } } -impl TryFrom for temporal_client::TlsConfig { +impl TryFrom for temporalio_client::TlsConfig { type Error = PyErr; fn try_from(conf: ClientTlsConfig) -> PyResult { @@ -268,7 +269,7 @@ impl TryFrom for temporal_client::TlsConfig { client_tls_config: match (conf.client_cert, conf.client_private_key) { (None, None) => None, (Some(client_cert), Some(client_private_key)) => { - Some(temporal_client::ClientTlsConfig { + Some(temporalio_client::ClientTlsConfig { client_cert, client_private_key, }) diff --git a/temporalio/bridge/src/client_rpc_generated.rs b/temporalio/bridge/src/client_rpc_generated.rs index 0b2d2ffa8..0a07dc082 100644 --- a/temporalio/bridge/src/client_rpc_generated.rs +++ b/temporalio/bridge/src/client_rpc_generated.rs @@ -16,7 +16,7 @@ impl ClientRef { call: RpcCall, ) -> PyResult> { self.runtime.assert_same_process("use client")?; - use temporal_client::WorkflowService; + use temporalio_client::WorkflowService; let mut retry_client = self.retry_client.clone(); self.runtime.future_into_py(py, async move { let bytes = match call.rpc.as_str() { @@ -568,7 +568,7 @@ impl ClientRef { call: RpcCall, ) -> PyResult> { self.runtime.assert_same_process("use client")?; - use temporal_client::OperatorService; + use temporalio_client::OperatorService; let mut retry_client = self.retry_client.clone(); self.runtime.future_into_py(py, async move { let bytes = match call.rpc.as_str() { @@ -631,7 +631,7 @@ impl ClientRef { fn call_cloud_service<'p>(&self, py: Python<'p>, call: RpcCall) -> PyResult> { self.runtime.assert_same_process("use client")?; - use temporal_client::CloudService; + use temporalio_client::CloudService; let mut retry_client = self.retry_client.clone(); self.runtime.future_into_py(py, async move { let bytes = match call.rpc.as_str() { @@ -846,7 +846,7 @@ impl ClientRef { fn call_test_service<'p>(&self, py: Python<'p>, call: RpcCall) -> PyResult> { self.runtime.assert_same_process("use client")?; - use temporal_client::TestService; + use temporalio_client::TestService; let mut retry_client = self.retry_client.clone(); self.runtime.future_into_py(py, async move { let bytes = match call.rpc.as_str() { @@ -886,7 +886,7 @@ impl ClientRef { fn call_health_service<'p>(&self, py: Python<'p>, call: RpcCall) -> PyResult> { self.runtime.assert_same_process("use client")?; - use temporal_client::HealthService; + use temporalio_client::HealthService; let mut retry_client = self.retry_client.clone(); self.runtime.future_into_py(py, async move { let bytes = match call.rpc.as_str() { diff --git a/temporalio/bridge/src/envconfig.rs b/temporalio/bridge/src/envconfig.rs index 9277a4588..fb0da290c 100644 --- a/temporalio/bridge/src/envconfig.rs +++ b/temporalio/bridge/src/envconfig.rs @@ -4,7 +4,7 @@ use pyo3::{ types::{PyBytes, PyDict}, }; use std::collections::HashMap; -use temporal_sdk_core_api::envconfig::{ +use temporalio_common::envconfig::{ load_client_config as core_load_client_config, load_client_config_profile as core_load_client_config_profile, ClientConfig as CoreClientConfig, ClientConfigCodec, diff --git a/temporalio/bridge/src/lib.rs b/temporalio/bridge/src/lib.rs index cbd5be10e..ee157fb18 100644 --- a/temporalio/bridge/src/lib.rs +++ b/temporalio/bridge/src/lib.rs @@ -82,8 +82,8 @@ fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option PyResult { - runtime::init_runtime(telemetry_config) +fn init_runtime(options: runtime::RuntimeOptions) -> PyResult { + runtime::init_runtime(options) } #[pyfunction] diff --git a/temporalio/bridge/src/metric.rs b/temporalio/bridge/src/metric.rs index bc516a08a..0eb5afd60 100644 --- a/temporalio/bridge/src/metric.rs +++ b/temporalio/bridge/src/metric.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, sync::Arc}; use pyo3::prelude::*; use pyo3::{exceptions::PyTypeError, types::PyDict}; -use temporal_sdk_core_api::telemetry::metrics::{ +use temporalio_common::telemetry::metrics::{ self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent, NewAttributes, }; diff --git a/temporalio/bridge/src/runtime.rs b/temporalio/bridge/src/runtime.rs index a75aeb3e3..3c5c7a4ea 100644 --- a/temporalio/bridge/src/runtime.rs +++ b/temporalio/bridge/src/runtime.rs @@ -9,13 +9,13 @@ use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, UNIX_EPOCH}; -use temporal_sdk_core::telemetry::{ +use temporalio_sdk_core::telemetry::{ build_otlp_metric_exporter, start_prometheus_metric_exporter, CoreLogStreamConsumer, MetricsCallBuffer, }; -use temporal_sdk_core::{CoreRuntime, TokioRuntimeBuilder}; -use temporal_sdk_core_api::telemetry::metrics::{CoreMeter, MetricCallBufferer}; -use temporal_sdk_core_api::telemetry::{ +use temporalio_sdk_core::{CoreRuntime, RuntimeOptionsBuilder, TokioRuntimeBuilder}; +use temporalio_common::telemetry::metrics::{CoreMeter, MetricCallBufferer}; +use temporalio_common::telemetry::{ CoreLog, Logger, MetricTemporality, OtelCollectorOptionsBuilder, OtlpProtocol, PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder, }; @@ -87,16 +87,27 @@ pub struct PrometheusConfig { histogram_bucket_overrides: Option>>, } +#[derive(FromPyObject)] +pub struct RuntimeOptions { + telemetry: TelemetryConfig, + worker_heartbeat_interval_millis: Option, +} + const FORWARD_LOG_BUFFER_SIZE: usize = 2048; const FORWARD_LOG_MAX_FREQ_MS: u64 = 10; -pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult { +pub fn init_runtime(options: RuntimeOptions) -> PyResult { + let RuntimeOptions { + telemetry: TelemetryConfig { logging, metrics }, + worker_heartbeat_interval_millis, + } = options; + // Have to build/start telemetry config pieces let mut telemetry_build = TelemetryOptionsBuilder::default(); // Build logging config, capturing forwarding info to start later let mut log_forwarding: Option<(Receiver, PyObject)> = None; - if let Some(logging_conf) = telemetry_config.logging { + if let Some(logging_conf) = logging { telemetry_build.logging(if let Some(forward_to) = logging_conf.forward_to { // Note, actual log forwarding is started later let (consumer, stream) = CoreLogStreamConsumer::new(FORWARD_LOG_BUFFER_SIZE); @@ -114,26 +125,31 @@ pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult { // Build metric config, but actual metrics instance is late-bound after // CoreRuntime is created since it needs Tokio runtime - if let Some(metrics_conf) = telemetry_config.metrics.as_ref() { + if let Some(metrics_conf) = metrics.as_ref() { telemetry_build.attach_service_name(metrics_conf.attach_service_name); if let Some(prefix) = &metrics_conf.metric_prefix { telemetry_build.metric_prefix(prefix.to_string()); } } + let runtime_options = RuntimeOptionsBuilder::default() + .telemetry_options( + telemetry_build + .build() + .map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {err}")))?, + ) + .heartbeat_interval(worker_heartbeat_interval_millis.map(Duration::from_millis)) + .build() + .map_err(|err| PyValueError::new_err(format!("Invalid runtime options: {err}")))?; + // Create core runtime which starts tokio multi-thread runtime - let mut core = CoreRuntime::new( - telemetry_build - .build() - .map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {err}")))?, - TokioRuntimeBuilder::default(), - ) - .map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {err}")))?; + let mut core = CoreRuntime::new(runtime_options, TokioRuntimeBuilder::default()) + .map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {err}")))?; // We late-bind the metrics after core runtime is created since it needs // the Tokio handle let mut metrics_call_buffer: Option>> = None; - if let Some(metrics_conf) = telemetry_config.metrics { + if let Some(metrics_conf) = metrics { let _guard = core.tokio_handle().enter(); // If they want buffered, cannot have Prom/OTel and we make buffered if metrics_conf.buffered_with_size > 0 { @@ -367,7 +383,7 @@ impl TryFrom for Arc { } if let Some(overrides) = prom_conf.histogram_bucket_overrides { build.histogram_bucket_overrides( - temporal_sdk_core_api::telemetry::HistogramBucketOverrides { overrides }, + temporalio_common::telemetry::HistogramBucketOverrides { overrides }, ); } let prom_options = build.build().map_err(|err| { diff --git a/temporalio/bridge/src/testing.rs b/temporalio/bridge/src/testing.rs index 04eea1286..5df3ee24d 100644 --- a/temporalio/bridge/src/testing.rs +++ b/temporalio/bridge/src/testing.rs @@ -2,7 +2,7 @@ use std::time::Duration; use pyo3::exceptions::{PyRuntimeError, PyValueError}; use pyo3::prelude::*; -use temporal_sdk_core::ephemeral_server; +use temporalio_sdk_core::ephemeral_server; use crate::runtime; diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index 549f4268f..559e340d3 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -10,19 +10,20 @@ use std::collections::HashSet; use std::marker::PhantomData; use std::sync::{Arc, OnceLock}; use std::time::Duration; -use temporal_sdk_core::api::errors::PollError; -use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput}; -use temporal_sdk_core_api::errors::WorkflowErrorType; -use temporal_sdk_core_api::worker::{ +use temporalio_common::errors::PollError; +use temporalio_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput}; +use temporalio_common::errors::WorkflowErrorType; +use temporalio_common::worker::{ SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, SlotSupplier as SlotSupplierTrait, SlotSupplierPermit, }; -use temporal_sdk_core_api::Worker; -use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion; -use temporal_sdk_core_protos::coresdk::{ +use temporalio_common::Worker; +use temporalio_common::protos::coresdk::workflow_completion::WorkflowActivationCompletion; +use temporalio_common::protos::coresdk::{ nexus::NexusTaskCompletion, ActivityHeartbeat, ActivityTaskCompletion, }; -use temporal_sdk_core_protos::temporal::api::history::v1::History; +use temporalio_common::protos::temporal::api::history::v1::History; +use temporalio_common::protos::temporal::api::worker::v1::PluginInfo; use tokio::sync::mpsc::{channel, Sender}; use tokio_stream::wrappers::ReceiverStream; use tracing::error; @@ -34,7 +35,7 @@ pyo3::create_exception!(temporal_sdk_bridge, PollShutdownError, PyException); #[pyclass] pub struct WorkerRef { - worker: Option>, + worker: Option>, /// Set upon the call to `validate`, with the task locals for the event loop at that time, which /// is whatever event loop the user is running their worker in. This loop might be needed by /// other rust-created threads that want to run async python code. @@ -53,7 +54,7 @@ pub struct WorkerConfig { workflow_task_poller_behavior: PollerBehavior, nonsticky_to_sticky_poll_ratio: f32, activity_task_poller_behavior: PollerBehavior, - no_remote_activities: bool, + task_types: WorkerTaskTypes, sticky_queue_schedule_to_start_timeout_millis: u64, max_heartbeat_throttle_interval_millis: u64, default_heartbeat_throttle_interval_millis: u64, @@ -63,6 +64,7 @@ pub struct WorkerConfig { nondeterminism_as_workflow_fail: bool, nondeterminism_as_workflow_fail_for_types: HashSet, nexus_task_poller_behavior: PollerBehavior, + plugins: Vec, } #[derive(FromPyObject)] @@ -77,21 +79,21 @@ pub struct PollerBehaviorAutoscaling { pub initial: usize, } -/// Recreates [temporal_sdk_core_api::worker::PollerBehavior] +/// Recreates [temporalio_common::worker::PollerBehavior] #[derive(FromPyObject)] pub enum PollerBehavior { SimpleMaximum(PollerBehaviorSimpleMaximum), Autoscaling(PollerBehaviorAutoscaling), } -impl From for temporal_sdk_core_api::worker::PollerBehavior { +impl From for temporalio_common::worker::PollerBehavior { fn from(value: PollerBehavior) -> Self { match value { PollerBehavior::SimpleMaximum(simple) => { - temporal_sdk_core_api::worker::PollerBehavior::SimpleMaximum(simple.simple_maximum) + temporalio_common::worker::PollerBehavior::SimpleMaximum(simple.simple_maximum) } PollerBehavior::Autoscaling(auto) => { - temporal_sdk_core_api::worker::PollerBehavior::Autoscaling { + temporalio_common::worker::PollerBehavior::Autoscaling { minimum: auto.minimum, maximum: auto.maximum, initial: auto.initial, @@ -101,7 +103,7 @@ impl From for temporal_sdk_core_api::worker::PollerBehavior { } } -/// Recreates [temporal_sdk_core_api::worker::WorkerVersioningStrategy] +/// Recreates [temporalio_common::worker::WorkerVersioningStrategy] #[derive(FromPyObject)] pub enum WorkerVersioningStrategy { None(WorkerVersioningNone), @@ -114,7 +116,7 @@ pub struct WorkerVersioningNone { pub build_id_no_versioning: String, } -/// Recreates [temporal_sdk_core_api::worker::WorkerDeploymentOptions] +/// Recreates [temporalio_common::worker::WorkerDeploymentOptions] #[derive(FromPyObject)] pub struct WorkerDeploymentOptions { pub version: WorkerDeploymentVersion, @@ -128,15 +130,15 @@ pub struct LegacyBuildIdBased { pub build_id_with_versioning: String, } -/// Recreates [temporal_sdk_core_api::worker::WorkerDeploymentVersion] +/// Recreates [temporalio_common::worker::WorkerDeploymentVersion] #[derive(FromPyObject, IntoPyObject, Clone)] pub struct WorkerDeploymentVersion { pub deployment_name: String, pub build_id: String, } -impl From for WorkerDeploymentVersion { - fn from(version: temporal_sdk_core_api::worker::WorkerDeploymentVersion) -> Self { +impl From for WorkerDeploymentVersion { + fn from(version: temporalio_common::worker::WorkerDeploymentVersion) -> Self { WorkerDeploymentVersion { deployment_name: version.deployment_name, build_id: version.build_id, @@ -173,6 +175,23 @@ pub struct ResourceBasedSlotSupplier { tuner_config: ResourceBasedTunerConfig, } +#[derive(FromPyObject)] +pub struct WorkerTaskTypes { + enable_workflows: bool, + enable_activities: bool, + enable_nexus: bool, +} + +impl From<&WorkerTaskTypes> for temporalio_common::worker::WorkerTaskTypes { + fn from(t: &WorkerTaskTypes) -> Self { + Self { + enable_workflows: t.enable_workflows, + enable_activities: t.enable_activities, + enable_nexus: t.enable_nexus, + } + } +} + #[pyclass] pub struct SlotReserveCtx { #[pyo3(get)] @@ -462,7 +481,7 @@ pub struct ResourceBasedTunerConfig { macro_rules! enter_sync { ($runtime:expr) => { if let Some(subscriber) = $runtime.core.telemetry().trace_subscriber() { - temporal_sdk_core::telemetry::set_trace_subscriber_for_current_thread(subscriber); + temporalio_sdk_core::telemetry::set_trace_subscriber_for_current_thread(subscriber); } let _guard = $runtime.core.tokio_handle().enter(); }; @@ -477,7 +496,7 @@ pub fn new_worker( runtime_ref.runtime.assert_same_process("create worker")?; let event_loop_task_locals = Arc::new(OnceLock::new()); let config = convert_worker_config(config, event_loop_task_locals.clone())?; - let worker = temporal_sdk_core::init_worker( + let worker = temporalio_sdk_core::init_worker( &runtime_ref.runtime.core, config, client.retry_client.clone().into_inner(), @@ -504,7 +523,7 @@ pub fn new_replay_worker<'a>( let (history_pusher, stream) = HistoryPusher::new(runtime_ref.runtime.clone()); let worker = WorkerRef { worker: Some(Arc::new( - temporal_sdk_core::init_replay_worker(ReplayWorkerInput::new(config, stream)).map_err( + temporalio_sdk_core::init_replay_worker(ReplayWorkerInput::new(config, stream)).map_err( |err| PyValueError::new_err(format!("Failed creating replay worker: {err}")), )?, )), @@ -652,11 +671,12 @@ impl WorkerRef { Ok(()) } - fn replace_client(&self, client: &client::ClientRef) { + fn replace_client(&self, client: &client::ClientRef) -> PyResult<()> { self.worker .as_ref() .expect("missing worker") - .replace_client(client.retry_client.clone().into_inner()); + .replace_client(client.retry_client.clone().into_inner()) + .map_err(|err| PyValueError::new_err(format!("Failed replacing client: {err}"))) } fn initiate_shutdown(&self) -> PyResult<()> { @@ -684,10 +704,10 @@ impl WorkerRef { fn convert_worker_config( conf: WorkerConfig, task_locals: Arc>, -) -> PyResult { +) -> PyResult { let converted_tuner = convert_tuner_holder(conf.tuner, task_locals)?; let converted_versioning_strategy = convert_versioning_strategy(conf.versioning_strategy); - temporal_sdk_core::WorkerConfigBuilder::default() + temporalio_sdk_core::WorkerConfigBuilder::default() .namespace(conf.namespace) .task_queue(conf.task_queue) .versioning_strategy(converted_versioning_strategy) @@ -697,7 +717,7 @@ fn convert_worker_config( .tuner(Arc::new(converted_tuner)) .nonsticky_to_sticky_poll_ratio(conf.nonsticky_to_sticky_poll_ratio) .activity_task_poller_behavior(conf.activity_task_poller_behavior) - .no_remote_activities(conf.no_remote_activities) + .task_types(&conf.task_types) .sticky_queue_schedule_to_start_timeout(Duration::from_millis( conf.sticky_queue_schedule_to_start_timeout_millis, )) @@ -730,6 +750,15 @@ fn convert_worker_config( .collect::>>(), ) .nexus_task_poller_behavior(conf.nexus_task_poller_behavior) + .plugins( + conf.plugins + .into_iter() + .map(|name| PluginInfo { + name, + version: String::new(), + }) + .collect::>(), + ) .build() .map_err(|err| PyValueError::new_err(format!("Invalid worker config: {err}"))) } @@ -737,7 +766,7 @@ fn convert_worker_config( fn convert_tuner_holder( holder: TunerHolder, task_locals: Arc>, -) -> PyResult { +) -> PyResult { // Verify all resource-based options are the same if any are set let maybe_wf_resource_opts = if let SlotSupplier::ResourceBased(ref ss) = holder.workflow_slot_supplier { @@ -782,10 +811,10 @@ fn convert_tuner_holder( )); } - let mut options = temporal_sdk_core::TunerHolderOptionsBuilder::default(); + let mut options = temporalio_sdk_core::TunerHolderOptionsBuilder::default(); if let Some(first) = first { options.resource_based_options( - temporal_sdk_core::ResourceBasedSlotsOptionsBuilder::default() + temporalio_sdk_core::ResourceBasedSlotsOptionsBuilder::default() .target_mem_usage(first.target_memory_usage) .target_cpu_usage(first.target_cpu_usage) .build() @@ -819,19 +848,19 @@ fn convert_tuner_holder( fn convert_slot_supplier( supplier: SlotSupplier, task_locals: Arc>, -) -> PyResult> { +) -> PyResult> { Ok(match supplier { - SlotSupplier::FixedSize(fs) => temporal_sdk_core::SlotSupplierOptions::FixedSize { + SlotSupplier::FixedSize(fs) => temporalio_sdk_core::SlotSupplierOptions::FixedSize { slots: fs.num_slots, }, - SlotSupplier::ResourceBased(ss) => temporal_sdk_core::SlotSupplierOptions::ResourceBased( - temporal_sdk_core::ResourceSlotOptions::new( + SlotSupplier::ResourceBased(ss) => temporalio_sdk_core::SlotSupplierOptions::ResourceBased( + temporalio_sdk_core::ResourceSlotOptions::new( ss.minimum_slots, ss.maximum_slots, Duration::from_millis(ss.ramp_throttle_ms), ), ), - SlotSupplier::Custom(cs) => temporal_sdk_core::SlotSupplierOptions::Custom(Arc::new( + SlotSupplier::Custom(cs) => temporalio_sdk_core::SlotSupplierOptions::Custom(Arc::new( CustomSlotSupplierOfType:: { inner: cs.inner, event_loop_task_locals: task_locals, @@ -843,17 +872,17 @@ fn convert_slot_supplier( fn convert_versioning_strategy( strategy: WorkerVersioningStrategy, -) -> temporal_sdk_core_api::worker::WorkerVersioningStrategy { +) -> temporalio_common::worker::WorkerVersioningStrategy { match strategy { WorkerVersioningStrategy::None(vn) => { - temporal_sdk_core_api::worker::WorkerVersioningStrategy::None { + temporalio_common::worker::WorkerVersioningStrategy::None { build_id: vn.build_id_no_versioning, } } WorkerVersioningStrategy::DeploymentBased(options) => { - temporal_sdk_core_api::worker::WorkerVersioningStrategy::WorkerDeploymentBased( - temporal_sdk_core_api::worker::WorkerDeploymentOptions { - version: temporal_sdk_core_api::worker::WorkerDeploymentVersion { + temporalio_common::worker::WorkerVersioningStrategy::WorkerDeploymentBased( + temporalio_common::worker::WorkerDeploymentOptions { + version: temporalio_common::worker::WorkerDeploymentVersion { deployment_name: options.version.deployment_name, build_id: options.version.build_id, }, @@ -868,7 +897,7 @@ fn convert_versioning_strategy( ) } WorkerVersioningStrategy::LegacyBuildIdBased(lb) => { - temporal_sdk_core_api::worker::WorkerVersioningStrategy::LegacyBuildIdBased { + temporalio_common::worker::WorkerVersioningStrategy::LegacyBuildIdBased { build_id: lb.build_id_with_versioning, } } diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index 8e20b670a..d3be8e27b 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -54,7 +54,7 @@ class WorkerConfig: workflow_task_poller_behavior: PollerBehavior nonsticky_to_sticky_poll_ratio: float activity_task_poller_behavior: PollerBehavior - no_remote_activities: bool + task_types: WorkerTaskTypes sticky_queue_schedule_to_start_timeout_millis: int max_heartbeat_throttle_interval_millis: int default_heartbeat_throttle_interval_millis: int @@ -64,6 +64,7 @@ class WorkerConfig: nondeterminism_as_workflow_fail: bool nondeterminism_as_workflow_fail_for_types: Set[str] nexus_task_poller_behavior: PollerBehavior + plugins: Sequence[str] @dataclass @@ -169,6 +170,15 @@ class TunerHolder: nexus_slot_supplier: SlotSupplier +@dataclass +class WorkerTaskTypes: + """Python representation of the Rust struct for worker task types""" + + enable_workflows: bool + enable_activities: bool + enable_nexus: bool + + class Worker: """SDK Core worker.""" diff --git a/temporalio/client.py b/temporalio/client.py index 770a51392..336645375 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -350,6 +350,11 @@ def api_key(self, value: Optional[str]) -> None: self.service_client.config.api_key = value self.service_client.update_api_key(value) + @property + def plugins(self) -> Sequence[Plugin]: + """Plugins used by this client.""" + return self._config["plugins"] + # Overload for no-param workflow @overload async def start_workflow( diff --git a/temporalio/runtime.py b/temporalio/runtime.py index 345d7ca77..0768caaf5 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -117,14 +117,42 @@ def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None: global _runtime_ref _runtime_ref.set_default(runtime, error_if_already_set=error_if_already_set) - def __init__(self, *, telemetry: TelemetryConfig) -> None: - """Create a default runtime with the given telemetry config. + def __init__( + self, + *, + telemetry: TelemetryConfig, + worker_heartbeat_interval: Optional[timedelta] = timedelta(seconds=60), + ) -> None: + """Create a runtime with the provided configuration. Each new runtime creates a new internal thread pool, so use sparingly. + + Args: + telemetry: Telemetry configuration when not supplying + ``runtime_options``. + worker_heartbeat_interval: Interval for worker heartbeats. ``None`` + disables heartbeating. + + Raises: + ValueError: If both ```runtime_options`` is a negative value. """ - self._core_runtime = temporalio.bridge.runtime.Runtime( - telemetry=telemetry._to_bridge_config() + if worker_heartbeat_interval is None: + heartbeat_millis = None + else: + if worker_heartbeat_interval <= timedelta(0): + raise ValueError("worker_heartbeat_interval must be positive") + heartbeat_millis = int(worker_heartbeat_interval.total_seconds() * 1000) + if heartbeat_millis == 0: + heartbeat_millis = 1 + + self._heartbeat_millis = heartbeat_millis + + runtime_options = temporalio.bridge.runtime.RuntimeOptions( + telemetry=telemetry._to_bridge_config(), + worker_heartbeat_interval_millis=heartbeat_millis, ) + + self._core_runtime = temporalio.bridge.runtime.Runtime(options=runtime_options) if isinstance(telemetry.metrics, MetricBuffer): telemetry.metrics._runtime = self core_meter = temporalio.bridge.metric.MetricMeter.create(self._core_runtime) @@ -159,7 +187,15 @@ def formatted(self) -> str: """Return a formatted form of this filter.""" # We intentionally aren't using __str__ or __format__ so they can keep # their original dataclass impls - return f"{self.other_level},temporal_sdk_core={self.core_level},temporal_client={self.core_level},temporal_sdk={self.core_level}" + targets = [ + "temporalio_sdk_core", + # "temporal_sdk_bridge", + "temporalio_client", + "temporalio_sdk", + ] + parts = [self.other_level] + parts.extend(f"{target}={self.core_level}" for target in targets) + return ",".join(parts) @dataclass(frozen=True) diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 04a32be17..98e8150e8 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -18,6 +18,7 @@ import temporalio.converter import temporalio.runtime import temporalio.workflow +from temporalio.bridge.worker import WorkerTaskTypes from ..common import HeaderCodecBehavior from ._interceptor import Interceptor @@ -273,7 +274,7 @@ def on_eviction_hook( ), ), nonsticky_to_sticky_poll_ratio=1, - no_remote_activities=True, + task_types=WorkerTaskTypes(True, False, False), sticky_queue_schedule_to_start_timeout_millis=1000, max_heartbeat_throttle_interval_millis=1000, default_heartbeat_throttle_interval_millis=1000, @@ -293,6 +294,7 @@ def on_eviction_hook( nexus_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum( 1 ), + plugins=[plugin.name() for plugin in self.plugins], ), ) # Start worker diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index fc2c2241d..1ff0f7263 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -29,6 +29,7 @@ import temporalio.common import temporalio.runtime import temporalio.service +from temporalio.bridge.worker import WorkerTaskTypes from temporalio.common import ( HeaderCodecBehavior, VersioningBehavior, @@ -124,7 +125,6 @@ def __init__( max_concurrent_workflow_task_polls: Optional[int] = None, nonsticky_to_sticky_poll_ratio: float = 0.2, max_concurrent_activity_task_polls: Optional[int] = None, - no_remote_activities: bool = False, sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10), max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60), default_heartbeat_throttle_interval: timedelta = timedelta(seconds=30), @@ -251,8 +251,6 @@ def __init__( If set, will override any value passed to ``activity_task_poller_behavior``. WARNING: Deprecated, use ``activity_task_poller_behavior`` instead - no_remote_activities: If true, this worker will only handle workflow - tasks and local activities, it will not poll for activity tasks. sticky_queue_schedule_to_start_timeout: How long a workflow task is allowed to sit on the sticky queue before it is timed out and moved to the non-sticky queue where it may be picked up by any @@ -346,7 +344,6 @@ def __init__( max_concurrent_workflow_task_polls=max_concurrent_workflow_task_polls, nonsticky_to_sticky_poll_ratio=nonsticky_to_sticky_poll_ratio, max_concurrent_activity_task_polls=max_concurrent_activity_task_polls, - no_remote_activities=no_remote_activities, sticky_queue_schedule_to_start_timeout=sticky_queue_schedule_to_start_timeout, max_heartbeat_throttle_interval=max_heartbeat_throttle_interval, default_heartbeat_throttle_interval=default_heartbeat_throttle_interval, @@ -376,6 +373,7 @@ def __init__( f"The same plugin type {type(client_plugin)} is present from both client and worker. It may run twice and may not be the intended behavior." ) plugins = plugins_from_client + list(plugins) + config["plugins"] = plugins self.plugins = plugins for plugin in plugins: @@ -555,6 +553,10 @@ def check_activity(activity): maximum=config["max_concurrent_activity_task_polls"] ) + worker_plugins = [plugin.name() for plugin in config.get("plugins", [])] + client_plugins = [plugin.name() for plugin in config["client"].plugins] + plugins = list(set(worker_plugins + client_plugins)) + # Create bridge worker last. We have empirically observed that if it is # created before an error is raised from the activity worker # constructor, a deadlock/hang will occur presumably while trying to @@ -571,11 +573,11 @@ def check_activity(activity): max_cached_workflows=config["max_cached_workflows"], tuner=bridge_tuner, nonsticky_to_sticky_poll_ratio=config["nonsticky_to_sticky_poll_ratio"], - # We have to disable remote activities if a user asks _or_ if we - # are not running an activity worker at all. Otherwise shutdown - # will not proceed properly. - no_remote_activities=config["no_remote_activities"] - or not config["activities"], + task_types=WorkerTaskTypes( + enable_workflows=self._workflow_worker is not None, + enable_activities=self._activity_worker is not None, + enable_nexus=self._nexus_worker is not None, + ), sticky_queue_schedule_to_start_timeout_millis=int( 1000 * config["sticky_queue_schedule_to_start_timeout"].total_seconds() @@ -609,6 +611,7 @@ def check_activity(activity): nexus_task_poller_behavior=config[ "nexus_task_poller_behavior" ]._to_bridge(), + plugins=plugins, ), ) @@ -884,7 +887,6 @@ class WorkerConfig(TypedDict, total=False): max_concurrent_workflow_task_polls: Optional[int] nonsticky_to_sticky_poll_ratio: float max_concurrent_activity_task_polls: Optional[int] - no_remote_activities: bool sticky_queue_schedule_to_start_timeout: timedelta max_heartbeat_throttle_interval: timedelta default_heartbeat_throttle_interval: timedelta @@ -902,6 +904,7 @@ class WorkerConfig(TypedDict, total=False): workflow_task_poller_behavior: PollerBehavior activity_task_poller_behavior: PollerBehavior nexus_task_poller_behavior: PollerBehavior + plugins: Sequence[Plugin] def _warn_if_activity_executor_max_workers_is_inconsistent( diff --git a/tests/conftest.py b/tests/conftest.py index c0f8bc5e0..805a4816d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,6 +7,8 @@ import pytest import pytest_asyncio +import temporalio.worker + from . import DEV_SERVER_DOWNLOAD_VERSION # If there is an integration test environment variable set, we must remove the diff --git a/tests/contrib/openai_agents/test_openai.py b/tests/contrib/openai_agents/test_openai.py index 02183d121..9146fe05b 100644 --- a/tests/contrib/openai_agents/test_openai.py +++ b/tests/contrib/openai_agents/test_openai.py @@ -2615,7 +2615,8 @@ async def test_split_workers(client: Client): # Workflow worker async with new_worker( - workflow_client, HelloWorldAgent, no_remote_activities=True + workflow_client, + HelloWorldAgent, ) as worker: activity_plugin = openai_agents.OpenAIAgentsPlugin( model_params=ModelActivityParameters( diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 5571841b4..fba582c55 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -1,11 +1,13 @@ import dataclasses import uuid import warnings +from collections import Counter from contextlib import AbstractAsyncContextManager, asynccontextmanager from typing import AsyncIterator, Awaitable, Callable, Optional, cast import pytest +import temporalio.bridge.temporal_sdk_bridge import temporalio.client import temporalio.converter import temporalio.worker @@ -80,7 +82,7 @@ def configure_client(self, config: ClientConfig) -> ClientConfig: return config def configure_worker(self, config: WorkerConfig) -> WorkerConfig: - config["task_queue"] = "combined" + config["task_queue"] = "combined" + str(uuid.uuid4()) return config async def connect_service_client( @@ -112,7 +114,7 @@ def run_replayer( class MyWorkerPlugin(temporalio.worker.Plugin): def configure_worker(self, config: WorkerConfig) -> WorkerConfig: - config["task_queue"] = "replaced_queue" + config["task_queue"] = "replaced_queue" + str(uuid.uuid4()) runner = config.get("workflow_runner") if isinstance(runner, SandboxedWorkflowRunner): config["workflow_runner"] = dataclasses.replace( @@ -144,27 +146,72 @@ def run_replayer( async def test_worker_plugin_basic_config(client: Client) -> None: worker = Worker( client, - task_queue="queue", + task_queue="queue" + str(uuid.uuid4()), activities=[never_run_activity], plugins=[MyWorkerPlugin()], ) - assert worker.config().get("task_queue") == "replaced_queue" + task_queue = worker.config().get("task_queue") + assert task_queue is not None and task_queue.startswith("replaced_queue") + assert [p.name() for p in worker.config().get("plugins", [])] == [ + MyWorkerPlugin().name() + ] # Test client plugin propagation to worker plugins new_config = client.config() new_config["plugins"] = [MyCombinedPlugin()] client = Client(**new_config) - worker = Worker(client, task_queue="queue", activities=[never_run_activity]) - assert worker.config().get("task_queue") == "combined" + worker = Worker( + client, task_queue="queue" + str(uuid.uuid4()), activities=[never_run_activity] + ) + task_queue = worker.config().get("task_queue") + assert task_queue is not None and task_queue.startswith("combined") + assert [p.name() for p in worker.config().get("plugins", [])] == [ + MyCombinedPlugin().name() + ] # Test both. Client propagated plugins are called first, so the worker plugin overrides in this case worker = Worker( client, - task_queue="queue", + task_queue="queue" + str(uuid.uuid4()), activities=[never_run_activity], plugins=[MyWorkerPlugin()], ) - assert worker.config().get("task_queue") == "replaced_queue" + task_queue = worker.config().get("task_queue") + assert task_queue is not None and task_queue.startswith("replaced_queue") + assert [p.name() for p in worker.config().get("plugins", [])] == [ + MyCombinedPlugin().name(), + MyWorkerPlugin().name(), + ] + + +async def test_worker_plugin_names_forwarded_to_core( + client: Client, monkeypatch: pytest.MonkeyPatch +) -> None: + captured_plugins: list[str] = [] + + original_new_worker = temporalio.bridge.temporal_sdk_bridge.new_worker + + def new_worker_wrapper(runtime_ref, client_ref, config): + nonlocal captured_plugins + captured_plugins = list(config.plugins) + return original_new_worker(runtime_ref, client_ref, config) + + monkeypatch.setattr( + temporalio.bridge.temporal_sdk_bridge, + "new_worker", + new_worker_wrapper, + ) + + plugin1 = SimplePlugin("test-worker-plugin1") + plugin2 = SimplePlugin("test-worker-plugin2") + worker = Worker( + client, + task_queue="queue" + str(uuid.uuid4()), + activities=[never_run_activity], + plugins=[plugin1, plugin2], + ) + # Use counter to compare unordered lists + assert Counter(captured_plugins) == Counter([plugin1.name(), plugin2.name()]) async def test_worker_duplicated_plugin(client: Client) -> None: @@ -175,7 +222,7 @@ async def test_worker_duplicated_plugin(client: Client) -> None: with warnings.catch_warnings(record=True) as warning_list: worker = Worker( client, - task_queue="queue", + task_queue="queue" + str(uuid.uuid4()), activities=[never_run_activity], plugins=[MyCombinedPlugin()], ) @@ -188,7 +235,7 @@ async def test_worker_sandbox_restrictions(client: Client) -> None: with warnings.catch_warnings(record=True) as warning_list: worker = Worker( client, - task_queue="queue", + task_queue="queue" + str(uuid.uuid4()), activities=[never_run_activity], plugins=[MyWorkerPlugin()], ) @@ -290,7 +337,7 @@ async def test_simple_plugins(client: Client) -> None: # Test without plugin registered in client worker = Worker( client, - task_queue="queue", + task_queue="queue" + str(uuid.uuid4()), activities=[never_run_activity], workflows=[HelloWorkflow], plugins=[plugin], @@ -301,7 +348,7 @@ async def test_simple_plugins(client: Client) -> None: # Test with plugin registered in client worker = Worker( new_client, - task_queue="queue", + task_queue="queue" + str(uuid.uuid4()), activities=[never_run_activity], ) assert worker.config().get("workflows") == [HelloWorkflow2] @@ -338,7 +385,7 @@ def converter(old: Optional[DataConverter]): ) worker = Worker( client, - task_queue="queue", + task_queue="queue" + str(uuid.uuid4()) + str(uuid.uuid4()), workflows=[HelloWorkflow], activities=[never_run_activity], plugins=[plugin], @@ -352,13 +399,17 @@ def __init__(self): def configure_worker(self, config: WorkerConfig) -> WorkerConfig: config = super().configure_worker(config) - config["task_queue"] = "override" + config["task_queue"] = "override" + str(uuid.uuid4()) return config async def test_medium_plugin(client: Client) -> None: plugin = MediumPlugin() worker = Worker( - client, task_queue="queue", plugins=[plugin], workflows=[HelloWorkflow] + client, + task_queue="queue" + str(uuid.uuid4()), + plugins=[plugin], + workflows=[HelloWorkflow], ) - assert worker.config().get("task_queue") == "override" + task_queue = worker.config().get("task_queue") + assert task_queue is not None and task_queue.startswith("override") diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 9b318bbb7..e706fe3da 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -21,7 +21,12 @@ _RuntimeRef, ) from temporalio.worker import Worker -from tests.helpers import assert_eq_eventually, assert_eventually, find_free_port +from tests.helpers import ( + assert_eq_eventually, + assert_eventually, + find_free_port, + worker_versioning_enabled, +) @workflow.defn @@ -184,7 +189,9 @@ async def has_log() -> bool: # Check record record = next((l for l in log_queue_list if "Failing workflow task" in l.message)) assert record.levelno == logging.WARNING - assert record.name == f"{logger.name}-sdk_core::temporal_sdk_core::worker::workflow" + assert ( + record.name == f"{logger.name}-sdk_core::temporalio_sdk_core::worker::workflow" + ) assert record.temporal_log.fields["run_id"] == handle.result_run_id # type: ignore @@ -259,6 +266,30 @@ async def check_metrics() -> None: await assert_eventually(check_metrics) +def test_runtime_options_to_bridge_config() -> None: + runtime = Runtime(telemetry=TelemetryConfig()) + assert runtime._heartbeat_millis == 60_000 + + runtime = Runtime( + telemetry=TelemetryConfig(), + worker_heartbeat_interval=timedelta(seconds=10), + ) + assert runtime._heartbeat_millis == 10_000 + + runtime = Runtime( + telemetry=TelemetryConfig(), + worker_heartbeat_interval=None, + ) + assert runtime._heartbeat_millis is None + + +def test_runtime_options_invalid_heartbeat() -> None: + with pytest.raises(ValueError): + Runtime( + telemetry=TelemetryConfig(), worker_heartbeat_interval=timedelta(seconds=-5) + ) + + def test_runtime_ref_creates_default(): ref = _RuntimeRef() assert not ref._default_runtime diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 9ad4be3af..1e336c1c8 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -19,6 +19,7 @@ from temporalio.api.workflowservice.v1 import ( DescribeWorkerDeploymentRequest, DescribeWorkerDeploymentResponse, + ListWorkersRequest, SetWorkerDeploymentCurrentVersionRequest, SetWorkerDeploymentCurrentVersionResponse, SetWorkerDeploymentRampingVersionRequest, @@ -30,7 +31,11 @@ TaskReachabilityType, ) from temporalio.common import PinnedVersioningOverride, RawValue, VersioningBehavior -from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig +from temporalio.runtime import ( + PrometheusConfig, + Runtime, + TelemetryConfig, +) from temporalio.service import RPCError from temporalio.testing import WorkflowEnvironment from temporalio.worker import ( @@ -219,64 +224,6 @@ def my_signal(self, value: str) -> None: workflow.logger.info(f"Signal: {value}") -async def test_worker_versioning(client: Client, env: WorkflowEnvironment): - if env.supports_time_skipping: - pytest.skip("Java test server does not support worker versioning") - if not await worker_versioning_enabled(client): - pytest.skip("This server does not have worker versioning enabled") - - task_queue = f"worker-versioning-{uuid.uuid4()}" - await client.update_worker_build_id_compatibility( - task_queue, BuildIdOpAddNewDefault("1.0") - ) - - async with new_worker( - client, - WaitOnSignalWorkflow, - activities=[say_hello], - task_queue=task_queue, - build_id="1.0", - use_worker_versioning=True, - ): - wf1 = await client.start_workflow( - WaitOnSignalWorkflow.run, - id=f"worker-versioning-1-{uuid.uuid4()}", - task_queue=task_queue, - ) - # Sleep for a beat, otherwise it's possible for new workflow to start on 2.0 - await asyncio.sleep(0.1) - await client.update_worker_build_id_compatibility( - task_queue, BuildIdOpAddNewDefault("2.0") - ) - wf2 = await client.start_workflow( - WaitOnSignalWorkflow.run, - id=f"worker-versioning-2-{uuid.uuid4()}", - task_queue=task_queue, - ) - async with new_worker( - client, - WaitOnSignalWorkflow, - activities=[say_hello], - task_queue=task_queue, - build_id="2.0", - use_worker_versioning=True, - ): - # Confirm reachability type parameter is respected. If it wasn't, list would have - # `OPEN_WORKFLOWS` in it. - reachability = await client.get_worker_task_reachability( - build_ids=["2.0"], - reachability_type=TaskReachabilityType.CLOSED_WORKFLOWS, - ) - assert reachability.build_id_reachability["2.0"].task_queue_reachability[ - task_queue - ] == [TaskReachabilityType.NEW_WORKFLOWS] - - await wf1.signal(WaitOnSignalWorkflow.my_signal, "finish") - await wf2.signal(WaitOnSignalWorkflow.my_signal, "finish") - await wf1.result() - await wf2.result() - - async def test_worker_validate_fail(client: Client, env: WorkflowEnvironment): if env.supports_time_skipping: pytest.skip("Java test server does not appear to fail on invalid namespace") @@ -1136,7 +1083,9 @@ async def test_can_run_autoscaling_polling_worker( activity_pollers = [l for l in matches if "activity_task" in l] assert len(activity_pollers) == 1 assert activity_pollers[0].endswith("2") - workflow_pollers = [l for l in matches if "workflow_task" in l] + workflow_pollers = [ + l for l in matches if "workflow_task" in l and w.task_queue in l + ] assert len(workflow_pollers) == 2 # There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on # initialization timing. diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 220bb5e2b..ebfb3f04a 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5210,57 +5210,19 @@ async def run(self) -> None: await asyncio.sleep(0.1) -async def test_workflow_replace_worker_client(client: Client, env: WorkflowEnvironment): - if env.supports_time_skipping: - pytest.skip("Only testing against two real servers") - # We are going to start a second ephemeral server and then replace the - # client. So we will start a no-cache ticking workflow with the current - # client and confirm it has accomplished at least one task. Then we will - # start another on the other client, and confirm it gets started too. Then - # we will terminate both. We have to use a ticking workflow with only one - # poller to force a quick re-poll to recognize our client change quickly (as - # opposed to just waiting the minute for poll timeout). - async with await WorkflowEnvironment.start_local( - dev_server_download_version=DEV_SERVER_DOWNLOAD_VERSION - ) as other_env: - # Start both workflows on different servers - task_queue = f"tq-{uuid.uuid4()}" - handle1 = await client.start_workflow( - TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue - ) - handle2 = await other_env.client.start_workflow( - TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue - ) - - async def any_task_completed(handle: WorkflowHandle) -> bool: - async for e in handle.fetch_history_events(): - if e.HasField("workflow_task_completed_event_attributes"): - return True - return False - - # Now start the worker on the first env - async with Worker( - client, - task_queue=task_queue, - workflows=[TickingWorkflow], - max_cached_workflows=0, - max_concurrent_workflow_task_polls=1, - ) as worker: - # Confirm the first ticking workflow has completed a task but not - # the second - await assert_eq_eventually(True, lambda: any_task_completed(handle1)) - assert not await any_task_completed(handle2) - - # Now replace the client, which should be used fairly quickly - # because we should have timer-done poll completions every 100ms - worker.client = other_env.client - - # Now confirm the other workflow has started - await assert_eq_eventually(True, lambda: any_task_completed(handle2)) - - # Terminate both - await handle1.terminate() - await handle2.terminate() +async def test_workflow_replace_worker_client(client: Client): + other_runtime = Runtime(telemetry=TelemetryConfig()) + other_client = await Client.connect( + client.service_client.config.target_host, + namespace=client.namespace, + runtime=other_runtime, + ) + async with new_worker(client, HelloWorkflow) as worker: + with pytest.raises( + ValueError, + match="New client is not on the same runtime as the existing client", + ): + worker.client = other_client @activity.defn(dynamic=True)