diff --git a/docker-compose.yml b/docker-compose.yml index 6386355ed..4ef18ee58 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,7 +17,7 @@ services: volumes: - influxdb-data:/var/lib/influxdb ports: - - "8086:8086" + - "8085:8085" environment: INFLUXDB_DB: openwisp2 INFLUXDB_USER: openwisp diff --git a/openwisp_monitoring/db/backends/influxdb2/client.py b/openwisp_monitoring/db/backends/influxdb2/client.py index cd1074a5c..83ad91eeb 100644 --- a/openwisp_monitoring/db/backends/influxdb2/client.py +++ b/openwisp_monitoring/db/backends/influxdb2/client.py @@ -9,8 +9,9 @@ from django.utils.functional import cached_property from django.utils.timezone import now from django.utils.translation import gettext_lazy as _ + from influxdb_client import InfluxDBClient, Point -from influxdb_client.client.exceptions import InfluxDBError +from influxdb_client.client.exceptions import InfluxDBError, BucketRetentionRules from influxdb_client.client.write_api import SYNCHRONOUS from openwisp_monitoring.utils import retry @@ -25,7 +26,6 @@ class DatabaseClient(object): def __init__(self, db_name=None): self._db = None - self.db_name = db_name or TIMESERIES_DB['NAME'] self.client_error = InfluxDBError self.write_api = None self.query_api = None @@ -41,25 +41,15 @@ def db(self): @retry def create_database(self): - "creates a new bucket if not already found" - bucket = self.db.buckets_api().find_bucket_by_name(self.db_name) - if bucket is None: - bucket = self.db.buckets_api().create_bucket(bucket_name=self.db_name, org=TIMESERIES_DB["ORG"]) - logger.debug(f'Created InfluxDB2 bucket "{self.db_name}"') - else: - logger.debug(f'Bucket named "{self.db_name}" found') + "initialize APIs required for writing and querying data from influxdb database" + logger.debug(f'quert_api and write_api for {str(self.db)} initiated') self.write_api = self.db.write_api(write_options=SYNCHRONOUS) self.query_api = self.db.query_api() @retry def drop_database(self): - "deletes a bucket if it exists" - bucket = self.db.buckets_api().find_bucket_by_name(self.db_name) - if bucket is None: - logger.debug(f'No such bucket: "{self.db_name}"') - else: - self.db.buckets_api().delete_bucket(bucket) - logger.debug(f'Bucket named "{self.db_name}" deleted') + "deletes all buckets" + pass @retry def query(self, query): @@ -118,3 +108,28 @@ def write(self, name, values, **kwargs): ): return raise TimeseriesWriteException + + @retry + def get_list_retention_policies(self, name=None): + if name is None: + logger.warning(f'no bucket name provided') + return None + bucket = self.db.buckets_api().find_bucket_by_name(name) + if bucket is None: + logger.warning(f'Bucket with name - {name} not found') + return None + return bucket.retention_rules + + @retry + def create_or_alter_retention_policy(self, name, duration): + """alters the retention policy if bucket with the given name exists, + otherwise create a new bucket""" + bucket = self.db.buckets_api().find_bucket_by_name(name) + retention_rules = BucketRetentionRules(type="expire", every_seconds=duration) + if not bucket is None: + bucket.retention_rules = retention_rules + self.db.buckets_api().update_bucket(bucket=bucket) + logger.warning(f'Retention policies for bucket '{name}' have been updated') + else: + bucket = buckets_api.create_bucket(bucket_name=name, retention_rules=retention_rules, org=TIMESERIES_DB["ORG"]) + logger.warning(f'Bucket '{name}' with specified retention polcies has been created')