Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/v/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ v_cc_library(
node_status_backend.cc
node_status_rpc_handler.cc
bootstrap_service.cc
cluster_uuid.cc
bootstrap_backend.cc
DEPS
Seastar::seastar
bootstrap_rpc
Expand Down
102 changes: 102 additions & 0 deletions src/v/cluster/bootstrap_backend.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2022 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "cluster/bootstrap_backend.h"

#include "cluster/commands.h"
#include "cluster/logger.h"
#include "cluster/types.h"
#include "security/credential_store.h"

namespace cluster {

bootstrap_backend::bootstrap_backend(
const std::optional<cluster_uuid>& cluster_uuid,
ss::sharded<security::credential_store>& credentials,
ss::sharded<storage::api>& storage)
: _cluster_uuid(cluster_uuid)
, _credentials(credentials)
, _storage(storage) {}

namespace {

std::error_code
do_apply(user_and_credential cred, security::credential_store& store) {
if (store.contains(cred.username)) return errc::user_exists;
store.put(cred.username, std::move(cred.credential));
return errc::success;
}

template<typename Cmd>
ss::future<std::error_code> dispatch_updates_to_cores(
Cmd cmd, ss::sharded<security::credential_store>& sharded_service) {
auto res = co_await sharded_service.map_reduce0(
[cmd](security::credential_store& service) {
return do_apply(std::move(cmd), service);
},
std::optional<std::error_code>{},
[](std::optional<std::error_code> result, std::error_code error_code) {
if (!result.has_value()) {
result = error_code;
} else {
vassert(
result.value() == error_code,
"State inconsistency across shards detected, "
"expected result: {}, have: {}",
result->value(),
error_code);
}
return result.value();
});
co_return res.value();
}

} // namespace

ss::future<std::error_code>
bootstrap_backend::apply_update(model::record_batch b) {
vlog(clusterlog.info, "Applying update to bootstrap_manager");

// handle node managements command
static constexpr auto accepted_commands
= make_commands_list<bootstrap_cluster_cmd>();
auto cmd = co_await cluster::deserialize(std::move(b), accepted_commands);

co_return co_await ss::visit(
cmd, [this](bootstrap_cluster_cmd cmd) -> ss::future<std::error_code> {
if (_cluster_uuid.has_value()) {
vlog(
clusterlog.debug,
"Skipping bootstrap_cluster_cmd {}, current cluster_uuid: {}",
cmd.value.uuid,
*_cluster_uuid);
co_return errc::success;
}

if (cmd.value.bootstrap_user_cred) {
const std::error_code res
= co_await dispatch_updates_to_cores<user_and_credential>(
*cmd.value.bootstrap_user_cred, _credentials);
if (res != errc::success) co_return res;
vlog(
clusterlog.info,
"Bootstrap user {} created",
cmd.value.bootstrap_user_cred->username);
}

_cluster_uuid = cmd.value.uuid;
co_await write_stored_cluster_uuid(_storage.local(), cmd.value.uuid);
vlog(clusterlog.info, "Cluster {} created", cmd.value.uuid);
co_return errc::success;
});
}

} // namespace cluster
60 changes: 60 additions & 0 deletions src/v/cluster/bootstrap_backend.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2022 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include "cluster/cluster_uuid.h"
#include "cluster/fwd.h"
#include "model/record.h"
#include "model/record_batch_types.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/sharded.hh>

#include <optional>

namespace security {
class credential_store;
}

namespace cluster {

/**
* This class applies the cluster intitalization message to
* - itself, storing the cluster UUID value, also duplicating it to the kvstore
* - credintial_store, to initialize the bootstrap user
* - TODO: apply the initial licence
*/
class bootstrap_backend final {
public:
bootstrap_backend(
const std::optional<cluster_uuid>&,
ss::sharded<security::credential_store>&,
ss::sharded<storage::api>&);

ss::future<std::error_code> apply_update(model::record_batch);

bool is_batch_applicable(const model::record_batch& b) {
return b.header().type
== model::record_batch_type::cluster_bootstrap_cmd;
}

const std::optional<cluster_uuid>& get_cluster_uuid() const {
return _cluster_uuid;
}

private:
std::optional<cluster_uuid> _cluster_uuid;
ss::sharded<security::credential_store>& _credentials;
ss::sharded<storage::api>& _storage;
};

} // namespace cluster
17 changes: 16 additions & 1 deletion src/v/cluster/bootstrap_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,29 @@
#include "cluster/bootstrap_service.h"

#include "cluster/bootstrap_types.h"
#include "cluster/cluster_utils.h"
#include "cluster/logger.h"
#include "config/node_config.h"
#include "features/feature_table.h"

namespace cluster {

ss::future<cluster_bootstrap_info_reply>
bootstrap_service::cluster_bootstrap_info(
cluster_bootstrap_info_request&&, rpc::streaming_context&) {
cluster_bootstrap_info_reply r{};
// TODO: add fields!
r.broker = make_self_broker(config::node());
r.version = features::feature_table::get_latest_logical_version();
const std::vector<config::seed_server>& seed_servers
= config::node().seed_servers();
r.seed_servers.reserve(seed_servers.size());
std::transform(
seed_servers.cbegin(),
seed_servers.cend(),
std::back_inserter(r.seed_servers),
[](const config::seed_server& seed_server) { return seed_server.addr; });
r.empty_seed_starts_cluster = config::node().empty_seed_starts_cluster();
vlog(clusterlog.debug, "Replying cluster_bootstrap_info: {}", r);
co_return r;
}

Expand Down
21 changes: 18 additions & 3 deletions src/v/cluster/bootstrap_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0
#pragma once

#include "cluster/types.h"
#include "model/fundamental.h"
#include "serde/serde.h"

Expand All @@ -34,13 +35,27 @@ struct cluster_bootstrap_info_reply
: serde::envelope<cluster_bootstrap_info_reply, serde::version<0>> {
using rpc_adl_exempt = std::true_type;

model::broker broker;
cluster_version version;
std::vector<net::unresolved_address> seed_servers;
bool empty_seed_starts_cluster;
// TODO: add fields!
// - node UUID?

auto serde_fields() { return std::tie(); }
auto serde_fields() {
return std::tie(
broker, version, seed_servers, empty_seed_starts_cluster);
}

friend std::ostream&
operator<<(std::ostream& o, const cluster_bootstrap_info_reply&) {
fmt::print(o, "{{}}");
operator<<(std::ostream& o, const cluster_bootstrap_info_reply& v) {
fmt::print(
o,
"{{broker: {}, version: {}, seed_servers: {}, ESCB: {}}}",
v.broker,
v.version,
v.seed_servers,
v.empty_seed_starts_cluster);
return o;
}
};
Expand Down
Loading