diff --git a/Cargo.toml b/Cargo.toml index 48b1b3c6f..a4eef1f75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,11 +25,12 @@ derive_more = { version = "2.0", features = [ "try_into", ] } thiserror = "2" -tonic = "0.13" -tonic-build = "0.13" -opentelemetry = { version = "0.30", features = ["metrics"] } -prost = "0.13" -prost-types = "0.13" +tonic = "0.14" +tonic-prost = "0.14" +tonic-prost-build = "0.14" +opentelemetry = { version = "0.31", features = ["metrics"] } +prost = "0.14" +prost-types = { version = "0.7", package = "prost-wkt-types" } [workspace.lints.rust] unreachable_pub = "warn" diff --git a/core-c-bridge/Cargo.toml b/core-c-bridge/Cargo.toml index 958f47d07..e2e361aee 100644 --- a/core-c-bridge/Cargo.toml +++ b/core-c-bridge/Cargo.toml @@ -53,4 +53,4 @@ thiserror = { workspace = true } cbindgen = { version = "0.29", default-features = false } [features] -xz2-static = ["xz2/static"] \ No newline at end of file +xz2-static = ["xz2/static"] diff --git a/core/Cargo.toml b/core/Cargo.toml index 017f451b3..c9ceccda7 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -64,12 +64,12 @@ itertools = "0.14" lru = "0.16" mockall = "0.13" opentelemetry = { workspace = true, features = ["metrics"], optional = true } -opentelemetry_sdk = { version = "0.30", features = [ +opentelemetry_sdk = { version = "0.31", features = [ "rt-tokio", "metrics", "spec_unstable_metrics_views", ], optional = true } -opentelemetry-otlp = { version = "0.30", features = [ +opentelemetry-otlp = { version = "0.31", features = [ "tokio", "metrics", "tls", @@ -81,7 +81,7 @@ pid = "4.0" pin-project = "1.1" prometheus = { version = "0.14", optional = true } prost = { workspace = true } -prost-types = { version = "0.6", package = "prost-wkt-types" } +prost-types = { workspace = true } rand = "0.9" reqwest = { version = "0.12", features = [ "json", diff --git a/core/src/core_tests/updates.rs b/core/src/core_tests/updates.rs index 9beedc46f..9f6bdb954 100644 --- a/core/src/core_tests/updates.rs +++ b/core/src/core_tests/updates.rs @@ -114,10 +114,10 @@ async fn initial_request_sent_back(#[values(false, true)] reject: bool) { .returning(move |mut resp| { let msg = resp.messages.pop().unwrap(); let orig_req = if reject { - let acceptance = msg.body.unwrap().unpack_as(Rejection::default()).unwrap(); + let acceptance = msg.body.unwrap().to_msg::().unwrap(); acceptance.rejected_request.unwrap() } else { - let acceptance = msg.body.unwrap().unpack_as(Acceptance::default()).unwrap(); + let acceptance = msg.body.unwrap().to_msg::().unwrap(); acceptance.accepted_request.unwrap() }; assert_eq!(orig_req, upd_req_body); diff --git a/core/src/protosext/protocol_messages.rs b/core/src/protosext/protocol_messages.rs index af2aab47d..e267193db 100644 --- a/core/src/protosext/protocol_messages.rs +++ b/core/src/protosext/protocol_messages.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, bail}; +use anyhow::anyhow; use std::collections::HashMap; use temporal_sdk_core_protos::temporal::api::{ common::v1::Payload, @@ -108,16 +108,9 @@ impl TryFrom> for IncomingProtocolMessageBody { fn try_from(v: Option) -> Result { let v = v.ok_or_else(|| anyhow!("Protocol message body must be populated"))?; - // Undo explicit type url checks when https://github.com/fdeantoni/prost-wkt/issues/48 is - // fixed - Ok(match v.type_url.as_str() { - "type.googleapis.com/temporal.api.update.v1.Request" => { - IncomingProtocolMessageBody::UpdateRequest( - v.unpack_as(update::v1::Request::default())?.try_into()?, - ) - } - o => bail!("Could not understand protocol message type {o}"), - }) + Ok(IncomingProtocolMessageBody::UpdateRequest( + v.to_msg::()?.try_into()?, + )) } } diff --git a/core/src/worker/workflow/workflow_stream.rs b/core/src/worker/workflow/workflow_stream.rs index b85e08953..0cce1ee90 100644 --- a/core/src/worker/workflow/workflow_stream.rs +++ b/core/src/worker/workflow/workflow_stream.rs @@ -578,7 +578,7 @@ impl WFStream { #[derive(derive_more::From, Debug)] enum WFStreamInput { NewWft(Box), - Local(LocalInput), + Local(Box), /// The stream given to us which represents the poller (or a mock) terminated. PollerDead, /// The stream given to us which represents the poller (or a mock) encountered a non-retryable @@ -590,6 +590,11 @@ enum WFStreamInput { auto_reply_fail_tt: Option, }, } +impl From for WFStreamInput { + fn from(input: LocalInput) -> Self { + WFStreamInput::Local(Box::new(input)) + } +} /// A non-poller-received input to the [WFStream] #[derive(derive_more::Debug)] @@ -672,10 +677,10 @@ impl From for WFStreamInput { paginator, update, span, - } => WFStreamInput::Local(LocalInput { + } => WFStreamInput::Local(Box::new(LocalInput { input: LocalInputs::FetchedPageCompletion { paginator, update }, span, - }), + })), } } } diff --git a/sdk-core-protos/Cargo.toml b/sdk-core-protos/Cargo.toml index d9a77052d..4ce0bbffa 100644 --- a/sdk-core-protos/Cargo.toml +++ b/sdk-core-protos/Cargo.toml @@ -20,19 +20,18 @@ anyhow = "1.0" base64 = "0.22" derive_more = { workspace = true } prost = { workspace = true } -prost-wkt = "0.6" -prost-wkt-types = "0.6" +prost-wkt = "0.7" +prost-types = { workspace = true } rand = { version = "0.9", optional = true } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = { workspace = true } tonic = { workspace = true } +tonic-prost = { workspace = true } uuid = { version = "1.18", features = ["v4"], optional = true } [build-dependencies] -tonic-build = { workspace = true } -prost-build = "0.13" -prost-wkt-build = "0.6" +tonic-prost-build = { workspace = true } [lints] workspace = true diff --git a/sdk-core-protos/build.rs b/sdk-core-protos/build.rs index 68b964fd1..3f2fe8a3c 100644 --- a/sdk-core-protos/build.rs +++ b/sdk-core-protos/build.rs @@ -1,5 +1,7 @@ use std::{env, path::PathBuf}; +use tonic_prost_build::Config; + static ALWAYS_SERDE: &str = "#[cfg_attr(not(feature = \"serde_serialize\"), \ derive(::serde::Serialize, ::serde::Deserialize))]"; @@ -7,7 +9,7 @@ fn main() -> Result<(), Box> { println!("cargo:rerun-if-changed=./protos"); let out = PathBuf::from(env::var("OUT_DIR").unwrap()); let descriptor_file = out.join("descriptors.bin"); - tonic_build::configure() + tonic_prost_build::configure() // We don't actually want to build the grpc definitions - we don't need them (for now). // Just build the message structs. .build_server(false) @@ -96,29 +98,14 @@ fn main() -> Result<(), Box> { "coresdk.external_data.LocalActivityMarkerData.backoff", "#[serde(with = \"opt_duration\")]", ) - .extern_path( - ".google.protobuf.Any", - "::prost_wkt_types::Any" - ) - .extern_path( - ".google.protobuf.Timestamp", - "::prost_wkt_types::Timestamp" - ) - .extern_path( - ".google.protobuf.Duration", - "::prost_wkt_types::Duration" - ) - .extern_path( - ".google.protobuf.Value", - "::prost_wkt_types::Value" - ) - .extern_path( - ".google.protobuf.FieldMask", - "::prost_wkt_types::FieldMask" - ) .file_descriptor_set_path(descriptor_file) - .skip_debug("temporal.api.common.v1.Payload") - .compile_protos( + .skip_debug(["temporal.api.common.v1.Payload"]) + .compile_with_config( + { + let mut c = Config::new(); + c.enable_type_names(); + c + }, &[ "./protos/local/temporal/sdk/core/core_interface.proto", "./protos/api_upstream/temporal/api/workflowservice/v1/service.proto", diff --git a/sdk-core-protos/src/history_builder.rs b/sdk-core-protos/src/history_builder.rs index 9aef52737..c197dd684 100644 --- a/sdk-core-protos/src/history_builder.rs +++ b/sdk-core-protos/src/history_builder.rs @@ -23,7 +23,7 @@ use crate::{ }, }; use anyhow::bail; -use prost_wkt_types::Timestamp; +use prost_types::Timestamp; use std::{ collections::HashMap, time::{Duration, SystemTime}, diff --git a/sdk-core-protos/src/lib.rs b/sdk-core-protos/src/lib.rs index b6ca0048d..8b9242c3e 100644 --- a/sdk-core-protos/src/lib.rs +++ b/sdk-core-protos/src/lib.rs @@ -396,7 +396,7 @@ pub mod coresdk { } pub mod external_data { - use prost_wkt_types::{Duration, Timestamp}; + use prost_types::{Duration, Timestamp}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; tonic::include_proto!("coresdk.external_data"); @@ -494,7 +494,7 @@ pub mod coresdk { query::v1::WorkflowQuery, }, }; - use prost_wkt_types::Timestamp; + use prost_types::Timestamp; use std::fmt::{Display, Formatter}; tonic::include_proto!("coresdk.workflow_activation"); @@ -2151,8 +2151,7 @@ pub mod temporal { enums::v1::EventType, history::v1::history_event::Attributes, }; use anyhow::bail; - use prost::alloc::fmt::Formatter; - use std::fmt::Display; + use std::fmt::{Display, Formatter}; tonic::include_proto!("temporal.api.history.v1"); @@ -2693,8 +2692,8 @@ pub mod temporal { } fn elapsed_between_prost_times( - from: prost_wkt_types::Timestamp, - to: prost_wkt_types::Timestamp, + from: prost_types::Timestamp, + to: prost_types::Timestamp, ) -> Option> { let from: Result = from.try_into(); let to: Result = to.try_into(); diff --git a/sdk-core-protos/src/utilities.rs b/sdk-core-protos/src/utilities.rs index 715572b43..9a8decd9d 100644 --- a/sdk-core-protos/src/utilities.rs +++ b/sdk-core-protos/src/utilities.rs @@ -20,13 +20,10 @@ where /// Use to encode an message into a proto `Any`. /// /// Delete this once `prost_wkt_types` supports `prost` `0.12.x` which has built-in any packing. -pub fn pack_any( - type_url: String, - msg: &T, -) -> Result { +pub fn pack_any(type_url: String, msg: &T) -> Result { let mut value = Vec::new(); Message::encode(msg, &mut value)?; - Ok(prost_wkt_types::Any { type_url, value }) + Ok(prost_types::Any { type_url, value }) } /// Given a header map, lowercase all the keys and return it as a new map. diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 97039c2cb..f6f70de5a 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -16,9 +16,15 @@ anyhow = "1.0" derive_more = { workspace = true } futures-util = { version = "0.3", default-features = false } parking_lot = { version = "0.12", features = ["send_guard"] } -prost-types = { version = "0.6", package = "prost-wkt-types" } +prost-types = { workspace = true } serde = "1.0" -tokio = { version = "1.47", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs"] } +tokio = { version = "1.47", features = [ + "rt", + "rt-multi-thread", + "parking_lot", + "time", + "fs", +] } tokio-util = { version = "0.7" } tokio-stream = "0.1" tracing = "0.1"