Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/otap-dataflow/configs/fake-with-extension.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ groups:
main:
extensions:
sample_kv_store:
type: "urn:otap:sample_shared_key_value_store"
type: "urn:otap:extension:sample_shared_key_value_store"

nodes:
generator:
Expand Down
4 changes: 2 additions & 2 deletions rust/otap-dataflow/crates/config/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mod tests {
#[test]
fn test_extension_user_config_deserialize() {
let yaml = r#"
type: "urn:otap:sample_kv_store"
type: "urn:otap:extension:sample_kv_store"
config:
capacity: 100
"#;
Expand All @@ -74,7 +74,7 @@ config:
#[test]
fn test_extension_user_config_rejects_capabilities() {
let yaml = r#"
type: "urn:otap:auth"
type: "urn:otap:extension:auth"
capabilities:
some_cap: "ext"
"#;
Expand Down
166 changes: 75 additions & 91 deletions rust/otap-dataflow/crates/config/src/extension_urn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,38 @@

//! Extension URN parsing and validation.
//!
//! Extension URNs use a simpler 3-segment format (`urn:<namespace>:<id>`)
//! compared to the 4-segment node URN format (`urn:<namespace>:<kind>:<id>`).
//! The `<kind>` segment is unnecessary because extensions are always
//! identified by their position in the `extensions:` config section.
//! Extension URNs follow the canonical form `urn:<namespace>:extension:<id>`,
//! mirroring the `urn:<namespace>:<kind>:<id>` shape used by node URNs
//! (with the kind segment fixed to the literal `extension`). The shortcut
//! `extension:<id>` expands to `urn:otel:extension:<id>`.
//!
//! Extensions are intentionally NOT modeled as a node kind, and
//! [`ExtensionUrn`] is a distinct type from [`crate::node_urn::NodeUrn`]
//! so the two cannot be confused. The underlying parsing primitives are
//! shared via the private [`crate::urn`] module.

use crate::error::Error;
use crate::urn::{build_canonical_urn, parse_kinded_urn};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::ops::Range;

/// The single kind segment accepted by extension URNs.
const EXTENSION_KINDS: &[&str] = &["extension"];

/// Human-friendly URN label used in error messages.
const URN_LABEL: &str = "extension URN";

/// Extension type URN with zero-copy access to namespace and id segments.
///
/// Format: `urn:<namespace>:<id>` (e.g., `urn:otap:sample_kv_store`)
/// Canonical form: `urn:<namespace>:extension:<id>` (e.g.,
/// `urn:microsoft:extension:azure_identity_auth`). The kind segment is
/// fixed to the literal `extension`, mirroring the
/// `urn:<namespace>:<kind>:<id>` convention used by receivers,
/// processors, and exporters.
///
/// Unlike [`NodeUrn`](crate::node_urn::NodeUrn), extension URNs have no
/// `<kind>` segment — the kind is implicit from the `extensions:` config
/// section.
///
/// Short form `<id>` is also accepted and expanded to `urn:otel:<id>`.
/// Short form `extension:<id>` is also accepted and expanded to
/// `urn:otel:extension:<id>`.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
#[serde(try_from = "String", into = "String")]
#[schemars(with = "String")]
Expand Down Expand Up @@ -51,74 +64,15 @@ impl ExtensionUrn {
}

/// Parses an extension URN.
///
/// Accepted formats:
/// - `urn:<namespace>:<id>` (canonical)
/// - `<id>` (short form, expanded to `urn:otel:<id>`)
pub fn parse(raw: &str) -> Result<Self, Error> {
let raw = raw.trim();
let parts: Vec<&str> = raw.split(':').collect();

match parts.as_slice() {
// Short form: just <id> → urn:otel:<id>
[id] => {
validate_segment(raw, id, "id")?;
Ok(build_extension_urn("otel", id))
}
// Full form: urn:<namespace>:<id>
[scheme, namespace, id] => {
if !scheme.eq_ignore_ascii_case("urn") {
return Err(Error::InvalidUserConfig {
error: format!(
"Invalid extension URN `{raw}`: expected `urn:<namespace>:<id>` \
or `<id>`"
),
});
}
validate_segment(raw, namespace, "namespace")?;
validate_segment(raw, id, "id")?;
let namespace = namespace.to_ascii_lowercase();
Ok(build_extension_urn(&namespace, id))
}
_ => Err(Error::InvalidUserConfig {
error: format!(
"Invalid extension URN `{raw}`: expected `urn:<namespace>:<id>` or `<id>`"
),
}),
}
}
}

fn validate_segment(raw: &str, segment: &str, label: &str) -> Result<(), Error> {
if segment.is_empty() {
return Err(Error::InvalidUserConfig {
error: format!("Invalid extension URN `{raw}`: {label} segment is empty"),
});
}
if !segment
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
{
return Err(Error::InvalidUserConfig {
error: format!(
"Invalid extension URN `{raw}`: {label} segment `{segment}` \
contains invalid characters (only alphanumeric, `_`, `-` allowed)"
),
});
}
Ok(())
}

fn build_extension_urn(namespace: &str, id: &str) -> ExtensionUrn {
let raw = format!("urn:{namespace}:{id}");
let ns_start = 4; // "urn:".len()
let ns_end = ns_start + namespace.len();
let id_start = ns_end + 1; // ":"
let id_end = id_start + id.len();
ExtensionUrn {
raw,
namespace_range: ns_start..ns_end,
id_range: id_start..id_end,
let parsed = parse_kinded_urn(raw, EXTENSION_KINDS, URN_LABEL)?;
let (raw, namespace_range, id_range) =
build_canonical_urn(&parsed.namespace, parsed.kind, &parsed.id);
Ok(ExtensionUrn {
raw,
namespace_range,
id_range,
})
}
}

Expand All @@ -134,6 +88,12 @@ impl AsRef<str> for ExtensionUrn {
}
}

impl std::borrow::Borrow<str> for ExtensionUrn {
fn borrow(&self) -> &str {
self.as_str()
}
}

impl From<ExtensionUrn> for String {
fn from(value: ExtensionUrn) -> Self {
value.raw
Expand All @@ -142,8 +102,9 @@ impl From<ExtensionUrn> for String {

impl TryFrom<String> for ExtensionUrn {
type Error = Error;

fn try_from(value: String) -> Result<Self, Self::Error> {
Self::parse(&value)
Self::parse(value.as_str())
}
}

Expand All @@ -158,31 +119,54 @@ mod tests {
use super::*;

#[test]
fn test_parse_full_urn() {
let urn = ExtensionUrn::parse("urn:otap:sample_kv_store").unwrap();
fn test_parse_canonical_4_segment_urn() {
let urn = ExtensionUrn::parse("urn:otap:extension:sample_kv_store").unwrap();
assert_eq!(urn.namespace(), "otap");
assert_eq!(urn.id(), "sample_kv_store");
assert_eq!(urn.as_str(), "urn:otap:sample_kv_store");
assert_eq!(urn.as_str(), "urn:otap:extension:sample_kv_store");
}

#[test]
fn test_parse_short_form() {
let urn = ExtensionUrn::parse("my_auth").unwrap();
let urn = ExtensionUrn::parse("extension:my_auth").unwrap();
assert_eq!(urn.namespace(), "otel");
assert_eq!(urn.id(), "my_auth");
assert_eq!(urn.as_str(), "urn:otel:my_auth");
assert_eq!(urn.as_str(), "urn:otel:extension:my_auth");
}

#[test]
fn test_parse_short_form_rejects_other_kinds() {
// The short form's kind segment must be `extension`.
assert!(ExtensionUrn::parse("receiver:my_thing").is_err());
assert!(ExtensionUrn::parse("processor:my_thing").is_err());
assert!(ExtensionUrn::parse("exporter:my_thing").is_err());
}

#[test]
fn test_parse_case_insensitive_scheme() {
let urn = ExtensionUrn::parse("URN:Microsoft:azure_auth").unwrap();
fn test_parse_case_insensitive_scheme_and_kind() {
let urn = ExtensionUrn::parse("URN:Microsoft:Extension:azure_auth").unwrap();
assert_eq!(urn.namespace(), "microsoft");
assert_eq!(urn.id(), "azure_auth");
}

#[test]
fn test_parse_rejects_node_urn_format() {
assert!(ExtensionUrn::parse("urn:otap:extension:sample_kv_store").is_err());
fn test_parse_rejects_3_segment_form() {
// The pre-existing 3-segment form is no longer accepted; users
// must use the 4-segment canonical form.
assert!(ExtensionUrn::parse("urn:otap:sample_kv_store").is_err());
}

#[test]
fn test_parse_rejects_4_segment_with_other_kind() {
// Any kind other than `extension` is rejected.
assert!(ExtensionUrn::parse("urn:otap:receiver:foo").is_err());
assert!(ExtensionUrn::parse("urn:otap:processor:foo").is_err());
assert!(ExtensionUrn::parse("urn:otap:exporter:foo").is_err());
}

#[test]
fn test_parse_rejects_bare_id() {
assert!(ExtensionUrn::parse("my_auth").is_err());
}

#[test]
Expand All @@ -192,20 +176,20 @@ mod tests {

#[test]
fn test_parse_rejects_invalid_chars() {
assert!(ExtensionUrn::parse("urn:otap:bad name").is_err());
assert!(ExtensionUrn::parse("urn:otap:extension:bad name").is_err());
}

#[test]
fn test_from_static_str() {
let urn: ExtensionUrn = "urn:otap:test_ext".into();
let urn: ExtensionUrn = "urn:otap:extension:test_ext".into();
assert_eq!(urn.id(), "test_ext");
}

#[test]
fn test_serde_roundtrip() {
let urn = ExtensionUrn::parse("urn:otap:my_ext").unwrap();
let urn = ExtensionUrn::parse("urn:otap:extension:my_ext").unwrap();
let json = serde_json::to_string(&urn).unwrap();
assert_eq!(json, "\"urn:otap:my_ext\"");
assert_eq!(json, "\"urn:otap:extension:my_ext\"");
let parsed: ExtensionUrn = serde_json::from_str(&json).unwrap();
assert_eq!(parsed, urn);
}
Expand Down
2 changes: 2 additions & 0 deletions rust/otap-dataflow/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub mod topic;
pub mod transport_headers;
/// Transport header capture and propagation policy declarations.
pub mod transport_headers_policy;
/// Shared URN parsing primitives used by [`node_urn`] and [`extension_urn`].
mod urn;
pub use topic::{
SubscriptionGroupName, TopicAckPropagationMode, TopicAckPropagationPolicies, TopicBackendKind,
TopicBroadcastOnLagPolicy, TopicImplSelectionPolicy, TopicName,
Expand Down
5 changes: 0 additions & 5 deletions rust/otap-dataflow/crates/config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ pub enum NodeKind {
Processor,
/// A sink of signals
Exporter,
/// A provider of shared capabilities (e.g., auth, service discovery).
Extension,

// ToDo(LQ) : Add more node kinds as needed.
// A connector between two pipelines
Expand All @@ -185,7 +183,6 @@ impl From<NodeKind> for Cow<'static, str> {
NodeKind::Receiver => "receiver".into(),
NodeKind::Processor => "processor".into(),
NodeKind::Exporter => "exporter".into(),
NodeKind::Extension => "extension".into(),
NodeKind::ProcessorChain => "processor_chain".into(),
}
}
Expand Down Expand Up @@ -292,7 +289,6 @@ impl NodeUserConfig {
NodeKind::Processor => "processor",
NodeKind::Exporter => "exporter",
NodeKind::ProcessorChain => "processor_chain",
NodeKind::Extension => "extension",
NodeKind::Receiver => unreachable!(),
}
),
Expand All @@ -308,7 +304,6 @@ impl NodeUserConfig {
NodeKind::Receiver => "receiver",
NodeKind::Processor => "processor",
NodeKind::ProcessorChain => "processor_chain",
NodeKind::Extension => "extension",
NodeKind::Exporter => unreachable!(),
}
),
Expand Down
Loading
Loading