Skip to content

Commit

Permalink
Merge pull request #13878 from rockwotj/wasm-storage
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj authored Oct 3, 2023
2 parents 0eecc50 + e03e65a commit 0390350
Show file tree
Hide file tree
Showing 12 changed files with 828 additions and 47 deletions.
5 changes: 5 additions & 0 deletions src/v/cluster/health_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ ss::future<> health_manager::do_tick() {
model::kafka_namespace, model::schema_registry_internal_tp.topic};
ok = co_await ensure_topic_replication(schema_registry_nt);
}

if (ok) {
ok = co_await ensure_topic_replication(
model::topic_namespace_view(model::wasm_binaries_internal_ntp));
}
}

_timer.arm(_tick_interval);
Expand Down
5 changes: 5 additions & 0 deletions src/v/model/ktp.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ class ktp {
return {kafka_namespace, get_topic()};
}

/**
* An explicit conversion operator of `as_tn_view`.
*/
explicit operator topic_namespace_view() const { return as_tn_view(); }

/**
* @brief Return a topic partition view corresponding to this object.
*
Expand Down
3 changes: 3 additions & 0 deletions src/v/model/namespace.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,7 @@ inline const model::ntp tx_registry_ntp(
inline const model::topic_partition schema_registry_internal_tp{
model::topic{"_schemas"}, model::partition_id{0}};

inline const model::ntp wasm_binaries_internal_ntp(
model::redpanda_ns, model::topic("wasm_binaries"), model::partition_id(0));

} // namespace model
176 changes: 176 additions & 0 deletions src/v/transform/rpc/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <seastar/core/semaphore.hh>
#include <seastar/core/smp.hh>
#include <seastar/core/when_all.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/util/noncopyable_function.hh>

#include <absl/container/btree_map.h>
Expand Down Expand Up @@ -90,10 +91,12 @@ cluster::errc map_errc(std::error_code ec) {
client::client(
model::node_id self,
std::unique_ptr<partition_leader_cache> l,
std::unique_ptr<topic_creator> t,
ss::sharded<::rpc::connection_cache>* c,
ss::sharded<local_service>* s)
: _self(self)
, _leaders(std::move(l))
, _topic_creator(std::move(t))
, _connections(c)
, _local_service(s) {}

Expand Down Expand Up @@ -154,4 +157,177 @@ client::do_remote_produce(model::node_id node, produce_request req) {
}
co_return std::move(resp).value();
}

ss::future<result<stored_wasm_binary_metadata, cluster::errc>>
client::store_wasm_binary(iobuf data, model::timeout_clock::duration timeout) {
auto leader = co_await compute_wasm_binary_ntp_leader();
if (!leader) {
co_return cluster::errc::not_leader;
}
co_return co_await (
leader == _self
? do_local_store_wasm_binary(std::move(data), timeout)
: do_remote_store_wasm_binary(*leader, std::move(data), timeout));
}

ss::future<result<stored_wasm_binary_metadata, cluster::errc>>
client::do_local_store_wasm_binary(
iobuf data, model::timeout_clock::duration timeout) {
return _local_service->local().store_wasm_binary(std::move(data), timeout);
}

ss::future<result<stored_wasm_binary_metadata, cluster::errc>>
client::do_remote_store_wasm_binary(
model::node_id node, iobuf data, model::timeout_clock::duration timeout) {
auto resp = co_await _connections->local()
.with_node_client<impl::transform_rpc_client_protocol>(
_self,
ss::this_shard_id(),
node,
timeout,
[timeout, data = std::move(data)](
impl::transform_rpc_client_protocol proto) mutable {
return proto.store_wasm_binary(
store_wasm_binary_request(std::move(data), timeout),
::rpc::client_opts(
model::timeout_clock::now() + timeout));
})
.then(&::rpc::get_ctx_data<store_wasm_binary_reply>);
if (resp.has_error()) {
co_return map_errc(resp.assume_error());
}
auto reply = resp.value();
if (reply.ec != cluster::errc::success) {
co_return reply.ec;
}
co_return reply.stored;
}

ss::future<cluster::errc>
client::delete_wasm_binary(uuid_t key, model::timeout_clock::duration timeout) {
auto leader = co_await compute_wasm_binary_ntp_leader();
if (!leader) {
co_return cluster::errc::not_leader;
}
co_return co_await (
leader == _self ? do_local_delete_wasm_binary(key, timeout)
: do_remote_delete_wasm_binary(*leader, key, timeout));
}

ss::future<cluster::errc> client::do_local_delete_wasm_binary(
uuid_t key, model::timeout_clock::duration timeout) {
return _local_service->local().delete_wasm_binary(key, timeout);
}

ss::future<cluster::errc> client::do_remote_delete_wasm_binary(
model::node_id node, uuid_t key, model::timeout_clock::duration timeout) {
auto resp
= co_await _connections->local()
.with_node_client<impl::transform_rpc_client_protocol>(
_self,
ss::this_shard_id(),
node,
timeout,
[timeout, key](impl::transform_rpc_client_protocol proto) mutable {
return proto.delete_wasm_binary(
delete_wasm_binary_request(key, timeout),
::rpc::client_opts(model::timeout_clock::now() + timeout));
})
.then(&::rpc::get_ctx_data<delete_wasm_binary_reply>);
if (resp.has_error()) {
co_return map_errc(resp.assume_error());
}
co_return resp.value().ec;
}

ss::future<result<iobuf, cluster::errc>> client::load_wasm_binary(
model::offset offset, model::timeout_clock::duration timeout) {
auto leader = co_await compute_wasm_binary_ntp_leader();
if (!leader) {
co_return cluster::errc::not_leader;
}
co_return co_await (
leader == _self ? do_local_load_wasm_binary(offset, timeout)
: do_remote_load_wasm_binary(*leader, offset, timeout));
}

ss::future<result<iobuf, cluster::errc>> client::do_local_load_wasm_binary(
model::offset offset, model::timeout_clock::duration timeout) {
return _local_service->local().load_wasm_binary(offset, timeout);
}

ss::future<result<iobuf, cluster::errc>> client::do_remote_load_wasm_binary(
model::node_id node,
model::offset offset,
model::timeout_clock::duration timeout) {
auto resp = co_await _connections->local()
.with_node_client<impl::transform_rpc_client_protocol>(
_self,
ss::this_shard_id(),
node,
timeout,
[timeout, offset](
impl::transform_rpc_client_protocol proto) mutable {
return proto.load_wasm_binary(
load_wasm_binary_request(offset, timeout),
::rpc::client_opts(
model::timeout_clock::now() + timeout));
})
.then(&::rpc::get_ctx_data<load_wasm_binary_reply>);
if (resp.has_error()) {
co_return map_errc(resp.assume_error());
}
auto reply = std::move(resp).value();
if (reply.ec != cluster::errc::success) {
co_return reply.ec;
}
co_return std::move(reply.data);
}

ss::future<bool> client::try_create_wasm_binary_ntp() {
cluster::topic_properties topic_props;
// TODO: This should be configurable
constexpr size_t wasm_binaries_max_bytes = 10_MiB;
topic_props.batch_max_bytes = wasm_binaries_max_bytes;
// Mark all these as disabled
topic_props.retention_bytes = tristate<size_t>();
topic_props.retention_local_target_bytes = tristate<size_t>();
topic_props.retention_duration = tristate<std::chrono::milliseconds>();
topic_props.retention_local_target_ms
= tristate<std::chrono::milliseconds>();
topic_props.cleanup_policy_bitflags
= model::cleanup_policy_bitflags::compaction;

auto fut = co_await ss::coroutine::as_future<cluster::errc>(
_topic_creator->create_topic(
model::topic_namespace_view(model::wasm_binaries_internal_ntp),
1,
topic_props));
if (fut.failed()) {
vlog(
log.warn,
"unable to create internal wasm binary topic: {}",
std::move(fut).get_exception());
co_return false;
}
cluster::errc ec = fut.get();
if (ec != cluster::errc::success) {
vlog(log.warn, "unable to create internal wasm binary topic: {}", ec);
co_return false;
}
co_return true;
}

ss::future<std::optional<model::node_id>>
client::compute_wasm_binary_ntp_leader() {
auto leader = _leaders->get_leader_node(model::wasm_binaries_internal_ntp);
if (!leader.has_value()) {
bool success = co_await try_create_wasm_binary_ntp();
if (!success) {
co_return std::nullopt;
}
leader = _leaders->get_leader_node(model::wasm_binaries_internal_ntp);
}
co_return leader;
}
} // namespace transform::rpc
30 changes: 30 additions & 0 deletions src/v/transform/rpc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class client {
client(
model::node_id self,
std::unique_ptr<partition_leader_cache>,
std::unique_ptr<topic_creator>,
ss::sharded<::rpc::connection_cache>*,
ss::sharded<local_service>*);
client(client&&) = delete;
Expand All @@ -53,16 +54,45 @@ class client {
ss::future<cluster::errc>
produce(model::topic_partition, ss::chunked_fifo<model::record_batch>);

ss::future<result<stored_wasm_binary_metadata, cluster::errc>>
store_wasm_binary(iobuf, model::timeout_clock::duration timeout);

ss::future<cluster::errc>
delete_wasm_binary(uuid_t key, model::timeout_clock::duration timeout);

ss::future<result<iobuf, cluster::errc>>
load_wasm_binary(model::offset, model::timeout_clock::duration timeout);

ss::future<> stop();

private:
ss::future<produce_reply> do_local_produce(produce_request);
ss::future<produce_reply>
do_remote_produce(model::node_id, produce_request);

ss::future<result<stored_wasm_binary_metadata, cluster::errc>>
do_local_store_wasm_binary(iobuf, model::timeout_clock::duration timeout);
ss::future<result<stored_wasm_binary_metadata, cluster::errc>>
do_remote_store_wasm_binary(
model::node_id, iobuf, model::timeout_clock::duration timeout);

ss::future<cluster::errc> do_local_delete_wasm_binary(
uuid_t key, model::timeout_clock::duration timeout);
ss::future<cluster::errc> do_remote_delete_wasm_binary(
model::node_id, uuid_t key, model::timeout_clock::duration timeout);

ss::future<result<iobuf, cluster::errc>> do_local_load_wasm_binary(
model::offset, model::timeout_clock::duration timeout);
ss::future<result<iobuf, cluster::errc>> do_remote_load_wasm_binary(
model::node_id, model::offset, model::timeout_clock::duration timeout);

ss::future<std::optional<model::node_id>> compute_wasm_binary_ntp_leader();
ss::future<bool> try_create_wasm_binary_ntp();

model::node_id _self;
// need partition_leaders_table to know which node owns the partitions
std::unique_ptr<partition_leader_cache> _leaders;
std::unique_ptr<topic_creator> _topic_creator;
ss::sharded<::rpc::connection_cache>* _connections;
ss::sharded<local_service>* _local_service;
};
Expand Down
47 changes: 47 additions & 0 deletions src/v/transform/rpc/deps.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,23 @@

#include "transform/rpc/deps.h"

#include "cluster/controller.h"
#include "cluster/fwd.h"
#include "cluster/metadata_cache.h"
#include "cluster/partition_manager.h"
#include "cluster/shard_table.h"
#include "cluster/topics_frontend.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "kafka/server/partition_proxy.h"
#include "model/ktp.h"
#include "transform/rpc/logger.h"

#include <seastar/core/do_with.hh>
#include <seastar/core/future.hh>

#include <memory>

namespace transform::rpc {

namespace {
Expand Down Expand Up @@ -111,6 +118,40 @@ class partition_manager_impl final : public partition_manager {
ss::sharded<cluster::shard_table>* _table;
ss::sharded<cluster::partition_manager>* _manager;
};

class topic_creator_impl : public topic_creator {
public:
explicit topic_creator_impl(cluster::controller* controller)
: _controller(controller) {}

ss::future<cluster::errc> create_topic(
model::topic_namespace_view tp_ns,
int32_t partition_count,
cluster::topic_properties properties) final {
cluster::topic_configuration topic_cfg(
tp_ns.ns,
tp_ns.tp,
partition_count,
_controller->internal_topic_replication());
topic_cfg.properties = properties;

try {
auto res = co_await _controller->get_topics_frontend()
.local()
.autocreate_topics(
{std::move(topic_cfg)},
config::shard_local_cfg().create_topic_timeout_ms());
vassert(res.size() == 1, "expected a single result");
co_return res[0].ec;
} catch (const std::exception& ex) {
vlog(log.warn, "unable to create topic {}: {}", tp_ns, ex);
co_return cluster::errc::topic_operation_error;
}
}

private:
cluster::controller* _controller;
};
} // namespace

std::unique_ptr<partition_leader_cache>
Expand Down Expand Up @@ -141,6 +182,7 @@ std::optional<ss::shard_id>
partition_manager::shard_owner(const model::ktp& ktp) {
return shard_owner(ktp.to_ntp());
}

ss::future<cluster::errc> partition_manager::invoke_on_shard(
ss::shard_id shard_id,
const model::ktp& ktp,
Expand All @@ -149,4 +191,9 @@ ss::future<cluster::errc> partition_manager::invoke_on_shard(
auto ntp = ktp.to_ntp();
co_return co_await invoke_on_shard(shard_id, ntp, std::move(fn));
}

std::unique_ptr<topic_creator>
topic_creator::make_default(cluster::controller* controller) {
return std::make_unique<topic_creator_impl>(controller);
}
} // namespace transform::rpc
24 changes: 24 additions & 0 deletions src/v/transform/rpc/deps.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,30 @@ class topic_metadata_cache {
virtual uint32_t get_default_batch_max_bytes() const = 0;
};

/**
* A component that can create topics.
*/
class topic_creator {
public:
topic_creator() = default;
topic_creator(const topic_creator&) = default;
topic_creator(topic_creator&&) = delete;
topic_creator& operator=(const topic_creator&) = default;
topic_creator& operator=(topic_creator&&) = delete;
virtual ~topic_creator() = default;

static std::unique_ptr<topic_creator> make_default(cluster::controller*);

/**
* Create a topic.
*/
virtual ss::future<cluster::errc> create_topic(
model::topic_namespace_view,
int32_t partition_count,
cluster::topic_properties)
= 0;
};

/**
* Handles routing for shard local partitions.
*/
Expand Down
Loading

0 comments on commit 0390350

Please sign in to comment.