From 6616870e4697f32cd4f794f15f57b7af3bd0759e Mon Sep 17 00:00:00 2001 From: Gagan Deep Date: Tue, 31 Dec 2024 22:25:56 +0530 Subject: [PATCH] [fix] Fixed "migrate_timeseries" command #626 Fixes #626 --- .../db/backends/influxdb/client.py | 20 +++++++++++- .../influxdb/influxdb_alter_structure_0006.py | 31 +++++++------------ 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/openwisp_monitoring/db/backends/influxdb/client.py b/openwisp_monitoring/db/backends/influxdb/client.py index b6b9428d..6c398823 100644 --- a/openwisp_monitoring/db/backends/influxdb/client.py +++ b/openwisp_monitoring/db/backends/influxdb/client.py @@ -149,14 +149,32 @@ def query(self, query, precision=None, **kwargs): ) def _write(self, points, database, retention_policy): + """ + Write data points in the specified database. + + This method writes data points to the specified database. If the size of the data exceeds + the limit of a UDP packet, it falls back to using a TCP connection for writing data. + + Args: + points (list): The data points to be stored. + database (str): The name of the database where the data points will be stored. + retention_policy (str): The retention policy to be used for storing the data points. + + Returns: + bool: True if the data points were successfully written, False otherwise. + + Raises: + TimeseriesWriteException: If there is an error while writing the data points. + """ db = self.dbs['short'] if retention_policy else self.dbs['default'] # If the size of data exceeds the limit of the UDP packet, then # fallback to use TCP connection for writing data. lines = make_lines({'points': points}) if sys.getsizeof(lines) > 65000: + # Size exceeds UDP limit, write using TCP. db = self.dbs['__all__'] try: - db.write_points( + return db.write_points( points=lines.split('\n')[:-1], database=database, retention_policy=retention_policy, diff --git a/openwisp_monitoring/monitoring/migrations/influxdb/influxdb_alter_structure_0006.py b/openwisp_monitoring/monitoring/migrations/influxdb/influxdb_alter_structure_0006.py index 873e1d7c..1ec5aee1 100644 --- a/openwisp_monitoring/monitoring/migrations/influxdb/influxdb_alter_structure_0006.py +++ b/openwisp_monitoring/monitoring/migrations/influxdb/influxdb_alter_structure_0006.py @@ -89,7 +89,7 @@ def migrate_influxdb_data( configuration, new_measurement, read_query=READ_QUERY, - delete_query=DELETE_QUERY, + delete_query=None, ): Metric = load_model('monitoring', 'Metric') metric_qs = Metric.objects.filter(configuration=configuration, key=new_measurement) @@ -114,10 +114,10 @@ def migrate_influxdb_data( start = offset end = offset + min(write_data_count, SELECT_QUERY_LIMIT) response = retry_until_success( - timeseries_db.db.write_points, + timeseries_db._write, write_data, - tags=metric.tags, - batch_size=WRITE_BATCH_SIZE, + timeseries_db.db_name, + retention_policy=None, ) if response is True: logger.info( @@ -151,39 +151,26 @@ def migrate_influxdb_data( def migrate_wifi_clients(): read_query = f"{READ_QUERY} AND clients != ''" - # Lookup using fields not supported in WHERE clause during deletion. - # Hence, we cannot perform delete operation only for rows that - # contains clients. - delete_query = None migrate_influxdb_data( new_measurement='wifi_clients', configuration='clients', read_query=read_query, - delete_query=delete_query, ) logger.info('"wifi_clients" measurements successfully migrated.') def migrate_traffic_data(): - migrate_influxdb_data( - new_measurement='traffic', - configuration='traffic', - delete_query=f"{DELETE_QUERY} AND access_tech != ''", - ) + migrate_influxdb_data(new_measurement='traffic', configuration='traffic') logger.info('"traffic" measurements successfully migrated.') def migrate_signal_strength_data(): - migrate_influxdb_data( - new_measurement='signal', configuration='signal_strength', delete_query=None - ) + migrate_influxdb_data(new_measurement='signal', configuration='signal_strength') logger.info('"signal_strength" measurements successfully migrated.') def migrate_signal_quality_data(): - migrate_influxdb_data( - new_measurement='signal', configuration='signal_quality', delete_query=None - ) + migrate_influxdb_data(new_measurement='signal', configuration='signal_quality') logger.info('"signal_quality" measurements successfully migrated.') @@ -191,6 +178,7 @@ def migrate_access_tech_data(): migrate_influxdb_data( new_measurement='signal', configuration='access_tech', + delete_query=DELETE_QUERY, ) logger.info('"access_tech" measurements successfully migrated.') @@ -215,6 +203,9 @@ def migrate_influxdb_structure(): 'Timeseries data migration is already migrated. Skipping migration!' ) return + # IMPORTANT: Do not change order of below functions. + # Lookup using fields not supported in WHERE clause during deletion. + # "migrate_access_tech_data" at the end because it is deletes the metrics. migrate_wifi_clients() migrate_traffic_data() migrate_signal_strength_data()