Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,19 @@ steps:
args: [-m=long, test/cloudtest/test_upgrade.py, --no-test-parallelism]
sanitizer: skip
skip: "TODO(def-) Reenable in one version when labels are fixed in old version"
- id: checks-self-managed-25-2-upgrade
label: "Checks Self-Managed upgrade across all v25.2 patch releases, whole-Mz restart"
depends_on: build-x86_64
timeout_in_minutes: 180
# Sometimes runs into query timeouts or entire test timeouts with parallelism 1, too much state, same in all other platform-checks
parallelism: 3
agents:
# A larger instance is needed due to frequent OOMs, same in all other platform-checks
queue: hetzner-x86-64-8cpu-16gb
plugins:
- ./ci/plugins/mzcompose:
composition: platform-checks
args: [--scenario=SelfManagedv25_2_Upgrade, "--seed=$BUILDKITE_JOB_ID", --features=azurite]

- group: "K8s node recovery cloudtest"
key: k8s-node-recovery
Expand Down
50 changes: 50 additions & 0 deletions misc/python/materialize/checks/all_checks/password_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from textwrap import dedent

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check


class PasswordAuth(Check):
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(dedent(s))
for s in [
# Create a user with a password `user1` and grant them privileges to select from table materialize.schema1.t1
"""
> DROP SCHEMA IF EXISTS schema1 CASCADE;
> CREATE SCHEMA schema1;
> SET SCHEMA = schema1;
> CREATE ROLE user1 WITH LOGIN PASSWORD 'password';
> CREATE TABLE t1 (c int);
> GRANT USAGE ON SCHEMA schema1 TO user1;
> GRANT SELECT ON TABLE t1 TO user1;

# Validate that the user can select from the table
$ postgres-execute connection=postgres://user1:password@${testdrive.materialize-password-sql-addr}
SELECT * FROM materialize.schema1.t1
""",
# Change the role's password to `password2`
"""
> ALTER ROLE user1 PASSWORD 'password2';
""",
]
]

def validate(self) -> Testdrive:
return Testdrive(
# Test that user1 can login and select from the table they have privileges to select from
dedent(
"""
$ postgres-execute connection=postgres://user1:password2@${testdrive.materialize-password-sql-addr}
SELECT * FROM materialize.schema1.t1
"""
)
)
61 changes: 61 additions & 0 deletions misc/python/materialize/checks/scenarios_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@


from materialize.checks.actions import Action, Initialize, Manipulate, Sleep, Validate
from materialize.checks.all_checks.password_auth import PasswordAuth
from materialize.checks.checks import Check
from materialize.checks.executors import Executor
from materialize.checks.features import Features
Expand All @@ -25,6 +26,7 @@
from materialize.mz_version import MzVersion
from materialize.mzcompose.services.materialized import LEADER_STATUS_HEALTHCHECK
from materialize.version_list import (
fetch_self_managed_versions,
get_published_minor_mz_versions,
get_self_managed_versions,
)
Expand Down Expand Up @@ -509,3 +511,62 @@ def actions(self) -> list[Action]:
),
Validate(self),
]


class SelfManagedv25_2_Upgrade(Scenario):
"""
Upgrade from the oldest v25.2 patch release to the latest v25.2 patch release.
"""

def __init__(
self,
checks: list[type[Check]],
executor: Executor,
features: Features,
seed: str | None = None,
):
self_managed_versions = fetch_self_managed_versions()
self.v25_2_versions = sorted(
[
v.version
for v in self_managed_versions
if v.helm_version.major == 25 and v.helm_version.minor == 2
]
)

super().__init__(checks, executor, features, seed)

def base_version(self) -> MzVersion:
return self.v25_2_versions[0]

def actions(self) -> list[Action]:
print(f"Upgrading from tag {self.base_version()}")

def upgrade_actions(version: MzVersion | None, generation: int) -> list[Action]:
service_name = f"mz_{generation}"
return [
start_mz_read_only(
self,
tag=version,
deploy_generation=generation,
mz_service=service_name,
),
WaitReadyMz(service_name),
PromoteMz(service_name),
Validate(self, mz_service=service_name),
]

actions = [
StartMz(
self,
tag=self.base_version(),
),
Initialize(self),
Manipulate(self, phase=1),
Manipulate(self, phase=2),
]

for generation, version in enumerate(self.v25_2_versions[1:] + [None]):
actions.extend(upgrade_actions(version, generation + 1))

return actions
6 changes: 5 additions & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ def get_variable_system_parameters(
# force-enabled, the in-between, and the production value
["0", "1048576", "314572800", "67108864"],
),
VariableSystemParameter(
"enable_password_auth",
"true",
["true", "false"],
),
VariableSystemParameter(
"kafka_default_metadata_fetch_interval",
"1s",
Expand Down Expand Up @@ -550,7 +555,6 @@ def get_default_system_parameters(
"plan_insights_notice_fast_path_clusters_optimize_duration",
"enable_continual_task_builtins",
"enable_expression_cache",
"enable_password_auth",
"mz_metrics_lgalloc_map_refresh_interval",
"mz_metrics_lgalloc_refresh_interval",
"mz_metrics_rusage_refresh_interval",
Expand Down
2 changes: 1 addition & 1 deletion misc/python/materialize/mzcompose/composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ def sql_connection(
if reuse_connection and key in self.conns:
return self.conns[key]
conn = psycopg.connect(
host="localhost",
host="127.0.0.1",
dbname=database,
user=user,
password=password,
Expand Down
5 changes: 3 additions & 2 deletions misc/python/materialize/mzcompose/services/materialized.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def __init__(
cluster_replica_size: dict[str, dict[str, Any]] | None = None,
bootstrap_replica_size: str | None = None,
default_replication_factor: int = 1,
listeners_config_path: str = f"{MZ_ROOT}/src/materialized/ci/listener_configs/no_auth.json",
listeners_config_path: str = f"{MZ_ROOT}/src/materialized/ci/listener_configs/testdrive.json",
support_external_clusterd: bool = False,
networks: (
dict[str, dict[str, list[str]]] | dict[str, dict[str, str]] | None
Expand Down Expand Up @@ -132,6 +132,7 @@ def __init__(
"MZ_INTERNAL_PERSIST_PUBSUB_LISTEN_ADDR=0.0.0.0:6879",
"MZ_PERSIST_PUBSUB_URL=http://127.0.0.1:6879",
"MZ_AWS_CONNECTION_ROLE_ARN=arn:aws:iam::123456789000:role/MaterializeConnection",
"MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM=password",
"MZ_AWS_EXTERNAL_ID_PREFIX=eb5cb59b-e2fe-41f3-87ca-d2176a495345",
# Always use the persist catalog if the version has multiple implementations.
"MZ_CATALOG_STORE=persist",
Expand Down Expand Up @@ -347,7 +348,7 @@ def __init__(
{
"depends_on": depends_graph,
"command": command,
"ports": [6875, 6876, 6877, 6878, 26257],
"ports": [6875, 6876, 6877, 6878, 6880, 26257],
"environment": environment,
"volumes": volumes,
"tmpfs": ["/tmp"],
Expand Down
43 changes: 33 additions & 10 deletions misc/python/materialize/version_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import os
from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path

import frontmatter
Expand All @@ -30,21 +31,37 @@
MZ_ROOT = Path(os.environ["MZ_ROOT"])


@dataclass
class SelfManagedVersion:
helm_version: MzVersion
version: MzVersion


def fetch_self_managed_versions() -> list[SelfManagedVersion]:
return [
SelfManagedVersion(
MzVersion.parse_mz(entry["version"]),
MzVersion.parse_mz(entry["appVersion"]),
)
for entry in yaml.safe_load(
requests.get("https://materializeinc.github.io/materialize/index.yaml").text
)["entries"]["materialize-operator"]
if MzVersion.parse_mz(entry["appVersion"]) not in INVALID_VERSIONS
]


def get_self_managed_versions() -> list[MzVersion]:
prefixes = set()
result = set()
for entry in yaml.safe_load(
requests.get("https://materializeinc.github.io/materialize/index.yaml").text
)["entries"]["materialize-operator"]:
helm_version = MzVersion.parse_mz(entry["version"])
version = MzVersion.parse_mz(entry["appVersion"])
prefix = (version.major, version.minor)
self_managed_versions = fetch_self_managed_versions()
for version_info in self_managed_versions:
prefix = (version_info.version.major, version_info.version.minor)
if (
not version.prerelease
not version_info.version.prerelease
and prefix not in prefixes
and not helm_version.prerelease
and not version_info.helm_version.prerelease
):
result.add(version)
result.add(version_info.version)
prefixes.add(prefix)
return sorted(result)

Expand All @@ -71,6 +88,12 @@ def get_self_managed_versions() -> list[MzVersion]:
MzVersion.parse_mz("v0.93.0"), # accidental release
MzVersion.parse_mz("v0.99.1"), # incompatible for upgrades
MzVersion.parse_mz("v0.113.1"), # incompatible for upgrades
MzVersion.parse_mz(
"v0.147.7"
), # Incompatible for upgrades because it clears login attribute for roles due to catalog migration
MzVersion.parse_mz(
"v0.147.14"
), # Incompatible for upgrades because it clears login attribute for roles due to catalog migration
MzVersion.parse_mz("v0.157.0"),
}

Expand Down Expand Up @@ -101,7 +124,7 @@ def resolve_ancestor_image_tag(ancestor_overrides: dict[str, MzVersion]) -> str:


def _create_ancestor_image_resolution(
ancestor_overrides: dict[str, MzVersion]
ancestor_overrides: dict[str, MzVersion],
) -> AncestorImageResolutionBase:
if buildkite.is_in_buildkite():
return AncestorImageResolutionInBuildkite(ancestor_overrides)
Expand Down
1 change: 1 addition & 0 deletions misc/shlib/shlib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ is_truthy() {
trufflehog_jq_filter_common() {
jq -c '
select(
(.Raw | contains("user1:password") | not) and
.Raw != "postgres://mz_system:materialize@materialized:5432" and
.Raw != "postgres://materialize:materialize@materialized:6875" and
.Raw != "postgres://mz_system:materialize@materialized:6877" and
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2268,6 +2268,9 @@ impl CatalogState {
gids.remove(&gid.to_string());
}
}
// We exclude role_auth_by_id because it contains password information
// which should not be included in the dump.
dump_obj.remove("role_auth_by_id");

// Emit as pretty-printed JSON.
Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
Expand Down
50 changes: 50 additions & 0 deletions src/materialized/ci/listener_configs/testdrive.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"sql": {
"external": {
"addr": "0.0.0.0:6875",
"authenticator_kind": "None",
"allowed_roles": "Normal",
"enable_tls": false
},
"internal": {
"addr": "0.0.0.0:6877",
"authenticator_kind": "None",
"allowed_roles": "Internal",
"enable_tls": false
},
"password": {
"addr": "0.0.0.0:6880",
"authenticator_kind": "Password",
"allowed_roles": "NormalAndInternal",
"enable_tls": false
}
},
"http": {
"external": {
"addr": "0.0.0.0:6876",
"authenticator_kind": "None",
"allowed_roles": "NormalAndInternal",
"enable_tls": false,
"routes": {
"base": true,
"webhook": true,
"internal": false,
"metrics": false,
"profiling": false
}
},
"internal": {
"addr": "0.0.0.0:6878",
"authenticator_kind": "None",
"allowed_roles": "NormalAndInternal",
"enable_tls": false,
"routes": {
"base": true,
"webhook": true,
"internal": true,
"metrics": true,
"profiling": true
}
}
}
}
14 changes: 14 additions & 0 deletions src/testdrive/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ pub struct Config {
/// The port for the internal endpoints of the materialize instance that
/// testdrive will connect to via HTTP.
pub materialize_internal_http_port: u16,
/// The port for the password endpoints of the materialize instance that
/// testdrive will connect to via HTTP.
pub materialize_password_sql_port: u16,
/// Session parameters to set after connecting to materialize.
pub materialize_params: Vec<(String, String)>,
/// An optional catalog configuration.
Expand Down Expand Up @@ -191,6 +194,7 @@ pub struct MaterializeState {
http_addr: String,
internal_sql_addr: String,
internal_http_addr: String,
password_sql_addr: String,
user: String,
pgclient: tokio_postgres::Client,
environment_id: EnvironmentId,
Expand Down Expand Up @@ -330,6 +334,10 @@ impl State {
"testdrive.materialize-internal-sql-addr".into(),
self.materialize.internal_sql_addr.clone(),
);
self.cmd_vars.insert(
"testdrive.materialize-password-sql-addr".into(),
self.materialize.password_sql_addr.clone(),
);
self.cmd_vars.insert(
"testdrive.materialize-user".into(),
self.materialize.user.clone(),
Expand Down Expand Up @@ -1125,6 +1133,11 @@ async fn create_materialize_state(
materialize_internal_url.host_str().unwrap(),
materialize_internal_url.port().unwrap()
);
let materialize_password_sql_addr = format!(
"{}:{}",
materialize_url.host_str().unwrap(),
config.materialize_password_sql_port
);
let materialize_internal_http_addr = format!(
"{}:{}",
materialize_internal_url.host_str().unwrap(),
Expand All @@ -1151,6 +1164,7 @@ async fn create_materialize_state(
http_addr: materialize_http_addr,
internal_sql_addr: materialize_internal_sql_addr,
internal_http_addr: materialize_internal_http_addr,
password_sql_addr: materialize_password_sql_addr,
user: materialize_user,
pgclient,
environment_id,
Expand Down
Loading