diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ef01d83fb5..3ecbb57643 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -49,7 +49,7 @@ env: CARGO_INCREMENTAL: 0 CARGO_NET_RETRY: 10 RUSTFLAGS: "-D warnings -A deprecated" - RUSTUP_MAX_RETRIES: 10 + RUSTUP_MAX_RETRIES: 11 concurrency: group: ${{ github.workflow }}-${{ github.head_ref }} diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000000..13566b81b0 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/linkerd2-proxy.iml b/.idea/linkerd2-proxy.iml new file mode 100644 index 0000000000..d6ebd48059 --- /dev/null +++ b/.idea/linkerd2-proxy.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000000..a38bd29c15 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000000..35eb1ddfbb --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/linkerd/app/admin/src/stack.rs b/linkerd/app/admin/src/stack.rs index 3c4874afd9..c5b772b1d6 100644 --- a/linkerd/app/admin/src/stack.rs +++ b/linkerd/app/admin/src/stack.rs @@ -268,8 +268,6 @@ impl Param for Permitted { fn param(&self) -> metrics::EndpointLabels { metrics::InboundEndpointLabels { tls: self.http.tcp.tls.clone(), - authority: None, - target_addr: self.http.tcp.addr.into(), policy: self.permit.labels.clone(), } .into() diff --git a/linkerd/app/core/src/metrics.rs b/linkerd/app/core/src/metrics.rs index 3fd374031b..fd0f434938 100644 --- a/linkerd/app/core/src/metrics.rs +++ b/linkerd/app/core/src/metrics.rs @@ -19,7 +19,6 @@ pub use linkerd_metrics::*; use linkerd_proxy_server_policy as policy; use std::{ fmt::{self, Write}, - net::SocketAddr, sync::Arc, time::Duration, }; @@ -66,8 +65,6 @@ pub enum EndpointLabels { #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct InboundEndpointLabels { pub tls: tls::ConditionalServerTls, - pub authority: Option, - pub target_addr: SocketAddr, pub policy: RouteAuthzLabels, } @@ -99,9 +96,7 @@ pub struct RouteAuthzLabels { #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct OutboundEndpointLabels { pub server_id: tls::ConditionalClientTls, - pub authority: Option, pub labels: Option, - pub target_addr: SocketAddr, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -139,6 +134,22 @@ where } Some(out) } +pub fn prefix_outbound_endpoint_labels<'i, I>(prefix: &str, mut labels_iter: I) -> Option +where + I: Iterator, +{ + let (k0, v0) = labels_iter.next()?; + let mut out = format!("{}_{}=\"{}\"", prefix, k0, v0); + + for (k, v) in labels_iter { + if k == "pod" || k == "pod_template_hash" { + continue; + } + + write!(out, ",{}_{}=\"{}\"", prefix, k, v).expect("label concat must succeed"); + } + Some(out) +} // === impl Metrics === @@ -291,17 +302,7 @@ impl FmtLabels for EndpointLabels { impl FmtLabels for InboundEndpointLabels { fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if let Some(a) = self.authority.as_ref() { - Authority(a).fmt_labels(f)?; - write!(f, ",")?; - } - - ( - (TargetAddr(self.target_addr), TlsAccept::from(&self.tls)), - &self.policy, - ) - .fmt_labels(f)?; - + ((TlsAccept::from(&self.tls)), &self.policy).fmt_labels(f)?; Ok(()) } } @@ -359,14 +360,8 @@ impl FmtLabels for RouteAuthzLabels { impl FmtLabels for OutboundEndpointLabels { fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if let Some(a) = self.authority.as_ref() { - Authority(a).fmt_labels(f)?; - write!(f, ",")?; - } - - let ta = TargetAddr(self.target_addr); let tls = TlsConnect::from(&self.server_id); - (ta, tls).fmt_labels(f)?; + (tls).fmt_labels(f)?; if let Some(labels) = self.labels.as_ref() { write!(f, ",{}", labels)?; diff --git a/linkerd/app/inbound/src/http/router.rs b/linkerd/app/inbound/src/http/router.rs index 76274a4abb..11d8700a0d 100644 --- a/linkerd/app/inbound/src/http/router.rs +++ b/linkerd/app/inbound/src/http/router.rs @@ -383,8 +383,6 @@ impl Param for Logical { fn param(&self) -> metrics::EndpointLabels { metrics::InboundEndpointLabels { tls: self.tls.clone(), - authority: self.logical.as_ref().map(|d| d.as_http_authority()), - target_addr: self.addr.into(), policy: self.permit.labels.clone(), } .into() diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index f5c9351240..d43b590664 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -522,8 +522,6 @@ async fn grpc_response_class() { .get_response_total( &metrics::EndpointLabels::Inbound(metrics::InboundEndpointLabels { tls: Target::meshed_h2().1, - authority: Some("foo.svc.cluster.local:5550".parse().unwrap()), - target_addr: "127.0.0.1:80".parse().unwrap(), policy: metrics::RouteAuthzLabels { route: metrics::RouteLabels { server: metrics::ServerLabel(Arc::new(policy::Meta::Resource { diff --git a/linkerd/app/integration/src/tests/discovery.rs b/linkerd/app/integration/src/tests/discovery.rs index 67caf75c73..cba50e513a 100644 --- a/linkerd/app/integration/src/tests/discovery.rs +++ b/linkerd/app/integration/src/tests/discovery.rs @@ -454,14 +454,12 @@ mod http2 { // Simulate the first server falling over without discovery // knowing about it... tracing::info!(%alpha.addr, "Stopping"); - let alpha_addr = alpha.addr; alpha.join().await; // Wait until the proxy has seen the `alpha` disconnect... metrics::metric("tcp_close_total") .label("peer", "dst") .label("direction", "outbound") - .label("target_addr", alpha_addr.to_string()) .value(1u64) .assert_in(&metrics) .await; diff --git a/linkerd/app/integration/src/tests/telemetry.rs b/linkerd/app/integration/src/tests/telemetry.rs index 3b466a0a0c..39eb75fc6c 100644 --- a/linkerd/app/integration/src/tests/telemetry.rs +++ b/linkerd/app/integration/src/tests/telemetry.rs @@ -56,10 +56,8 @@ impl Fixture { let client = client::new(proxy.inbound, "tele.test.svc.cluster.local"); let tcp_dst_labels = metrics::labels().label("direction", "inbound"); - let tcp_src_labels = tcp_dst_labels.clone().label("target_addr", orig_dst); - let labels = tcp_dst_labels - .clone() - .label("authority", "tele.test.svc.cluster.local"); + let tcp_src_labels = tcp_dst_labels.clone(); + let labels = tcp_dst_labels.clone(); let tcp_src_labels = tcp_src_labels.label("peer", "src"); let tcp_dst_labels = tcp_dst_labels.label("peer", "dst"); Fixture { @@ -96,9 +94,7 @@ impl Fixture { let metrics = client::http1(proxy.admin, "localhost"); let client = client::new(proxy.outbound, "tele.test.svc.cluster.local"); - let tcp_labels = metrics::labels() - .label("direction", "outbound") - .label("target_addr", orig_dst); + let tcp_labels = metrics::labels().label("direction", "outbound"); let labels = tcp_labels.clone(); let tcp_src_labels = tcp_labels.clone().label("peer", "src"); let tcp_dst_labels = tcp_labels.label("peer", "dst"); @@ -193,8 +189,7 @@ impl TcpFixture { .label("direction", "outbound") .label("peer", "src") .label("tls", "no_identity") - .label("no_tls_reason", "loopback") - .label("target_addr", orig_dst); + .label("no_tls_reason", "loopback"); let dst_labels = metrics::labels() .label("direction", "outbound") .label("peer", "dst"); @@ -218,7 +213,6 @@ async fn admin_request_count() { let metrics = fixture.metrics; let metric = metrics::metric("request_total") .label("direction", "inbound") - .label("target_addr", metrics.target_addr()) .value(1usize); // We can't assert that the metric is not present, since `GET /metrics` @@ -233,7 +227,6 @@ async fn admin_transport_metrics() { let metrics = fixture.metrics; let labels = metrics::labels() .label("direction", "inbound") - .label("target_addr", metrics.target_addr()) .label("peer", "src"); let mut open_total = labels.metric("tcp_open_total").value(1usize); @@ -308,14 +301,11 @@ async fn test_http_count(metric: &str, fixture: impl Future) { } = fixture.await; let metric = labels.metric(metric); - - assert!(metric.is_not_in(metrics.get("/metrics").await)); - info!("client.get(/)"); assert_eq!(client.get("/").await, "hello"); - // after seeing a request, the request count should be 1. - metric.value(1u64).assert_in(&metrics).await; + // after seeing a request, the request carry the correct labels + metric.assert_in(&metrics).await; } mod response_classification { @@ -403,7 +393,6 @@ mod response_classification { "success" }, ) - .value(1u64) .assert_in(&metrics) .await; } @@ -551,9 +540,7 @@ mod outbound_dst_labels { let metrics = client::http1(proxy.admin, "localhost"); let client = client::new(proxy.outbound, host); - let tcp_labels = metrics::labels() - .label("direction", "outbound") - .label("target_addr", addr); + let tcp_labels = metrics::labels().label("direction", "outbound"); let labels = tcp_labels.clone(); let f = Fixture { client, @@ -697,140 +684,6 @@ mod outbound_dst_labels { labels.metric(metric).assert_in(&metrics).await; } } - - // XXX(ver) This test is broken and/or irrelevant. linkerd/linkerd2#751. - #[tokio::test] - #[ignore] - async fn controller_updates_addr_labels() { - let _trace = trace_init(); - info!("running test server"); - - let ( - Fixture { - client, - metrics, - proxy: _proxy, - _profile, - dst_tx, - labels, - .. - }, - addr, - ) = fixture("labeled.test.svc.cluster.local").await; - let dst_tx = dst_tx.unwrap(); - dst_tx.send( - controller::destination_add(addr) - .addr_label("addr_label", "foo") - .set_label("set_label", "unchanged"), - ); - - let labels1 = labels - .clone() - .label("dst_addr_label", "foo") - .label("dst_set_label", "unchanged"); - - info!("client.get(/)"); - assert_eq!(client.get("/").await, "hello"); - - // the first request should be labeled with `dst_addr_label="foo"` - for &metric in &[ - "request_total", - "response_total", - "response_latency_ms_count", - ] { - labels1.metric(metric).value(1u64).assert_in(&metrics).await; - } - - dst_tx.send( - controller::destination_add(addr) - .addr_label("addr_label", "bar") - .set_label("set_label", "unchanged"), - ); - - let labels2 = labels - .label("dst_addr_label", "bar") - .label("dst_set_label", "unchanged"); - - info!("client.get(/)"); - assert_eq!(client.get("/").await, "hello"); - - // the second request should increment stats labeled with `dst_addr_label="bar"` - // the first request should be labeled with `dst_addr_label="foo"` - for &metric in &[ - "request_total", - "response_total", - "response_latency_ms_count", - ] { - labels1.metric(metric).value(1u64).assert_in(&metrics).await; - } - - // stats recorded from the first request should still be present. - // the first request should be labeled with `dst_addr_label="foo"` - for &metric in &[ - "request_total", - "response_total", - "response_latency_ms_count", - ] { - labels2.metric(metric).value(1u64).assert_in(&metrics).await; - } - } - - // XXX(ver) This test is broken and/or irrelevant. linkerd/linkerd2#751. - #[ignore] - #[tokio::test] - async fn controller_updates_set_labels() { - let _trace = trace_init(); - info!("running test server"); - let ( - Fixture { - client, - metrics, - proxy: _proxy, - _profile, - dst_tx, - labels, - .. - }, - addr, - ) = fixture("labeled.test.svc.cluster.local").await; - let dst_tx = dst_tx.unwrap(); - dst_tx.send(controller::destination_add(addr).set_label("set_label", "foo")); - - let labels1 = labels.clone().label("dst_set_label", "foo"); - - info!("client.get(/)"); - assert_eq!(client.get("/").await, "hello"); - // the first request should be labeled with `dst_addr_label="foo" - for &metric in &[ - "request_total", - "response_total", - "response_latency_ms_count", - ] { - labels1.metric(metric).value(1u64).assert_in(&client).await; - } - - dst_tx.send(controller::destination_add(addr).set_label("set_label", "bar")); - let labels2 = labels.label("dst_set_label", "bar"); - - info!("client.get(/)"); - assert_eq!(client.get("/").await, "hello"); - // the second request should increment stats labeled with `dst_addr_label="bar"` - for &metric in &[ - "request_total", - "response_total", - "response_latency_ms_count", - ] { - labels2.metric(metric).value(1u64).assert_in(&metrics).await; - } - // stats recorded from the first request should still be present. - for &metric in &[ - "request_total", - "response_total", - "response_latency_ms_count", - ] { - labels1.metric(metric).value(1u64).assert_in(&metrics).await; - } - } } #[tokio::test] @@ -1328,10 +1181,9 @@ async fn metrics_compression() { info!("client.get(/)"); assert_eq!(client.get("/").await, "hello"); - let mut metric = labels + let metric = labels .metric("response_latency_ms_count") - .label("status_code", 200) - .value(1u64); + .label("status_code", 200); for &encoding in encodings { assert_eventually_contains!(do_scrape(encoding).await, &metric); @@ -1341,6 +1193,6 @@ async fn metrics_compression() { assert_eq!(client.get("/").await, "hello"); for &encoding in encodings { - assert_eventually_contains!(do_scrape(encoding).await, metric.set_value(2u64)); + assert_eventually_contains!(do_scrape(encoding).await, metric); } } diff --git a/linkerd/app/integration/src/tests/telemetry/tcp_errors.rs b/linkerd/app/integration/src/tests/telemetry/tcp_errors.rs index 6aed1daa46..38b00db0d4 100644 --- a/linkerd/app/integration/src/tests/telemetry/tcp_errors.rs +++ b/linkerd/app/integration/src/tests/telemetry/tcp_errors.rs @@ -103,8 +103,8 @@ impl Test { } } -fn metric(proxy: &proxy::Listening) -> metrics::MetricMatch { - metrics::metric(METRIC).label("target_addr", proxy.inbound_server.as_ref().unwrap().addr) +fn metric(_proxy: &proxy::Listening) -> metrics::MetricMatch { + metrics::metric(METRIC) } /// Tests that the detect metric is labeled and incremented on timeout. @@ -246,7 +246,7 @@ async fn inbound_direct_multi() { let (proxy, metrics) = Test::new(proxy).run().await; let client = client::tcp(proxy.inbound); - let metric = metrics::metric(METRIC).label("target_addr", proxy.inbound); + let metric = metrics::metric(METRIC); let timeout_metric = metric.clone().label("error", "tls detection timeout"); let no_tls_metric = metric.clone().label("error", "unexpected"); @@ -292,9 +292,7 @@ async fn inbound_invalid_ip() { .await; let client = client::tcp(proxy.inbound); - let metric = metric(&proxy) - .label("error", "unexpected") - .label("target_addr", fake_ip); + let metric = metric(&proxy).label("error", "unexpected"); let tcp_client = client.connect().await; tcp_client.write(TcpFixture::HELLO_MSG).await; @@ -357,7 +355,6 @@ async fn inbound_direct_success() { let no_tls_client = client::tcp(proxy1.inbound); let metric = metrics::metric(METRIC) - .label("target_addr", proxy1.inbound) .label("error", "tls detection timeout") .value(1u64); diff --git a/linkerd/app/outbound/src/http/concrete.rs b/linkerd/app/outbound/src/http/concrete.rs index 168cca5756..3378d6eec4 100644 --- a/linkerd/app/outbound/src/http/concrete.rs +++ b/linkerd/app/outbound/src/http/concrete.rs @@ -202,10 +202,8 @@ where { fn param(&self) -> OutboundEndpointLabels { OutboundEndpointLabels { - authority: self.parent.param(), labels: prefix_labels("dst", self.metadata.labels().iter()), server_id: self.param(), - target_addr: self.addr.into(), } } } diff --git a/linkerd/app/outbound/src/http/endpoint/tests.rs b/linkerd/app/outbound/src/http/endpoint/tests.rs index fa512740a2..59233b58fd 100644 --- a/linkerd/app/outbound/src/http/endpoint/tests.rs +++ b/linkerd/app/outbound/src/http/endpoint/tests.rs @@ -298,10 +298,8 @@ impl svc::Param for Endpoint { impl svc::Param for Endpoint { fn param(&self) -> metrics::OutboundEndpointLabels { metrics::OutboundEndpointLabels { - authority: None, labels: None, server_id: self.param(), - target_addr: self.addr.into(), } } } diff --git a/linkerd/app/outbound/src/opaq/concrete.rs b/linkerd/app/outbound/src/opaq/concrete.rs index 1c01fd7f30..2d2ec82d0d 100644 --- a/linkerd/app/outbound/src/opaq/concrete.rs +++ b/linkerd/app/outbound/src/opaq/concrete.rs @@ -315,16 +315,9 @@ where T: svc::Param>, { fn param(&self) -> metrics::OutboundEndpointLabels { - let authority = self - .parent - .param() - .as_ref() - .map(|profiles::LogicalAddr(a)| a.as_http_authority()); metrics::OutboundEndpointLabels { - authority, labels: metrics::prefix_labels("dst", self.metadata.labels().iter()), server_id: self.param(), - target_addr: self.addr.into(), } } }