From 78f0d624ed48387dc7cea0e45bf1f5134800b733 Mon Sep 17 00:00:00 2001 From: Nikolai Kondrashov Date: Thu, 31 Oct 2024 21:44:10 +0200 Subject: [PATCH] db: Support dumping a time range --- kcidb/db/__init__.py | 69 ++++++++++++++++++++++++--- kcidb/db/abstract.py | 20 +++++++- kcidb/db/bigquery/v04_00.py | 48 ++++++++++++++++--- kcidb/db/misc.py | 4 ++ kcidb/db/mux.py | 18 ++++++- kcidb/db/null.py | 19 +++++++- kcidb/db/postgresql/schema.py | 7 ++- kcidb/db/postgresql/v04_00.py | 21 +++++++-- kcidb/db/postgresql/v04_02.py | 3 +- kcidb/db/postgresql/v04_05.py | 5 +- kcidb/db/schematic.py | 43 +++++++++++++++-- kcidb/db/sql/schema.py | 62 +++++++++++++++++++++--- kcidb/db/sqlite/schema.py | 7 ++- kcidb/db/sqlite/v04_00.py | 19 +++++++- kcidb/db/sqlite/v04_02.py | 3 +- kcidb/test_db.py | 89 +++++++++++++++++++++++++++++++++++ 16 files changed, 397 insertions(+), 40 deletions(-) diff --git a/kcidb/db/__init__.py b/kcidb/db/__init__.py index 00dd1ca6..1971719c 100644 --- a/kcidb/db/__init__.py +++ b/kcidb/db/__init__.py @@ -11,6 +11,8 @@ from kcidb.db import abstract, schematic, mux, \ bigquery, postgresql, sqlite, json, null, misc # noqa: F401 +# It's OK for now, pylint: disable=too-many-lines + # Module's logger LOGGER = logging.getLogger(__name__) @@ -274,7 +276,8 @@ def upgrade(self, target_version=None): "Target schema is older than the current schema" self.driver.upgrade(target_version) - def dump_iter(self, objects_per_report=0, with_metadata=True): + def dump_iter(self, objects_per_report=0, with_metadata=True, + after=None, until=None): """ Dump all data from the database in object number-limited chunks. @@ -283,37 +286,76 @@ def dump_iter(self, objects_per_report=0, with_metadata=True): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the current I/O schema version, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ - assert self.is_initialized() assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) + assert after is None or \ + isinstance(after, datetime.datetime) and after.tzinfo + assert until is None or \ + isinstance(until, datetime.datetime) and until.tzinfo + assert self.is_initialized() yield from self.driver.dump_iter( objects_per_report=objects_per_report, - with_metadata=with_metadata + with_metadata=with_metadata, + after=after, until=until ) - def dump(self, with_metadata=True): + def dump(self, with_metadata=True, after=None, until=None): """ Dump all data from the database. Args: with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: The JSON data from the database adhering to the current I/O schema version. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ + assert isinstance(with_metadata, bool) + assert after is None or \ + isinstance(after, datetime.datetime) and after.tzinfo + assert until is None or \ + isinstance(until, datetime.datetime) and until.tzinfo assert self.is_initialized() try: return next(self.dump_iter(objects_per_report=0, - with_metadata=with_metadata)) + with_metadata=with_metadata, + after=after, until=until)) except StopIteration: return self.get_schema()[1].new() @@ -776,13 +818,28 @@ def dump_main(): help='Do not dump metadata fields', action='store_true' ) + parser.add_argument( + '--after', + metavar='AFTER', + type=kcidb.misc.iso_timestamp, + help="An ISO-8601 timestamp specifying the latest time the data to " + "be *excluded* from the dump should've arrived." + ) + parser.add_argument( + '--until', + metavar='UNTIL', + type=kcidb.misc.iso_timestamp, + help="An ISO-8601 timestamp specifying the latest time the data to " + "be *included* into the dump should've arrived." + ) args = parser.parse_args() client = Client(args.database) if not client.is_initialized(): raise Exception(f"Database {args.database!r} is not initialized") kcidb.misc.json_dump_stream( client.dump_iter(objects_per_report=args.objects_per_report, - with_metadata=not args.without_metadata), + with_metadata=not args.without_metadata, + after=args.after, until=args.until), sys.stdout, indent=args.indent, seq=args.seq_out ) diff --git a/kcidb/db/abstract.py b/kcidb/db/abstract.py index aded832d..b4915653 100644 --- a/kcidb/db/abstract.py +++ b/kcidb/db/abstract.py @@ -170,7 +170,7 @@ def upgrade(self, target_version): "Target schema is older than the current schema" @abstractmethod - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the database in object number-limited chunks. The database must be initialized. @@ -180,15 +180,33 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the current database schema's I/O schema version, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) + assert after is None or \ + isinstance(after, datetime.datetime) and after.tzinfo + assert until is None or \ + isinstance(until, datetime.datetime) and until.tzinfo assert self.is_initialized() # No, it's not, pylint: disable=too-many-return-statements diff --git a/kcidb/db/bigquery/v04_00.py b/kcidb/db/bigquery/v04_00.py index f3188a9c..a80506a4 100644 --- a/kcidb/db/bigquery/v04_00.py +++ b/kcidb/db/bigquery/v04_00.py @@ -17,7 +17,7 @@ from kcidb.db.schematic import \ Schema as AbstractSchema, \ Connection as AbstractConnection -from kcidb.db.misc import NotFound +from kcidb.db.misc import NotFound, NoTimestamps from kcidb.db.bigquery.schema import validate_json_obj_list # We'll manage for now, pylint: disable=too-many-lines @@ -749,7 +749,7 @@ def _unpack_node(cls, node, drop_null=True): node[key] = cls._unpack_node(value) return node - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the database in object number-limited chunks. @@ -758,11 +758,25 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the I/O version of the database schema, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) assert objects_per_report >= 0 @@ -771,14 +785,34 @@ def dump_iter(self, objects_per_report, with_metadata): obj_num = 0 data = self.io.new() for obj_list_name, table_schema in self.TABLE_MAP.items(): - query_string = \ - "SELECT " + \ + ts_field = next( + (f for f in table_schema if f.name == "_timestamp"), + None + ) + if (after or until) and not ts_field: + raise NoTimestamps( + f"Table {obj_list_name!r} has no {ts_field.name!r} column" + ) + + query_string = ( + "SELECT " + ", ".join( f"`{f.name}`" for f in table_schema if with_metadata or f.name[0] != '_' - ) + \ - f" FROM `{obj_list_name}`" - query_job = self.conn.query_create(query_string) + ) + + f" FROM `{obj_list_name}`" + + (( + " WHERE " + " AND ".join( + f"{ts_field.name} {op} ?" + for op, v in ((">", after), ("<=", until)) if v + ) + ) if (after or until) else "") + ) + query_parameters = [ + bigquery.ScalarQueryParameter(None, ts_field.field_type, v) + for v in (after, until) if v + ] + query_job = self.conn.query_create(query_string, query_parameters) obj_list = None for row in query_job: if obj_list is None: diff --git a/kcidb/db/misc.py b/kcidb/db/misc.py index 0e0d6ce2..a7738bdb 100644 --- a/kcidb/db/misc.py +++ b/kcidb/db/misc.py @@ -33,6 +33,10 @@ class UnsupportedSchema(Error): """Database schema version is not supported""" +class NoTimestamps(Error): + """Row timestamps required for the operation don't exist""" + + def format_spec_list(specs): """ Format a database specification list string out of a list of specification diff --git a/kcidb/db/mux.py b/kcidb/db/mux.py index 97e3bc25..8c1332fa 100644 --- a/kcidb/db/mux.py +++ b/kcidb/db/mux.py @@ -355,7 +355,7 @@ def upgrade(self, target_version): driver.upgrade(driver_version) self.version = version - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the first database in object number-limited chunks. @@ -364,14 +364,28 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the current I/O schema version, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ yield from self.drivers[0].dump_iter(objects_per_report, - with_metadata) + with_metadata, after, until) # We can live with this for now, pylint: disable=too-many-arguments # Or if you prefer, pylint: disable=too-many-positional-arguments diff --git a/kcidb/db/null.py b/kcidb/db/null.py index fa1d71db..8e9c2fe9 100644 --- a/kcidb/db/null.py +++ b/kcidb/db/null.py @@ -129,7 +129,7 @@ def get_last_modified(self): """ return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the database in object number-limited chunks. @@ -138,13 +138,30 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the current I/O schema version, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ del objects_per_report + del with_metadata + del after + del until yield io.SCHEMA.new() # We can live with this for now, pylint: disable=too-many-arguments diff --git a/kcidb/db/postgresql/schema.py b/kcidb/db/postgresql/schema.py index 414819a8..ee9cfcd0 100644 --- a/kcidb/db/postgresql/schema.py +++ b/kcidb/db/postgresql/schema.py @@ -237,7 +237,7 @@ def __init__(self, constraint=None, class Table(_SQLTable): """A table schema""" - def __init__(self, columns, primary_key=None): + def __init__(self, columns, primary_key=None, timestamp=None): """ Initialize the table schema. @@ -249,9 +249,12 @@ def __init__(self, columns, primary_key=None): primary_key: A list of names of columns constituting the primary key. None or an empty list to use the column with the PRIMARY_KEY constraint instead. + timestamp The name of the column containing last row change + timestamp. Must exist in "columns". """ # TODO: Switch to hardcoding "_" key_sep in base class - super().__init__("%s", columns, primary_key, key_sep="_") + super().__init__("%s", columns, primary_key, key_sep="_", + timestamp=timestamp) class Index(_SQLIndex): diff --git a/kcidb/db/postgresql/v04_00.py b/kcidb/db/postgresql/v04_00.py index 97ffc842..aecad66a 100644 --- a/kcidb/db/postgresql/v04_00.py +++ b/kcidb/db/postgresql/v04_00.py @@ -569,7 +569,7 @@ def empty(self): for name, schema in self.TABLES.items(): cursor.execute(schema.format_delete(name)) - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the database in object number-limited chunks. @@ -578,11 +578,25 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the I/O version of the database schema, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) assert objects_per_report >= 0 @@ -593,8 +607,9 @@ def dump_iter(self, objects_per_report, with_metadata): with self.conn, self.conn.cursor() as cursor: for table_name, table_schema in self.TABLES.items(): obj_list = None - cursor.execute(table_schema.format_dump(table_name, - with_metadata)) + cursor.execute(*table_schema.format_dump(table_name, + with_metadata, + after, until)) for obj in table_schema.unpack_iter(cursor, with_metadata): if obj_list is None: obj_list = [] diff --git a/kcidb/db/postgresql/v04_02.py b/kcidb/db/postgresql/v04_02.py index 7d74931d..a07ad741 100644 --- a/kcidb/db/postgresql/v04_02.py +++ b/kcidb/db/postgresql/v04_02.py @@ -31,7 +31,8 @@ class Schema(PreviousSchema): TABLES_ARGS = { name: merge_dicts( args, - columns=dict(_timestamp=TIMESTAMP_COLUMN, **args["columns"]) + columns=dict(_timestamp=TIMESTAMP_COLUMN, **args["columns"]), + timestamp="_timestamp", ) for name, args in PreviousSchema.TABLES_ARGS.items() } diff --git a/kcidb/db/postgresql/v04_05.py b/kcidb/db/postgresql/v04_05.py index 4f234f2a..e3949d3a 100644 --- a/kcidb/db/postgresql/v04_05.py +++ b/kcidb/db/postgresql/v04_05.py @@ -52,11 +52,12 @@ class Schema(PreviousSchema): # For use by descendants TABLES_ARGS = merge_dicts( PreviousSchema.TABLES_ARGS, - tests=dict( + tests=merge_dicts( + PreviousSchema.TABLES_ARGS["tests"], columns=merge_dicts( PreviousSchema.TABLES_ARGS["tests"]["columns"], status=Column("STATUS"), - ) + ), ), ) diff --git a/kcidb/db/schematic.py b/kcidb/db/schematic.py index 5d1eb208..870d3def 100644 --- a/kcidb/db/schematic.py +++ b/kcidb/db/schematic.py @@ -274,7 +274,7 @@ def purge(self, before): return False @abstractmethod - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the database in object number-limited chunks. The database must be initialized. @@ -284,15 +284,33 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the current database schema's I/O schema version, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) + assert after is None or \ + isinstance(after, datetime.datetime) and after.tzinfo + assert until is None or \ + isinstance(until, datetime.datetime) and until.tzinfo # We can live with this for now, pylint: disable=too-many-arguments # Or if you prefer, pylint: disable=too-many-positional-arguments @@ -605,7 +623,7 @@ def upgrade(self, target_version): self.conn.set_schema_version(schema.version) self.schema = schema(self.conn) - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the database in object number-limited chunks. The database must be initialized. @@ -615,17 +633,36 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the schema's I/O schema version, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) + assert after is None or \ + isinstance(after, datetime.datetime) and after.tzinfo + assert until is None or \ + isinstance(until, datetime.datetime) and until.tzinfo assert self.is_initialized() - return self.schema.dump_iter(objects_per_report, with_metadata) + return self.schema.dump_iter(objects_per_report, with_metadata, + after, until) # We can live with this for now, pylint: disable=too-many-arguments # Or if you prefer, pylint: disable=too-many-positional-arguments diff --git a/kcidb/db/sql/schema.py b/kcidb/db/sql/schema.py index 249996a8..7cc2d1d4 100644 --- a/kcidb/db/sql/schema.py +++ b/kcidb/db/sql/schema.py @@ -2,8 +2,10 @@ Kernel CI report database - generic SQL schema definitions """ +import datetime import re from enum import Enum +from kcidb.db.misc import NoTimestamps class Constraint(Enum): @@ -149,7 +151,10 @@ def format_def(self): class Table: """A table schema""" - def __init__(self, placeholder, columns, primary_key=None, key_sep="_"): + # It's OK, pylint: disable=too-many-arguments + # Or, if you wish, pylint: disable=too-many-positional-arguments + def __init__(self, placeholder, columns, primary_key=None, key_sep="_", + timestamp=None): """ Initialize the table schema. @@ -164,7 +169,10 @@ def __init__(self, placeholder, columns, primary_key=None, key_sep="_"): primary key. None or an empty list to use the column with the PRIMARY_KEY constraint instead. key_sep: String used to replace dots in column names ("key" - separator) + separator). + timestamp The name of the column containing last row change + timestamp. Must exist in "columns". None, if the + table has no such column. """ assert isinstance(placeholder, str) and str assert isinstance(columns, dict) @@ -173,6 +181,8 @@ def __init__(self, placeholder, columns, primary_key=None, key_sep="_"): isinstance(column, Column) for name, column in columns.items() ) + assert timestamp is None or timestamp in columns + # The number of columns with PRIMARY_KEY constraint set primary_key_constraints = sum( c.constraint == Constraint.PRIMARY_KEY for c in columns.values() @@ -199,6 +209,8 @@ def __init__(self, placeholder, columns, primary_key=None, key_sep="_"): } # A list of columns in the explicitly-specified primary key self.primary_key = [self.columns[name] for name in primary_key] + # The timestamp table column + self.timestamp = timestamp and self.columns[timestamp] def format_create(self, name): """ @@ -273,7 +285,7 @@ def format_insert(self, name, prio_db, with_metadata): c not in self.primary_key ) - def format_dump(self, name, with_metadata): + def format_dump(self, name, with_metadata, after, until): """ Format the "SELECT" command for dumping the table contents, returning data suitable for unpacking with unpack*() methods. @@ -282,18 +294,54 @@ def format_dump(self, name, with_metadata): name: The name of the target table of the command. with_metadata: True, if metadata fields should be dumped too. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to be + excluded from the dump should've arrived. The data + after this time will be dumped. Can be None to + have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to be + dumped should've arrived. The data after this time + will not be dumped. Can be None to have no limit + on newer data. Returns: - The formatted "SELECT" command. + The formatted "SELECT" command, and its parameter container. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ assert isinstance(name, str) assert isinstance(with_metadata, bool) - return "SELECT " + \ + assert after is None or \ + isinstance(after, datetime.datetime) and after.tzinfo + assert until is None or \ + isinstance(until, datetime.datetime) and until.tzinfo + + if (after or until) and not self.timestamp: + raise NoTimestamps("Table has no timestamp column") + + return ( + "SELECT " + ", ".join( c.name for c in self.columns.values() if with_metadata or not c.schema.metadata_expr - ) + \ - f" FROM {name}" + ) + + f" FROM {name}" + + (( + " WHERE " + " AND ".join( + f"{self.timestamp.name} {op} {self.placeholder}" + for op, v in ((">", after), ("<=", until)) if v + ) + ) if (after or until) else ""), + [ + self.timestamp.schema.pack( + v.isoformat(timespec='microseconds') + ) + for v in (after, until) if v + ] + ) def format_delete(self, name): """ diff --git a/kcidb/db/sqlite/schema.py b/kcidb/db/sqlite/schema.py index 4f345e95..78a6cb4d 100644 --- a/kcidb/db/sqlite/schema.py +++ b/kcidb/db/sqlite/schema.py @@ -174,7 +174,7 @@ def __init__(self, constraint=None, class Table(_SQLTable): """A table schema""" - def __init__(self, columns, primary_key=None): + def __init__(self, columns, primary_key=None, timestamp=None): """ Initialize the table schema. @@ -186,9 +186,12 @@ def __init__(self, columns, primary_key=None): primary_key: A list of names of columns constituting the primary key. None or an empty list to use the column with the PRIMARY_KEY constraint instead. + timestamp The name of the column containing last row change + timestamp. Must exist in "columns". """ # TODO: Switch to using "_" key_sep, and hardcoding it in base class - super().__init__("?", columns, primary_key, key_sep=".") + super().__init__("?", columns, primary_key, key_sep=".", + timestamp=timestamp) def format_create(self, name): """ diff --git a/kcidb/db/sqlite/v04_00.py b/kcidb/db/sqlite/v04_00.py index 689551bd..398e429c 100644 --- a/kcidb/db/sqlite/v04_00.py +++ b/kcidb/db/sqlite/v04_00.py @@ -497,7 +497,7 @@ def empty(self): finally: cursor.close() - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the database in object number-limited chunks. @@ -506,11 +506,25 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the I/O version of the database schema, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) assert objects_per_report >= 0 @@ -523,7 +537,8 @@ def dump_iter(self, objects_per_report, with_metadata): try: for table_name, table_schema in self.TABLES.items(): result = cursor.execute( - table_schema.format_dump(table_name, with_metadata) + *table_schema.format_dump(table_name, with_metadata, + after, until) ) obj_list = None for obj in table_schema.unpack_iter(result, diff --git a/kcidb/db/sqlite/v04_02.py b/kcidb/db/sqlite/v04_02.py index 407520d4..45d699c7 100644 --- a/kcidb/db/sqlite/v04_02.py +++ b/kcidb/db/sqlite/v04_02.py @@ -31,7 +31,8 @@ class Schema(PreviousSchema): TABLES_ARGS = { name: merge_dicts( args, - columns=dict(_timestamp=TIMESTAMP_COLUMN, **args["columns"]) + columns=dict(_timestamp=TIMESTAMP_COLUMN, **args["columns"]), + timestamp="_timestamp", ) for name, args in PreviousSchema.TABLES_ARGS.items() } diff --git a/kcidb/test_db.py b/kcidb/test_db.py index cd8db7f4..dbe5264e 100644 --- a/kcidb/test_db.py +++ b/kcidb/test_db.py @@ -1994,3 +1994,92 @@ def test_purge(empty_database): assert client.dump() == kcidb.io.SCHEMA.new() else: assert not client.purge(None) + + +def test_dump_limits(empty_database): + """Test the dump() method observes time limits""" + # It's OK, pylint: disable=too-many-locals + client = empty_database + + # Check we can do a basic dump + io_schema = client.get_schema()[1] + assert client.dump(with_metadata=False) == io_schema.new() + + drivers = [*client.driver.drivers] \ + if isinstance(client.driver, kcidb.db.mux.Driver) \ + else [client.driver] + + # If this is a database and schema which *should* support purging + if all( + isinstance(driver, + (kcidb.db.bigquery.Driver, + kcidb.db.postgresql.Driver, + kcidb.db.sqlite.Driver)) and + driver.get_schema()[0] >= (4, 2) + for driver in drivers + ): + io_data_1 = deepcopy(COMPREHENSIVE_IO_DATA) + io_data_2 = deepcopy(COMPREHENSIVE_IO_DATA) + for obj_list_name in kcidb.io.SCHEMA.graph: + if obj_list_name: + for obj in io_data_2[obj_list_name]: + obj["id"] = "origin:2" + + client.load(io_data_1) + assert client.dump(with_metadata=False) == io_data_1 + time.sleep(1) + dump = client.dump(with_metadata=True) + first_load_timestamps = [ + dateutil.parser.isoparse(obj["_timestamp"]) + for obj_list_name in io_schema.id_fields + for obj in dump[obj_list_name] + ] + before_first_load = min(first_load_timestamps) - \ + datetime.timedelta(microseconds=1) + latest_first_load = max(first_load_timestamps) + time.sleep(1) + client.load(io_data_2) + dump = client.dump(with_metadata=True) + second_load_timestamps = [ + dateutil.parser.isoparse(obj["_timestamp"]) + for obj_list_name in io_schema.id_fields + for obj in dump[obj_list_name] + ] + latest_second_load = max(second_load_timestamps) + + # Check both datasets are in the database + # regardless of the object order + for dump in [ + client.dump(with_metadata=False), + client.dump(with_metadata=False, + after=None, + until=None), + client.dump(with_metadata=False, + after=before_first_load, + until=latest_second_load), + ]: + for io_data in (io_data_1, io_data_2): + for obj_list_name in io_schema.id_fields: + assert obj_list_name in dump + for obj in io_data[obj_list_name]: + assert obj in dump[obj_list_name] + + assert client.dump(with_metadata=False, + until=latest_first_load) == io_data_1 + assert client.dump(with_metadata=False, + after=latest_first_load) == io_data_2 + assert client.dump(with_metadata=False, + until=before_first_load) == io_schema.new() + assert client.dump(with_metadata=False, + after=latest_second_load) == io_schema.new() + time.sleep(1) + assert client.purge(client.get_current_time()) + assert client.dump() == io_schema.new() + else: + now = datetime.datetime.now(datetime.timezone.utc) + with pytest.raises(AssertionError): + client.dump(after=now) + with pytest.raises(AssertionError): + client.dump(until=now) + with pytest.raises(AssertionError): + client.dump(after=now, until=now)