Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/prometheus_remote_write_path.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added `path` configuration option to `prometheus_remote_write` source to allow accepting metrics on custom URL paths instead of only the root path. This enables configuration of endpoints like `/api/v1/write` to match standard Prometheus remote write conventions.

authors: elohmeier
106 changes: 105 additions & 1 deletion src/sources/prometheus/remote_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ pub struct PrometheusRemoteWriteConfig {
#[configurable(metadata(docs::examples = "0.0.0.0:9090"))]
address: SocketAddr,

/// The URL path on which metric POST requests are accepted.
#[serde(default = "default_path")]
#[configurable(metadata(docs::examples = "/api/v1/write"))]
#[configurable(metadata(docs::examples = "/remote-write"))]
path: String,

#[configurable(derived)]
tls: Option<TlsEnableableConfig>,

Expand Down Expand Up @@ -66,6 +72,7 @@ impl PrometheusRemoteWriteConfig {
pub fn from_address(address: SocketAddr) -> Self {
Self {
address,
path: default_path(),
tls: None,
auth: None,
acknowledgements: false.into(),
Expand All @@ -75,10 +82,15 @@ impl PrometheusRemoteWriteConfig {
}
}

fn default_path() -> String {
"/".to_string()
}

impl GenerateConfig for PrometheusRemoteWriteConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
address: "127.0.0.1:9090".parse().unwrap(),
path: default_path(),
tls: None,
auth: None,
acknowledgements: SourceAcknowledgementsConfig::default(),
Expand All @@ -98,7 +110,7 @@ impl SourceConfig for PrometheusRemoteWriteConfig {
};
source.run(
self.address,
"",
self.path.as_str(),
HttpMethod::Post,
StatusCode::OK,
true,
Expand Down Expand Up @@ -203,6 +215,7 @@ mod test {
.http_protocol_name();
let source = PrometheusRemoteWriteConfig {
address,
path: default_path(),
auth: None,
tls: tls.clone(),
acknowledgements: SourceAcknowledgementsConfig::default(),
Expand Down Expand Up @@ -317,6 +330,7 @@ mod test {

let source = PrometheusRemoteWriteConfig {
address,
path: default_path(),
auth: None,
tls: None,
acknowledgements: SourceAcknowledgementsConfig::default(),
Expand Down Expand Up @@ -387,6 +401,7 @@ mod test {

let source = PrometheusRemoteWriteConfig {
address,
path: default_path(),
auth: None,
tls: None,
acknowledgements: SourceAcknowledgementsConfig::default(),
Expand Down Expand Up @@ -456,6 +471,7 @@ mod test {

let source = PrometheusRemoteWriteConfig {
address,
path: default_path(),
auth: None,
tls: None,
acknowledgements: SourceAcknowledgementsConfig::default(),
Expand Down Expand Up @@ -531,6 +547,93 @@ mod test {
assert_eq!(valid_metric.name(), "test_metric_valid");
assert_eq!(valid_metric.value(), &MetricValue::Gauge { value: 42.0 });
}

#[tokio::test]
async fn receives_metrics_on_custom_path() {
let address = test_util::next_addr();
let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);

let source = PrometheusRemoteWriteConfig {
address,
path: "/api/v1/write".to_string(),
auth: None,
tls: None,
acknowledgements: SourceAcknowledgementsConfig::default(),
keepalive: KeepaliveConfig::default(),
skip_nan_values: false,
};
let source = source
.build(SourceContext::new_test(tx, None))
.await
.unwrap();
tokio::spawn(source);
wait_for_tcp(address).await;

let sink = RemoteWriteConfig {
endpoint: format!("http://localhost:{}/api/v1/write", address.port()),
..Default::default()
};
let (sink, _) = sink
.build(SinkContext::default())
.await
.expect("Error building config.");

let events = make_events();
let events_copy = events.clone();
let mut output = test_util::spawn_collect_ready(
async move {
sink.run_events(events_copy).await.unwrap();
},
rx,
1,
)
.await;

// The MetricBuffer used by the sink may reorder the metrics, so
// put them back into order before comparing.
output.sort_unstable_by_key(|event| event.as_metric().name().to_owned());

vector_lib::assert_event_data_eq!(events, output);
}

#[tokio::test]
async fn rejects_metrics_on_wrong_path() {
let address = test_util::next_addr();
let (tx, _rx) = SourceSender::new_test_finalize(EventStatus::Delivered);

let source = PrometheusRemoteWriteConfig {
address,
path: "/api/v1/write".to_string(),
auth: None,
tls: None,
acknowledgements: SourceAcknowledgementsConfig::default(),
keepalive: KeepaliveConfig::default(),
skip_nan_values: false,
};
let source = source
.build(SourceContext::new_test(tx, None))
.await
.unwrap();
tokio::spawn(source);
wait_for_tcp(address).await;

// Try to send to the root path, which should be rejected
let client = reqwest::Client::new();
let response = client
.post(format!("http://localhost:{}/wrong/path", address.port()))
.header("Content-Type", "application/x-protobuf")
.body(vec![])
.send()
.await
.unwrap();

// Should return an error status code since we're sending to the wrong path
assert!(
response.status().is_client_error(),
"Expected 4xx error, got {}",
response.status()
);
}
}

#[cfg(all(test, feature = "prometheus-integration-tests"))]
Expand Down Expand Up @@ -565,6 +668,7 @@ mod integration_tests {
// maybe there's a way to do a one-shot remote write from Prometheus? Not sure.
let config = PrometheusRemoteWriteConfig {
address: source_receive_address(),
path: default_path(),
auth: None,
tls: None,
acknowledgements: SourceAcknowledgementsConfig::default(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ generated: components: sources: prometheus_remote_write: configuration: {
}
}
}
path: {
description: "The URL path on which metric POST requests are accepted."
required: false
type: string: {
default: "/"
examples: ["/api/v1/write", "/remote-write"]
}
}
skip_nan_values: {
description: """
Whether to skip/discard received samples with NaN values.
Expand Down
Loading