Skip to content

Commit

Permalink
[change] Changed code/docs from "iperf" to "iperf3" #443
Browse files Browse the repository at this point in the history
Closes #443

Co-authored-by: Federico Capoano <[email protected]>
  • Loading branch information
Aryamanz29 and nemesifier authored Oct 20, 2022
1 parent 45d5e75 commit f69e854
Show file tree
Hide file tree
Showing 13 changed files with 359 additions and 353 deletions.
182 changes: 91 additions & 91 deletions README.rst

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions openwisp_monitoring/check/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ def _connect_signals(self):
sender=load_model('config', 'Device'),
dispatch_uid='auto_config_check',
)
if app_settings.AUTO_IPERF:
from .base.models import auto_iperf_check_receiver
if app_settings.AUTO_IPERF3:
from .base.models import auto_iperf3_check_receiver

post_save.connect(
auto_iperf_check_receiver,
auto_iperf3_check_receiver,
sender=load_model('config', 'Device'),
dispatch_uid='auto_iperf_check',
dispatch_uid='auto_iperf3_check',
)
8 changes: 4 additions & 4 deletions openwisp_monitoring/check/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from openwisp_monitoring.check import settings as app_settings
from openwisp_monitoring.check.tasks import (
auto_create_config_check,
auto_create_iperf_check,
auto_create_iperf3_check,
auto_create_ping,
)
from openwisp_utils.base import TimeStampedEditableModel
Expand Down Expand Up @@ -142,17 +142,17 @@ def auto_config_check_receiver(sender, instance, created, **kwargs):
)


def auto_iperf_check_receiver(sender, instance, created, **kwargs):
def auto_iperf3_check_receiver(sender, instance, created, **kwargs):
"""
Implements OPENWISP_MONITORING_AUTO_IPERF
Implements OPENWISP_MONITORING_AUTO_IPERF3
The creation step is executed in the background
"""
# we need to skip this otherwise this task will be executed
# every time the configuration is requested via checksum
if not created:
return
transaction_on_commit(
lambda: auto_create_iperf_check.delay(
lambda: auto_create_iperf3_check.delay(
model=sender.__name__.lower(),
app_label=sender._meta.app_label,
object_id=str(instance.pk),
Expand Down
2 changes: 1 addition & 1 deletion openwisp_monitoring/check/classes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .config_applied import ConfigApplied # noqa
from .iperf import Iperf # noqa
from .iperf3 import Iperf3 # noqa
from .ping import Ping # noqa
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
AlertSettings = load_model('monitoring', 'AlertSettings')
DeviceConnection = load_model('connection', 'DeviceConnection')

DEFAULT_IPERF_CHECK_CONFIG = {
DEFAULT_IPERF3_CHECK_CONFIG = {
'host': {
'type': 'array',
'items': {
'type': 'string',
},
'default': [],
},
# username, password max_length chosen from iperf3 docs to avoid iperf param errors
# username, password max_length chosen from iperf3 docs to avoid iperf3 param errors
'username': {'type': 'string', 'default': '', 'minLength': 1, 'maxLength': 20},
'password': {'type': 'string', 'default': '', 'minLength': 1, 'maxLength': 20},
'rsa_public_key': {
Expand Down Expand Up @@ -149,7 +149,7 @@
}


def get_iperf_schema():
def get_iperf3_schema():
schema = {
'$schema': 'http://json-schema.org/draft-07/schema#',
'type': 'object',
Expand All @@ -160,13 +160,13 @@ def get_iperf_schema():
'rsa_public_key': ['username', 'password'],
},
}
schema['properties'] = DEFAULT_IPERF_CHECK_CONFIG
schema['properties'] = DEFAULT_IPERF3_CHECK_CONFIG
return schema


class Iperf(BaseCheck):
class Iperf3(BaseCheck):

schema = get_iperf_schema()
schema = get_iperf3_schema()

def validate_params(self, params=None):
try:
Expand All @@ -181,70 +181,70 @@ def validate_params(self, params=None):
message = '{0}: {1}'.format(message, e.message)
raise ValidationError({'params': message}) from e

def _validate_iperf_config(self, org):
# if iperf config is present and validate it's params
if app_settings.IPERF_CHECK_CONFIG:
def _validate_iperf3_config(self, org):
# if iperf3 config is present and validate it's params
if app_settings.IPERF3_CHECK_CONFIG:
self.validate_params(
params=app_settings.IPERF_CHECK_CONFIG.get(str(org.id))
params=app_settings.IPERF3_CHECK_CONFIG.get(str(org.id))
)

def check(self, store=True):
lock_acquired = False
org = self.related_object.organization
self._validate_iperf_config(org)
available_iperf_servers = self._get_param('host', 'host.default')
if not available_iperf_servers:
self._validate_iperf3_config(org)
available_iperf3_servers = self._get_param('host', 'host.default')
if not available_iperf3_servers:
logger.warning(
(
f'Iperf servers for organization "{org}" '
f'is not configured properly, iperf check skipped!'
f'Iperf3 servers for organization "{org}" '
f'is not configured properly, iperf3 check skipped!'
)
)
return
time = self._get_param(
'client_options.time', 'client_options.properties.time.default'
)
# Try to acquire a lock, or put task back on queue
for server in available_iperf_servers:
server_lock_key = f'ow_monitoring_{org}_iperf_check_{server}'
# Set available_iperf_server to the org device
for server in available_iperf3_servers:
server_lock_key = f'ow_monitoring_{org}_iperf3_check_{server}'
# Set available_iperf3_server to the org device
lock_acquired = cache.add(
server_lock_key,
str(self.related_object),
timeout=app_settings.IPERF_CHECK_LOCK_EXPIRE,
timeout=app_settings.IPERF3_CHECK_LOCK_EXPIRE,
)
if lock_acquired:
break
else:
logger.info(
(
f'At the moment, all available iperf servers of organization "{org}" '
f'At the moment, all available iperf3 servers of organization "{org}" '
f'are busy running checks, putting "{self.check_instance}" back in the queue..'
)
)
# Return the iperf_check task to the queue,
# it will executed after 2 * iperf_check_time (TCP+UDP)
# Return the iperf3_check task to the queue,
# it will executed after 2 * iperf3_check_time (TCP+UDP)
self.check_instance.perform_check_delayed(duration=2 * time)
return
try:
# Execute the iperf check with current available server
result = self._run_iperf_check(store, server, time)
# Execute the iperf3 check with current available server
result = self._run_iperf3_check(store, server, time)
finally:
# Release the lock after completion of the check
cache.delete(server_lock_key)
return result

def _run_iperf_check(self, store, server, time):
def _run_iperf3_check(self, store, server, time):
device_connection = self._get_device_connection()
if not device_connection:
logger.warning(
f'Failed to get a working DeviceConnection for "{self.related_object}", iperf check skipped!'
f'Failed to get a working DeviceConnection for "{self.related_object}", iperf3 check skipped!'
)
return
# The DeviceConnection could fail if the management tunnel is down.
if not device_connection.connect():
logger.warning(
f'DeviceConnection for "{self.related_object}" is not working, iperf check skipped!'
f'DeviceConnection for "{self.related_object}" is not working, iperf3 check skipped!'
)
return
command_tcp, command_udp = self._get_check_commands(server)
Expand All @@ -260,24 +260,24 @@ def _run_iperf_check(self, store, server, time):
)
return

result_tcp = self._get_iperf_result(result, exit_code, mode='TCP')
result_tcp = self._get_iperf3_result(result, exit_code, mode='TCP')
# UDP mode
result, exit_code = device_connection.connector_instance.exec_command(
command_udp, raise_unexpected_exit=False
)
result_udp = self._get_iperf_result(result, exit_code, mode='UDP')
result_udp = self._get_iperf3_result(result, exit_code, mode='UDP')
result = {}
if store and result_tcp and result_udp:
# Store iperf_result field 1 if any mode passes, store 0 when both fails
iperf_result = result_tcp['iperf_result'] | result_udp['iperf_result']
result.update({**result_tcp, **result_udp, 'iperf_result': iperf_result})
# Store iperf3_result field 1 if any mode passes, store 0 when both fails
iperf3_result = result_tcp['iperf3_result'] | result_udp['iperf3_result']
result.update({**result_tcp, **result_udp, 'iperf3_result': iperf3_result})
self.store_result(result)
device_connection.disconnect()
return result

def _get_check_commands(self, server):
"""
Returns tcp & udp commands for iperf check
Returns tcp & udp commands for iperf3 check
"""
username = self._get_param('username', 'username.default')
port = self._get_param(
Expand Down Expand Up @@ -310,8 +310,8 @@ def _get_check_commands(self, server):
'client_options.properties.udp.properties.length.default',
)

rev_or_bidir, test_end_condition = self._get_iperf_test_conditions()
logger.info(f'«« Iperf server : {server}, Device : {self.related_object} »»')
rev_or_bidir, test_end_condition = self._get_iperf3_test_conditions()
logger.info(f'«« Iperf3 server : {server}, Device : {self.related_object} »»')
command_tcp = (
f'iperf3 -c {server} -p {port} {test_end_condition} --connect-timeout {ct} '
f'-b {tcp_bitrate} -l {tcp_length} -w {window} -P {parallel} {rev_or_bidir} -J'
Expand All @@ -327,7 +327,7 @@ def _get_check_commands(self, server):
password = self._get_param('password', 'password.default')
key = self._get_param('rsa_public_key', 'rsa_public_key.default')
rsa_public_key = self._get_compelete_rsa_key(key)
rsa_public_key_path = '/tmp/iperf-public-key.pem'
rsa_public_key_path = '/tmp/iperf3-public-key.pem'

command_tcp = (
f'echo "{rsa_public_key}" > {rsa_public_key_path} && '
Expand All @@ -342,14 +342,14 @@ def _get_check_commands(self, server):
f'-b {udp_bitrate} -l {udp_length} -w {window} -P {parallel} {rev_or_bidir} -u -J'
)

# If IPERF_CHECK_DELETE_RSA_KEY, remove rsa_public_key from the device
if app_settings.IPERF_CHECK_DELETE_RSA_KEY:
# If IPERF3_CHECK_DELETE_RSA_KEY, remove rsa_public_key from the device
if app_settings.IPERF3_CHECK_DELETE_RSA_KEY:
command_udp = f'{command_udp} && rm -f {rsa_public_key_path}'
return command_tcp, command_udp

def _get_iperf_test_conditions(self):
def _get_iperf3_test_conditions(self):
"""
Returns iperf check test conditions (rev_or_bidir, end_condition)
Returns iperf3 check test conditions (rev_or_bidir, end_condition)
"""
time = self._get_param(
'client_options.time', 'client_options.properties.time.default'
Expand All @@ -368,7 +368,7 @@ def _get_iperf_test_conditions(self):
'client_options.properties.bidirectional.default',
)
# by default we use 'time' param
# for the iperf test end condition
# for the iperf3 test end condition
test_end_condition = f'-t {time}'
# if 'bytes' present in config
# use it instead of 'time'
Expand Down Expand Up @@ -425,41 +425,41 @@ def _get_param(self, conf_key, default_conf_key):
Returns specified param or its default value according to the schema
"""
org_id = str(self.related_object.organization.id)
iperf_config = app_settings.IPERF_CHECK_CONFIG
iperf3_config = app_settings.IPERF3_CHECK_CONFIG

if self.params:
check_params = self._deep_get(self.params, conf_key)
if check_params:
return check_params

if iperf_config:
iperf_config = iperf_config.get(org_id)
iperf_config_param = self._deep_get(iperf_config, conf_key)
if iperf_config_param:
return iperf_config_param
if iperf3_config:
iperf3_config = iperf3_config.get(org_id)
iperf3_config_param = self._deep_get(iperf3_config, conf_key)
if iperf3_config_param:
return iperf3_config_param

return self._deep_get(DEFAULT_IPERF_CHECK_CONFIG, default_conf_key)
return self._deep_get(DEFAULT_IPERF3_CHECK_CONFIG, default_conf_key)

def _get_iperf_result(self, result, exit_code, mode):
def _get_iperf3_result(self, result, exit_code, mode):
"""
Returns iperf test result
Returns iperf3 test result
"""
try:
result = loads(result)
except JSONDecodeError:
# Errors other than iperf3 test errors
logger.warning(
f'Iperf check failed for "{self.related_object}", error - {result.strip()}'
f'Iperf3 check failed for "{self.related_object}", error - {result.strip()}'
)
return

if mode == 'TCP':
if exit_code != 0:
logger.warning(
f'Iperf check failed for "{self.related_object}", {result["error"]}'
f'Iperf3 check failed for "{self.related_object}", {result["error"]}'
)
return {
'iperf_result': 0,
'iperf3_result': 0,
'sent_bps_tcp': 0.0,
'received_bps_tcp': 0.0,
'sent_bytes_tcp': 0,
Expand All @@ -470,7 +470,7 @@ def _get_iperf_result(self, result, exit_code, mode):
sent = result['end']['sum_sent']
received = result['end']['sum_received']
return {
'iperf_result': 1,
'iperf3_result': 1,
'sent_bps_tcp': float(sent['bits_per_second']),
'received_bps_tcp': float(received['bits_per_second']),
'sent_bytes_tcp': sent['bytes'],
Expand All @@ -481,10 +481,10 @@ def _get_iperf_result(self, result, exit_code, mode):
elif mode == 'UDP':
if exit_code != 0:
logger.warning(
f'Iperf check failed for "{self.related_object}", {result["error"]}'
f'Iperf3 check failed for "{self.related_object}", {result["error"]}'
)
return {
'iperf_result': 0,
'iperf3_result': 0,
'sent_bps_udp': 0.0,
'sent_bytes_udp': 0,
'jitter': 0.0,
Expand All @@ -494,7 +494,7 @@ def _get_iperf_result(self, result, exit_code, mode):
}
else:
return {
'iperf_result': 1,
'iperf3_result': 1,
'sent_bps_udp': float(result['end']['sum']['bits_per_second']),
'sent_bytes_udp': result['end']['sum']['bytes'],
'jitter': float(result['end']['sum']['jitter_ms']),
Expand All @@ -509,8 +509,8 @@ def store_result(self, result):
"""
metric = self._get_metric()
copied = result.copy()
iperf_result = copied.pop('iperf_result')
metric.write(iperf_result, extra_values=copied)
iperf3_result = copied.pop('iperf3_result')
metric.write(iperf3_result, extra_values=copied)

def _get_metric(self):
"""
Expand All @@ -524,15 +524,15 @@ def _get_metric(self):

def _create_alert_settings(self, metric):
"""
Creates default iperf alert settings with is_active=False
Creates default iperf3 alert settings with is_active=False
"""
alert_settings = AlertSettings(metric=metric, is_active=False)
alert_settings.full_clean()
alert_settings.save()

def _create_charts(self, metric):
"""
Creates iperf related charts
Creates iperf3 related charts
"""
charts = [
'bandwidth',
Expand Down
Loading

0 comments on commit f69e854

Please sign in to comment.