From 1670d8c0f9f89168fb4bafc7bcf746265269807c Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Wed, 6 Nov 2024 23:29:09 +0000 Subject: [PATCH 1/2] fix(k8s): use provided kube-config path Ensure we always use the provided kube-config path. Signed-off-by: Tiago Castro --- k8s/forward/examples/http-forward.rs | 3 +-- k8s/forward/examples/port-forward.rs | 2 +- k8s/forward/src/http_forward.rs | 13 ++++++----- k8s/forward/src/port_forward.rs | 13 ++++------- k8s/proxy/src/proxy.rs | 34 +++++++++++++++++++--------- 5 files changed, 37 insertions(+), 28 deletions(-) diff --git a/k8s/forward/examples/http-forward.rs b/k8s/forward/examples/http-forward.rs index ded2d3664..ae0756929 100644 --- a/k8s/forward/examples/http-forward.rs +++ b/k8s/forward/examples/http-forward.rs @@ -11,8 +11,7 @@ async fn main() -> anyhow::Result<()> { let selector = kube_forward::TargetSelector::svc_label("app", "api-rest"); let target = kube_forward::Target::new(selector, "http", "mayastor"); - let uri = kube_forward::HttpForward::new(target, None) - .await? + let uri = kube_forward::HttpForward::new(target, None, kube::Client::try_default().await?) .uri() .await?; diff --git a/k8s/forward/examples/port-forward.rs b/k8s/forward/examples/port-forward.rs index b6be97e68..9edd1fb48 100644 --- a/k8s/forward/examples/port-forward.rs +++ b/k8s/forward/examples/port-forward.rs @@ -4,7 +4,7 @@ async fn main() -> anyhow::Result<()> { let selector = kube_forward::TargetSelector::svc_label("app", "api-rest"); let target = kube_forward::Target::new(selector, "http", "mayastor"); - let pf = kube_forward::PortForward::new(target, 30011).await?; + let pf = kube_forward::PortForward::new(target, 30011, kube::Client::try_default().await?); let (_, handle) = pf.port_forward().await?; handle.await?; diff --git a/k8s/forward/src/http_forward.rs b/k8s/forward/src/http_forward.rs index cbb98417b..73093fc4d 100644 --- a/k8s/forward/src/http_forward.rs +++ b/k8s/forward/src/http_forward.rs @@ -32,19 +32,19 @@ impl HttpForward { /// Return a new `Self`. /// # Arguments /// * `target` - the target we'll forward to - pub async fn new>>( + pub fn new>>( target: crate::Target, scheme: SO, - ) -> anyhow::Result { - let client = kube::Client::try_default().await?; + client: kube::Client, + ) -> Self { let namespace = target.namespace.name_any(); - Ok(Self { + Self { target, pod_api: Api::namespaced(client.clone(), &namespace), svc_api: Api::namespaced(client, &namespace), scheme: scheme.into().unwrap_or(Scheme::HTTP), - }) + } } /// Returns the `hyper::Uri` that can be used to proxy with the kubeapi server. @@ -70,7 +70,8 @@ impl HttpForward { /// ```ignore /// let selector = kube_forward::TargetSelector::svc_label("app", "api-rest"); /// let target = kube_forward::Target::new(selector, "http", "mayastor"); -/// let pf = kube_forward::HttpForward::new(target, None).await?; +/// let client = kube::Client::try_default().await?; +/// let pf = kube_forward::HttpForward::new(target, None, client).await?; /// /// let uri = pf.uri().await?; /// tracing::info!(%uri, "generated kube-api"); diff --git a/k8s/forward/src/port_forward.rs b/k8s/forward/src/port_forward.rs index fb692edf5..5859bcb70 100644 --- a/k8s/forward/src/port_forward.rs +++ b/k8s/forward/src/port_forward.rs @@ -18,7 +18,8 @@ use kube::{ /// ```ignore /// let selector = kube_forward::TargetSelector::pod_label("app", "etcd"); /// let target = kube_forward::Target::new(selector, "client", "mayastor"); -/// let pf = kube_forward::PortForward::new(target, 35003).await?; +/// let client = kube::Client::try_default().await?; +/// let pf = kube_forward::PortForward::new(target, 35003, client).await?; /// /// let (_port, handle) = pf.port_forward().await?; /// handle.await?; @@ -36,19 +37,15 @@ impl PortForward { /// # Arguments /// * `target` - the target we'll forward to /// * `local_port` - specific local port to use, if Some - pub async fn new( - target: crate::Target, - local_port: impl Into>, - ) -> anyhow::Result { - let client = Client::try_default().await?; + pub fn new(target: crate::Target, local_port: impl Into>, client: Client) -> Self { let namespace = target.namespace.name_any(); - Ok(Self { + Self { target, local_port: local_port.into(), pod_api: Api::namespaced(client.clone(), &namespace), svc_api: Api::namespaced(client, &namespace), - }) + } } /// The specified local port, or 0. diff --git a/k8s/proxy/src/proxy.rs b/k8s/proxy/src/proxy.rs index 3b6976243..45a153574 100644 --- a/k8s/proxy/src/proxy.rs +++ b/k8s/proxy/src/proxy.rs @@ -190,12 +190,13 @@ impl ConfigBuilder { } /// Tries to build an HTTP `Configuration` from the current self. async fn build_http(self) -> anyhow::Result { - let uri = kube_forward::HttpForward::new(self.target, Some(self.scheme.into())) - .await? - .uri() - .await?; let config = super::config_from_kubeconfig(self.kube_config).await?; let client = kube::Client::try_from(config)?; + let uri = + kube_forward::HttpForward::new(self.target, Some(self.scheme.into()), client.clone()) + .uri() + .await?; + let proxy = kube_forward::HttpProxy::new(client); let config = Configuration::builder() @@ -208,7 +209,10 @@ impl ConfigBuilder { } /// Tries to build a TCP `Configuration` from the current self. async fn build_tcp(self) -> anyhow::Result { - let pf = kube_forward::PortForward::new(self.target, None).await?; + let config = super::config_from_kubeconfig(self.kube_config).await?; + let client = kube::Client::try_from(config)?; + + let pf = kube_forward::PortForward::new(self.target, None, client); let (port, _handle) = pf.port_forward().await?; @@ -235,7 +239,10 @@ impl ConfigBuilder { impl ConfigBuilder { /// Tries to build a TCP `Configuration` from the current self. pub async fn build(self) -> anyhow::Result { - let pf = kube_forward::PortForward::new(self.target, None).await?; + let config = super::config_from_kubeconfig(self.kube_config).await?; + let client = kube::Client::try_from(config)?; + + let pf = kube_forward::PortForward::new(self.target, None, client); let (port, _handle) = pf.port_forward().await?; @@ -279,12 +286,14 @@ impl ConfigBuilder { } /// Tries to build an HTTP `Configuration` from the current self. async fn build_http(self) -> anyhow::Result<(Uri, LokiClient)> { - let uri = kube_forward::HttpForward::new(self.target, Some(self.scheme.into())) - .await? - .uri() - .await?; let config = super::config_from_kubeconfig(self.kube_config).await?; let client = kube::Client::try_from(config)?; + + let uri = + kube_forward::HttpForward::new(self.target, Some(self.scheme.into()), client.clone()) + .uri() + .await?; + let proxy = kube_forward::HttpProxy::new(client); let service = tower::ServiceBuilder::new() @@ -294,7 +303,10 @@ impl ConfigBuilder { } /// Tries to build a TCP `Configuration` from the current self. async fn build_tcp(self) -> anyhow::Result<(Uri, LokiClient)> { - let pf = kube_forward::PortForward::new(self.target, None).await?; + let config = super::config_from_kubeconfig(self.kube_config).await?; + let client = kube::Client::try_from(config)?; + + let pf = kube_forward::PortForward::new(self.target, None, client); let (port, _handle) = pf.port_forward().await?; From 401e460775c0c5d7b5d404f9228085bca5ab45b0 Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Thu, 7 Nov 2024 00:38:38 +0000 Subject: [PATCH 2/2] refactor(kube-proxy): expose error to make it extensible The plugin can then add a more human error message. Signed-off-by: Tiago Castro --- Cargo.lock | 3 ++ k8s/forward/Cargo.toml | 1 + k8s/forward/src/error.rs | 38 +++++++++++++++++++++ k8s/forward/src/http_forward.rs | 14 +++++--- k8s/forward/src/lib.rs | 14 +++++--- k8s/forward/src/port_forward.rs | 30 +++++++++-------- k8s/proxy/Cargo.toml | 2 ++ k8s/proxy/src/error.rs | 47 ++++++++++++++++++++++++++ k8s/proxy/src/lib.rs | 58 +++++++++++++-------------------- k8s/proxy/src/proxy.rs | 32 ++++++++---------- 10 files changed, 163 insertions(+), 76 deletions(-) create mode 100644 k8s/forward/src/error.rs create mode 100644 k8s/proxy/src/error.rs diff --git a/Cargo.lock b/Cargo.lock index ebd36df03..c0d24f947 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2442,6 +2442,7 @@ dependencies = [ "kube", "serde_json", "shutdown", + "thiserror", "tokio", "tokio-stream", "tower 0.5.1", @@ -2461,8 +2462,10 @@ dependencies = [ "kube", "kube-forward", "openapi", + "thiserror", "tonic", "tower 0.5.1", + "url", "utils", ] diff --git a/k8s/forward/Cargo.toml b/k8s/forward/Cargo.toml index 60d93fc4d..bf3812b92 100644 --- a/k8s/forward/Cargo.toml +++ b/k8s/forward/Cargo.toml @@ -20,6 +20,7 @@ tower = "0.5.1" tower-http = { version = "0.6.1", features = ["map-response-body"] } hyper = { version = "1.5.0", features = ["client", "http1", "http2"] } hyper-body = { path = "../../utils/hyper-body" } +thiserror = "1.0.68" [dev-dependencies] tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } diff --git a/k8s/forward/src/error.rs b/k8s/forward/src/error.rs new file mode 100644 index 000000000..0bb040d23 --- /dev/null +++ b/k8s/forward/src/error.rs @@ -0,0 +1,38 @@ +/// A kube-forward error. +#[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] +pub enum Error { + #[error("Service '{selector}' not found on '{namespace:?}'")] + ServiceNotFound { + selector: String, + namespace: crate::NameSpace, + }, + #[error("{source}")] + Kube { source: kube::Error }, + #[error("{source}")] + AnyHow { source: anyhow::Error }, + #[error("Invalid uri: {source}")] + InvalidUri { + source: hyper::http::uri::InvalidUri, + }, + #[error("{source}")] + Io { source: std::io::Error }, +} + +impl From for Error { + fn from(source: anyhow::Error) -> Self { + Self::AnyHow { source } + } +} + +impl From for Error { + fn from(source: kube::Error) -> Self { + Self::Kube { source } + } +} + +impl From for Error { + fn from(source: hyper::http::uri::InvalidUri) -> Self { + Self::InvalidUri { source } + } +} diff --git a/k8s/forward/src/http_forward.rs b/k8s/forward/src/http_forward.rs index 73093fc4d..e839d6d13 100644 --- a/k8s/forward/src/http_forward.rs +++ b/k8s/forward/src/http_forward.rs @@ -1,4 +1,5 @@ use crate::{ + error::Error, pod_selection::{AnyReady, PodSelection}, vx::{Pod, Service}, }; @@ -48,10 +49,10 @@ impl HttpForward { } /// Returns the `hyper::Uri` that can be used to proxy with the kubeapi server. - pub async fn uri(self) -> anyhow::Result { + pub async fn uri(self) -> Result { let target = self.finder().find(&self.target).await?; let uri = hyper::Uri::try_from(target.with_scheme(self.scheme))?; - tracing::info!(%uri, "generated kube-api"); + tracing::debug!(%uri, "generated kube-api"); Ok(uri) } @@ -101,7 +102,7 @@ impl HttpProxy { Self { client } } /// Tries to return a default `HttpProxy` with a default `kube::Client`. - pub async fn try_default() -> anyhow::Result { + pub async fn try_default() -> Result { Ok(Self { client: kube::Client::try_default().await?, }) @@ -167,7 +168,7 @@ impl<'a> TargetFinder<'a> { /// Finds the `HttpTarget` according to the specified target. /// # Arguments /// * `target` - the target to be found - async fn find(&self, target: &crate::Target) -> anyhow::Result { + async fn find(&self, target: &crate::Target) -> Result { let pod_api = self.pod_api; let svc_api = self.svc_api; @@ -192,7 +193,10 @@ impl<'a> TargetFinder<'a> { let services = svc_api.list(&Self::svc_params(&selector)).await?; let service = match services.items.into_iter().next() { Some(service) => Ok(service), - None => Err(anyhow::anyhow!("Service '{}' not found", selector)), + None => Err(Error::ServiceNotFound { + selector, + namespace: namespace.clone(), + }), }?; Ok(HttpTarget::new( diff --git a/k8s/forward/src/lib.rs b/k8s/forward/src/lib.rs index 8a3b654a3..0bb7a0b94 100644 --- a/k8s/forward/src/lib.rs +++ b/k8s/forward/src/lib.rs @@ -6,6 +6,7 @@ //! //! If you're looking at a higher-level construct, please take a look at kube-proxy. +mod error; mod http_forward; mod pod_selection; mod port_forward; @@ -23,6 +24,9 @@ use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString; use kube::ResourceExt; use vx::Pod; +/// The error exposed. +pub use crate::error::Error; + /// Different types of target selectors. #[derive(Clone)] pub enum TargetSelector { @@ -104,8 +108,8 @@ pub struct Target { /// A kubernetes namespace. /// If None, the default is "default". -#[derive(Clone)] -pub(crate) struct NameSpace(Option); +#[derive(Debug, Clone)] +pub struct NameSpace(Option); impl NameSpace { /// Returns the configured namespace or the default. pub(crate) fn name_any(&self) -> String { @@ -121,7 +125,7 @@ pub(crate) struct TargetPod { port_number: u16, } impl TargetPod { - fn new(pod_name: String, port_number: i32) -> anyhow::Result { + fn new(pod_name: String, port_number: i32) -> Result { let port_number = u16::try_from(port_number).context("Port not valid")?; Ok(Self { pod_name, @@ -140,6 +144,8 @@ impl Target { /// * `selector` - target selector /// * `port` - target port /// * `namespace` - target namespace + /// + /// TODO: this namespace api is not bad, needs refactoring... pub fn new>, T: Into, P: Into>( selector: TargetSelector, port: P, @@ -177,7 +183,7 @@ impl Target { } /// Returns the `TargetPod` for the given pod/port or pod/self.port. - pub(crate) fn find(&self, pod: &Pod, port: Option) -> anyhow::Result { + pub(crate) fn find(&self, pod: &Pod, port: Option) -> Result { let port = match &port { None => &self.port, Some(port) => port, diff --git a/k8s/forward/src/port_forward.rs b/k8s/forward/src/port_forward.rs index 5859bcb70..e431aaed5 100644 --- a/k8s/forward/src/port_forward.rs +++ b/k8s/forward/src/port_forward.rs @@ -7,6 +7,7 @@ use tokio_stream::wrappers::TcpListenerStream; use crate::{ pod_selection::{AnyReady, PodSelection}, vx::{Pod, Service}, + Error, }; use kube::{ api::{Api, ListParams}, @@ -55,11 +56,16 @@ impl PortForward { } /// Runs the port forwarding proxy until a SIGINT signal is received. - pub async fn port_forward(self) -> anyhow::Result<(u16, tokio::task::JoinHandle<()>)> { + pub async fn port_forward(self) -> Result<(u16, tokio::task::JoinHandle<()>), Error> { let addr = SocketAddr::from(([127, 0, 0, 1], self.local_port())); - let bind = TcpListener::bind(addr).await?; - let port = bind.local_addr()?.port(); + let bind = TcpListener::bind(addr) + .await + .map_err(|source| Error::Io { source })?; + let port = bind + .local_addr() + .map_err(|source| Error::Io { source })? + .port(); tracing::trace!(port, "Bound to local port"); let server = TcpListenerStream::new(bind) @@ -74,11 +80,8 @@ impl PortForward { } tokio::spawn(async move { - if let Err(e) = pf.forward_connection(client_conn).await { - tracing::error!( - error = e.as_ref() as &dyn std::error::Error, - "failed to forward connection" - ); + if let Err(error) = pf.forward_connection(client_conn).await { + tracing::error!(%error, "failed to forward connection"); } }); @@ -96,10 +99,7 @@ impl PortForward { }), )) } - async fn forward_connection( - self, - mut client_conn: tokio::net::TcpStream, - ) -> anyhow::Result<()> { + async fn forward_connection(self, mut client_conn: tokio::net::TcpStream) -> Result<(), Error> { let target = self.finder().find(&self.target).await?; let (pod_name, pod_port) = target.into_parts(); @@ -120,7 +120,9 @@ impl PortForward { } drop(upstream_conn); - forwarder.join().await?; + forwarder.join().await.map_err(|error| Error::AnyHow { + source: error.into(), + })?; tracing::debug!(local_port, pod_port, pod_name, "connection closed"); Ok(()) } @@ -143,7 +145,7 @@ impl<'a> TargetPodFinder<'a> { /// Finds the name and port of the target pod specified by the selector. /// # Arguments /// * `target` - the target to be found - pub(crate) async fn find(&self, target: &crate::Target) -> anyhow::Result { + pub(crate) async fn find(&self, target: &crate::Target) -> Result { let pod_api = self.pod_api; let svc_api = self.svc_api; let ready_pod = AnyReady {}; diff --git a/k8s/proxy/Cargo.toml b/k8s/proxy/Cargo.toml index d7e879027..9d9c46e71 100644 --- a/k8s/proxy/Cargo.toml +++ b/k8s/proxy/Cargo.toml @@ -24,3 +24,5 @@ hyper = { version = "1.5.0", features = ["client", "http1", "http2"] } hyper-body = { path = "../../utils/hyper-body" } anyhow = "1.0.92" +thiserror = "1.0.68" +url = "2.5.2" diff --git a/k8s/proxy/src/error.rs b/k8s/proxy/src/error.rs new file mode 100644 index 000000000..b9487105a --- /dev/null +++ b/k8s/proxy/src/error.rs @@ -0,0 +1,47 @@ +/// A kube-proxy error. +#[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] +pub enum Error { + #[error("{source}")] + Forward { source: kube_forward::Error }, + #[error("{source}")] + Kube { source: kube::Error }, + #[error("{source}")] + AnyHow { source: anyhow::Error }, + #[error("Invalid url: {source}")] + InvalidUrl { source: url::ParseError }, + #[error("Invalid uri: {source}")] + InvalidUri { + source: hyper::http::uri::InvalidUri, + }, +} + +impl From for Error { + fn from(source: kube_forward::Error) -> Self { + Self::Forward { source } + } +} + +impl From for Error { + fn from(source: anyhow::Error) -> Self { + Self::AnyHow { source } + } +} + +impl From for Error { + fn from(source: kube::Error) -> Self { + Self::Kube { source } + } +} + +impl From for Error { + fn from(source: url::ParseError) -> Self { + Self::InvalidUrl { source } + } +} + +impl From for Error { + fn from(source: hyper::http::uri::InvalidUri) -> Self { + Self::InvalidUri { source } + } +} diff --git a/k8s/proxy/src/lib.rs b/k8s/proxy/src/lib.rs index 7cf1114cb..631e87c6c 100644 --- a/k8s/proxy/src/lib.rs +++ b/k8s/proxy/src/lib.rs @@ -2,13 +2,14 @@ //! A utility library to facilitate connections to a kubernetes cluster via //! the k8s-proxy library. -use std::{ - env, - path::{Path, PathBuf}, -}; +use std::path::PathBuf; +mod error; mod proxy; +/// A [`error::Error`]. +pub use error::Error; +use kube::config::KubeConfigOptions; /// OpenApi client helpers. pub use proxy::{ConfigBuilder, ForwardingProxy, LokiClient, Scheme}; @@ -16,38 +17,25 @@ pub use proxy::{ConfigBuilder, ForwardingProxy, LokiClient, Scheme}; pub async fn config_from_kubeconfig( kube_config_path: Option, ) -> anyhow::Result { - let file = match kube_config_path { - Some(config_path) => config_path, - None => { - let file_path = match env::var("KUBECONFIG") { - Ok(value) => Some(value), - Err(_) => { - // Look for kubeconfig file in default location. - #[cfg(any(target_os = "linux", target_os = "macos"))] - let default_path = format!("{}/.kube/config", env::var("HOME")?); - #[cfg(target_os = "windows")] - let default_path = format!("{}/.kube/config", env::var("USERPROFILE")?); - match Path::new(&default_path).exists() { - true => Some(default_path), - false => None, - } - } - }; - if file_path.is_none() { - return Err(anyhow::anyhow!( - "kubeconfig file not found in default location" - )); - } - let mut path = PathBuf::new(); - path.push(file_path.unwrap_or_default()); - path + let mut config = match kube_config_path { + Some(config_path) => { + // NOTE: Kubeconfig file may hold multiple contexts to communicate + // with different kubernetes clusters. We have to pick master + // address of current-context config only + let kube_config = kube::config::Kubeconfig::read_from(&config_path)?; + kube::Config::from_custom_kubeconfig(kube_config, &Default::default()).await? } + None => kube::Config::from_kubeconfig(&KubeConfigOptions::default()).await?, }; - - // NOTE: Kubeconfig file may hold multiple contexts to communicate - // with different kubernetes clusters. We have to pick master - // address of current-context config only - let kube_config = kube::config::Kubeconfig::read_from(&file)?; - let config = kube::Config::from_custom_kubeconfig(kube_config, &Default::default()).await?; + config.apply_debug_overrides(); Ok(config) } + +/// Get the `kube::Client` from the given kubeconfig file, or the default. +pub async fn client_from_kubeconfig( + kube_config_path: Option, +) -> anyhow::Result { + Ok(kube::Client::try_from( + config_from_kubeconfig(kube_config_path).await?, + )?) +} diff --git a/k8s/proxy/src/proxy.rs b/k8s/proxy/src/proxy.rs index 45a153574..7754c43e6 100644 --- a/k8s/proxy/src/proxy.rs +++ b/k8s/proxy/src/proxy.rs @@ -1,3 +1,4 @@ +use crate::Error; use anyhow::anyhow; use openapi::{ apis::Url, @@ -182,16 +183,15 @@ impl ConfigBuilder { } /// Tries to build a `Configuration` from the current self. - pub async fn build(self) -> anyhow::Result { + pub async fn build(self) -> Result { match self.method { ForwardingProxy::HTTP => self.build_http().await, ForwardingProxy::TCP => self.build_tcp().await, } } /// Tries to build an HTTP `Configuration` from the current self. - async fn build_http(self) -> anyhow::Result { - let config = super::config_from_kubeconfig(self.kube_config).await?; - let client = kube::Client::try_from(config)?; + async fn build_http(self) -> Result { + let client = super::client_from_kubeconfig(self.kube_config).await?; let uri = kube_forward::HttpForward::new(self.target, Some(self.scheme.into()), client.clone()) .uri() @@ -208,9 +208,8 @@ impl ConfigBuilder { Ok(config) } /// Tries to build a TCP `Configuration` from the current self. - async fn build_tcp(self) -> anyhow::Result { - let config = super::config_from_kubeconfig(self.kube_config).await?; - let client = kube::Client::try_from(config)?; + async fn build_tcp(self) -> Result { + let client = super::client_from_kubeconfig(self.kube_config).await?; let pf = kube_forward::PortForward::new(self.target, None, client); @@ -232,15 +231,14 @@ impl ConfigBuilder { None => config, } .build_url(url) - .map_err(|e| anyhow!("Failed to Create OpenApi config: {:?}", e)) + .map_err(|e| anyhow!("Failed to Create OpenApi config: {:?}", e).into()) } } impl ConfigBuilder { /// Tries to build a TCP `Configuration` from the current self. - pub async fn build(self) -> anyhow::Result { - let config = super::config_from_kubeconfig(self.kube_config).await?; - let client = kube::Client::try_from(config)?; + pub async fn build(self) -> Result { + let client = super::client_from_kubeconfig(self.kube_config).await?; let pf = kube_forward::PortForward::new(self.target, None, client); @@ -278,16 +276,15 @@ impl ConfigBuilder { /// Tries to build a `LokiClient` from the current self. /// This is simply a boxed `tower::Service` so can be used for any HTTP requests. - pub async fn build(self) -> anyhow::Result<(Uri, LokiClient)> { + pub async fn build(self) -> Result<(Uri, LokiClient), Error> { match self.method { ForwardingProxy::HTTP => self.build_http().await, ForwardingProxy::TCP => self.build_tcp().await, } } /// Tries to build an HTTP `Configuration` from the current self. - async fn build_http(self) -> anyhow::Result<(Uri, LokiClient)> { - let config = super::config_from_kubeconfig(self.kube_config).await?; - let client = kube::Client::try_from(config)?; + async fn build_http(self) -> Result<(Uri, LokiClient), Error> { + let client = super::client_from_kubeconfig(self.kube_config).await?; let uri = kube_forward::HttpForward::new(self.target, Some(self.scheme.into()), client.clone()) @@ -302,9 +299,8 @@ impl ConfigBuilder { Ok((uri, LokiClient::new(service))) } /// Tries to build a TCP `Configuration` from the current self. - async fn build_tcp(self) -> anyhow::Result<(Uri, LokiClient)> { - let config = super::config_from_kubeconfig(self.kube_config).await?; - let client = kube::Client::try_from(config)?; + async fn build_tcp(self) -> Result<(Uri, LokiClient), Error> { + let client = super::client_from_kubeconfig(self.kube_config).await?; let pf = kube_forward::PortForward::new(self.target, None, client);