From 25ee9c17f6b1080de0caa0fa88d4a36629fd7360 Mon Sep 17 00:00:00 2001 From: Nikolai Kondrashov Date: Wed, 27 Nov 2024 18:41:04 +0200 Subject: [PATCH 1/3] db: Support packing loaded data in-place --- kcidb/db/__init__.py | 8 ++++++-- kcidb/db/abstract.py | 12 ++++++++---- kcidb/db/bigquery/v04_00.py | 24 +++++++++++++++++------- kcidb/db/json.py | 2 +- kcidb/db/mux.py | 12 ++++++++++-- kcidb/db/null.py | 11 +++++++---- kcidb/db/postgresql/v04_00.py | 7 ++++++- kcidb/db/schematic.py | 15 ++++++++++++--- kcidb/db/sqlite/v04_00.py | 7 ++++++- 9 files changed, 73 insertions(+), 25 deletions(-) diff --git a/kcidb/db/__init__.py b/kcidb/db/__init__.py index 87de6ca5..25e56eaf 100644 --- a/kcidb/db/__init__.py +++ b/kcidb/db/__init__.py @@ -622,7 +622,7 @@ def oo_query(self, pattern_set): LOGGER.debug("OO Query: %r", pattern_set) return self.driver.oo_query(pattern_set) - def load(self, data, with_metadata=False): + def load(self, data, with_metadata=False, copy=True): """ Load data into the database. @@ -630,17 +630,21 @@ def load(self, data, with_metadata=False): data: The JSON data to load into the database. Must adhere to the database's supported I/O schema version, or an earlier one. + Will be modified, if "copy" is False. with_metadata: True if any metadata in the data should also be loaded into the database. False if it should be discarded and the database should generate its metadata itself. + copy: True, if the loaded data should be copied before + packing. False, if the loaded data should be + packed in-place. """ assert LIGHT_ASSERTS or self.is_initialized() io_schema = self.get_schema()[1] assert io_schema.is_compatible_directly(data) assert LIGHT_ASSERTS or io_schema.is_valid_exactly(data) assert isinstance(with_metadata, bool) - self.driver.load(data, with_metadata=with_metadata) + self.driver.load(data, with_metadata=with_metadata, copy=copy) class DBHelpAction(argparse.Action): diff --git a/kcidb/db/abstract.py b/kcidb/db/abstract.py index e885c837..ad953221 100644 --- a/kcidb/db/abstract.py +++ b/kcidb/db/abstract.py @@ -331,22 +331,26 @@ def oo_query(self, pattern_set): assert self.is_initialized() @abstractmethod - def load(self, data, with_metadata): + def load(self, data, with_metadata, copy): """ Load data into the database. The database must be initialized. Args: - data: The JSON data to load into the database. - Must adhere to the current database schema's - version of the I/O schema. + data: The JSON data to load into the database. Must + adhere to the I/O version of the database schema. + Will be modified, if "copy" is False. with_metadata: True if any metadata in the data should also be loaded into the database. False if it should be discarded and the database should generate its metadata itself. + copy: True, if the loaded data should be copied before + packing. False, if the loaded data should be + packed in-place. """ assert self.is_initialized() io_schema = self.get_schema()[1] assert io_schema.is_compatible_directly(data) assert LIGHT_ASSERTS or io_schema.is_valid_exactly(data) assert isinstance(with_metadata, bool) + assert isinstance(copy, bool) diff --git a/kcidb/db/bigquery/v04_00.py b/kcidb/db/bigquery/v04_00.py index 1ed9d010..f6f1b0b3 100644 --- a/kcidb/db/bigquery/v04_00.py +++ b/kcidb/db/bigquery/v04_00.py @@ -1148,7 +1148,7 @@ def oo_query(self, pattern_set): return objs @classmethod - def _pack_node(cls, node, with_metadata): + def _pack_node(cls, node, with_metadata, copy): """ Pack a loaded data node (and all its children) to the BigQuery storage-compatible representation. @@ -1157,16 +1157,20 @@ def _pack_node(cls, node, with_metadata): node: The node to pack. with_metadata: True, if meta fields (with leading underscore "_") should be preserved. False, if omitted. + copy: True, if the data should be copied before packing. + False, if the data should be packed in-place. Returns: The packed node. """ if isinstance(node, list): - node = node.copy() + if copy: + node = node.copy() for index, value in enumerate(node): - node[index] = cls._pack_node(value, with_metadata) + node[index] = cls._pack_node(value, with_metadata, copy) elif isinstance(node, dict): - node = node.copy() + if copy: + node = node.copy() for key, value in list(node.items()): # Flatten the "misc" fields if key == "misc": @@ -1176,29 +1180,35 @@ def _pack_node(cls, node, with_metadata): del node[key] # Pack everything else else: - node[key] = cls._pack_node(value, with_metadata) + node[key] = cls._pack_node(value, with_metadata, copy) return node - def load(self, data, with_metadata): + def load(self, data, with_metadata, copy): """ Load data into the database. Args: data: The JSON data to load into the database. Must adhere to the I/O version of the database schema. + Will be modified, if "copy" is False. with_metadata: True if any metadata in the data should also be loaded into the database. False if it should be discarded and the database should generate its metadata itself. + copy: True, if the loaded data should be copied before + packing. False, if the loaded data should be + packed in-place. """ assert self.io.is_compatible_directly(data) assert LIGHT_ASSERTS or self.io.is_valid_exactly(data) assert isinstance(with_metadata, bool) + assert isinstance(copy, bool) # Load the data for obj_list_name, table_schema in self.TABLE_MAP.items(): if obj_list_name in data: - obj_list = self._pack_node(data[obj_list_name], with_metadata) + obj_list = self._pack_node(data[obj_list_name], + with_metadata, copy) if not LIGHT_ASSERTS: validate_json_obj_list(table_schema, obj_list) job_config = bigquery.job.LoadJobConfig( diff --git a/kcidb/db/json.py b/kcidb/db/json.py index 04ce8faf..03f359fc 100644 --- a/kcidb/db/json.py +++ b/kcidb/db/json.py @@ -45,4 +45,4 @@ def __init__(self, params): io_schema = self.get_schema()[1] for data in kcidb.misc.json_load_stream_fd(json_file.fileno()): data = io_schema.upgrade(io_schema.validate(data), copy=False) - self.load(data, with_metadata=True) + self.load(data, with_metadata=True, copy=False) diff --git a/kcidb/db/mux.py b/kcidb/db/mux.py index 9b4e61ea..2db98e03 100644 --- a/kcidb/db/mux.py +++ b/kcidb/db/mux.py @@ -481,7 +481,7 @@ def oo_query(self, pattern_set): """ return self.drivers[0].oo_query(pattern_set) - def load(self, data, with_metadata): + def load(self, data, with_metadata, copy): """ Load data into the databases. The databases must be initialized. @@ -490,15 +490,20 @@ def load(self, data, with_metadata): data: The JSON data to load into the databases. Must adhere to the current database schema's version of the I/O schema. + Will be modified, if "copy" is False. with_metadata: True if any metadata in the data should also be loaded into the databases. False if it should be discarded and the databases should generate their metadata themselves. + copy: True, if the loaded data should be copied before + packing. False, if the loaded data should be + packed in-place. """ # The mux driver I/O schema is the oldest across member drivers io_schema = self.get_schema()[1] assert io_schema.is_compatible_directly(data) assert isinstance(with_metadata, bool) + assert isinstance(copy, bool) # Load data into every driver for driver in self.drivers: # Only copy if we need to upgrade @@ -506,5 +511,8 @@ def load(self, data, with_metadata): driver.load( driver_io_schema.upgrade(data) if driver_io_schema != io_schema else data, - with_metadata=with_metadata + with_metadata=with_metadata, + copy=copy ) + # We don't want to pack packed data again + copy = True diff --git a/kcidb/db/null.py b/kcidb/db/null.py index 1427821e..fe09ce10 100644 --- a/kcidb/db/null.py +++ b/kcidb/db/null.py @@ -235,16 +235,19 @@ def oo_query(self, pattern_set): del pattern_set return {} - def load(self, data, with_metadata): + def load(self, data, with_metadata, copy): """ Load data into the database. Args: - data: The JSON data to load into the database. - Must adhere to the current database schema's - version of the I/O schema. + data: The JSON data to load into the database. Must + adhere to the I/O version of the database schema. + Will be modified, if "copy" is False. with_metadata: True if any metadata in the data should also be loaded into the database. False if it should be discarded and the database should generate its metadata itself. + copy: True, if the loaded data should be copied before + packing. False, if the loaded data should be + packed in-place. """ diff --git a/kcidb/db/postgresql/v04_00.py b/kcidb/db/postgresql/v04_00.py index 029239ee..f57ccc60 100644 --- a/kcidb/db/postgresql/v04_00.py +++ b/kcidb/db/postgresql/v04_00.py @@ -916,21 +916,26 @@ def oo_query(self, pattern_set): assert LIGHT_ASSERTS or orm.data.SCHEMA.is_valid(objs) return objs - def load(self, data, with_metadata): + def load(self, data, with_metadata, copy): """ Load data into the database. Args: data: The JSON data to load into the database. Must adhere to the I/O version of the database schema. + Will be modified, if "copy" is False. with_metadata: True if any metadata in the data should also be loaded into the database. False if it should be discarded and the database should generate its metadata itself. + copy: True, if the loaded data should be copied before + packing. False, if the loaded data should be + packed in-place. """ assert self.io.is_compatible_directly(data) assert LIGHT_ASSERTS or self.io.is_valid_exactly(data) assert isinstance(with_metadata, bool) + assert isinstance(copy, bool) with self.conn, self.conn.cursor() as cursor: for table_name, table_schema in self.TABLES.items(): if table_name in data: diff --git a/kcidb/db/schematic.py b/kcidb/db/schematic.py index 9e8f99b2..5cefeae9 100644 --- a/kcidb/db/schematic.py +++ b/kcidb/db/schematic.py @@ -353,17 +353,21 @@ def oo_query(self, pattern_set): for r in pattern_set) @abstractmethod - def load(self, data, with_metadata): + def load(self, data, with_metadata, copy): """ Load data into the database. Args: data: The JSON data to load into the database. Must adhere to the schema's version of the I/O schema. + Will be modified, if "copy" is False. with_metadata: True if any metadata in the data should also be loaded into the database. False if it should be discarded and the database should generate its metadata itself. + copy: True, if the loaded data should be copied before + packing. False, if the loaded data should be + packed in-place. """ # Relying on the driver to check compatibility/validity @@ -773,7 +777,7 @@ def oo_query(self, pattern_set): assert self.is_initialized() return self.schema.oo_query(pattern_set) - def load(self, data, with_metadata): + def load(self, data, with_metadata, copy): """ Load data into the database. The database must be initialized. @@ -782,13 +786,18 @@ def load(self, data, with_metadata): data: The JSON data to load into the database. Must adhere to the current database schema's version of the I/O schema. + Will be modified, if "copy" is False. with_metadata: True if any metadata in the data should also be loaded into the database. False if it should be discarded and the database should generate its metadata itself. + copy: True, if the loaded data should be copied before + packing. False, if the loaded data should be + packed in-place. """ assert self.is_initialized() assert self.schema.io.is_compatible_directly(data) assert LIGHT_ASSERTS or self.schema.io.is_valid_exactly(data) assert isinstance(with_metadata, bool) - self.schema.load(data, with_metadata=with_metadata) + assert isinstance(copy, bool) + self.schema.load(data, with_metadata=with_metadata, copy=copy) diff --git a/kcidb/db/sqlite/v04_00.py b/kcidb/db/sqlite/v04_00.py index 1ca73d2b..79d827de 100644 --- a/kcidb/db/sqlite/v04_00.py +++ b/kcidb/db/sqlite/v04_00.py @@ -864,21 +864,26 @@ def oo_query(self, pattern_set): assert LIGHT_ASSERTS or orm.data.SCHEMA.is_valid(objs) return objs - def load(self, data, with_metadata): + def load(self, data, with_metadata, copy): """ Load data into the database. Args: data: The JSON data to load into the database. Must adhere to the I/O version of the database schema. + Will be modified, if "copy" is False. with_metadata: True if any metadata in the data should also be loaded into the database. False if it should be discarded and the database should generate its metadata itself. + copy: True, if the loaded data should be copied before + packing. False, if the loaded data should be + packed in-place. """ assert self.io.is_compatible_directly(data) assert LIGHT_ASSERTS or self.io.is_valid_exactly(data) assert isinstance(with_metadata, bool) + assert isinstance(copy, bool) with self.conn: cursor = self.conn.cursor() try: From 7695dfb39c46c92450d9a68aee7554f237282d5b Mon Sep 17 00:00:00 2001 From: Nikolai Kondrashov Date: Wed, 27 Nov 2024 19:27:25 +0200 Subject: [PATCH 2/3] cloud: Pack archived data in-place to save RAM --- main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.py b/main.py index 11b8c4a9..5c437c8d 100644 --- a/main.py +++ b/main.py @@ -549,7 +549,7 @@ def kcidb_archive(event, context): after=after, until=next_after) count = kcidb.io.SCHEMA.count(data) LOGGER.info("LOADING %u objects into archive database", count) - ar_client.load(data, with_metadata=True) + ar_client.load(data, with_metadata=True, copy=False) LOGGER.info("ARCHIVED %u objects in (%s, %s] range", count, min_after_str, next_min_after_str) for obj_list_name in after: From f92408b0719cf89b288eded24f1347d2f887306b Mon Sep 17 00:00:00 2001 From: Nikolai Kondrashov Date: Wed, 27 Nov 2024 19:29:15 +0200 Subject: [PATCH 3/3] db: Pack loaded data in-place to save RAM --- kcidb/db/__init__.py | 2 +- kcidb/test_db.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/kcidb/db/__init__.py b/kcidb/db/__init__.py index 25e56eaf..1874e087 100644 --- a/kcidb/db/__init__.py +++ b/kcidb/db/__init__.py @@ -983,7 +983,7 @@ def load_main(): for data in kcidb.misc.json_load_stream_fd(sys.stdin.fileno(), seq=args.seq_in): data = io_schema.upgrade(io_schema.validate(data), copy=False) - client.load(data, with_metadata=args.with_metadata) + client.load(data, with_metadata=args.with_metadata, copy=False) def schemas_main(): diff --git a/kcidb/test_db.py b/kcidb/test_db.py index 801dbd87..f698deec 100644 --- a/kcidb/test_db.py +++ b/kcidb/test_db.py @@ -210,7 +210,8 @@ def test_load_main(): status = function() Client.assert_called_once_with("bigquery:project.dataset") client.load.assert_called_once_with({repr(empty)}, - with_metadata=False) + with_metadata=False, + copy=False) return status """) assert_executes(json.dumps(empty), *argv, @@ -227,8 +228,8 @@ def test_load_main(): Client.assert_called_once_with("bigquery:project.dataset") assert client.load.call_count == 2 client.load.assert_has_calls([ - call({repr(empty)}, with_metadata=False), - call({repr(empty)}, with_metadata=False) + call({repr(empty)}, with_metadata=False, copy=False), + call({repr(empty)}, with_metadata=False, copy=False) ]) return status """)