Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pack loaded BigQuery data in-place #617

Merged
merged 3 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions kcidb/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,25 +622,29 @@ def oo_query(self, pattern_set):
LOGGER.debug("OO Query: %r", pattern_set)
return self.driver.oo_query(pattern_set)

def load(self, data, with_metadata=False):
def load(self, data, with_metadata=False, copy=True):
"""
Load data into the database.

Args:
data: The JSON data to load into the database.
Must adhere to the database's supported I/O schema
version, or an earlier one.
Will be modified, if "copy" is False.
with_metadata: True if any metadata in the data should
also be loaded into the database. False if it
should be discarded and the database should
generate its metadata itself.
copy: True, if the loaded data should be copied before
packing. False, if the loaded data should be
packed in-place.
"""
assert LIGHT_ASSERTS or self.is_initialized()
io_schema = self.get_schema()[1]
assert io_schema.is_compatible_directly(data)
assert LIGHT_ASSERTS or io_schema.is_valid_exactly(data)
assert isinstance(with_metadata, bool)
self.driver.load(data, with_metadata=with_metadata)
self.driver.load(data, with_metadata=with_metadata, copy=copy)


class DBHelpAction(argparse.Action):
Expand Down Expand Up @@ -979,7 +983,7 @@ def load_main():
for data in kcidb.misc.json_load_stream_fd(sys.stdin.fileno(),
seq=args.seq_in):
data = io_schema.upgrade(io_schema.validate(data), copy=False)
client.load(data, with_metadata=args.with_metadata)
client.load(data, with_metadata=args.with_metadata, copy=False)


def schemas_main():
Expand Down
12 changes: 8 additions & 4 deletions kcidb/db/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,22 +331,26 @@ def oo_query(self, pattern_set):
assert self.is_initialized()

@abstractmethod
def load(self, data, with_metadata):
def load(self, data, with_metadata, copy):
"""
Load data into the database.
The database must be initialized.

Args:
data: The JSON data to load into the database.
Must adhere to the current database schema's
version of the I/O schema.
data: The JSON data to load into the database. Must
adhere to the I/O version of the database schema.
Will be modified, if "copy" is False.
with_metadata: True if any metadata in the data should
also be loaded into the database. False if it
should be discarded and the database should
generate its metadata itself.
copy: True, if the loaded data should be copied before
packing. False, if the loaded data should be
packed in-place.
"""
assert self.is_initialized()
io_schema = self.get_schema()[1]
assert io_schema.is_compatible_directly(data)
assert LIGHT_ASSERTS or io_schema.is_valid_exactly(data)
assert isinstance(with_metadata, bool)
assert isinstance(copy, bool)
24 changes: 17 additions & 7 deletions kcidb/db/bigquery/v04_00.py
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ def oo_query(self, pattern_set):
return objs

@classmethod
def _pack_node(cls, node, with_metadata):
def _pack_node(cls, node, with_metadata, copy):
"""
Pack a loaded data node (and all its children) to
the BigQuery storage-compatible representation.
Expand All @@ -1157,16 +1157,20 @@ def _pack_node(cls, node, with_metadata):
node: The node to pack.
with_metadata: True, if meta fields (with leading underscore "_")
should be preserved. False, if omitted.
copy: True, if the data should be copied before packing.
False, if the data should be packed in-place.

Returns:
The packed node.
"""
if isinstance(node, list):
node = node.copy()
if copy:
node = node.copy()
for index, value in enumerate(node):
node[index] = cls._pack_node(value, with_metadata)
node[index] = cls._pack_node(value, with_metadata, copy)
elif isinstance(node, dict):
node = node.copy()
if copy:
node = node.copy()
for key, value in list(node.items()):
# Flatten the "misc" fields
if key == "misc":
Expand All @@ -1176,29 +1180,35 @@ def _pack_node(cls, node, with_metadata):
del node[key]
# Pack everything else
else:
node[key] = cls._pack_node(value, with_metadata)
node[key] = cls._pack_node(value, with_metadata, copy)
return node

def load(self, data, with_metadata):
def load(self, data, with_metadata, copy):
"""
Load data into the database.

Args:
data: The JSON data to load into the database. Must
adhere to the I/O version of the database schema.
Will be modified, if "copy" is False.
with_metadata: True if any metadata in the data should
also be loaded into the database. False if it
should be discarded and the database should
generate its metadata itself.
copy: True, if the loaded data should be copied before
packing. False, if the loaded data should be
packed in-place.
"""
assert self.io.is_compatible_directly(data)
assert LIGHT_ASSERTS or self.io.is_valid_exactly(data)
assert isinstance(with_metadata, bool)
assert isinstance(copy, bool)

# Load the data
for obj_list_name, table_schema in self.TABLE_MAP.items():
if obj_list_name in data:
obj_list = self._pack_node(data[obj_list_name], with_metadata)
obj_list = self._pack_node(data[obj_list_name],
with_metadata, copy)
if not LIGHT_ASSERTS:
validate_json_obj_list(table_schema, obj_list)
job_config = bigquery.job.LoadJobConfig(
Expand Down
2 changes: 1 addition & 1 deletion kcidb/db/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ def __init__(self, params):
io_schema = self.get_schema()[1]
for data in kcidb.misc.json_load_stream_fd(json_file.fileno()):
data = io_schema.upgrade(io_schema.validate(data), copy=False)
self.load(data, with_metadata=True)
self.load(data, with_metadata=True, copy=False)
12 changes: 10 additions & 2 deletions kcidb/db/mux.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ def oo_query(self, pattern_set):
"""
return self.drivers[0].oo_query(pattern_set)

def load(self, data, with_metadata):
def load(self, data, with_metadata, copy):
"""
Load data into the databases.
The databases must be initialized.
Expand All @@ -490,21 +490,29 @@ def load(self, data, with_metadata):
data: The JSON data to load into the databases.
Must adhere to the current database schema's
version of the I/O schema.
Will be modified, if "copy" is False.
with_metadata: True if any metadata in the data should
also be loaded into the databases. False if it
should be discarded and the databases should
generate their metadata themselves.
copy: True, if the loaded data should be copied before
packing. False, if the loaded data should be
packed in-place.
"""
# The mux driver I/O schema is the oldest across member drivers
io_schema = self.get_schema()[1]
assert io_schema.is_compatible_directly(data)
assert isinstance(with_metadata, bool)
assert isinstance(copy, bool)
# Load data into every driver
for driver in self.drivers:
# Only copy if we need to upgrade
driver_io_schema = driver.get_schema()[1]
driver.load(
driver_io_schema.upgrade(data)
if driver_io_schema != io_schema else data,
with_metadata=with_metadata
with_metadata=with_metadata,
copy=copy
)
# We don't want to pack packed data again
copy = True
11 changes: 7 additions & 4 deletions kcidb/db/null.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,16 +235,19 @@ def oo_query(self, pattern_set):
del pattern_set
return {}

def load(self, data, with_metadata):
def load(self, data, with_metadata, copy):
"""
Load data into the database.

Args:
data: The JSON data to load into the database.
Must adhere to the current database schema's
version of the I/O schema.
data: The JSON data to load into the database. Must
adhere to the I/O version of the database schema.
Will be modified, if "copy" is False.
with_metadata: True if any metadata in the data should
also be loaded into the database. False if it
should be discarded and the database should
generate its metadata itself.
copy: True, if the loaded data should be copied before
packing. False, if the loaded data should be
packed in-place.
"""
7 changes: 6 additions & 1 deletion kcidb/db/postgresql/v04_00.py
Original file line number Diff line number Diff line change
Expand Up @@ -916,21 +916,26 @@ def oo_query(self, pattern_set):
assert LIGHT_ASSERTS or orm.data.SCHEMA.is_valid(objs)
return objs

def load(self, data, with_metadata):
def load(self, data, with_metadata, copy):
"""
Load data into the database.

Args:
data: The JSON data to load into the database. Must
adhere to the I/O version of the database schema.
Will be modified, if "copy" is False.
with_metadata: True if any metadata in the data should
also be loaded into the database. False if it
should be discarded and the database should
generate its metadata itself.
copy: True, if the loaded data should be copied before
packing. False, if the loaded data should be
packed in-place.
"""
assert self.io.is_compatible_directly(data)
assert LIGHT_ASSERTS or self.io.is_valid_exactly(data)
assert isinstance(with_metadata, bool)
assert isinstance(copy, bool)
with self.conn, self.conn.cursor() as cursor:
for table_name, table_schema in self.TABLES.items():
if table_name in data:
Expand Down
15 changes: 12 additions & 3 deletions kcidb/db/schematic.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,17 +353,21 @@ def oo_query(self, pattern_set):
for r in pattern_set)

@abstractmethod
def load(self, data, with_metadata):
def load(self, data, with_metadata, copy):
"""
Load data into the database.

Args:
data: The JSON data to load into the database. Must
adhere to the schema's version of the I/O schema.
Will be modified, if "copy" is False.
with_metadata: True if any metadata in the data should
also be loaded into the database. False if it
should be discarded and the database should
generate its metadata itself.
copy: True, if the loaded data should be copied before
packing. False, if the loaded data should be
packed in-place.
"""
# Relying on the driver to check compatibility/validity

Expand Down Expand Up @@ -773,7 +777,7 @@ def oo_query(self, pattern_set):
assert self.is_initialized()
return self.schema.oo_query(pattern_set)

def load(self, data, with_metadata):
def load(self, data, with_metadata, copy):
"""
Load data into the database.
The database must be initialized.
Expand All @@ -782,13 +786,18 @@ def load(self, data, with_metadata):
data: The JSON data to load into the database.
Must adhere to the current database schema's
version of the I/O schema.
Will be modified, if "copy" is False.
with_metadata: True if any metadata in the data should
also be loaded into the database. False if it
should be discarded and the database should
generate its metadata itself.
copy: True, if the loaded data should be copied before
packing. False, if the loaded data should be
packed in-place.
"""
assert self.is_initialized()
assert self.schema.io.is_compatible_directly(data)
assert LIGHT_ASSERTS or self.schema.io.is_valid_exactly(data)
assert isinstance(with_metadata, bool)
self.schema.load(data, with_metadata=with_metadata)
assert isinstance(copy, bool)
self.schema.load(data, with_metadata=with_metadata, copy=copy)
7 changes: 6 additions & 1 deletion kcidb/db/sqlite/v04_00.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,21 +864,26 @@ def oo_query(self, pattern_set):
assert LIGHT_ASSERTS or orm.data.SCHEMA.is_valid(objs)
return objs

def load(self, data, with_metadata):
def load(self, data, with_metadata, copy):
"""
Load data into the database.

Args:
data: The JSON data to load into the database. Must
adhere to the I/O version of the database schema.
Will be modified, if "copy" is False.
with_metadata: True if any metadata in the data should
also be loaded into the database. False if it
should be discarded and the database should
generate its metadata itself.
copy: True, if the loaded data should be copied before
packing. False, if the loaded data should be
packed in-place.
"""
assert self.io.is_compatible_directly(data)
assert LIGHT_ASSERTS or self.io.is_valid_exactly(data)
assert isinstance(with_metadata, bool)
assert isinstance(copy, bool)
with self.conn:
cursor = self.conn.cursor()
try:
Expand Down
7 changes: 4 additions & 3 deletions kcidb/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ def test_load_main():
status = function()
Client.assert_called_once_with("bigquery:project.dataset")
client.load.assert_called_once_with({repr(empty)},
with_metadata=False)
with_metadata=False,
copy=False)
return status
""")
assert_executes(json.dumps(empty), *argv,
Expand All @@ -227,8 +228,8 @@ def test_load_main():
Client.assert_called_once_with("bigquery:project.dataset")
assert client.load.call_count == 2
client.load.assert_has_calls([
call({repr(empty)}, with_metadata=False),
call({repr(empty)}, with_metadata=False)
call({repr(empty)}, with_metadata=False, copy=False),
call({repr(empty)}, with_metadata=False, copy=False)
])
return status
""")
Expand Down
2 changes: 1 addition & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ def kcidb_archive(event, context):
after=after, until=next_after)
count = kcidb.io.SCHEMA.count(data)
LOGGER.info("LOADING %u objects into archive database", count)
ar_client.load(data, with_metadata=True)
ar_client.load(data, with_metadata=True, copy=False)
LOGGER.info("ARCHIVED %u objects in (%s, %s] range",
count, min_after_str, next_min_after_str)
for obj_list_name in after:
Expand Down