Skip to content

Commit

Permalink
[monitoring] added code for retention policies in influxdb2.x openwis…
Browse files Browse the repository at this point in the history
  • Loading branch information
AbhigyaShridhar committed Apr 14, 2022
1 parent 70289c2 commit fd9d21f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 17 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ services:
volumes:
- influxdb-data:/var/lib/influxdb
ports:
- "8086:8086"
- "8085:8085"
environment:
INFLUXDB_DB: openwisp2
INFLUXDB_USER: openwisp
Expand Down
47 changes: 31 additions & 16 deletions openwisp_monitoring/db/backends/influxdb2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from django.utils.functional import cached_property
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.exceptions import InfluxDBError, BucketRetentionRules
from influxdb_client.client.write_api import SYNCHRONOUS

from openwisp_monitoring.utils import retry
Expand All @@ -25,7 +26,6 @@ class DatabaseClient(object):

def __init__(self, db_name=None):
self._db = None
self.db_name = db_name or TIMESERIES_DB['NAME']
self.client_error = InfluxDBError
self.write_api = None
self.query_api = None
Expand All @@ -41,25 +41,15 @@ def db(self):

@retry
def create_database(self):
"creates a new bucket if not already found"
bucket = self.db.buckets_api().find_bucket_by_name(self.db_name)
if bucket is None:
bucket = self.db.buckets_api().create_bucket(bucket_name=self.db_name, org=TIMESERIES_DB["ORG"])
logger.debug(f'Created InfluxDB2 bucket "{self.db_name}"')
else:
logger.debug(f'Bucket named "{self.db_name}" found')
"initialize APIs required for writing and querying data from influxdb database"
logger.debug(f'quert_api and write_api for {str(self.db)} initiated')
self.write_api = self.db.write_api(write_options=SYNCHRONOUS)
self.query_api = self.db.query_api()

@retry
def drop_database(self):
"deletes a bucket if it exists"
bucket = self.db.buckets_api().find_bucket_by_name(self.db_name)
if bucket is None:
logger.debug(f'No such bucket: "{self.db_name}"')
else:
self.db.buckets_api().delete_bucket(bucket)
logger.debug(f'Bucket named "{self.db_name}" deleted')
"deletes all buckets"
pass

@retry
def query(self, query):
Expand Down Expand Up @@ -118,3 +108,28 @@ def write(self, name, values, **kwargs):
):
return
raise TimeseriesWriteException

@retry
def get_list_retention_policies(self, name=None):
if name is None:
logger.warning(f'no bucket name provided')
return None
bucket = self.db.buckets_api().find_bucket_by_name(name)
if bucket is None:
logger.warning(f'Bucket with name - {name} not found')
return None
return bucket.retention_rules

@retry
def create_or_alter_retention_policy(self, name, duration):
"""alters the retention policy if bucket with the given name exists,
otherwise create a new bucket"""
bucket = self.db.buckets_api().find_bucket_by_name(name)
retention_rules = BucketRetentionRules(type="expire", every_seconds=duration)
if not bucket is None:
bucket.retention_rules = retention_rules
self.db.buckets_api().update_bucket(bucket=bucket)
logger.warning(f'Retention policies for bucket '{name}' have been updated')
else:
bucket = buckets_api.create_bucket(bucket_name=name, retention_rules=retention_rules, org=TIMESERIES_DB["ORG"])
logger.warning(f'Bucket '{name}' with specified retention polcies has been created')

0 comments on commit fd9d21f

Please sign in to comment.