Skip to content

Commit

Permalink
[monitoring] Adding influxDB 2.x version support openwisp#274
Browse files Browse the repository at this point in the history
  • Loading branch information
praptisharma28 committed May 31, 2024
1 parent 510c61d commit e47cecd
Show file tree
Hide file tree
Showing 11 changed files with 955 additions and 20 deletions.
17 changes: 15 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,25 +1,38 @@
FROM python:3.9.19-slim-bullseye

# Install system dependencies
RUN apt update && \
apt install --yes zlib1g-dev libjpeg-dev gdal-bin libproj-dev \
libgeos-dev libspatialite-dev libsqlite3-mod-spatialite \
sqlite3 libsqlite3-dev openssl libssl-dev fping && \
rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/*

# Upgrade pip and install Python dependencies
RUN pip install -U pip setuptools wheel

# Copy and install project dependencies
COPY requirements-test.txt requirements.txt /opt/openwisp/
RUN pip install -r /opt/openwisp/requirements.txt && \
pip install -r /opt/openwisp/requirements-test.txt && \
rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/*

# Copy project files and install the project
ADD . /opt/openwisp
RUN pip install -U /opt/openwisp && \
rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/*

# Set working directory
WORKDIR /opt/openwisp/tests/

# Set environment variables
ENV NAME=openwisp-monitoring \
PYTHONBUFFERED=1 \
INFLUXDB_HOST=influxdb \
INFLUXDB1_HOST=influxdb \
INFLUXDB2_HOST=influxdb2 \
REDIS_HOST=redis
CMD ["sh", "docker-entrypoint.sh"]

# Expose the application port
EXPOSE 8000

# Command to run the application
CMD ["sh", "docker-entrypoint.sh"]
16 changes: 16 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
- "8000:8000"
depends_on:
- influxdb
- influxdb2
- redis

influxdb:
Expand All @@ -28,6 +29,20 @@ services:
INFLUXDB_USER: openwisp
INFLUXDB_USER_PASSWORD: openwisp

influxdb2:
image: influxdb:2.0-alpine
volumes:
- influxdb2-data:/var/lib/influxdb2
ports:
- "8087:8086"
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: openwisp
DOCKER_INFLUXDB_INIT_PASSWORD: openwisp
DOCKER_INFLUXDB_INIT_ORG: openwisp
DOCKER_INFLUXDB_INIT_BUCKET: openwisp2
DOCKER_INFLUXDB_INIT_RETENTION: 1w

redis:
image: redis:5.0-alpine
ports:
Expand All @@ -36,3 +51,4 @@ services:

volumes:
influxdb-data: {}
influxdb2-data: {}
16 changes: 12 additions & 4 deletions openwisp_monitoring/db/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,16 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
"""
try:
assert 'BACKEND' in TIMESERIES_DB, 'BACKEND'
assert 'USER' in TIMESERIES_DB, 'USER'
assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD'
assert 'NAME' in TIMESERIES_DB, 'NAME'
if 'BACKEND' in TIMESERIES_DB and '2' in TIMESERIES_DB['BACKEND']:
# InfluxDB 2.x specific checks
assert 'TOKEN' in TIMESERIES_DB, 'TOKEN'
assert 'ORG' in TIMESERIES_DB, 'ORG'
assert 'BUCKET' in TIMESERIES_DB, 'BUCKET'
else:
# InfluxDB 1.x specific checks
assert 'USER' in TIMESERIES_DB, 'USER'
assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD'
assert 'NAME' in TIMESERIES_DB, 'NAME'
assert 'HOST' in TIMESERIES_DB, 'HOST'
assert 'PORT' in TIMESERIES_DB, 'PORT'
if module:
Expand All @@ -48,7 +55,8 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
except ImportError as e:
# The database backend wasn't found. Display a helpful error message
# listing all built-in database backends.
builtin_backends = ['influxdb']
builtin_backends = ['influxdb', 'influxdb2']
raise e
if backend_name not in [
f'openwisp_monitoring.db.backends.{b}' for b in builtin_backends
]:
Expand Down
42 changes: 42 additions & 0 deletions openwisp_monitoring/db/backends/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging

from django.utils.functional import cached_property

from openwisp_monitoring.utils import retry

logger = logging.getLogger(__name__)


class BaseDatabaseClient:
def __init__(self, db_name=None):
self._db = None
self.db_name = db_name

@cached_property
def db(self):
raise NotImplementedError("Subclasses must implement `db` method")

@retry
def create_database(self):
raise NotImplementedError("Subclasses must implement `create_database` method")

@retry
def drop_database(self):
raise NotImplementedError("Subclasses must implement `drop_database` method")

@retry
def query(self, query):
raise NotImplementedError("Subclasses must implement `query` method")

def write(self, name, values, **kwargs):
raise NotImplementedError("Subclasses must implement `write` method")

def get_list_retention_policies(self, name=None):
raise NotImplementedError(
"Subclasses must implement `get_list_retention_policies` method"
)

def create_or_alter_retention_policy(self, name, duration):
raise NotImplementedError(
"Subclasses must implement `create_or_alter_retention_policy` method"
)
78 changes: 78 additions & 0 deletions openwisp_monitoring/db/backends/influxdb2/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import logging

from django.utils.functional import cached_property
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.write_api import SYNCHRONOUS

from openwisp_monitoring.utils import retry

from ...exceptions import TimeseriesWriteException
from .. import TIMESERIES_DB
from ..base import BaseDatabaseClient

logger = logging.getLogger(__name__)


class DatabaseClient(BaseDatabaseClient):
backend_name = 'influxdb2'

def __init__(self, db_name=None):
super().__init__(db_name)
self.client_error = InfluxDBError

@cached_property
def db(self):
return InfluxDBClient(
url=f"http://{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}",
token=TIMESERIES_DB['TOKEN'],
org=TIMESERIES_DB['ORG'],
bucket=self.db_name,
)

@retry
def create_database(self):
self.write_api = self.db.write_api(write_options=SYNCHRONOUS)
self.query_api = self.db.query_api()
logger.debug('Initialized APIs for InfluxDB 2.0')

@retry
def drop_database(self):
pass # Implement as needed for InfluxDB 2.0

@retry
def query(self, query):
return self.query_api.query(query)

def write(self, name, values, **kwargs):
point = Point(name).time(self._get_timestamp(kwargs.get('timestamp')))
tags = kwargs.get('tags', {})
for tag, value in tags.items():
point.tag(tag, value)
for field, value in values.items():
point.field(field, value)
try:
self.write_api.write(bucket=self.db_name, record=point)
except InfluxDBError as e:
raise TimeseriesWriteException(str(e))

@retry
def get_list_retention_policies(self, name=None):
bucket = self.db.buckets_api().find_bucket_by_name(name)
if bucket:
return bucket.retention_rules
return []

@retry
def create_or_alter_retention_policy(self, name, duration):
bucket = self.db.buckets_api().find_bucket_by_name(name)
retention_rules = [{"type": "expire", "everySeconds": duration}]
if bucket:
bucket.retention_rules = retention_rules
self.db.buckets_api().update_bucket(bucket=bucket)
else:
self.db.buckets_api().create_bucket(
bucket_name=name,
retention_rules=retention_rules,
org=TIMESERIES_DB["ORG"],
)
Loading

0 comments on commit e47cecd

Please sign in to comment.