Skip to content

Commit

Permalink
datalake: use schema cache by default in datalake_manager
Browse files Browse the repository at this point in the history
  • Loading branch information
ballard26 committed Dec 23, 2024
1 parent 684cc2e commit 6a69bdd
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
12 changes: 8 additions & 4 deletions src/v/datalake/datalake_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ namespace datalake {

namespace {

static std::unique_ptr<type_resolver>
make_type_resolver(model::iceberg_mode mode, schema::registry& sr) {
static std::unique_ptr<type_resolver> make_type_resolver(
model::iceberg_mode mode, schema::registry& sr, schema_cache_t& cache) {
switch (mode) {
case model::iceberg_mode::disabled:
vassert(
Expand All @@ -39,7 +39,7 @@ make_type_resolver(model::iceberg_mode mode, schema::registry& sr) {
case model::iceberg_mode::key_value:
return std::make_unique<binary_type_resolver>();
case model::iceberg_mode::value_schema_id_prefix:
return std::make_unique<record_schema_resolver>(sr);
return std::make_unique<record_schema_resolver>(sr, cache);
}
}

Expand Down Expand Up @@ -89,6 +89,10 @@ datalake_manager::datalake_manager(
, _schema_registry(schema::registry::make_default(sr_api))
, _catalog_factory(std::move(catalog_factory))
, _type_resolver(std::make_unique<record_schema_resolver>(*_schema_registry))
// TODO: The cache size is currently arbitrary. Figure out a more reasoned
// size and allocate a share of the datalake memory semaphore to this cache.
, _schema_cache(std::make_unique<schema_cache_t>(
schema_cache_t::config{.cache_size = 100, .small_size = 10}))
, _as(as)
, _sg(sg)
, _effective_max_translator_buffered_data(
Expand Down Expand Up @@ -247,7 +251,7 @@ void datalake_manager::start_translator(
_features,
&_cloud_data_io,
_schema_mgr.get(),
make_type_resolver(mode, *_schema_registry),
make_type_resolver(mode, *_schema_registry, *_schema_cache),
make_record_translator(mode),
translation_interval_ms(),
_sg,
Expand Down
2 changes: 2 additions & 0 deletions src/v/datalake/datalake_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "config/property.h"
#include "container/chunked_hash_map.h"
#include "datalake/fwd.h"
#include "datalake/record_schema_resolver.h"
#include "datalake/translation/partition_translator.h"
#include "features/fwd.h"
#include "pandaproxy/schema_registry/fwd.h"
Expand Down Expand Up @@ -92,6 +93,7 @@ class datalake_manager : public ss::peering_sharded_service<datalake_manager> {
std::unique_ptr<iceberg::catalog> _catalog;
std::unique_ptr<datalake::schema_manager> _schema_mgr;
std::unique_ptr<datalake::type_resolver> _type_resolver;
std::unique_ptr<datalake::schema_cache_t> _schema_cache;
ss::sharded<ss::abort_source>* _as;
ss::scheduling_group _sg;
ss::gate _gate;
Expand Down

0 comments on commit 6a69bdd

Please sign in to comment.