@@ -29,8 +29,8 @@ namespace datalake {
29
29
30
30
namespace {
31
31
32
- static std::unique_ptr<type_resolver>
33
- make_type_resolver ( model::iceberg_mode mode, schema::registry& sr) {
32
+ static std::unique_ptr<type_resolver> make_type_resolver (
33
+ model::iceberg_mode mode, schema::registry& sr, schema_cache_t & cache ) {
34
34
switch (mode) {
35
35
case model::iceberg_mode::disabled:
36
36
vassert (
@@ -39,7 +39,7 @@ make_type_resolver(model::iceberg_mode mode, schema::registry& sr) {
39
39
case model::iceberg_mode::key_value:
40
40
return std::make_unique<binary_type_resolver>();
41
41
case model::iceberg_mode::value_schema_id_prefix:
42
- return std::make_unique<record_schema_resolver>(sr);
42
+ return std::make_unique<record_schema_resolver>(sr, cache );
43
43
}
44
44
}
45
45
@@ -89,6 +89,10 @@ datalake_manager::datalake_manager(
89
89
, _schema_registry(schema::registry::make_default(sr_api))
90
90
, _catalog_factory(std::move(catalog_factory))
91
91
, _type_resolver(std::make_unique<record_schema_resolver>(*_schema_registry))
92
+ // TODO: The cache size is currently arbitrary. Figure out a more reasoned
93
+ // size and allocate a share of the datalake memory semaphore to this cache.
94
+ , _schema_cache(std::make_unique<schema_cache_t >(
95
+ schema_cache_t ::config{.cache_size = 100 , .small_size = 10 }))
92
96
, _as(as)
93
97
, _sg(sg)
94
98
, _effective_max_translator_buffered_data(
@@ -251,7 +255,7 @@ void datalake_manager::start_translator(
251
255
_features,
252
256
&_cloud_data_io,
253
257
_schema_mgr.get (),
254
- make_type_resolver (mode, *_schema_registry),
258
+ make_type_resolver (mode, *_schema_registry, *_schema_cache ),
255
259
make_record_translator (mode),
256
260
translation_interval_ms (),
257
261
_sg,
0 commit comments