diff --git a/Cargo.toml b/Cargo.toml index e2197605..95fb4b5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ default = ["protobuf"] gen = ["protobuf-codegen-pure"] nightly = ["libc"] process = ["libc", "procfs"] -push = ["reqwest", "libc", "protobuf"] +push = ["reqwest", "libc", "protobuf", "snap"] [dependencies] cfg-if = "^1.0" @@ -31,6 +31,7 @@ lazy_static = "^1.4" libc = { version = "^0.2", optional = true } parking_lot = "^0.12" protobuf = { version = "^2.0", optional = true } +snap = { version = "^1.1", optional = true } memchr = "^2.3" reqwest = { version = "^0.11", features = ["blocking"], optional = true } thiserror = "^1.0" diff --git a/src/lib.rs b/src/lib.rs index 55cbea35..23465bca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -224,7 +224,7 @@ pub use self::pulling_gauge::PullingGauge; #[cfg(feature = "push")] pub use self::push::{ hostname_grouping_key, push_add_collector, push_add_metrics, push_collector, push_metrics, - BasicAuthentication, + push_raw, BasicAuthentication, }; pub use self::registry::Registry; pub use self::registry::{default_registry, gather, register, unregister}; diff --git a/src/push.rs b/src/push.rs index 525b3421..5594aa7c 100644 --- a/src/push.rs +++ b/src/push.rs @@ -173,6 +173,62 @@ fn push( } } +/// Pushes snappy compressed data to url as is. +pub fn push_raw( + url: &str, + mfs: Vec, + method: &str, + basic_auth: Option, +) -> Result<()> { + let mut push_url = if url.contains("://") { + url.to_owned() + } else { + format!("http://{}", url) + }; + + if push_url.ends_with('/') { + push_url.pop(); + } + + let encoder = ProtobufEncoder::new(); + let mut proto_buf = Vec::new(); + + for mf in mfs { + // Ignore error, `no metrics` and `no name`. + let _ = encoder.encode(&[mf], &mut proto_buf); + } + + let buf = snap::raw::Encoder::new() + .compress_vec(&proto_buf) + .expect("msg"); + + let mut builder = HTTP_CLIENT + .request( + Method::from_str(method).unwrap(), + Url::from_str(&push_url).unwrap(), + ) + .header(CONTENT_TYPE, encoder.format_type()) + .header("Content-Encoding", "snappy") + .body(buf); + + if let Some(BasicAuthentication { username, password }) = basic_auth { + builder = builder.basic_auth(username, Some(password)); + } + + let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?; + + match response.status() { + StatusCode::ACCEPTED => Ok(()), + StatusCode::OK => Ok(()), + _ => Err(Error::Msg(format!( + "unexpected status code {} while pushing to {} {}", + response.status(), + push_url, + response.text().map(|text| format!("with body `{}`", text)).unwrap_or_default() + ))), + } +} + fn push_from_collector( job: &str, grouping: HashMap,