Skip to content
This repository was archived by the owner on Dec 5, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions tembo-operator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tembo-operator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "controller"
description = "Tembo Operator for Postgres"
version = "0.21.2"
version = "0.21.3"
edition = "2021"
default-run = "controller"
license = "Apache-2.0"
Expand Down
12 changes: 0 additions & 12 deletions tembo-operator/rustfmt.toml

This file was deleted.

25 changes: 20 additions & 5 deletions tembo-operator/src/apis/coredb_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use k8s_openapi::{
apimachinery::pkg::{api::resource::Quantity, apis::meta::v1::ObjectMeta},
};

use crate::cloudnativepg::poolers::{PoolerPgbouncerPoolMode, PoolerTemplateSpecContainersResources};
use crate::cloudnativepg::poolers::{
PoolerPgbouncerPoolMode, PoolerTemplateSpecContainersResources,
};
use chrono::{DateTime, Utc};
use kube::CustomResource;
use schemars::JsonSchema;
Expand All @@ -35,7 +37,11 @@ pub struct ServiceAccountTemplate {

#[derive(Serialize, Deserialize, Clone, Debug, Default, JsonSchema)]
pub struct S3Credentials {
#[serde(default, skip_serializing_if = "Option::is_none", rename = "accessKeyId")]
#[serde(
default,
skip_serializing_if = "Option::is_none",
rename = "accessKeyId"
)]
pub access_key_id: Option<S3CredentialsAccessKeyId>,
#[serde(
default,
Expand All @@ -45,9 +51,17 @@ pub struct S3Credentials {
pub inherit_from_iam_role: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub region: Option<S3CredentialsRegion>,
#[serde(default, skip_serializing_if = "Option::is_none", rename = "secretAccessKey")]
#[serde(
default,
skip_serializing_if = "Option::is_none",
rename = "secretAccessKey"
)]
pub secret_access_key: Option<S3CredentialsSecretAccessKey>,
#[serde(default, skip_serializing_if = "Option::is_none", rename = "sessionToken")]
#[serde(
default,
skip_serializing_if = "Option::is_none",
rename = "sessionToken"
)]
pub session_token: Option<S3CredentialsSessionToken>,
}

Expand Down Expand Up @@ -237,7 +251,8 @@ impl CoreDBSpec {
}
}

let shared_preload_from_extensions = ConfigValue::Multiple(include_with_shared_preload_libraries);
let shared_preload_from_extensions =
ConfigValue::Multiple(include_with_shared_preload_libraries);
let extension_settings_config = vec![PgConfig {
name: "shared_preload_libraries".to_string(),
value: shared_preload_from_extensions,
Expand Down
17 changes: 12 additions & 5 deletions tembo-operator/src/apis/postgres_parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ pub const MULTI_VAL_CONFIGS: [&str; 5] = [
// This array defines the priority order for any multi-value config
// This defines any required order for shared_preload_libraries, otherwise alphabetical
// TODO: move this to a trunk endpoint
pub const MULTI_VAL_CONFIGS_PRIORITY_LIST: [&str; 3] = ["citus", "pg_stat_statements", "pg_stat_kcache"];
pub const MULTI_VAL_CONFIGS_PRIORITY_LIST: [&str; 3] =
["citus", "pg_stat_statements", "pg_stat_kcache"];

// configurations that are not allowed to be set by the user
pub const DISALLOWED_CONFIGS: [&str; 66] = [
Expand Down Expand Up @@ -223,7 +224,8 @@ impl JsonSchema for ConfigValue {

fn json_schema(_: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
let mut schema_object = SchemaObject::default();
schema_object.metadata().description = Some("A postgresql.conf configuration value".to_owned());
schema_object.metadata().description =
Some("A postgresql.conf configuration value".to_owned());
schema_object.metadata().read_only = false;
// overriding the enums to be a string
schema_object.instance_type = Some(schemars::schema::InstanceType::String.into());
Expand Down Expand Up @@ -359,7 +361,8 @@ impl<'de> Deserialize<'de> for PgConfig {
let raw_value = value.ok_or_else(|| M::Error::custom("key 'value' not found"))?;

let value = if MULTI_VAL_CONFIGS.contains(&name.as_str()) {
let set: BTreeSet<String> = raw_value.split(',').map(|s| s.trim().to_string()).collect();
let set: BTreeSet<String> =
raw_value.split(',').map(|s| s.trim().to_string()).collect();
ConfigValue::Multiple(set)
} else {
ConfigValue::Single(raw_value)
Expand Down Expand Up @@ -435,7 +438,10 @@ mod pg_param_tests {
};
let mut requires_load: BTreeMap<String, String> = BTreeMap::new();
requires_load.insert("pg_cron".to_string(), "pg_cron".to_string());
requires_load.insert("pg_stat_statements".to_string(), "pg_stat_statements".to_string());
requires_load.insert(
"pg_stat_statements".to_string(),
"pg_stat_statements".to_string(),
);
let pg_configs = spec
.get_pg_configs(requires_load)
.expect("failed to get pg configs")
Expand Down Expand Up @@ -538,7 +544,8 @@ mod pg_param_tests {
serialized,
"{\"name\":\"shared_preload_libraries\",\"value\":\"a,b,c\"}"
);
let deserialized: PgConfig = serde_json::from_str(&serialized).expect("failed to deserialize");
let deserialized: PgConfig =
serde_json::from_str(&serialized).expect("failed to deserialize");
match deserialized.value {
ConfigValue::Multiple(set) => {
assert_eq!(set.len(), 3);
Expand Down
35 changes: 26 additions & 9 deletions tembo-operator/src/app_service/ingress.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::{
ingress_route_crd::{
IngressRoute, IngressRouteRoutes, IngressRouteRoutesKind, IngressRouteRoutesMiddlewares,
IngressRouteRoutesServices, IngressRouteRoutesServicesKind, IngressRouteSpec, IngressRouteTls,
IngressRouteRoutesServices, IngressRouteRoutesServicesKind, IngressRouteSpec,
IngressRouteTls,
},
traefik::middlewares_crd::{
Middleware as TraefikMiddleware, MiddlewareHeaders, MiddlewareReplacePathRegex, MiddlewareSpec,
MiddlewareStripPrefix,
Middleware as TraefikMiddleware, MiddlewareHeaders, MiddlewareReplacePathRegex,
MiddlewareSpec, MiddlewareStripPrefix,
},
Result,
};
Expand Down Expand Up @@ -220,7 +221,8 @@ pub async fn reconcile_ingress(
let ingress_api: Api<IngressRoute> = Api::namespaced(client.clone(), ns);

let middleware_api: Api<TraefikMiddleware> = Api::namespaced(client.clone(), ns);
let desired_middlewares = generate_middlewares(coredb_name, ns, oref.clone(), desired_middlewares);
let desired_middlewares =
generate_middlewares(coredb_name, ns, oref.clone(), desired_middlewares);
let actual_mw_names = get_middlewares(client.clone(), ns, coredb_name).await?;
let desired_mw_names = desired_middlewares
.iter()
Expand All @@ -233,15 +235,21 @@ pub async fn reconcile_ingress(
debug!("ns: {}, successfully deleted Middleware: {}", ns, d);
}
Err(e) => {
error!("ns: {}, Failed to delete Middleware: {}, error: {}", ns, d, e);
error!(
"ns: {}, Failed to delete Middleware: {}, error: {}",
ns, d, e
);
}
}
}
}
for desired_mw in desired_middlewares {
match apply_middleware(middleware_api.clone(), &desired_mw.name, &desired_mw.mw).await {
Ok(_) => {
debug!("ns: {}, successfully applied Middleware: {}", ns, desired_mw.name);
debug!(
"ns: {}, successfully applied Middleware: {}",
ns, desired_mw.name
);
}
Err(e) => {
error!(
Expand Down Expand Up @@ -295,7 +303,9 @@ async fn apply_middleware(
mw: &TraefikMiddleware,
) -> Result<TraefikMiddleware, kube::Error> {
let patch_parameters = PatchParams::apply("cntrlr").force();
mw_api.patch(mw_name, &patch_parameters, &Patch::Apply(&mw)).await
mw_api
.patch(mw_name, &patch_parameters, &Patch::Apply(&mw))
.await
}

async fn apply_ingress_route(
Expand All @@ -305,7 +315,11 @@ async fn apply_ingress_route(
) -> Result<IngressRoute, kube::Error> {
let patch_parameters = PatchParams::apply("cntrlr").force();
ingress_api
.patch(ingress_name, &patch_parameters, &Patch::Apply(&ingress_route))
.patch(
ingress_name,
&patch_parameters,
&Patch::Apply(&ingress_route),
)
.await
}

Expand All @@ -314,7 +328,10 @@ async fn get_middlewares(
namespace: &str,
coredb_name: &str,
) -> Result<Vec<String>, kube::Error> {
let label_selector = format!("component={},coredb.io/name={}", COMPONENT_NAME, coredb_name);
let label_selector = format!(
"component={},coredb.io/name={}",
COMPONENT_NAME, coredb_name
);
let deployent_api: Api<TraefikMiddleware> = Api::namespaced(client, namespace);
let lp = ListParams::default().labels(&label_selector).timeout(10);
let deployments = deployent_api.list(&lp).await?;
Expand Down
27 changes: 21 additions & 6 deletions tembo-operator/src/app_service/manager.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::{apis::coredb_types::CoreDB, ingress_route_crd::IngressRouteRoutes, Context, Error, Result};
use crate::{
apis::coredb_types::CoreDB, ingress_route_crd::IngressRouteRoutes, Context, Error, Result,
};
use k8s_openapi::{
api::{
apps::v1::{Deployment, DeploymentSpec},
core::v1::{
Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, HTTPGetAction, PodSpec,
PodTemplateSpec, Probe, SecretKeySelector, SecurityContext, Service, ServicePort, ServiceSpec,
PodTemplateSpec, Probe, SecretKeySelector, SecurityContext, Service, ServicePort,
ServiceSpec,
},
},
apimachinery::pkg::{
Expand Down Expand Up @@ -352,7 +355,10 @@ async fn get_appservice_deployments(
namespace: &str,
coredb_name: &str,
) -> Result<Vec<String>, Error> {
let label_selector = format!("component={},coredb.io/name={}", COMPONENT_NAME, coredb_name);
let label_selector = format!(
"component={},coredb.io/name={}",
COMPONENT_NAME, coredb_name
);
let deployent_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
let lp = ListParams::default().labels(&label_selector).timeout(10);
let deployments = deployent_api.list(&lp).await.map_err(Error::KubeError)?;
Expand All @@ -370,7 +376,10 @@ async fn get_appservice_services(
namespace: &str,
coredb_name: &str,
) -> Result<Vec<String>, Error> {
let label_selector = format!("component={},coredb.io/name={}", COMPONENT_NAME, coredb_name);
let label_selector = format!(
"component={},coredb.io/name={}",
COMPONENT_NAME, coredb_name
);
let deployent_api: Api<Service> = Api::namespaced(client.clone(), namespace);
let lp = ListParams::default().labels(&label_selector).timeout(10);
let services = deployent_api.list(&lp).await.map_err(Error::KubeError)?;
Expand Down Expand Up @@ -516,7 +525,10 @@ pub async fn reconcile_app_services(cdb: &CoreDB, ctx: Arc<Context>) -> Result<(
}
Err(e) => {
has_errors = true;
error!("ns: {}, Failed to delete AppService: {}, error: {}", ns, d, e);
error!(
"ns: {}, Failed to delete AppService: {}, error: {}",
ns, d, e
);
}
}
}
Expand All @@ -531,7 +543,10 @@ pub async fn reconcile_app_services(cdb: &CoreDB, ctx: Arc<Context>) -> Result<(
}
Err(e) => {
has_errors = true;
error!("ns: {}, Failed to delete AppService: {}, error: {}", ns, d, e);
error!(
"ns: {}, Failed to delete AppService: {}, error: {}",
ns, d, e
);
}
}
}
Expand Down
Loading