diff --git a/src/v/cluster/CMakeLists.txt b/src/v/cluster/CMakeLists.txt index c3b4e6d0197d4..111980a82d9fb 100644 --- a/src/v/cluster/CMakeLists.txt +++ b/src/v/cluster/CMakeLists.txt @@ -121,6 +121,8 @@ v_cc_library( node_status_backend.cc node_status_rpc_handler.cc bootstrap_service.cc + cluster_uuid.cc + bootstrap_backend.cc DEPS Seastar::seastar bootstrap_rpc diff --git a/src/v/cluster/bootstrap_backend.cc b/src/v/cluster/bootstrap_backend.cc new file mode 100644 index 0000000000000..ad079d62491f9 --- /dev/null +++ b/src/v/cluster/bootstrap_backend.cc @@ -0,0 +1,102 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "cluster/bootstrap_backend.h" + +#include "cluster/commands.h" +#include "cluster/logger.h" +#include "cluster/types.h" +#include "security/credential_store.h" + +namespace cluster { + +bootstrap_backend::bootstrap_backend( + const std::optional& cluster_uuid, + ss::sharded& credentials, + ss::sharded& storage) + : _cluster_uuid(cluster_uuid) + , _credentials(credentials) + , _storage(storage) {} + +namespace { + +std::error_code +do_apply(user_and_credential cred, security::credential_store& store) { + if (store.contains(cred.username)) return errc::user_exists; + store.put(cred.username, std::move(cred.credential)); + return errc::success; +} + +template +ss::future dispatch_updates_to_cores( + Cmd cmd, ss::sharded& sharded_service) { + auto res = co_await sharded_service.map_reduce0( + [cmd](security::credential_store& service) { + return do_apply(std::move(cmd), service); + }, + std::optional{}, + [](std::optional result, std::error_code error_code) { + if (!result.has_value()) { + result = error_code; + } else { + vassert( + result.value() == error_code, + "State inconsistency across shards detected, " + "expected result: {}, have: {}", + result->value(), + error_code); + } + return result.value(); + }); + co_return res.value(); +} + +} // namespace + +ss::future +bootstrap_backend::apply_update(model::record_batch b) { + vlog(clusterlog.info, "Applying update to bootstrap_manager"); + + // handle node managements command + static constexpr auto accepted_commands + = make_commands_list(); + auto cmd = co_await cluster::deserialize(std::move(b), accepted_commands); + + co_return co_await ss::visit( + cmd, [this](bootstrap_cluster_cmd cmd) -> ss::future { + if (_cluster_uuid.has_value()) { + vlog( + clusterlog.debug, + "Skipping bootstrap_cluster_cmd {}, current cluster_uuid: {}", + cmd.value.uuid, + *_cluster_uuid); + co_return errc::success; + } + + if (cmd.value.bootstrap_user_cred) { + const std::error_code res + = co_await dispatch_updates_to_cores( + *cmd.value.bootstrap_user_cred, _credentials); + if (res != errc::success) co_return res; + vlog( + clusterlog.info, + "Bootstrap user {} created", + cmd.value.bootstrap_user_cred->username); + } + + _cluster_uuid = cmd.value.uuid; + co_await write_stored_cluster_uuid(_storage.local(), cmd.value.uuid); + vlog(clusterlog.info, "Cluster {} created", cmd.value.uuid); + co_return errc::success; + }); +} + +} // namespace cluster diff --git a/src/v/cluster/bootstrap_backend.h b/src/v/cluster/bootstrap_backend.h new file mode 100644 index 0000000000000..c2d2cf9157229 --- /dev/null +++ b/src/v/cluster/bootstrap_backend.h @@ -0,0 +1,60 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "cluster/cluster_uuid.h" +#include "cluster/fwd.h" +#include "model/record.h" +#include "model/record_batch_types.h" + +#include +#include + +#include + +namespace security { +class credential_store; +} + +namespace cluster { + +/** + * This class applies the cluster intitalization message to + * - itself, storing the cluster UUID value, also duplicating it to the kvstore + * - credintial_store, to initialize the bootstrap user + * - TODO: apply the initial licence + */ +class bootstrap_backend final { +public: + bootstrap_backend( + const std::optional&, + ss::sharded&, + ss::sharded&); + + ss::future apply_update(model::record_batch); + + bool is_batch_applicable(const model::record_batch& b) { + return b.header().type + == model::record_batch_type::cluster_bootstrap_cmd; + } + + const std::optional& get_cluster_uuid() const { + return _cluster_uuid; + } + +private: + std::optional _cluster_uuid; + ss::sharded& _credentials; + ss::sharded& _storage; +}; + +} // namespace cluster diff --git a/src/v/cluster/bootstrap_service.cc b/src/v/cluster/bootstrap_service.cc index a78e2e3689342..67d7b27ba46f8 100644 --- a/src/v/cluster/bootstrap_service.cc +++ b/src/v/cluster/bootstrap_service.cc @@ -10,6 +10,10 @@ #include "cluster/bootstrap_service.h" #include "cluster/bootstrap_types.h" +#include "cluster/cluster_utils.h" +#include "cluster/logger.h" +#include "config/node_config.h" +#include "features/feature_table.h" namespace cluster { @@ -17,7 +21,18 @@ ss::future bootstrap_service::cluster_bootstrap_info( cluster_bootstrap_info_request&&, rpc::streaming_context&) { cluster_bootstrap_info_reply r{}; - // TODO: add fields! + r.broker = make_self_broker(config::node()); + r.version = features::feature_table::get_latest_logical_version(); + const std::vector& seed_servers + = config::node().seed_servers(); + r.seed_servers.reserve(seed_servers.size()); + std::transform( + seed_servers.cbegin(), + seed_servers.cend(), + std::back_inserter(r.seed_servers), + [](const config::seed_server& seed_server) { return seed_server.addr; }); + r.empty_seed_starts_cluster = config::node().empty_seed_starts_cluster(); + vlog(clusterlog.debug, "Replying cluster_bootstrap_info: {}", r); co_return r; } diff --git a/src/v/cluster/bootstrap_types.h b/src/v/cluster/bootstrap_types.h index 39278e73486d5..bb45d5714657a 100644 --- a/src/v/cluster/bootstrap_types.h +++ b/src/v/cluster/bootstrap_types.h @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #pragma once +#include "cluster/types.h" #include "model/fundamental.h" #include "serde/serde.h" @@ -34,13 +35,27 @@ struct cluster_bootstrap_info_reply : serde::envelope> { using rpc_adl_exempt = std::true_type; + model::broker broker; + cluster_version version; + std::vector seed_servers; + bool empty_seed_starts_cluster; // TODO: add fields! + // - node UUID? - auto serde_fields() { return std::tie(); } + auto serde_fields() { + return std::tie( + broker, version, seed_servers, empty_seed_starts_cluster); + } friend std::ostream& - operator<<(std::ostream& o, const cluster_bootstrap_info_reply&) { - fmt::print(o, "{{}}"); + operator<<(std::ostream& o, const cluster_bootstrap_info_reply& v) { + fmt::print( + o, + "{{broker: {}, version: {}, seed_servers: {}, ESCB: {}}}", + v.broker, + v.version, + v.seed_servers, + v.empty_seed_starts_cluster); return o; } }; diff --git a/src/v/cluster/cluster_discovery.cc b/src/v/cluster/cluster_discovery.cc index 8747d9ea6b17f..9fb521807d33c 100644 --- a/src/v/cluster/cluster_discovery.cc +++ b/src/v/cluster/cluster_discovery.cc @@ -9,10 +9,13 @@ #include "cluster/cluster_discovery.h" +#include "cluster/bootstrap_types.h" +#include "cluster/cluster_bootstrap_service.h" #include "cluster/cluster_utils.h" #include "cluster/controller_service.h" #include "cluster/logger.h" #include "config/node_config.h" +#include "features/feature_table.h" #include "model/fundamental.h" #include "model/metadata.h" #include "seastarx.h" @@ -26,6 +29,19 @@ using std::vector; namespace cluster { +namespace { + +void verify_duplicate_seed_servers() { + std::vector s = config::node().seed_servers(); + std::sort(s.begin(), s.end()); + const auto s_dupe_i = std::adjacent_find(s.cbegin(), s.cend()); + if (s_dupe_i != s.cend()) + throw std::runtime_error(fmt_with_ctx( + fmt::format, "Duplicate items in seed_servers: {}", *s_dupe_i)); +} + +} // namespace + cluster_discovery::cluster_discovery( const model::node_uuid& node_uuid, storage::kvstore& kvstore, @@ -34,7 +50,9 @@ cluster_discovery::cluster_discovery( , _join_retry_jitter(config::shard_local_cfg().join_retry_timeout_ms()) , _join_timeout(std::chrono::seconds(2)) , _kvstore(kvstore) - , _as(as) {} + , _as(as) { + verify_duplicate_seed_servers(); +} ss::future cluster_discovery::determine_node_id() { // TODO: read from disk if empty. @@ -46,18 +64,16 @@ ss::future cluster_discovery::determine_node_id() { static const bytes invariants_key("configuration_invariants"); auto invariants_buf = _kvstore.get( storage::kvstore::key_space::controller, invariants_key); - if (invariants_buf) { auto invariants = reflection::from_iobuf( std::move(*invariants_buf)); co_return invariants.node_id; } - // TODO: once is_cluster_founder() refers to all seeds, verify that all the - // seeds' seed_servers lists match and assign node IDs based on the - // ordering. - if (is_cluster_founder()) { - // If this is the root node, assign node 0. - co_return model::node_id(0); + + if (auto cf_node_id = get_cluster_founder_node_id(); cf_node_id) { + // TODO: verify that all the seeds' seed_servers lists match + clusterlog.info("Using index based node ID {}", *cf_node_id); + co_return *cf_node_id; } model::node_id assigned_node_id; co_await ss::repeat([this, &assigned_node_id] { @@ -66,14 +82,26 @@ ss::future cluster_discovery::determine_node_id() { co_return assigned_node_id; } -vector cluster_discovery::initial_raft0_brokers() const { +ss::future +cluster_discovery::initial_seed_brokers(const bool cluster_exists) const { + // If the cluster has been formed, return empty + if (cluster_exists) co_return brokers{}; // If configured as the root node, we'll want to start the cluster with // just this node as the initial seed. - if (is_cluster_founder()) { - // TODO: we should only return non-empty seed list if our log is empty. - return {make_self_broker(config::node())}; + if (config::node().empty_seed_starts_cluster()) { + if (config::node().seed_servers().empty()) + co_return brokers{make_self_broker(config::node())}; + // Not a root + co_return brokers{}; + } + if (get_node_index_in_seed_servers()) { + std::vector seed_brokers + = co_await request_seed_brokers(); + seed_brokers.push_back(make_self_broker(config::node())); + co_return std::move(seed_brokers); } - return {}; + // Non-seed server + co_return brokers{}; } ss::future @@ -131,8 +159,167 @@ cluster_discovery::dispatch_node_uuid_registration_to_seeds( co_return ss::stop_iteration::no; } -bool cluster_discovery::is_cluster_founder() const { - return config::node().seed_servers().empty(); +ss::future +cluster_discovery::request_cluster_bootstrap_info( + const net::unresolved_address addr) const { + vlog(clusterlog.info, "Requesting cluster bootstrap info from {}", addr); + cluster_bootstrap_info_reply reply; + co_await ss::repeat( + ss::coroutine::lambda([&reply, addr]() -> ss::future { + result reply_result(std::errc::connection_refused); + try { + reply_result = co_await do_with_client_one_shot< + cluster_bootstrap_client_protocol>( + addr, + config::node().rpc_server_tls(), + 2s, + [](cluster_bootstrap_client_protocol c) { + return c + .cluster_bootstrap_info( + cluster_bootstrap_info_request{}, + rpc::client_opts(rpc::clock_type::now() + 2s)) + .then(&rpc::get_ctx_data); + }); + } catch (...) { + co_return ss::stop_iteration::no; + } + if (reply_result) { + reply = std::move(reply_result.value()); + co_return ss::stop_iteration::yes; + } + co_await ss::sleep_abortable(1s); + vlog( + clusterlog.trace, "Retrying cluster bootstrap info from {}", addr); + co_return ss::stop_iteration::no; + })); + + vlog(clusterlog.info, "Obtained cluster bootstrap info from {}", addr); + vlog(clusterlog.debug, "{}", reply); + co_return std::move(reply); +} + +namespace { + +bool equal( + const std::vector& lhs, + const std::vector& rhs) { + return std::equal( + lhs.cbegin(), + lhs.cend(), + rhs.cbegin(), + rhs.cend(), + [](const net::unresolved_address& lhs, const config::seed_server& rhs) { + return lhs == rhs.addr; + }); +} + +} // namespace + +ss::future> +cluster_discovery::request_seed_brokers() const { + const net::unresolved_address& self_addr + = config::node().advertised_rpc_api(); + const std::vector& self_seed_servers + = config::node().seed_servers(); + + std::vector< + std::pair> + peers; + peers.reserve(self_seed_servers.size()); + for (const config::seed_server& seed_server : self_seed_servers) { + // do not call oneself + if (seed_server.addr == self_addr) continue; + peers.emplace_back(seed_server.addr, cluster_bootstrap_info_reply{}); + } + co_await ss::parallel_for_each(peers, [this](auto& peer) -> ss::future<> { + peer.second = co_await request_cluster_bootstrap_info(peer.first); + co_return; + }); + + bool failed = false; + std::vector seed_brokers; + seed_brokers.reserve(peers.size()); + for (auto& peer : peers) { + if ( + peer.second.version + != features::feature_table::get_latest_logical_version()) { + vlog( + clusterlog.error, + "Cluster setup error: logical version mismatch, local: {}, {}: " + "{}", + features::feature_table::get_latest_logical_version(), + peer.first, + peer.second.version); + failed = true; + } + if (!equal(peer.second.seed_servers, self_seed_servers)) { + vlog( + clusterlog.error, + "Cluster configuration error: seed server list mismatch, " + "local: " + "[{}], {}: [{}]", + self_seed_servers, + peer.first, + peer.second.seed_servers); + failed = true; + } + if ( + peer.second.empty_seed_starts_cluster + != config::node().empty_seed_starts_cluster()) { + vlog( + clusterlog.error, + "Cluster configuration error: empty_seed_starts_cluster " + "mismatch, local: {}, {}: {}", + config::node().empty_seed_starts_cluster(), + peer.first, + peer.second.empty_seed_starts_cluster); + failed = true; + } + seed_brokers.push_back(std::move(peer.second.broker)); + } + if (failed) + throw std::runtime_error(fmt_with_ctx( + fmt::format, + "Cannot bootstrap a cluster due to seed servers mismatch, check " + "the " + "log for details")); + vlog(clusterlog.debug, "Seed brokers: [{}]", seed_brokers); + co_return std::move(seed_brokers); +} + +/*static*/ std::optional +cluster_discovery::get_cluster_founder_node_id() { + if (config::node().empty_seed_starts_cluster()) { + if (config::node().seed_servers().empty()) + return node_id{0}; + else + return {}; + } else { + if (auto idx = get_node_index_in_seed_servers(); idx) + return node_id{*idx}; + else + return {}; + } +} + +/*static*/ std::optional +cluster_discovery::get_node_index_in_seed_servers() { + const std::vector& seed_servers + = config::node().seed_servers(); + vassert( + !seed_servers.empty(), + "Configuration error: seed_servers cannot be empty when " + "empty_seed_starts_cluster is false"); + const auto it = std::find_if( + seed_servers.cbegin(), + seed_servers.cend(), + [rpc_addr = config::node().advertised_rpc_api()]( + const config::seed_server& seed_server) { + return rpc_addr == seed_server.addr; + }); + if (it == seed_servers.cend()) return {}; + return { + boost::numeric_cast(std::distance(seed_servers.cbegin(), it))}; } } // namespace cluster diff --git a/src/v/cluster/cluster_discovery.h b/src/v/cluster/cluster_discovery.h index 44233adb0c566..e9f825f6dea6c 100644 --- a/src/v/cluster/cluster_discovery.h +++ b/src/v/cluster/cluster_discovery.h @@ -24,6 +24,7 @@ class kvstore; } // namespace storage namespace cluster { +struct cluster_bootstrap_info_reply; // Provides metadata pertaining to initial cluster discovery. It is the // entrypoint into the steps to join a cluster. @@ -60,6 +61,8 @@ namespace cluster { // TODO: reconcile the RPC dispatch logic here with that in members_manager. class cluster_discovery { public: + using brokers = std::vector; + cluster_discovery( const model::node_uuid& node_uuid, storage::kvstore& kvstore, @@ -74,29 +77,36 @@ class cluster_discovery { // assign it a node ID. ss::future determine_node_id(); - // If configured as the root, return this broker as the sole initial Raft0 - // broker. - // - // TODO: implement the below behavior. - // // Returns brokers to be used to form a Raft group for a new cluster. // - // If this node is a seed server, returns all seed servers, assuming seeds - // are configured with identical seed servers. - // - // If this node is not a seed server returns an empty list. - std::vector initial_raft0_brokers() const; + // If this node is a cluster founder, returns all seed servers, assuming + // all founders are configured with identical seed servers list. + // If this node is not a cluster founder, or if a cluster has already been + // created, returns an empty list. + // In case of Emtpy Seed Cluster Bootstrap, that reflects to a list of the + // root broker in root if cluster is not there yet, and empty otherwise. + ss::future initial_seed_brokers(bool cluster_exists) const; private: - // Returns whether this node is the root node. - // - // TODO: implement the below behavior. - // - // Returns true if the local node is a founding member of the cluster, as - // indicated by either us having an empty seed server (we are the root node - // in a legacy config) or our node UUID matching one of those returned by - // the seed servers. - bool is_cluster_founder() const; + // Returns index-based node_id if the local node is a founding member + // of the cluster, as indicated by either us having an empty seed server + // (we are the root node in a legacy config), or our node IP listed + // as one of the seed servers. + static std::optional get_cluster_founder_node_id(); + + /** + * Search for the current node's advertised RPC address in the seed_servers. + * Precondition: emtpy_seed_starts_cluster=false + * \return Index of this node in seed_servers list if found, or empty if + * not. + * \throw vasserts if seed_servers is empty + */ + static std::optional get_node_index_in_seed_servers(); + + ss::future> request_seed_brokers() const; + + ss::future + request_cluster_bootstrap_info(net::unresolved_address) const; // Sends requests to each seed server to register the local node UUID until // one succeeds. Upon success, sets `node_id` to the assigned node ID and diff --git a/src/v/cluster/cluster_utils.h b/src/v/cluster/cluster_utils.h index d1cce4d7956dd..51dd0a2f98b6a 100644 --- a/src/v/cluster/cluster_utils.h +++ b/src/v/cluster/cluster_utils.h @@ -240,7 +240,8 @@ ss::future replicate_and_wait( } vassert( use_serde_serialization, - "Requested to ADL serialize a serde-only type"); + "serde_raft_0 feature not enabled while serializing a serde-only " + "controller command"); auto b = serde_serialize_cmd(std::forward(cmd)); return stm.replicate_and_wait( std::move(b), timeout, as.local(), term); diff --git a/src/v/cluster/cluster_uuid.cc b/src/v/cluster/cluster_uuid.cc new file mode 100644 index 0000000000000..e26573b9b0201 --- /dev/null +++ b/src/v/cluster/cluster_uuid.cc @@ -0,0 +1,38 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "cluster/cluster_uuid.h" + +#include "storage/api.h" + +namespace cluster { + +const bytes cluster_uuid_key = "cluster_uuid"; +const storage::kvstore::key_space cluster_uuid_key_space + = storage::kvstore::key_space::controller; + +std::optional read_stored_cluster_uuid(storage::api& storage) { + std::optional cluster_uuid_buf = storage.kvs().get( + cluster_uuid_key_space, cluster_uuid_key); + if (cluster_uuid_buf) { + return cluster_uuid{ + serde::from_iobuf(std::move(*cluster_uuid_buf))}; + } + return {}; +} + +ss::future<> +write_stored_cluster_uuid(storage::api& storage, const cluster_uuid& value) { + return storage.kvs().put( + cluster_uuid_key_space, cluster_uuid_key, serde::to_iobuf(value)); +} + +} // namespace cluster diff --git a/src/v/cluster/cluster_uuid.h b/src/v/cluster/cluster_uuid.h new file mode 100644 index 0000000000000..607f0ffd0749d --- /dev/null +++ b/src/v/cluster/cluster_uuid.h @@ -0,0 +1,32 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "bytes/bytes.h" +#include "storage/fwd.h" +#include "utils/uuid.h" + +namespace cluster { + +// TODO: going to fundamental.h? +using cluster_uuid = named_type; + +/** + * Read the stored cluster uuid from the kvstore + * \return Cluster UUID if stored in kvstore, empty otherwise + */ +std::optional read_stored_cluster_uuid(storage::api& storage); + +ss::future<> +write_stored_cluster_uuid(storage::api& storage, const cluster_uuid&); + +} // namespace cluster diff --git a/src/v/cluster/commands.h b/src/v/cluster/commands.h index ff5f3da3a4bb4..66a2b876535d5 100644 --- a/src/v/cluster/commands.h +++ b/src/v/cluster/commands.h @@ -108,6 +108,7 @@ static constexpr int8_t recommission_node_cmd_type = 1; static constexpr int8_t finish_reallocations_cmd_type = 2; static constexpr int8_t maintenance_mode_cmd_type = 3; static constexpr int8_t register_node_uuid_cmd_type = 4; +static constexpr int8_t bootstrap_cluster_cmd_type = 5; // cluster config commands static constexpr int8_t cluster_config_delta_cmd_type = 0; @@ -282,6 +283,13 @@ using feature_update_license_update_cmd = controller_command< model::record_batch_type::feature_update, serde_opts::serde_only>; +// Cluster bootstrap +using bootstrap_cluster_cmd = controller_command< + int8_t, // unused, always 0 + bootstrap_cluster_cmd_data, + bootstrap_cluster_cmd_type, + model::record_batch_type::cluster_bootstrap_cmd>; + // typelist utils template concept ControllerCommand = requires(T cmd) { diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index d5805e87899a4..13e3ed70162c6 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -9,6 +9,7 @@ #include "cluster/controller.h" +#include "cluster/bootstrap_backend.h" #include "cluster/cluster_utils.h" #include "cluster/config_frontend.h" #include "cluster/controller_api.h" @@ -50,6 +51,8 @@ #include #include +#include + namespace cluster { controller::controller( @@ -98,8 +101,10 @@ ss::future<> controller::wire_up() { .then([this] { _probe.start(); }); } -ss::future<> -controller::start(std::vector initial_raft0_brokers) { +ss::future<> controller::start( + std::vector&& initial_raft0_brokers, + const std::optional& stored_cluster_uuid) { + const bool local_node_is_seed_server = !initial_raft0_brokers.empty(); return create_raft0( _partition_manager, _shard_table, @@ -128,6 +133,10 @@ controller::start(std::vector initial_raft0_brokers) { .then([this] { return _feature_backend.start_single(std::ref(_feature_table)); }) + .then([this, stored_cluster_uuid] { + return _bootstrap_backend.start_single( + stored_cluster_uuid, std::ref(_credentials), std::ref(_storage)); + }) .then([this] { return _config_frontend.start( std::ref(_stm), @@ -155,7 +164,8 @@ controller::start(std::vector initial_raft0_brokers) { std::ref(_members_manager), std::ref(_data_policy_manager), std::ref(_config_manager), - std::ref(_feature_backend)); + std::ref(_feature_backend), + std::ref(_bootstrap_backend)); }) .then([this] { return _members_frontend.start( @@ -235,7 +245,11 @@ controller::start(std::vector initial_raft0_brokers) { return stm.wait(stm.bootstrap_last_applied(), model::no_timeout); }); }) - .then([this] { return cluster_creation_hook(); }) + .then([this, local_node_is_seed_server] { + if (_bootstrap_backend.local().get_cluster_uuid()) + return ss::make_ready_future(); + return cluster_creation_hook(local_node_is_seed_server); + }) .then( [this] { return _backend.invoke_on_all(&controller_backend::start); }) .then([this] { @@ -406,6 +420,7 @@ ss::future<> controller::stop() { .then([this] { return _config_frontend.stop(); }) .then([this] { return _feature_backend.stop(); }) .then([this] { return _stm.stop(); }) + .then([this] { return _bootstrap_backend.stop(); }) .then([this] { return _authorizer.stop(); }) .then([this] { return _credentials.stop(); }) .then([this] { return _tp_state.stop(); }) @@ -419,39 +434,93 @@ ss::future<> controller::stop() { }); } +ss::future<> controller::create_cluster() { + bootstrap_cluster_cmd_data cmd_data; + cmd_data.uuid = cluster_uuid( + uuid_t(boost::uuids::random_generator_mt19937()())); + cmd_data.bootstrap_user_cred + = security_frontend::get_bootstrap_user_creds_from_env(); + vlog(clusterlog.info, "Creating cluster {}", cmd_data.uuid); + + // cluster::replicate_and_wait() cannot be used here because it checks + // for serde_raft_0 feature to be enabled. Before a cluster is here, + // no feature is active, however the bootstrap_cluster_cmd is serde-only + return _stm + .invoke_on( + controller_stm_shard, + [this, cmd_data = std::move(cmd_data)](controller_stm& stm) mutable { + model::record_batch b = serde_serialize_cmd( + bootstrap_cluster_cmd{0, std::move(cmd_data)}); + return stm.replicate_and_wait( + std::move(b), + model::timeout_clock::now() + 30s, + _as.local(), + std::nullopt); + }) + .then([](std::error_code errc) { + vassert(errc == errc::success, "TODO: failed with {}", errc); + }); +} + /** * This function provides for writing the controller log immediately * after it has been created, before anything else has been written * to it, and before we have started communicating with peers. */ -ss::future<> controller::cluster_creation_hook() { - if (!config::node().seed_servers().empty()) { - // We are not on the root node - co_return; - } else if ( - _raft0->last_visible_index() > model::offset{} - || _raft0->config().brokers().size() > 1) { - // The controller log has already been written to +ss::future<> +controller::cluster_creation_hook(const bool local_node_is_seed_server) { + if (config::node().empty_seed_starts_cluster()) { + // This is the legacy empty-seed-starts-cluster mode + + if (!config::node().seed_servers().empty()) { + // We are not on the root node + co_return; + } + if ( + _raft0->last_visible_index() > model::offset{} + || _raft0->config().brokers().size() > 1) { + // The controller log has already been written to + co_return; + } + + // Internal RPC does not start until after controller startup + // is complete (we are called during controller startup), so + // it is guaranteed that if we were single node/empty controller + // log at start of this function, we will still be in that state + // here. The wait for leadership is really just a wait for the + // consensus object to finish writing its last_voted_for from + // its self-vote. + while (!_raft0->is_leader()) { + co_await ss::sleep(100ms); + } + + auto err + = co_await _security_frontend.local().maybe_create_bootstrap_user(); + vassert( + err == errc::success, + "Controller write should always succeed in single replica state " + "during " + "creation"); co_return; } - // Internal RPC does not start until after controller startup - // is complete (we are called during controller startup), so - // it is guaranteed that if we were single node/empty controller - // log at start of this function, we will still be in that state - // here. The wait for leadership is really just a wait for the - // consensus object to finish writing its last_voted_for from - // its self-vote. - while (!_raft0->is_leader()) { + // The new cluster creation is done by the leader of raft0 consisting of + // seed servers, once elected + if (!local_node_is_seed_server) co_return; + + while (!_bootstrap_backend.local().get_cluster_uuid() + && !_raft0->is_leader()) { co_await ss::sleep(100ms); } - auto err - = co_await _security_frontend.local().maybe_create_bootstrap_user(); - vassert( - err == errc::success, - "Controller write should always succeed in single replica state during " - "creation"); + if (_bootstrap_backend.local().get_cluster_uuid()) { + vlog( + clusterlog.info, + "Cluster is {}", + *_bootstrap_backend.local().get_cluster_uuid()); + co_return; + } + co_return co_await create_cluster(); } /** diff --git a/src/v/cluster/controller.h b/src/v/cluster/controller.h index a73ca24c627b1..994182c0e4af5 100644 --- a/src/v/cluster/controller.h +++ b/src/v/cluster/controller.h @@ -118,7 +118,17 @@ class controller { ss::future<> wire_up(); - ss::future<> start(std::vector); + /** + * Create raft0, and start the services that the \c controller owns. + * \param initial_raft0_brokers Brokers to start raft0 with. Empty for + * non-seeds. + * \param stored_cluster_uuid Cluster UUID as it appears in the kvstore, + * or empty if it does not. + */ + ss::future<> start( + std::vector&& initial_raft0_brokers, + const std::optional& stored_cluster_uuid); + // prevents controller from accepting new requests ss::future<> shutdown_input(); ss::future<> stop(); @@ -126,7 +136,8 @@ class controller { private: friend controller_probe; - ss::future<> cluster_creation_hook(); + ss::future<> create_cluster(); + ss::future<> cluster_creation_hook(bool local_node_is_seed_server); config_manager::preload_result _config_preload; ss::sharded _as; // instance per core @@ -171,6 +182,7 @@ class controller { consensus_ptr _raft0; ss::sharded& _cloud_storage_api; controller_probe _probe; + ss::sharded _bootstrap_backend; // single instance }; } // namespace cluster diff --git a/src/v/cluster/controller_stm.h b/src/v/cluster/controller_stm.h index 6a780898ab3d0..59c9a947f4a92 100644 --- a/src/v/cluster/controller_stm.h +++ b/src/v/cluster/controller_stm.h @@ -11,6 +11,7 @@ #pragma once +#include "cluster/bootstrap_backend.h" #include "cluster/config_manager.h" #include "cluster/data_policy_manager.h" #include "cluster/feature_backend.h" @@ -29,7 +30,8 @@ using controller_stm = raft::mux_state_machine< members_manager, data_policy_manager, config_manager, - feature_backend>; + feature_backend, + bootstrap_backend>; static constexpr ss::shard_id controller_stm_shard = 0; diff --git a/src/v/cluster/security_frontend.cc b/src/v/cluster/security_frontend.cc index d351f1a0b5ef8..919563e535f74 100644 --- a/src/v/cluster/security_frontend.cc +++ b/src/v/cluster/security_frontend.cc @@ -309,22 +309,14 @@ security_frontend::dispatch_delete_acls_to_leader( }); } -/** - * For use during cluster creation, if RP_BOOTSTRAP_USER is set - * then write a user creation message to the controller log. - * - * @returns an error code if controller log write failed. If the - * environment variable is missing or malformed this is - * not considered an error. - * - */ -ss::future security_frontend::maybe_create_bootstrap_user() { - static const ss::sstring bootstrap_user_env_key{"RP_BOOTSTRAP_USER"}; +static const ss::sstring bootstrap_user_env_key{"RP_BOOTSTRAP_USER"}; +/*static*/ std::optional +security_frontend::get_bootstrap_user_creds_from_env() { auto creds_str_ptr = std::getenv(bootstrap_user_env_key.c_str()); if (creds_str_ptr == nullptr) { // Environment variable is not set - co_return errc::success; + return {}; } ss::sstring creds_str = creds_str_ptr; @@ -336,16 +328,25 @@ ss::future security_frontend::maybe_create_bootstrap_user() { clusterlog.warn, "Invalid value of {} (expected \"username:password\")", bootstrap_user_env_key); - co_return errc::success; + return {}; } auto username = security::credential_user{creds_str.substr(0, colon)}; auto password = creds_str.substr(colon + 1); auto credentials = security::scram_sha256::make_credentials( password, security::scram_sha256::min_iterations); + return std::optional( + std::in_place, std::move(username), std::move(credentials)); +} + +ss::future security_frontend::maybe_create_bootstrap_user() { + const auto bootstrap_user_creds = get_bootstrap_user_creds_from_env(); + if (!bootstrap_user_creds) co_return errc::success; auto err = co_await create_user( - username, credentials, model::timeout_clock::now() + 5s); + bootstrap_user_creds->username, + bootstrap_user_creds->credential, + model::timeout_clock::now() + 5s); if (err) { vlog( @@ -357,7 +358,7 @@ ss::future security_frontend::maybe_create_bootstrap_user() { vlog( clusterlog.info, "Created user '{}' via {}", - username, + bootstrap_user_creds->username, bootstrap_user_env_key); } diff --git a/src/v/cluster/security_frontend.h b/src/v/cluster/security_frontend.h index 5b9c7f492dd23..ea3f3052505be 100644 --- a/src/v/cluster/security_frontend.h +++ b/src/v/cluster/security_frontend.h @@ -54,8 +54,26 @@ class security_frontend final { std::vector, model::timeout_clock::duration); + /** + * For use during cluster creation, if RP_BOOTSTRAP_USER is set + * then write a user creation message to the controller log. + * + * @returns an error code if controller log write failed. If the + * environment variable is missing or malformed this is + * not considered an error. + */ ss::future maybe_create_bootstrap_user(); + /** + * For use during cluster creation, if RP_BOOTSTRAP_USER is set + * then returns the user creds specified in it + * + * @returns empty value if RP_BOOTSTRAP_USER is not set or user creds + * cannot be parsed + */ + static std::optional + get_bootstrap_user_creds_from_env(); + private: ss::future> do_create_acls( std::vector, model::timeout_clock::duration); diff --git a/src/v/cluster/security_manager.h b/src/v/cluster/security_manager.h index 3b0933f92d769..1f6cafd9e420b 100644 --- a/src/v/cluster/security_manager.h +++ b/src/v/cluster/security_manager.h @@ -42,8 +42,9 @@ class security_manager final { } private: - template - ss::future dispatch_updates_to_cores(Cmd, ss::sharded&); + template + ss::future + dispatch_updates_to_cores(Cmd, ss::sharded&); ss::sharded& _credentials; ss::sharded& _authorizer; diff --git a/src/v/cluster/tests/cluster_test_fixture.h b/src/v/cluster/tests/cluster_test_fixture.h index 9b849fc5908c6..29024dc67181f 100644 --- a/src/v/cluster/tests/cluster_test_fixture.h +++ b/src/v/cluster/tests/cluster_test_fixture.h @@ -73,7 +73,10 @@ class cluster_test_fixture { int16_t schema_reg_port, int16_t coproc_supervisor_port, std::vector seeds, - configure_node_id use_node_id = configure_node_id::yes) { + configure_node_id use_node_id = configure_node_id::yes, + empty_seed_starts_cluster empty_seed_starts_cluster_val + = empty_seed_starts_cluster::yes) { + cluster::clusterlog.info("AWONG emplacing"); _instances.emplace( node_id, std::make_unique( @@ -87,7 +90,8 @@ class cluster_test_fixture { ssx::sformat("{}.{}", _base_dir, node_id()), _sgroups, false, - use_node_id)); + use_node_id, + empty_seed_starts_cluster_val)); } application* get_node_application(model::node_id id) { @@ -114,9 +118,12 @@ class cluster_test_fixture { int proxy_port_base = 8082, int schema_reg_port_base = 8081, int coproc_supervisor_port_base = 43189, - configure_node_id use_node_id = configure_node_id::yes) { + configure_node_id use_node_id = configure_node_id::yes, + empty_seed_starts_cluster empty_seed_starts_cluster_val + = empty_seed_starts_cluster::yes) { + cluster::clusterlog.info("AWONG creating"); std::vector seeds = {}; - if (node_id != 0) { + if (!empty_seed_starts_cluster_val || node_id != 0) { seeds.push_back( {.addr = net::unresolved_address("127.0.0.1", 11000)}); } @@ -128,14 +135,25 @@ class cluster_test_fixture { schema_reg_port_base + node_id(), coproc_supervisor_port_base + node_id(), std::move(seeds), - use_node_id); + use_node_id, + empty_seed_starts_cluster_val); return get_node_application(node_id); } application* create_node_application( - model::node_id node_id, configure_node_id use_node_id) { + model::node_id node_id, + configure_node_id use_node_id, + empty_seed_starts_cluster empty_seed_starts_cluster_val + = empty_seed_starts_cluster::yes) { return create_node_application( - node_id, 9092, 11000, 8082, 8081, 43189, use_node_id); + node_id, + 9092, + 11000, + 8082, + 8081, + 43189, + use_node_id, + empty_seed_starts_cluster_val); } void remove_node_application(model::node_id node_id) { diff --git a/src/v/cluster/tests/cluster_tests.cc b/src/v/cluster/tests/cluster_tests.cc index 4ae7f113f9706..3abebb62a2833 100644 --- a/src/v/cluster/tests/cluster_tests.cc +++ b/src/v/cluster/tests/cluster_tests.cc @@ -88,3 +88,18 @@ FIXTURE_TEST(test_auto_assign_with_explicit_node_id, cluster_test_fixture) { wait_for_all_members(3s).get(); } + +FIXTURE_TEST( + test_seed_driven_cluster_bootstrap_single_node, cluster_test_fixture) { + const model::node_id id0{0}; + create_node_application( + id0, configure_node_id::no, empty_seed_starts_cluster::no); + BOOST_REQUIRE_EQUAL(0, *config::node().node_id()); + wait_for_controller_leadership(id0).get(); + wait_for_all_members(3s).get(); + auto brokers = get_local_cache(id0).all_brokers(); + + // single broker + BOOST_REQUIRE_EQUAL(brokers.size(), 1); + BOOST_REQUIRE_EQUAL(brokers[0]->id(), id0); +} diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index e4b8f32c12070..40d8437f1a5cd 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -11,6 +11,7 @@ #pragma once +#include "cluster/cluster_uuid.h" #include "cluster/errc.h" #include "cluster/fwd.h" #include "kafka/types.h" @@ -22,7 +23,9 @@ #include "model/timeout_clock.h" #include "raft/types.h" #include "security/acl.h" +#include "security/credential_store.h" #include "security/license.h" +#include "security/scram_credential.h" #include "serde/envelope.h" #include "serde/serde.h" #include "storage/ntp_config.h" @@ -1939,6 +1942,41 @@ struct feature_update_license_update_cmd_data operator<<(std::ostream&, const feature_update_license_update_cmd_data&); }; +struct user_and_credential + : serde::envelope> { + using rpc_adl_exempt = std::true_type; + static constexpr int8_t current_version = 0; + + user_and_credential() = default; + user_and_credential( + security::credential_user&& username_, + security::scram_credential&& credential_) + : username(std::move(username_)) + , credential(std::move(credential_)) {} + friend bool + operator==(const user_and_credential&, const user_and_credential&) + = default; + auto serde_fields() { return std::tie(username, credential); } + + security::credential_user username; + security::scram_credential credential; +}; + +struct bootstrap_cluster_cmd_data + : serde::envelope> { + using rpc_adl_exempt = std::true_type; + static constexpr int8_t current_version = 0; + + friend bool operator==( + const bootstrap_cluster_cmd_data&, const bootstrap_cluster_cmd_data&) + = default; + + auto serde_fields() { return std::tie(uuid, bootstrap_user_cred); } + + cluster_uuid uuid; + std::optional bootstrap_user_cred; +}; + enum class reconciliation_status : int8_t { done, in_progress, diff --git a/src/v/config/node_config.cc b/src/v/config/node_config.cc index 9784ac2431736..41b92e3aa5fee 100644 --- a/src/v/config/node_config.cc +++ b/src/v/config/node_config.cc @@ -47,6 +47,14 @@ node_config::node_config() noexcept "form a new cluster", {.visibility = visibility::user}, {}) + , empty_seed_starts_cluster( + *this, + "empty_seed_starts_cluster", + "If enabled, requires exactly one node in a cluster-to-be to have its " + "seed_servers list empty. Otherwise, seed_servers list cannot be empty, " + "and must be the same in each node from that list", + {.visibility = visibility::user}, + true) , rpc_server( *this, "rpc_server", diff --git a/src/v/config/node_config.h b/src/v/config/node_config.h index faf6b2d395989..bce14bc5b75bf 100644 --- a/src/v/config/node_config.h +++ b/src/v/config/node_config.h @@ -34,6 +34,7 @@ struct node_config final : public config_store { property> rack; property> seed_servers; + property empty_seed_starts_cluster; // Internal RPC listener property rpc_server; diff --git a/src/v/config/seed_server.h b/src/v/config/seed_server.h index 8249e9d0c741d..8583f28e1c151 100644 --- a/src/v/config/seed_server.h +++ b/src/v/config/seed_server.h @@ -25,6 +25,9 @@ struct seed_server { net::unresolved_address addr; bool operator==(const seed_server&) const = default; + friend bool operator<(const seed_server& lhs, const seed_server& rhs) { + return lhs.addr < rhs.addr; + } friend std::ostream& operator<<(std::ostream& o, const config::seed_server& s) { diff --git a/src/v/model/model.cc b/src/v/model/model.cc index 71c0c8a764a62..fb1bce97ea5d6 100644 --- a/src/v/model/model.cc +++ b/src/v/model/model.cc @@ -352,6 +352,8 @@ std::ostream& operator<<(std::ostream& o, record_batch_type bt) { return o << "batch_type::cluster_config_cmd"; case record_batch_type::feature_update: return o << "batch_type::feature_update"; + case record_batch_type::cluster_bootstrap_cmd: + return o << "batch_type::cluster_bootstrap_cmd"; } return o << "batch_type::unknown{" << static_cast(bt) << "}"; diff --git a/src/v/model/record_batch_types.h b/src/v/model/record_batch_types.h index 545f45f18b431..79a6f1df2be9b 100644 --- a/src/v/model/record_batch_types.h +++ b/src/v/model/record_batch_types.h @@ -39,6 +39,7 @@ enum class record_batch_type : int8_t { archival_metadata = 19, // archival metadata updates cluster_config_cmd = 20, // cluster config deltas and status feature_update = 21, // Node logical versions updates + cluster_bootstrap_cmd = 22, // cluster bootsrap command }; std::ostream& operator<<(std::ostream& o, record_batch_type bt); diff --git a/src/v/net/unresolved_address.h b/src/v/net/unresolved_address.h index f1fd003b0f3b5..859fb1e34a076 100644 --- a/src/v/net/unresolved_address.h +++ b/src/v/net/unresolved_address.h @@ -43,6 +43,8 @@ class unresolved_address inet_family family() const { return _family; } bool operator==(const unresolved_address& other) const = default; + friend bool operator<(const unresolved_address&, const unresolved_address&) + = default; auto serde_fields() { return std::tie(_host, _port, _family); } diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index c1be3195b2264..7e2a57c69d27a 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -19,6 +19,7 @@ #include "cluster/bootstrap_service.h" #include "cluster/cluster_discovery.h" #include "cluster/cluster_utils.h" +#include "cluster/cluster_uuid.h" #include "cluster/controller.h" #include "cluster/fwd.h" #include "cluster/id_allocator.h" @@ -1401,7 +1402,14 @@ void application::wire_up_and_start(::stop_signal& app_signal, bool test_mode) { std::make_optional(node_id)); } - vlog(_log.info, "Starting Redpanda with node_id {}", node_id); + const auto cluster_uuid = cluster::read_stored_cluster_uuid( + storage.local()); + + vlog( + _log.info, + "Starting Redpanda with node_id {}, cluster UUID {}", + node_id, + cluster_uuid); wire_up_runtime_services(node_id); @@ -1416,7 +1424,7 @@ void application::wire_up_and_start(::stop_signal& app_signal, bool test_mode) { .get(); } - start_runtime_services(cd, app_signal); + start_runtime_services(app_signal, cd, cluster_uuid); if (_proxy_config) { _proxy.invoke_on_all(&pandaproxy::rest::proxy::start).get(); @@ -1443,8 +1451,9 @@ void application::wire_up_and_start(::stop_signal& app_signal, bool test_mode) { } void application::start_runtime_services( - const cluster::cluster_discovery& cluster_discovery, - ::stop_signal& app_signal) { + ::stop_signal& app_signal, + const cluster::cluster_discovery& cd, + const std::optional& stored_cluster_uuid) { ssx::background = _feature_table.invoke_on_all( [this](features::feature_table& ft) -> ss::future<> { try { @@ -1485,8 +1494,29 @@ void application::start_runtime_services( _group_manager.invoke_on_all(&kafka::group_manager::start).get(); _co_group_manager.invoke_on_all(&kafka::group_manager::start).get(); + // Initialize the Raft RPC endpoint before the rest of the runtime RPC + // services so the cluster seeds can elect a leader and write a cluster + // UUID before proceeding with the rest of bootstrap. + _rpc + .invoke_on_all([this](rpc::rpc_server& s) { + std::vector> runtime_services; + runtime_services.push_back( + std::make_unique< + raft::service>( + _scheduling_groups.raft_sg(), + smp_service_groups.raft_smp_sg(), + partition_manager, + shard_table.local(), + config::shard_local_cfg().raft_heartbeat_interval_ms())); + s.add_services(std::move(runtime_services)); + }) + .get(); syschecks::systemd_message("Starting controller").get(); - controller->start(cluster_discovery.initial_raft0_brokers()).get0(); + controller + ->start( + cd.initial_seed_brokers(stored_cluster_uuid.has_value()).get(), + stored_cluster_uuid) + .get0(); kafka_group_migration = ss::make_lw_shared( *controller, group_router); @@ -1513,14 +1543,6 @@ void application::start_runtime_services( std::ref(tx_gateway_frontend), _rm_group_proxy.get(), std::ref(rm_partition_frontend))); - runtime_services.push_back( - std::make_unique< - raft::service>( - _scheduling_groups.raft_sg(), - smp_service_groups.raft_smp_sg(), - partition_manager, - shard_table.local(), - config::shard_local_cfg().raft_heartbeat_interval_ms())); runtime_services.push_back(std::make_unique( _scheduling_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index d8e696ce1f0fe..1f2f51ba947b2 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -131,8 +131,10 @@ class application { // Starts the services meant for Redpanda runtime. Must be called after // having constructed the subsystems via the corresponding `wire_up` calls. - void - start_runtime_services(const cluster::cluster_discovery&, ::stop_signal&); + void start_runtime_services( + ::stop_signal&, + const cluster::cluster_discovery&, + const std::optional&); void start_kafka(const model::node_id&, ::stop_signal&); // All methods are calleds from Seastar thread diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index 6d8d3d33d10de..38e6736ba965b 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -52,6 +52,8 @@ // NOTE: several fixtures may still require a node ID be supplied for the sake // of differentiating ports, data directories, loggers, etc. using configure_node_id = ss::bool_class; +using empty_seed_starts_cluster + = ss::bool_class; class redpanda_thread_fixture { public: @@ -68,7 +70,9 @@ class redpanda_thread_fixture { ss::sstring base_dir, std::optional sch_groups, bool remove_on_shutdown, - configure_node_id use_node_id = configure_node_id::yes) + configure_node_id use_node_id = configure_node_id::yes, + const empty_seed_starts_cluster empty_seed_starts_cluster_val + = empty_seed_starts_cluster::yes) : app(ssx::sformat("redpanda-{}", node_id())) , proxy_port(proxy_port) , schema_reg_port(schema_reg_port) @@ -81,7 +85,8 @@ class redpanda_thread_fixture { rpc_port, coproc_supervisor_port, std::move(seed_servers), - use_node_id); + use_node_id, + empty_seed_starts_cluster_val); app.initialize( proxy_config(proxy_port), proxy_client_config(kafka_port), @@ -164,7 +169,8 @@ class redpanda_thread_fixture { int32_t rpc_port, int32_t coproc_supervisor_port, std::vector seed_servers, - configure_node_id use_node_id) { + configure_node_id use_node_id, + const empty_seed_starts_cluster empty_seed_starts_cluster_val) { auto base_path = std::filesystem::path(data_dir); ss::smp::invoke_on_all([node_id, kafka_port, @@ -172,7 +178,9 @@ class redpanda_thread_fixture { coproc_supervisor_port, seed_servers = std::move(seed_servers), base_path, - use_node_id]() mutable { + use_node_id, + + empty_seed_starts_cluster_val]() mutable { auto& config = config::shard_local_cfg(); config.get("enable_pid_file").set_value(false); @@ -188,6 +196,8 @@ class redpanda_thread_fixture { node_config.get("node_id").set_value( use_node_id ? std::make_optional(node_id) : std::optional(std::nullopt)); + node_config.get("empty_seed_starts_cluster") + .set_value(bool(empty_seed_starts_cluster_val)); node_config.get("rack").set_value( std::make_optional(model::rack_id(rack_name))); node_config.get("seed_servers").set_value(seed_servers); diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index 896370bd5c143..33f441969c6ba 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -581,6 +581,12 @@ def __init__(self, # stash a copy here so that we can quickly look up e.g. addresses later. self._node_configs = {} + self._seed_servers = [self.nodes[0]] + + def set_seed_servers(self, node_list): + assert len(node_list) > 0 + self._seed_servers = node_list + def set_environment(self, environment: dict[str, str]): self._environment = environment @@ -742,7 +748,8 @@ def start(self, start_si=True, parallel: bool = True, expect_fail: bool = False, - auto_assign_node_id: bool = False): + auto_assign_node_id: bool = False, + omit_seeds_on_idx_one: bool = True): """ Start the service on all nodes. @@ -798,7 +805,8 @@ def start_one(node): self.start_node(node, first_start=first_start, expect_fail=expect_fail, - auto_assign_node_id=auto_assign_node_id) + auto_assign_node_id=auto_assign_node_id, + omit_seeds_on_idx_one=omit_seeds_on_idx_one) self._for_nodes(to_start, start_one, parallel=parallel) @@ -1003,7 +1011,8 @@ def start_node(self, write_config=True, first_start=False, expect_fail: bool = False, - auto_assign_node_id: bool = False): + auto_assign_node_id: bool = False, + omit_seeds_on_idx_one: bool = True): """ Start a single instance of redpanda. This function will not return until redpanda appears to have started successfully. If redpanda does not @@ -1014,9 +1023,11 @@ def start_node(self, node.account.mkdirs(os.path.dirname(RedpandaService.NODE_CONFIG_FILE)) if write_config: - self.write_node_conf_file(node, - override_cfg_params, - auto_assign_node_id=auto_assign_node_id) + self.write_node_conf_file( + node, + override_cfg_params, + auto_assign_node_id=auto_assign_node_id, + omit_seeds_on_idx_one=omit_seeds_on_idx_one) if timeout is None: timeout = self.node_ready_timeout_s @@ -1554,7 +1565,8 @@ def started_nodes(self): def write_node_conf_file(self, node, override_cfg_params=None, - auto_assign_node_id=False): + auto_assign_node_id=False, + omit_seeds_on_idx_one=True): """ Write the node config file for a redpanda node: this is the YAML representation of Redpanda's `node_config` class. Distinct from Redpanda's _cluster_ configuration @@ -1562,8 +1574,11 @@ def write_node_conf_file(self, """ node_info = {self.idx(n): n for n in self.nodes} + include_seed_servers = True node_id = self.idx(node) - include_seed_servers = node_id > 1 + if omit_seeds_on_idx_one and node_id == 1: + include_seed_servers = False + if auto_assign_node_id: # Supply None so it's omitted from the config. node_id = None @@ -1578,6 +1593,7 @@ def write_node_conf_file(self, nodes=node_info, node_id=node_id, include_seed_servers=include_seed_servers, + seed_servers=self._seed_servers, node_ip=node_ip, kafka_alternate_port=self.KAFKA_ALTERNATE_PORT, admin_alternate_port=self.ADMIN_ALTERNATE_PORT, diff --git a/tests/rptest/services/templates/redpanda.yaml b/tests/rptest/services/templates/redpanda.yaml index 5f62b52287977..cf7bbf3396895 100644 --- a/tests/rptest/services/templates/redpanda.yaml +++ b/tests/rptest/services/templates/redpanda.yaml @@ -43,9 +43,11 @@ redpanda: {% if include_seed_servers %} seed_servers: + {% for node in seed_servers %} - host: - address: {{nodes[1].account.hostname}} + address: {{node.account.hostname}} port: 33145 + {% endfor %} {% endif %} {% if enable_pp %} diff --git a/tests/rptest/tests/cluster_bootstrap_test.py b/tests/rptest/tests/cluster_bootstrap_test.py new file mode 100644 index 0000000000000..7d9e562864641 --- /dev/null +++ b/tests/rptest/tests/cluster_bootstrap_test.py @@ -0,0 +1,51 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from ducktape.mark import matrix +from rptest.tests.redpanda_test import RedpandaTest +from rptest.services.cluster import cluster + + +class ClusterBootstrapTest(RedpandaTest): + def __init__(self, test_context): + super(ClusterBootstrapTest, + self).__init__(test_context=test_context, + num_brokers=3, + extra_node_conf={ + "empty_seed_starts_cluster": False, + }) + self.admin = self.redpanda._admin + + def setUp(self): + # Defer startup to test body. + pass + + @cluster(num_nodes=3) + @matrix(num_seeds=[1, 2, 3], auto_assign_node_ids=[False, True]) + def test_three_node_bootstrap(self, num_seeds, auto_assign_node_ids): + seeds = [self.redpanda.nodes[i] for i in range(num_seeds)] + self.redpanda.set_seed_servers(seeds) + self.redpanda.start(auto_assign_node_id=auto_assign_node_ids, + omit_seeds_on_idx_one=False) + node_ids_per_idx = {} + for n in self.redpanda.nodes: + idx = self.redpanda.idx(n) + node_ids_per_idx[idx] = self.redpanda.node_id(n) + + brokers = self.admin.get_brokers() + assert 3 == len(brokers), f"Got {len(brokers)} brokers" + + # Restart our nodes and make sure our node IDs persist across restarts. + self.redpanda.restart_nodes(self.redpanda.nodes, + auto_assign_node_id=auto_assign_node_ids) + for idx in node_ids_per_idx: + n = self.redpanda.get_node(idx) + expected_node_id = node_ids_per_idx[idx] + node_id = self.redpanda.node_id(n) + assert expected_node_id == node_id, f"Expected {expected_node_id} but got {node_id}"