Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[monitoring] Adding influxDB 2.x version support #274 #584

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[monitoring] Adding influxDB 2.x version support #274
Fixes #274
praptisharma28 committed May 31, 2024
commit 4a6ae397b86ae1e151b764ccc37d90c0e2066989
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@ jobs:
pip install -U pip wheel setuptools

- name: Install npm dependencies
run: sudo npm install -g install jshint stylelint
run: sudo npm install -g jshint stylelint

- name: Start InfluxDB container
run: docker-compose up -d influxdb
17 changes: 15 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,25 +1,38 @@
FROM python:3.9.19-slim-bullseye

# Install system dependencies
RUN apt update && \
apt install --yes zlib1g-dev libjpeg-dev gdal-bin libproj-dev \
libgeos-dev libspatialite-dev libsqlite3-mod-spatialite \
sqlite3 libsqlite3-dev openssl libssl-dev fping && \
rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/*

# Upgrade pip and install Python dependencies
RUN pip install -U pip setuptools wheel

# Copy and install project dependencies
COPY requirements-test.txt requirements.txt /opt/openwisp/
RUN pip install -r /opt/openwisp/requirements.txt && \
pip install -r /opt/openwisp/requirements-test.txt && \
rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/*

# Copy project files and install the project
ADD . /opt/openwisp
RUN pip install -U /opt/openwisp && \
rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/*

# Set working directory
WORKDIR /opt/openwisp/tests/

# Set environment variables
ENV NAME=openwisp-monitoring \
PYTHONBUFFERED=1 \
INFLUXDB_HOST=influxdb \
INFLUXDB1_HOST=influxdb \
INFLUXDB2_HOST=influxdb2 \
REDIS_HOST=redis
CMD ["sh", "docker-entrypoint.sh"]

# Expose the application port
EXPOSE 8000

# Command to run the application
CMD ["sh", "docker-entrypoint.sh"]
16 changes: 16 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ services:
- "8000:8000"
depends_on:
- influxdb
- influxdb2
- redis

influxdb:
@@ -28,6 +29,20 @@ services:
INFLUXDB_USER: openwisp
INFLUXDB_USER_PASSWORD: openwisp

influxdb2:
image: influxdb:2.0-alpine
volumes:
- influxdb2-data:/var/lib/influxdb2
ports:
- "8087:8086"
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: openwisp
DOCKER_INFLUXDB_INIT_PASSWORD: openwisp
DOCKER_INFLUXDB_INIT_ORG: openwisp
DOCKER_INFLUXDB_INIT_BUCKET: openwisp2
DOCKER_INFLUXDB_INIT_RETENTION: 1w

redis:
image: redis:5.0-alpine
ports:
@@ -36,3 +51,4 @@ services:

volumes:
influxdb-data: {}
influxdb2-data: {}
12 changes: 11 additions & 1 deletion openwisp_monitoring/db/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -30,6 +30,16 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
"""
try:
assert 'BACKEND' in TIMESERIES_DB, 'BACKEND'
if 'BACKEND' in TIMESERIES_DB and '2' in TIMESERIES_DB['BACKEND']:
# InfluxDB 2.x specific checks
assert 'TOKEN' in TIMESERIES_DB, 'TOKEN'
assert 'ORG' in TIMESERIES_DB, 'ORG'
assert 'BUCKET' in TIMESERIES_DB, 'BUCKET'
else:
# InfluxDB 1.x specific checks
assert 'USER' in TIMESERIES_DB, 'USER'
assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD'
assert 'NAME' in TIMESERIES_DB, 'NAME'
assert 'USER' in TIMESERIES_DB, 'USER'
assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD'
assert 'NAME' in TIMESERIES_DB, 'NAME'
@@ -48,7 +58,7 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
except ImportError as e:
# The database backend wasn't found. Display a helpful error message
# listing all built-in database backends.
builtin_backends = ['influxdb']
builtin_backends = ['influxdb', 'influxdb2']
if backend_name not in [
f'openwisp_monitoring.db.backends.{b}' for b in builtin_backends
]:
42 changes: 42 additions & 0 deletions openwisp_monitoring/db/backends/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging

from django.utils.functional import cached_property

from openwisp_monitoring.utils import retry

logger = logging.getLogger(__name__)


class BaseDatabaseClient:
def __init__(self, db_name=None):
self._db = None
self.db_name = db_name

@cached_property
def db(self):
raise NotImplementedError("Subclasses must implement `db` method")

@retry
def create_database(self):
raise NotImplementedError("Subclasses must implement `create_database` method")

@retry
def drop_database(self):
raise NotImplementedError("Subclasses must implement `drop_database` method")

@retry
def query(self, query):
raise NotImplementedError("Subclasses must implement `query` method")

def write(self, name, values, **kwargs):
raise NotImplementedError("Subclasses must implement `write` method")

def get_list_retention_policies(self, name=None):
raise NotImplementedError(
"Subclasses must implement `get_list_retention_policies` method"
)

def create_or_alter_retention_policy(self, name, duration):
raise NotImplementedError(
"Subclasses must implement `create_or_alter_retention_policy` method"
)
78 changes: 78 additions & 0 deletions openwisp_monitoring/db/backends/influxdb2/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import logging

from django.utils.functional import cached_property
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.write_api import SYNCHRONOUS

from openwisp_monitoring.utils import retry

from ...exceptions import TimeseriesWriteException
from .. import TIMESERIES_DB
from ..base import BaseDatabaseClient

logger = logging.getLogger(__name__)


class DatabaseClient(BaseDatabaseClient):
backend_name = 'influxdb2'

def __init__(self, db_name=None):
super().__init__(db_name)
self.client_error = InfluxDBError

@cached_property
def db(self):
return InfluxDBClient(
url=f"http://{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}",
token=TIMESERIES_DB['TOKEN'],
org=TIMESERIES_DB['ORG'],
bucket=self.db_name,
)

@retry
def create_database(self):
self.write_api = self.db.write_api(write_options=SYNCHRONOUS)
self.query_api = self.db.query_api()
logger.debug('Initialized APIs for InfluxDB 2.0')

@retry
def drop_database(self):
pass # Implement as needed for InfluxDB 2.0

@retry
def query(self, query):
return self.query_api.query(query)

def write(self, name, values, **kwargs):
point = Point(name).time(self._get_timestamp(kwargs.get('timestamp')))
tags = kwargs.get('tags', {})
for tag, value in tags.items():
point.tag(tag, value)
for field, value in values.items():
point.field(field, value)
try:
self.write_api.write(bucket=self.db_name, record=point)
except InfluxDBError as e:
raise TimeseriesWriteException(str(e))

@retry
def get_list_retention_policies(self, name=None):
bucket = self.db.buckets_api().find_bucket_by_name(name)
if bucket:
return bucket.retention_rules
return []

@retry
def create_or_alter_retention_policy(self, name, duration):
bucket = self.db.buckets_api().find_bucket_by_name(name)
retention_rules = [{"type": "expire", "everySeconds": duration}]
if bucket:
bucket.retention_rules = retention_rules
self.db.buckets_api().update_bucket(bucket=bucket)
else:
self.db.buckets_api().create_bucket(
bucket_name=name,
retention_rules=retention_rules,
org=TIMESERIES_DB["ORG"],
)
277 changes: 277 additions & 0 deletions openwisp_monitoring/db/backends/influxdb2/queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
chart_query = {
'uptime': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> map(fn: (r) => ({ r with uptime: r._value * 100 }))'
)
},
'packet_loss': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "loss" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean()'
)
},
'rtt': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "rtt_avg" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> yield(name: "RTT_average") '
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "rtt_max" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> yield(name: "RTT_max") '
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "rtt_min" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> yield(name: "RTT_min")'
)
},
'wifi_clients': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}" and '
'r["ifname"] == "{ifname}") '
'|> distinct() '
'|> count()'
)
},
'general_wifi_clients': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}"'
'{organization_id}{location_id}{floorplan_id}) '
'|> distinct() '
'|> count()'
)
},
'traffic': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "tx_bytes" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}" and '
'r["ifname"] == "{ifname}") '
'|> sum() '
'|> map(fn: (r) => ({ r with upload: r._value / 1000000000 })) '
'|> yield(name: "upload") '
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "rx_bytes" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}" and '
'r["ifname"] == "{ifname}") '
'|> sum() '
'|> map(fn: (r) => ({ r with download: r._value / 1000000000 })) '
'|> yield(name: "download")'
)
},
'general_traffic': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "tx_bytes"{organization_id}'
'{location_id}{floorplan_id}{ifname}) '
'|> sum() '
'|> map(fn: (r) => ({ r with upload: r._value / 1000000000 })) '
'|> yield(name: "upload") '
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "rx_bytes"{organization_id}'
'{location_id}{floorplan_id}{ifname}) '
'|> sum() '
'|> map(fn: (r) => ({ r with download: r._value / 1000000000 })) '
'|> yield(name: "download")'
)
},
'memory': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "percent_used" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> map(fn: (r) => ({ r with memory_usage: r._value }))'
)
},
'cpu': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "cpu_usage" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> map(fn: (r) => ({ r with CPU_load: r._value }))'
)
},
'disk': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "used_disk" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> map(fn: (r) => ({ r with disk_usage: r._value }))'
)
},
'signal_strength': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "signal_strength" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> map(fn: (r) => ({ r with signal_strength: round(r._value) })) '
'|> yield(name: "signal_strength") '
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "signal_power" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> map(fn: (r) => ({ r with signal_power: round(r._value) })) '
'|> yield(name: "signal_power")'
)
},
'signal_quality': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "signal_quality" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> map(fn: (r) => ({ r with signal_quality: round(r._value) })) '
'|> yield(name: "signal_quality") '
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "snr" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> map(fn: (r) => ({ r with signal_to_noise_ratio: round(r._value) })) '
'|> yield(name: "signal_to_noise_ratio")'
)
},
'access_tech': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}{end_date}) '
'|> filter(fn: (r) => r["_measurement"] == "access_tech" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mode() '
'|> map(fn: (r) => ({ r with access_tech: r._value }))'
)
},
'bandwidth': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "sent_bps_tcp" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> map(fn: (r) => ({ r with TCP: r._value / 1000000000 })) '
'|> yield(name: "TCP") '
'from(bucket: "{key}") '
'|> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "sent_bps_udp" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> map(fn: (r) => ({ r with UDP: r._value / 1000000000 })) '
'|> yield(name: "UDP")'
)
},
'transfer': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "sent_bytes_tcp" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> sum() '
'|> map(fn: (r) => ({ r with TCP: r._value / 1000000000 })) '
'|> yield(name: "TCP") '
'from(bucket: "{key}") '
'|> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "sent_bytes_udp" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> sum() '
'|> map(fn: (r) => ({ r with UDP: r._value / 1000000000 })) '
'|> yield(name: "UDP")'
)
},
'retransmits': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "retransmits" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> map(fn: (r) => ({ r with retransmits: r._value }))'
)
},
'jitter': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "jitter" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> map(fn: (r) => ({ r with jitter: r._value }))'
)
},
'datagram': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "lost_packets" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> map(fn: (r) => ({ r with lost_datagram: r._value })) '
'|> yield(name: "lost_datagram") '
'from(bucket: "{key}") '
'|> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "total_packets" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> map(fn: (r) => ({ r with total_datagram: r._value })) '
'|> yield(name: "total_datagram")'
)
},
'datagram_loss': {
'influxdb2': (
'from(bucket: "{key}") '
'|> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "lost_percent" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean() '
'|> map(fn: (r) => ({ r with datagram_loss: r._value }))'
)
},
}

default_chart_query = [
'from(bucket: "{key}") |> range(start: {time}{end_date}) ',
(
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}")'
),
]

device_data_query = (
'from(bucket: "{0}") |> range(start: 0) '
'|> filter(fn: (r) => r["_measurement"] == "{1}" and r["pk"] == "{2}") '
'|> sort(columns: ["_time"], desc: true) '
'|> limit(n: 1)'
)
433 changes: 433 additions & 0 deletions openwisp_monitoring/db/backends/influxdb2/tests.py

Large diffs are not rendered by default.

66 changes: 56 additions & 10 deletions openwisp_monitoring/monitoring/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -91,7 +91,13 @@
"SELECT {fields|SUM|/ 1} FROM {key} "
"WHERE time >= '{time}' AND content_type = "
"'{content_type}' AND object_id = '{object_id}'"
)
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> sum()'
),
},
},
'dummy': {
@@ -108,7 +114,7 @@
'description': 'Bugged chart for testing purposes.',
'unit': 'bugs',
'order': 999,
'query': {'influxdb': "BAD"},
'query': {'influxdb': "BAD", 'influxdb2': "BAD"},
},
'default': {
'type': 'line',
@@ -120,7 +126,12 @@
'influxdb': (
"SELECT {field_name} FROM {key} WHERE time >= '{time}' AND "
"content_type = '{content_type}' AND object_id = '{object_id}'"
)
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}")'
),
},
},
'multiple_test': {
@@ -133,26 +144,43 @@
'influxdb': (
"SELECT {field_name}, value2 FROM {key} WHERE time >= '{time}' AND "
"content_type = '{content_type}' AND object_id = '{object_id}'"
)
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" or '
'r["_measurement"] == "value2" and '
'r["content_type"] == "{content_type}" and '
'r["object_id"] == "{object_id}")'
),
},
},
'group_by_tag': {
'type': 'stackedbars',
'title': 'Group by tag',
'description': 'Query is groupped by tag along with time',
'description': 'Query is grouped by tag along with time',
'unit': 'n.',
'order': 999,
'query': {
'influxdb': (
"SELECT CUMULATIVE_SUM(SUM({field_name})) FROM {key} WHERE time >= '{time}'"
" GROUP BY time(1d), metric_num"
)
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}") '
'|> group(columns: ["metric_num"]) |> sum() |> cumulativeSum() |> window(every: 1d)'
),
},
'summary_query': {
'influxdb': (
"SELECT SUM({field_name}) FROM {key} WHERE time >= '{time}'"
" GROUP BY time(30d), metric_num"
)
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}") '
'|> group(columns: ["metric_num"]) |> sum() |> window(every: 30d)'
),
},
},
'mean_test': {
@@ -165,7 +193,13 @@
'influxdb': (
"SELECT MEAN({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND "
"content_type = '{content_type}' AND object_id = '{object_id}'"
)
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean()'
),
},
},
'sum_test': {
@@ -178,7 +212,13 @@
'influxdb': (
"SELECT SUM({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND "
"content_type = '{content_type}' AND object_id = '{object_id}'"
)
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> sum()'
),
},
},
'top_fields_mean': {
@@ -192,7 +232,13 @@
"SELECT {fields|MEAN} FROM {key} "
"WHERE time >= '{time}' AND content_type = "
"'{content_type}' AND object_id = '{object_id}'"
)
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean()'
),
},
},
}
7 changes: 6 additions & 1 deletion openwisp_monitoring/monitoring/tests/test_configuration.py
Original file line number Diff line number Diff line change
@@ -34,7 +34,12 @@ def _get_new_metric(self):
"SELECT {fields|SUM|/ 1} FROM {key} "
"WHERE time >= '{time}' AND content_type = "
"'{content_type}' AND object_id = '{object_id}'"
)
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}")'
),
},
}
},
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -4,3 +4,4 @@ django-nested-admin~=4.0.2
netaddr~=0.8.0
python-dateutil>=2.7.0,<3.0.0
openwisp-utils[rest] @ https://github.com/openwisp/openwisp-utils/tarball/master
influxdb-client~=1.21.0
11 changes: 9 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -55,6 +55,10 @@ def get_install_requires():
include_package_data=True,
zip_safe=False,
install_requires=get_install_requires(),
extras_require={
'influxdb': ['influxdb>=5.2,<5.3'],
'influxdb2': ['influxdb-client>=1.17.0,<2.0.0'],
},
classifiers=[
'Development Status :: 3 - Alpha',
'Environment :: Web Environment',
@@ -64,7 +68,10 @@ def get_install_requires():
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
'Operating System :: OS Independent',
'Framework :: Django',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
],
)
17 changes: 16 additions & 1 deletion tests/openwisp2/settings.py
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@
}
}

TIMESERIES_DATABASE = {
INFLUXDB_1x_DATABASE = {
'BACKEND': 'openwisp_monitoring.db.backends.influxdb',
'USER': 'openwisp',
'PASSWORD': 'openwisp',
@@ -31,6 +31,21 @@
# UDP writes are disabled by default
'OPTIONS': {'udp_writes': False, 'udp_port': 8089},
}

INFLUXDB_2x_DATABASE = {
'BACKEND': 'openwisp_monitoring.db.backends.influxdb2',
'TOKEN': 'your-influxdb-2.0-token',
'ORG': 'your-org',
'BUCKET': 'your-bucket',
'HOST': os.getenv('INFLUXDB2_HOST', 'localhost'),
'PORT': '8087',
}

if os.environ.get('USE_INFLUXDB2', False):
TIMESERIES_DATABASE = INFLUXDB_2x_DATABASE
else:
TIMESERIES_DATABASE = INFLUXDB_1x_DATABASE

if TESTING:
if os.environ.get('TIMESERIES_UDP', False):
TIMESERIES_DATABASE['OPTIONS'] = {'udp_writes': True, 'udp_port': 8091}