Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion .github/workflows/timeseries-build-pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,31 @@ jobs:
with:
name: unit-test-report
path: /tmp/report.txt

timeseries-function-tests:
name: Timeseries function Test
permissions:
contents: read
packages: read # needed for actions/checkout
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v1
with:
path: timeseries
persist-credentials: false
- name: Run time-series-analytics-microservices function tests
run: |
cd "${{ github.workspace }}"
cd ./microservices/time-series-analytics/tests-function
echo "Running function tests"
pip3 install -r requirements.txt
rm -rf /tmp/test_report/report.html
pytest -v --html=/tmp/test_report/report.html test_docker.py::test_input_endpoint test_docker.py::test_health_check
- name: Upload HTML test report to Github
uses: actions/upload-artifact@v4
with:
name: function-test-report
path: /tmp/test_report

timeseries-scans:
uses: ./.github/workflows/timeseries-scans-virus-bandit-pylint.yaml
uses: ./.github/workflows/timeseries-scans.yaml
39 changes: 39 additions & 0 deletions .github/workflows/timeseries-weekly-functional-tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: "[Time Series Analytics] Run weekly functional tests"
run-name: "[Time Series Analytics] Run weekly functional tests"
on:
schedule:
- cron: '0 14 * * 5' # 14:00 UTC
workflow_dispatch:
permissions: {}

jobs:
timeseries-weekly-functional-tests.yaml:
name: Weekly-Run TimeSeries Analytics Microservice tests
runs-on: ubuntu-24.04
permissions:
contents: read
packages: write
strategy:
fail-fast: false
steps:
- name: Check out edge-ai-libraries repository
uses: actions/checkout@v1
with:
persist-credentials: false
path: edge-ai-libraries-repo

- name: Run time-series-analytics-microservices function tests
run: |
cd "${{ github.workspace }}"
cd ./microservices/time-series-analytics/tests-function
echo "Running function tests"
pip3 install -r tests-function/requirements.txt
rm -rf /tmp/test_report/report.html
pytest -v --html=/tmp/test_report/report.html test_docker.py
- name: Upload HTML test report to Github
uses: actions/upload-artifact@v4
with:
name: function-test-report
path: /tmp/test_report


11 changes: 11 additions & 0 deletions microservices/time-series-analytics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,14 @@ cd edge-ai-libraries/microservices/time-series-analytics
echo "Running unit tests"
./tests/run_tests.sh
```

## Running Functional tests

Follow the steps below to run the automation tests.
```bash
git clone https://github.com/open-edge-platform/edge-ai-libraries
cd edge-ai-libraries/microservices/time-series-analytics/tests-functional
echo "Running automation tests"
pip3 install -r requirements.txt
pytest -v --html=./test_report/report.html .
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pytest==8.4.1
pytest-html==4.1.1
requests==2.32.4
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
#
# Apache v2 license
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
#

import pytest
import requests
import time
import subprocess
import json
import os

# Read the config.json file)
TS_DIR = os.getcwd() + "/../"
config_file = json.load(open(TS_DIR + "config.json"))
print(config_file)

def run_command(command):
"""Run a shell command and return the output."""
result = subprocess.run(command, shell=True, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Command failed: {command}\n{result.stderr}")
return result.stdout.strip()

## REST API Tests

# Get health check /health endpoint
def health_check(port):
"""
Test the health check endpoint of the Time Series Analytics service.
"""
url = f"http://localhost:{port}/health"
try:
response = requests.get(url)
assert response.status_code == 200
assert response.json() == {"status": "kapacitor daemon is running"}
except Exception as e:
pytest.fail(f"Health check failed: {e}")

# Post the OPC UA alerts /opcua_alerts endpoint
def opcua_alerts(port):
"""
Test the OPC UA alerts endpoint of the Time Series Analytics service.
"""
alert_message = {"message": "Test alert"}
try:
url = f"http://localhost:{port}/opcua_alerts"
response = requests.post(url, json=alert_message)
assert response.status_code == 500
assert response.json() == {'detail': '500: OPC UA alerts are not configured in the service'}
except Exception as e:
pytest.fail(f"Failed to post OPC UA alerts: {e}")

# Post valid input data to the /input endpoint
def input_endpoint(port):
"""
Test the input endpoint of the Time Series Analytics service.
"""
input_data = {
"topic": "point_data",
"tags": {
},
"fields": {
"temperature": 30
},
"timestamp": 0
}
try:
url = f"http://localhost:{port}/input"
response = requests.post(url, json=input_data)
assert response.status_code == 200
assert response.json() == {"status": "success", "message": "Data sent to Time Series Analytics microservice"}
except Exception as e:
pytest.fail(f"Failed to post valid input data: {e}")

# Post invalid input data to the /input endpoint
def input_endpoint_invalid_data(port):
"""
Test the input endpoint of the Time Series Analytics service.
"""
input_data = {
"topic": "point_data",
"tags": {
},
"fields": {
"temperature": "invalid_value" # Invalid temperature value
},
"timestamp": 0
}
try:
url = f"http://localhost:{port}/input"
response = requests.post(url, json=input_data)
assert response.status_code == 500
assert "400: unable to parse 'point_data temperature=invalid_value" in response.json().get("detail", "")
except Exception as e:
pytest.fail(f"Failed to post invalid input data: {e}")
input_data["fields"]["temperature"] = ""
try:
url = f"http://localhost:{port}/input"
response = requests.post(url, json=input_data)
assert response.status_code == 500
assert "400: unable to parse 'point_data temperature=" in response.json().get("detail", "")
except Exception as e:
pytest.fail(f"Failed to post no input data: {e}")

# Post no input data to the /input endpoint
def input_endpoint_no_data(port):
"""
Test the input endpoint of the Time Series Analytics service.
"""
input_data = {
"topic": "point_data",
"tags": {
},
"fields": {
"temperature": "" # Invalid temperature value
},
"timestamp": 0
}
try:
url = f"http://localhost:{port}/input"
response = requests.post(url, json=input_data)
assert response.status_code == 500
assert "400: unable to parse 'point_data temperature=" in response.json().get("detail", "")
except Exception as e:
pytest.fail(f"Failed to post no input data: {e}")

# Get config data from the /config endpoint
def get_config_endpoint(port):
"""
Test the config endpoint of the Time Series Analytics service.
"""
url = f"http://localhost:{port}/config"
try:
response = requests.get(url)
assert response.status_code == 200
assert response.json() == config_file
except Exception as e:
pytest.fail(f"Failed to get config data: {e}")

# Post config data to the /config endpoint
def post_config_endpoint(port, cmd):
"""
Test the config endpoint of the Time Series Analytics service.
"""
url = f"http://localhost:{port}/config"
try:
response = requests.post(url, json=config_file)
assert response.status_code == 200
assert response.json() == {"status": "success", "message": "Configuration updated successfully"}
time.sleep(10) # Wait for the configuration to be applied
command = f"{cmd} 2>&1 | grep -i 'Kapacitor daemon process has exited and was reaped.'"
output = run_command(command)
assert "Kapacitor daemon process has exited and was reaped." in output
except Exception as e:
pytest.fail(f"Failed to post config data: {e}")

# Test concurrent API requests
def concurrent_api_requests(port):
"""
Test concurrent API requests to the Time Series Analytics service.
"""
url = f"http://localhost:{port}"
input_data = {
"topic": "point_data",
"tags": {},
"fields": {"temperature": 30},
"timestamp": 0
}
config_file_alerts = config_file.copy()
config_file_alerts["alerts"] = {}
opcua_alert = {"message": "Test alert"}
endpoints = ['/health', '/config', '/opcua_alerts', '/input' ]
print("config file alert", config_file_alerts)
print("config file", config_file)
def get_request(endpoint):
try:
response = requests.get(url + endpoint)
return response.status_code, response.text
except Exception as e:
return None, str(e)

def post_request(endpoint, data):
try:
response = requests.post(url + endpoint, json=data)
return response.status_code, response.json()
except Exception as e:
return None, str(e)

from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
try:
future_get_health = executor.submit(get_request, endpoints[0])
future_get_config = executor.submit(get_request, endpoints[1])

# Schedule the POST request
future_post_alert = executor.submit(post_request, endpoints[2], opcua_alert)
future_post_input = executor.submit(post_request, endpoints[3], input_data)
future_post_config = executor.submit(post_request, endpoints[1], config_file)

# Retrieve results
get_health_result = future_get_health.result()
get_config_result = future_get_config.result()
post_alert_result = future_post_alert.result()

print(f"GET /health: {get_health_result}")
print(f"GET /config: {get_config_result}")
print(f"POST /opcua_alerts: {post_alert_result}")
print(f"POST /input: {future_post_input.result()}")
print(f"POST /config: {future_post_config.result()}")

health_status_code = [200, 500, 503]
health_status_json = [{"status": "kapacitor daemon is running"}, {"detail": "500: Kapacitor daemon is not running"}, {"status":"Port not accessible and kapacitor daemon not running"}]
assert get_health_result[0] in health_status_code
assert json.loads(get_health_result[1]) in health_status_json
assert get_config_result[0] == 200
assert json.loads(get_config_result[1]) == config_file or json.loads(get_config_result[1]) == config_file_alerts
assert post_alert_result[0] == 500
assert post_alert_result[1] == {'detail': '500: OPC UA alerts are not configured in the service'}
assert future_post_input.result()[0] == 200 or future_post_input.result()[0] == 500
assert future_post_input.result()[1] == {"status": "success", "message": "Data sent to Time Series Analytics microservice"} or \
future_post_input.result()[1] == {'detail': '500: Kapacitor daemon is not running'}
assert future_post_config.result()[0] == 200
assert future_post_config.result()[1] == {"status": "success", "message": "Configuration updated successfully"}
except Exception as e:
pytest.fail(f"Concurrent API requests failed: {e}")

# Post invalid config data to the /config endpoint
def post_invalid_config_endpoint(port, cmd):
"""
Test the config endpoint of the Time Series Analytics service.
"""
url = f"http://localhost:{port}/config"
invalid_config_data = config_file.copy()
invalid_config_data["udfs"]["name"] = "udf_classifier"
try:
response = requests.post(url, json=invalid_config_data)
assert response.status_code == 200
assert response.json() == {"status": "success", "message": "Configuration updated successfully"}
time.sleep(15) # Wait for the configuration to be applied
command = f"{cmd} 2>&1 | grep -i 'UDF deployment package directory udf_classifier does not exist. Please check and upload/copy the UDF deployment package.'"
output = run_command(command)
print(output)
assert "UDF deployment package directory udf_classifier does not exist. Please check and upload/copy the UDF deployment package." in output
except Exception as e:
pytest.fail(f"Failed to post config data: {e}")
Loading