Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ derive_more = { version = "2.0", features = [
"try_into",
] }
thiserror = "2"
tonic = "0.13"
tonic-build = "0.13"
opentelemetry = { version = "0.30", features = ["metrics"] }
prost = "0.13"
prost-types = "0.13"
tonic = "0.14"
tonic-prost = "0.14"
tonic-prost-build = "0.14"
opentelemetry = { version = "0.31", features = ["metrics"] }
prost = "0.14"
prost-types = { version = "0.7", package = "prost-wkt-types" }

[workspace.lints.rust]
unreachable_pub = "warn"
Expand Down
2 changes: 1 addition & 1 deletion core-c-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ thiserror = { workspace = true }
cbindgen = { version = "0.29", default-features = false }

[features]
xz2-static = ["xz2/static"]
xz2-static = ["xz2/static"]
6 changes: 3 additions & 3 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ itertools = "0.14"
lru = "0.16"
mockall = "0.13"
opentelemetry = { workspace = true, features = ["metrics"], optional = true }
opentelemetry_sdk = { version = "0.30", features = [
opentelemetry_sdk = { version = "0.31", features = [
"rt-tokio",
"metrics",
"spec_unstable_metrics_views",
], optional = true }
opentelemetry-otlp = { version = "0.30", features = [
opentelemetry-otlp = { version = "0.31", features = [
"tokio",
"metrics",
"tls",
Expand All @@ -81,7 +81,7 @@ pid = "4.0"
pin-project = "1.1"
prometheus = { version = "0.14", optional = true }
prost = { workspace = true }
prost-types = { version = "0.6", package = "prost-wkt-types" }
prost-types = { workspace = true }
rand = "0.9"
reqwest = { version = "0.12", features = [
"json",
Expand Down
4 changes: 2 additions & 2 deletions core/src/core_tests/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ async fn initial_request_sent_back(#[values(false, true)] reject: bool) {
.returning(move |mut resp| {
let msg = resp.messages.pop().unwrap();
let orig_req = if reject {
let acceptance = msg.body.unwrap().unpack_as(Rejection::default()).unwrap();
let acceptance = msg.body.unwrap().to_msg::<Rejection>().unwrap();
acceptance.rejected_request.unwrap()
} else {
let acceptance = msg.body.unwrap().unpack_as(Acceptance::default()).unwrap();
let acceptance = msg.body.unwrap().to_msg::<Acceptance>().unwrap();
acceptance.accepted_request.unwrap()
};
assert_eq!(orig_req, upd_req_body);
Expand Down
15 changes: 4 additions & 11 deletions core/src/protosext/protocol_messages.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{anyhow, bail};
use anyhow::anyhow;
use std::collections::HashMap;
use temporal_sdk_core_protos::temporal::api::{
common::v1::Payload,
Expand Down Expand Up @@ -108,16 +108,9 @@ impl TryFrom<Option<prost_types::Any>> for IncomingProtocolMessageBody {

fn try_from(v: Option<prost_types::Any>) -> Result<Self, Self::Error> {
let v = v.ok_or_else(|| anyhow!("Protocol message body must be populated"))?;
// Undo explicit type url checks when https://github.com/fdeantoni/prost-wkt/issues/48 is
// fixed
Ok(match v.type_url.as_str() {
"type.googleapis.com/temporal.api.update.v1.Request" => {
IncomingProtocolMessageBody::UpdateRequest(
v.unpack_as(update::v1::Request::default())?.try_into()?,
)
}
o => bail!("Could not understand protocol message type {o}"),
})
Ok(IncomingProtocolMessageBody::UpdateRequest(
v.to_msg::<update::v1::Request>()?.try_into()?,
))
}
}

Expand Down
11 changes: 8 additions & 3 deletions core/src/worker/workflow/workflow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ impl WFStream {
#[derive(derive_more::From, Debug)]
enum WFStreamInput {
NewWft(Box<PermittedWFT>),
Local(LocalInput),
Local(Box<LocalInput>),
/// The stream given to us which represents the poller (or a mock) terminated.
PollerDead,
/// The stream given to us which represents the poller (or a mock) encountered a non-retryable
Expand All @@ -590,6 +590,11 @@ enum WFStreamInput {
auto_reply_fail_tt: Option<TaskToken>,
},
}
impl From<LocalInput> for WFStreamInput {
fn from(input: LocalInput) -> Self {
WFStreamInput::Local(Box::new(input))
}
}

/// A non-poller-received input to the [WFStream]
#[derive(derive_more::Debug)]
Expand Down Expand Up @@ -672,10 +677,10 @@ impl From<ExternalPollerInputs> for WFStreamInput {
paginator,
update,
span,
} => WFStreamInput::Local(LocalInput {
} => WFStreamInput::Local(Box::new(LocalInput {
input: LocalInputs::FetchedPageCompletion { paginator, update },
span,
}),
})),
}
}
}
Expand Down
9 changes: 4 additions & 5 deletions sdk-core-protos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@ anyhow = "1.0"
base64 = "0.22"
derive_more = { workspace = true }
prost = { workspace = true }
prost-wkt = "0.6"
prost-wkt-types = "0.6"
prost-wkt = "0.7"
prost-types = { workspace = true }
rand = { version = "0.9", optional = true }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = { workspace = true }
tonic = { workspace = true }
tonic-prost = { workspace = true }
uuid = { version = "1.18", features = ["v4"], optional = true }

[build-dependencies]
tonic-build = { workspace = true }
prost-build = "0.13"
prost-wkt-build = "0.6"
tonic-prost-build = { workspace = true }

[lints]
workspace = true
33 changes: 10 additions & 23 deletions sdk-core-protos/build.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::{env, path::PathBuf};

use tonic_prost_build::Config;

static ALWAYS_SERDE: &str = "#[cfg_attr(not(feature = \"serde_serialize\"), \
derive(::serde::Serialize, ::serde::Deserialize))]";

fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("cargo:rerun-if-changed=./protos");
let out = PathBuf::from(env::var("OUT_DIR").unwrap());
let descriptor_file = out.join("descriptors.bin");
tonic_build::configure()
tonic_prost_build::configure()
// We don't actually want to build the grpc definitions - we don't need them (for now).
// Just build the message structs.
.build_server(false)
Expand Down Expand Up @@ -96,29 +98,14 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"coresdk.external_data.LocalActivityMarkerData.backoff",
"#[serde(with = \"opt_duration\")]",
)
.extern_path(
".google.protobuf.Any",
"::prost_wkt_types::Any"
)
.extern_path(
".google.protobuf.Timestamp",
"::prost_wkt_types::Timestamp"
)
.extern_path(
".google.protobuf.Duration",
"::prost_wkt_types::Duration"
)
.extern_path(
".google.protobuf.Value",
"::prost_wkt_types::Value"
)
.extern_path(
".google.protobuf.FieldMask",
"::prost_wkt_types::FieldMask"
)
.file_descriptor_set_path(descriptor_file)
.skip_debug("temporal.api.common.v1.Payload")
.compile_protos(
.skip_debug(["temporal.api.common.v1.Payload"])
.compile_with_config(
{
let mut c = Config::new();
c.enable_type_names();
c
},
&[
"./protos/local/temporal/sdk/core/core_interface.proto",
"./protos/api_upstream/temporal/api/workflowservice/v1/service.proto",
Expand Down
2 changes: 1 addition & 1 deletion sdk-core-protos/src/history_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
},
};
use anyhow::bail;
use prost_wkt_types::Timestamp;
use prost_types::Timestamp;
use std::{
collections::HashMap,
time::{Duration, SystemTime},
Expand Down
11 changes: 5 additions & 6 deletions sdk-core-protos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ pub mod coresdk {
}

pub mod external_data {
use prost_wkt_types::{Duration, Timestamp};
use prost_types::{Duration, Timestamp};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
tonic::include_proto!("coresdk.external_data");

Expand Down Expand Up @@ -494,7 +494,7 @@ pub mod coresdk {
query::v1::WorkflowQuery,
},
};
use prost_wkt_types::Timestamp;
use prost_types::Timestamp;
use std::fmt::{Display, Formatter};

tonic::include_proto!("coresdk.workflow_activation");
Expand Down Expand Up @@ -2151,8 +2151,7 @@ pub mod temporal {
enums::v1::EventType, history::v1::history_event::Attributes,
};
use anyhow::bail;
use prost::alloc::fmt::Formatter;
use std::fmt::Display;
use std::fmt::{Display, Formatter};

tonic::include_proto!("temporal.api.history.v1");

Expand Down Expand Up @@ -2693,8 +2692,8 @@ pub mod temporal {
}

fn elapsed_between_prost_times(
from: prost_wkt_types::Timestamp,
to: prost_wkt_types::Timestamp,
from: prost_types::Timestamp,
to: prost_types::Timestamp,
) -> Option<Option<Duration>> {
let from: Result<SystemTime, _> = from.try_into();
let to: Result<SystemTime, _> = to.try_into();
Expand Down
7 changes: 2 additions & 5 deletions sdk-core-protos/src/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@ where
/// Use to encode an message into a proto `Any`.
///
/// Delete this once `prost_wkt_types` supports `prost` `0.12.x` which has built-in any packing.
pub fn pack_any<T: Message>(
type_url: String,
msg: &T,
) -> Result<prost_wkt_types::Any, EncodeError> {
pub fn pack_any<T: Message>(type_url: String, msg: &T) -> Result<prost_types::Any, EncodeError> {
let mut value = Vec::new();
Message::encode(msg, &mut value)?;
Ok(prost_wkt_types::Any { type_url, value })
Ok(prost_types::Any { type_url, value })
}

/// Given a header map, lowercase all the keys and return it as a new map.
Expand Down
10 changes: 8 additions & 2 deletions sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ anyhow = "1.0"
derive_more = { workspace = true }
futures-util = { version = "0.3", default-features = false }
parking_lot = { version = "0.12", features = ["send_guard"] }
prost-types = { version = "0.6", package = "prost-wkt-types" }
prost-types = { workspace = true }
serde = "1.0"
tokio = { version = "1.47", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs"] }
tokio = { version = "1.47", features = [
"rt",
"rt-multi-thread",
"parking_lot",
"time",
"fs",
] }
tokio-util = { version = "0.7" }
tokio-stream = "0.1"
tracing = "0.1"
Expand Down