Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3e9f5e9
feat(engine): wire extensions and capabilities into runtime pipeline
gouslu May 6, 2026
0ac34d0
refactor(config): unify URN format and remove NodeKind::Extension
gouslu May 6, 2026
ebd985f
Merge remote-tracking branch 'origin/main' into gouslu/extension-syst…
gouslu May 6, 2026
37d9222
docs(macros): tighten capability receiver-shape docs
gouslu May 6, 2026
7490ae4
refactor(engine): extract ExtensionLifecycle from runtime_pipeline
gouslu May 6, 2026
7c79d84
Fix recordset_kql tests; clarify supervisor select intent
gouslu May 6, 2026
040ae95
Merge remote-tracking branch 'origin/main' into gouslu/extension-syst…
gouslu May 7, 2026
7c5a924
Merge branch 'main' into gouslu/extension-system-p1-pr4
gouslu May 8, 2026
c84f138
Fix host_metrics_receiver factory signature
gouslu May 8, 2026
916d24e
fix(comparison_dashboard): rename otap-otap.yaml to .j2 to fix config…
gouslu May 8, 2026
44ca58f
Merge branch 'main' into gouslu/extension-system-p1-pr4
gouslu May 8, 2026
19f9a2f
Merge branch 'main' into gouslu/extension-system-p1-pr4
gouslu May 11, 2026
c4818b7
Merge branch 'main' into gouslu/extension-system-p1-pr4
gouslu May 11, 2026
64aadeb
refactor(engine): move NoOp capabilities into testing module
gouslu May 12, 2026
fb0a711
Merge branch 'main' into gouslu/extension-system-p1-pr4
gouslu May 12, 2026
845768b
test(engine): scope dead_code allow to the unread Arc field
gouslu May 12, 2026
1ba8f43
test(engine): drop vestigial Arc field from ConstructedImpl
gouslu May 12, 2026
17a948a
Merge branch 'main' into gouslu/extension-system-p1-pr4
gouslu May 12, 2026
cae139a
macro(capability): reject method-level generics and destructured params
gouslu May 12, 2026
aa8d0f8
docs(extensions): clarify Phase 1 `shared` extension sharing boundary
gouslu May 12, 2026
d17b453
engine(runtime): broadcast Shutdown to extensions on error path with …
gouslu May 12, 2026
f9a67ea
controller(startup): statically validate extension URN + config
gouslu May 12, 2026
482feb2
configs: remove orphan fake-with-extension.yaml
gouslu May 12, 2026
e56d2f3
docs(extensions): document the per-core runtime cost of start()
gouslu May 12, 2026
7ae2cc4
docs(extensions): align table columns to satisfy MD060
gouslu May 13, 2026
77dd301
Merge branch 'main' into gouslu/extension-system-p1-pr4
gouslu May 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/otap-dataflow/configs/fake-with-extension.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ groups:
main:
extensions:
sample_kv_store:
type: "urn:otap:sample_shared_key_value_store"
type: "urn:otap:extension:sample_shared_key_value_store"
Comment thread
gouslu marked this conversation as resolved.
Outdated

nodes:
generator:
Expand Down
4 changes: 2 additions & 2 deletions rust/otap-dataflow/crates/config/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mod tests {
#[test]
fn test_extension_user_config_deserialize() {
let yaml = r#"
type: "urn:otap:sample_kv_store"
type: "urn:otap:extension:sample_kv_store"
config:
capacity: 100
"#;
Expand All @@ -74,7 +74,7 @@ config:
#[test]
fn test_extension_user_config_rejects_capabilities() {
let yaml = r#"
type: "urn:otap:auth"
type: "urn:otap:extension:auth"
capabilities:
some_cap: "ext"
"#;
Expand Down
166 changes: 75 additions & 91 deletions rust/otap-dataflow/crates/config/src/extension_urn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,38 @@

//! Extension URN parsing and validation.
//!
//! Extension URNs use a simpler 3-segment format (`urn:<namespace>:<id>`)
//! compared to the 4-segment node URN format (`urn:<namespace>:<kind>:<id>`).
//! The `<kind>` segment is unnecessary because extensions are always
//! identified by their position in the `extensions:` config section.
//! Extension URNs follow the canonical form `urn:<namespace>:extension:<id>`,
//! mirroring the `urn:<namespace>:<kind>:<id>` shape used by node URNs
//! (with the kind segment fixed to the literal `extension`). The shortcut
//! `extension:<id>` expands to `urn:otel:extension:<id>`.
//!
//! Extensions are intentionally NOT modeled as a node kind, and
//! [`ExtensionUrn`] is a distinct type from [`crate::node_urn::NodeUrn`]
//! so the two cannot be confused. The underlying parsing primitives are
//! shared via the private [`crate::urn`] module.

use crate::error::Error;
use crate::urn::{build_canonical_urn, parse_kinded_urn};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::ops::Range;

/// The single kind segment accepted by extension URNs.
const EXTENSION_KINDS: &[&str] = &["extension"];

/// Human-friendly URN label used in error messages.
const URN_LABEL: &str = "extension URN";

/// Extension type URN with zero-copy access to namespace and id segments.
///
/// Format: `urn:<namespace>:<id>` (e.g., `urn:otap:sample_kv_store`)
/// Canonical form: `urn:<namespace>:extension:<id>` (e.g.,
/// `urn:microsoft:extension:azure_identity_auth`). The kind segment is
/// fixed to the literal `extension`, mirroring the
/// `urn:<namespace>:<kind>:<id>` convention used by receivers,
/// processors, and exporters.
///
/// Unlike [`NodeUrn`](crate::node_urn::NodeUrn), extension URNs have no
/// `<kind>` segment — the kind is implicit from the `extensions:` config
/// section.
///
/// Short form `<id>` is also accepted and expanded to `urn:otel:<id>`.
/// Short form `extension:<id>` is also accepted and expanded to
/// `urn:otel:extension:<id>`.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
#[serde(try_from = "String", into = "String")]
#[schemars(with = "String")]
Expand Down Expand Up @@ -51,74 +64,15 @@ impl ExtensionUrn {
}

/// Parses an extension URN.
///
/// Accepted formats:
/// - `urn:<namespace>:<id>` (canonical)
/// - `<id>` (short form, expanded to `urn:otel:<id>`)
pub fn parse(raw: &str) -> Result<Self, Error> {
let raw = raw.trim();
let parts: Vec<&str> = raw.split(':').collect();

match parts.as_slice() {
// Short form: just <id> → urn:otel:<id>
[id] => {
validate_segment(raw, id, "id")?;
Ok(build_extension_urn("otel", id))
}
// Full form: urn:<namespace>:<id>
[scheme, namespace, id] => {
if !scheme.eq_ignore_ascii_case("urn") {
return Err(Error::InvalidUserConfig {
error: format!(
"Invalid extension URN `{raw}`: expected `urn:<namespace>:<id>` \
or `<id>`"
),
});
}
validate_segment(raw, namespace, "namespace")?;
validate_segment(raw, id, "id")?;
let namespace = namespace.to_ascii_lowercase();
Ok(build_extension_urn(&namespace, id))
}
_ => Err(Error::InvalidUserConfig {
error: format!(
"Invalid extension URN `{raw}`: expected `urn:<namespace>:<id>` or `<id>`"
),
}),
}
}
}

fn validate_segment(raw: &str, segment: &str, label: &str) -> Result<(), Error> {
if segment.is_empty() {
return Err(Error::InvalidUserConfig {
error: format!("Invalid extension URN `{raw}`: {label} segment is empty"),
});
}
if !segment
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
{
return Err(Error::InvalidUserConfig {
error: format!(
"Invalid extension URN `{raw}`: {label} segment `{segment}` \
contains invalid characters (only alphanumeric, `_`, `-` allowed)"
),
});
}
Ok(())
}

fn build_extension_urn(namespace: &str, id: &str) -> ExtensionUrn {
let raw = format!("urn:{namespace}:{id}");
let ns_start = 4; // "urn:".len()
let ns_end = ns_start + namespace.len();
let id_start = ns_end + 1; // ":"
let id_end = id_start + id.len();
ExtensionUrn {
raw,
namespace_range: ns_start..ns_end,
id_range: id_start..id_end,
let parsed = parse_kinded_urn(raw, EXTENSION_KINDS, URN_LABEL)?;
let (raw, namespace_range, id_range) =
build_canonical_urn(&parsed.namespace, parsed.kind, &parsed.id);
Ok(ExtensionUrn {
raw,
namespace_range,
id_range,
})
}
}

Expand All @@ -134,6 +88,12 @@ impl AsRef<str> for ExtensionUrn {
}
}

impl std::borrow::Borrow<str> for ExtensionUrn {
fn borrow(&self) -> &str {
self.as_str()
}
}

impl From<ExtensionUrn> for String {
fn from(value: ExtensionUrn) -> Self {
value.raw
Expand All @@ -142,8 +102,9 @@ impl From<ExtensionUrn> for String {

impl TryFrom<String> for ExtensionUrn {
type Error = Error;

fn try_from(value: String) -> Result<Self, Self::Error> {
Self::parse(&value)
Self::parse(value.as_str())
}
}

Expand All @@ -158,31 +119,54 @@ mod tests {
use super::*;

#[test]
fn test_parse_full_urn() {
let urn = ExtensionUrn::parse("urn:otap:sample_kv_store").unwrap();
fn test_parse_canonical_4_segment_urn() {
let urn = ExtensionUrn::parse("urn:otap:extension:sample_kv_store").unwrap();
assert_eq!(urn.namespace(), "otap");
assert_eq!(urn.id(), "sample_kv_store");
assert_eq!(urn.as_str(), "urn:otap:sample_kv_store");
assert_eq!(urn.as_str(), "urn:otap:extension:sample_kv_store");
}

#[test]
fn test_parse_short_form() {
let urn = ExtensionUrn::parse("my_auth").unwrap();
let urn = ExtensionUrn::parse("extension:my_auth").unwrap();
assert_eq!(urn.namespace(), "otel");
assert_eq!(urn.id(), "my_auth");
assert_eq!(urn.as_str(), "urn:otel:my_auth");
assert_eq!(urn.as_str(), "urn:otel:extension:my_auth");
}

#[test]
fn test_parse_short_form_rejects_other_kinds() {
// The short form's kind segment must be `extension`.
assert!(ExtensionUrn::parse("receiver:my_thing").is_err());
assert!(ExtensionUrn::parse("processor:my_thing").is_err());
assert!(ExtensionUrn::parse("exporter:my_thing").is_err());
}

#[test]
fn test_parse_case_insensitive_scheme() {
let urn = ExtensionUrn::parse("URN:Microsoft:azure_auth").unwrap();
fn test_parse_case_insensitive_scheme_and_kind() {
let urn = ExtensionUrn::parse("URN:Microsoft:Extension:azure_auth").unwrap();
assert_eq!(urn.namespace(), "microsoft");
assert_eq!(urn.id(), "azure_auth");
}

#[test]
fn test_parse_rejects_node_urn_format() {
assert!(ExtensionUrn::parse("urn:otap:extension:sample_kv_store").is_err());
fn test_parse_rejects_3_segment_form() {
// The pre-existing 3-segment form is no longer accepted; users
// must use the 4-segment canonical form.
assert!(ExtensionUrn::parse("urn:otap:sample_kv_store").is_err());
}

#[test]
fn test_parse_rejects_4_segment_with_other_kind() {
// Any kind other than `extension` is rejected.
assert!(ExtensionUrn::parse("urn:otap:receiver:foo").is_err());
assert!(ExtensionUrn::parse("urn:otap:processor:foo").is_err());
assert!(ExtensionUrn::parse("urn:otap:exporter:foo").is_err());
}

#[test]
fn test_parse_rejects_bare_id() {
assert!(ExtensionUrn::parse("my_auth").is_err());
}

#[test]
Expand All @@ -192,20 +176,20 @@ mod tests {

#[test]
fn test_parse_rejects_invalid_chars() {
assert!(ExtensionUrn::parse("urn:otap:bad name").is_err());
assert!(ExtensionUrn::parse("urn:otap:extension:bad name").is_err());
}

#[test]
fn test_from_static_str() {
let urn: ExtensionUrn = "urn:otap:test_ext".into();
let urn: ExtensionUrn = "urn:otap:extension:test_ext".into();
assert_eq!(urn.id(), "test_ext");
}

#[test]
fn test_serde_roundtrip() {
let urn = ExtensionUrn::parse("urn:otap:my_ext").unwrap();
let urn = ExtensionUrn::parse("urn:otap:extension:my_ext").unwrap();
let json = serde_json::to_string(&urn).unwrap();
assert_eq!(json, "\"urn:otap:my_ext\"");
assert_eq!(json, "\"urn:otap:extension:my_ext\"");
let parsed: ExtensionUrn = serde_json::from_str(&json).unwrap();
assert_eq!(parsed, urn);
}
Expand Down
2 changes: 2 additions & 0 deletions rust/otap-dataflow/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub mod topic;
pub mod transport_headers;
/// Transport header capture and propagation policy declarations.
pub mod transport_headers_policy;
/// Shared URN parsing primitives used by [`node_urn`] and [`extension_urn`].
mod urn;
pub use topic::{
SubscriptionGroupName, TopicAckPropagationMode, TopicAckPropagationPolicies, TopicBackendKind,
TopicBroadcastOnLagPolicy, TopicImplSelectionPolicy, TopicName,
Expand Down
5 changes: 0 additions & 5 deletions rust/otap-dataflow/crates/config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ pub enum NodeKind {
Processor,
/// A sink of signals
Exporter,
/// A provider of shared capabilities (e.g., auth, service discovery).
Extension,

// ToDo(LQ) : Add more node kinds as needed.
// A connector between two pipelines
Expand All @@ -185,7 +183,6 @@ impl From<NodeKind> for Cow<'static, str> {
NodeKind::Receiver => "receiver".into(),
NodeKind::Processor => "processor".into(),
NodeKind::Exporter => "exporter".into(),
NodeKind::Extension => "extension".into(),
NodeKind::ProcessorChain => "processor_chain".into(),
}
}
Expand Down Expand Up @@ -292,7 +289,6 @@ impl NodeUserConfig {
NodeKind::Processor => "processor",
NodeKind::Exporter => "exporter",
NodeKind::ProcessorChain => "processor_chain",
NodeKind::Extension => "extension",
NodeKind::Receiver => unreachable!(),
}
),
Expand All @@ -308,7 +304,6 @@ impl NodeUserConfig {
NodeKind::Receiver => "receiver",
NodeKind::Processor => "processor",
NodeKind::ProcessorChain => "processor_chain",
NodeKind::Extension => "extension",
NodeKind::Exporter => unreachable!(),
}
),
Expand Down
Loading
Loading