diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 29480fcc..17574c3d 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -6,7 +6,21 @@ on: workflow_dispatch: jobs: - build: + test: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r app/requirements.txt + pip install pytest + + - name: Run tests + run: pytest -v diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..4ecb1ad2 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,5 @@ +[pytest] +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..f0f6077b --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,80 @@ +import pytest +import csv +import pandas as pd + +from app.main import app as flask_app + + +@pytest.fixture +def app(): + flask_app.config["TESTING"] = True + return flask_app + + +@pytest.fixture +def client(app): + return app.test_client() + + +@pytest.fixture +def sample_iris_points(): + points = [] + calibration_targets = [ + (100, 100, 0.30, 0.40, 0.60, 0.40), + (500, 300, 0.50, 0.50, 0.80, 0.50), + (900, 500, 0.70, 0.60, 0.90, 0.60), + (100, 500, 0.20, 0.70, 0.50, 0.70), + (900, 100, 0.60, 0.30, 0.85, 0.30), + ] + for px, py, lx, ly, rx, ry in calibration_targets: + for jitter in [-0.01, 0.0, 0.01]: + points.append({ + "left_iris_x": lx + jitter, "left_iris_y": ly + jitter, + "right_iris_x": rx + jitter, "right_iris_y": ry + jitter, + "point_x": px, "point_y": py, + }) + return points + + +@pytest.fixture +def sample_calib_iris_points(): + return [ + {"left_iris_x": 0.35, "left_iris_y": 0.45, "right_iris_x": 0.65, "right_iris_y": 0.45}, + {"left_iris_x": 0.55, "left_iris_y": 0.55, "right_iris_x": 0.85, "right_iris_y": 0.55}, + {"left_iris_x": 0.45, "left_iris_y": 0.50, "right_iris_x": 0.75, "right_iris_y": 0.50}, + ] + + +@pytest.fixture +def calib_csv_path(tmp_path, sample_iris_points): + csv_path = tmp_path / "test_fixed_train_data.csv" + columns = ["left_iris_x", "left_iris_y", "right_iris_x", "right_iris_y", + "point_x", "point_y", "screen_height", "screen_width"] + with open(csv_path, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=columns) + writer.writeheader() + for row in sample_iris_points: + writer.writerow({**row, "screen_height": 600, "screen_width": 1000}) + return str(csv_path) + + +@pytest.fixture +def predict_csv_path(tmp_path, sample_calib_iris_points): + csv_path = tmp_path / "test_predict_data.csv" + columns = ["left_iris_x", "left_iris_y", "right_iris_x", "right_iris_y"] + with open(csv_path, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=columns) + writer.writeheader() + for row in sample_calib_iris_points: + writer.writerow(row) + return str(csv_path) + + +@pytest.fixture +def sample_metrics_df(): + return pd.DataFrame({ + "True X": [100, 100, 100, 500, 500], + "Predicted X": [110, 95, 105, 490, 510], + "True Y": [200, 200, 200, 400, 400], + "Predicted Y": [210, 195, 205, 390, 410], + }) diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 00000000..bcbfdb26 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,32 @@ +from app.services.config import hyperparameters + + +TUNABLE_MODELS = [ + "Lasso Regression", "Ridge Regression", "Elastic Net", + "Bayesian Ridge", "SGD Regressor", + "Support Vector Regressor", "Random Forest Regressor", +] + + +def test_all_tunable_models_present(): + for model in TUNABLE_MODELS: + assert model in hyperparameters + + +def test_param_grids_are_valid(): + for name, config in hyperparameters.items(): + assert "param_grid" in config + grid = config["param_grid"] + assert len(grid) > 0 + for param, values in grid.items(): + assert isinstance(values, list), f"{name}.{param} should be a list" + + +def test_ridge_alpha_values(): + alphas = hyperparameters["Ridge Regression"]["param_grid"]["ridge__alpha"] + assert min(alphas) > 0 + assert len(alphas) >= 5 + + +def test_linear_regression_not_included(): + assert "Linear Regression" not in hyperparameters diff --git a/tests/test_gaze_tracker.py b/tests/test_gaze_tracker.py new file mode 100644 index 00000000..08eee387 --- /dev/null +++ b/tests/test_gaze_tracker.py @@ -0,0 +1,125 @@ +import numpy as np +from sklearn.pipeline import Pipeline + +from app.services.gaze_tracker import ( + squash, normalizeData, models, + trian_and_predict, predict, predict_new_data_simple, +) + + +def test_squash_zero_in_zero_out(): + assert squash(0) == 0.0 + + +def test_squash_bounded_output(): + assert 0 < squash(1.0) < 1.0 + assert -1.0 < squash(-1.0) < 0 + + +def test_squash_symmetry(): + assert abs(squash(0.5) + squash(-0.5)) < 1e-10 + + +def test_squash_saturates(): + assert abs(squash(100.0) - 1.0) < 0.01 + + +def test_squash_respects_limit(): + result = squash(2.0, limit=2.0) + assert abs(result - np.tanh(1.0)) < 1e-10 + + +def test_squash_handles_arrays(): + result = squash(np.array([-1.0, 0.0, 1.0])) + assert len(result) == 3 + assert result[1] == 0.0 + + +def test_normalize_maps_to_0_1(): + result = normalizeData(np.array([0.0, 5.0, 10.0])) + np.testing.assert_array_almost_equal(result, [0.0, 0.5, 1.0]) + + +def test_normalize_handles_negatives(): + result = normalizeData(np.array([-10.0, 0.0, 10.0])) + np.testing.assert_array_almost_equal(result, [0.0, 0.5, 1.0]) + + +def test_normalize_endpoints(): + result = normalizeData(np.array([1.0, 2.0, 3.0])) + assert result[0] == 0.0 + assert result[-1] == 1.0 + + +def test_all_models_registered(): + expected = [ + "Linear Regression", "Ridge Regression", "Lasso Regression", + "Elastic Net", "Bayesian Ridge", "SGD Regressor", + "Support Vector Regressor", "Random Forest Regressor", + ] + for name in expected: + assert name in models + for name, m in models.items(): + assert isinstance(m, Pipeline), f"{name} should be a Pipeline" + + +def test_train_predict_linear(): + np.random.seed(42) + X_train = np.random.rand(20, 2) + y_train = X_train[:, 0] * 100 + X_train[:, 1] * 200 + X_test = np.random.rand(5, 2) + y_test = X_test[:, 0] * 100 + X_test[:, 1] * 200 + + preds = trian_and_predict("Linear Regression", X_train, y_train, X_test, y_test, "X") + assert len(preds) == 5 + + +def test_train_predict_ridge_with_gridsearch(): + np.random.seed(42) + X_train = np.random.rand(30, 2) + y_train = X_train[:, 0] * 100 + X_train[:, 1] * 200 + X_test = np.random.rand(5, 2) + y_test = X_test[:, 0] * 100 + X_test[:, 1] * 200 + + preds = trian_and_predict("Ridge Regression", X_train, y_train, X_test, y_test, "X") + assert len(preds) == 5 + + +def test_predict_full_pipeline(calib_csv_path): + result = predict(calib_csv_path, k=2, model_X="Linear Regression", model_Y="Linear Regression") + assert isinstance(result, dict) + assert "centroids" in result + assert len(result["centroids"]) == 2 + for c in result["centroids"]: + assert len(c) == 2 + + +def test_predict_new_data(calib_csv_path, predict_csv_path, sample_calib_iris_points): + iris_data = [{**pt, "timestamp": i * 100} for i, pt in enumerate(sample_calib_iris_points)] + + result = predict_new_data_simple( + calib_csv_path=calib_csv_path, + predict_csv_path=predict_csv_path, + iris_data=iris_data, + screen_width=1000, screen_height=600, + ) + assert len(result) == len(iris_data) + + for p in result: + assert isinstance(p["predicted_x"], float) + assert isinstance(p["predicted_y"], float) + assert "timestamp" in p + + +def test_predict_new_data_preserves_screen_size(calib_csv_path, predict_csv_path, sample_calib_iris_points): + iris_data = [{**pt, "timestamp": i * 100} for i, pt in enumerate(sample_calib_iris_points)] + + result = predict_new_data_simple( + calib_csv_path=calib_csv_path, + predict_csv_path=predict_csv_path, + iris_data=iris_data, + screen_width=1920, screen_height=1080, + ) + for p in result: + assert p["screen_width"] == 1920 + assert p["screen_height"] == 1080 diff --git a/tests/test_metrics.py b/tests/test_metrics.py new file mode 100644 index 00000000..e6036bf7 --- /dev/null +++ b/tests/test_metrics.py @@ -0,0 +1,65 @@ +import numpy as np +import pandas as pd + +from app.services.metrics import ( + func_precision_x, + func_presicion_y, + func_accuracy_x, + func_accuracy_y, + func_total_accuracy, +) + + +def test_precision_is_zero_when_predictions_are_identical(): + df = pd.DataFrame({"Predicted X": [100.0, 100.0, 100.0]}) + assert func_precision_x(df) == 0.0 + + df_y = pd.DataFrame({"Predicted Y": [200.0, 200.0, 200.0]}) + assert func_presicion_y(df_y) == 0.0 + + +def test_precision_grows_with_spread(): + tight = pd.DataFrame({"Predicted X": [99.0, 100.0, 101.0]}) + wide = pd.DataFrame({"Predicted X": [80.0, 100.0, 120.0]}) + assert func_precision_x(tight) < func_precision_x(wide) + + +def test_precision_known_rms(): + df = pd.DataFrame({"Predicted X": [90.0, 100.0, 110.0]}) + expected = np.sqrt(np.mean([100.0, 0.0, 100.0])) + assert abs(func_precision_x(df) - expected) < 1e-10 + + +def test_accuracy_zero_when_perfect(): + df = pd.DataFrame({"True X": [100.0, 200.0], "Predicted X": [100.0, 200.0]}) + assert func_accuracy_x(df) == 0.0 + + +def test_accuracy_rmse_value(): + df = pd.DataFrame({"True X": [100.0, 200.0], "Predicted X": [110.0, 190.0]}) + assert func_accuracy_x(df) == 10.0 + + +def test_accuracy_y(): + df = pd.DataFrame({"True Y": [100.0, 200.0], "Predicted Y": [105.0, 195.0]}) + assert func_accuracy_y(df) == 5.0 + + +def test_total_accuracy_uses_euclidean_distance(): + df = pd.DataFrame({ + "True X": [100.0], "Predicted X": [103.0], + "True Y": [200.0], "Predicted Y": [204.0], + }) + assert abs(func_total_accuracy(df) - 5.0) < 1e-10 + + +def test_total_accuracy_averages_multiple_points(): + df = pd.DataFrame({ + "True X": [0.0, 0.0], "Predicted X": [3.0, 0.0], + "True Y": [0.0, 0.0], "Predicted Y": [4.0, 5.0], + }) + assert abs(func_total_accuracy(df) - 5.0) < 1e-10 + + +def test_total_accuracy_on_sample_data(sample_metrics_df): + assert func_total_accuracy(sample_metrics_df) > 0 diff --git a/tests/test_nan_conversion.py b/tests/test_nan_conversion.py new file mode 100644 index 00000000..4e74f776 --- /dev/null +++ b/tests/test_nan_conversion.py @@ -0,0 +1,56 @@ +import numpy as np + +from app.routes.session import convert_nan_to_none + + +def test_nan_and_inf_become_none(): + assert convert_nan_to_none(float("nan")) is None + assert convert_nan_to_none(float("inf")) is None + assert convert_nan_to_none(float("-inf")) is None + + +def test_regular_values_pass_through(): + assert convert_nan_to_none(3.14) == 3.14 + assert convert_nan_to_none(0.0) == 0.0 + assert convert_nan_to_none(42) == 42 + assert convert_nan_to_none("hello") == "hello" + assert convert_nan_to_none(None) is None + + +def test_numpy_nan_and_inf(): + assert convert_nan_to_none(np.float64("nan")) is None + assert convert_nan_to_none(np.float64("inf")) is None + + +def test_numpy_types_converted_to_python(): + result_f = convert_nan_to_none(np.float64(2.5)) + assert result_f == 2.5 + assert isinstance(result_f, float) + + result_i = convert_nan_to_none(np.int64(10)) + assert result_i == 10 + assert isinstance(result_i, int) + + +def test_handles_dicts_and_lists(): + assert convert_nan_to_none({"a": 1.0, "b": float("nan")}) == {"a": 1.0, "b": None} + assert convert_nan_to_none([1.0, float("nan"), float("inf")]) == [1.0, None, None] + + +def test_handles_nested_structures(): + obj = { + "outer": { + "values": [1.0, float("nan")], + "score": float("inf"), + }, + "name": "test", + } + result = convert_nan_to_none(obj) + assert result["outer"]["values"] == [1.0, None] + assert result["outer"]["score"] is None + assert result["name"] == "test" + + +def test_empty_containers_unchanged(): + assert convert_nan_to_none({}) == {} + assert convert_nan_to_none([]) == [] diff --git a/tests/test_routes.py b/tests/test_routes.py new file mode 100644 index 00000000..544f70ef --- /dev/null +++ b/tests/test_routes.py @@ -0,0 +1,113 @@ +import json +import csv +import os +from pathlib import Path +from unittest.mock import patch + + +def test_health_endpoint(client): + resp = client.get("/api/session/health") + assert resp.status_code == 200 + assert resp.get_json() == {"status": "ok"} + assert resp.content_type == "application/json" + + +def test_calib_validation(client, sample_iris_points, sample_calib_iris_points, tmp_path): + with patch("app.routes.session.Path") as mock_path: + mock_path.return_value.absolute.return_value = tmp_path + + form_data = { + "from_ruxailab": json.dumps(False), + "file_name": json.dumps("test_session_123"), + "fixed_circle_iris_points": json.dumps(sample_iris_points), + "calib_circle_iris_points": json.dumps(sample_calib_iris_points), + "screen_height": json.dumps(600), + "screen_width": json.dumps(1000), + "model": json.dumps("Linear Regression"), + "k": json.dumps(2), + } + resp = client.post("/api/session/calib_validation", data=form_data) + assert resp.status_code == 200 + + data = json.loads(resp.data) + assert "centroids" in data + + +def test_calib_validation_fails_without_data(client): + resp = client.post("/api/session/calib_validation", data={}) + assert resp.status_code in (400, 500) + + +def test_calib_validation_rejects_get(client): + assert client.get("/api/session/calib_validation").status_code == 405 + + +def test_batch_predict_needs_calib_id(client): + payload = { + "iris_tracking_data": [ + {"left_iris_x": 0.3, "left_iris_y": 0.4, + "right_iris_x": 0.6, "right_iris_y": 0.4, "timestamp": 100} + ], + "screen_width": 1920, + "screen_height": 1080, + } + resp = client.post("/api/session/batch_predict", + data=json.dumps(payload), + content_type="application/json") + assert resp.status_code == 400 + + +def test_batch_predict_full_flow(client, sample_iris_points): + base_path = Path().absolute() / "app/services/calib_validation/csv/data" + os.makedirs(base_path, exist_ok=True) + + calib_id = "integration_test_batch" + calib_csv = base_path / f"{calib_id}_fixed_train_data.csv" + columns = ["left_iris_x", "left_iris_y", "right_iris_x", "right_iris_y", + "point_x", "point_y", "screen_height", "screen_width"] + with open(calib_csv, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=columns) + writer.writeheader() + for row in sample_iris_points: + writer.writerow({**row, "screen_height": 600, "screen_width": 1000}) + + payload = { + "iris_tracking_data": [ + {"left_iris_x": 0.35, "left_iris_y": 0.45, + "right_iris_x": 0.65, "right_iris_y": 0.45, "timestamp": 100}, + {"left_iris_x": 0.55, "left_iris_y": 0.55, + "right_iris_x": 0.85, "right_iris_y": 0.55, "timestamp": 200}, + ], + "screen_width": 1000, + "screen_height": 600, + "calib_id": calib_id, + } + + resp = client.post("/api/session/batch_predict", + data=json.dumps(payload), + content_type="application/json") + assert resp.status_code == 200 + + data = resp.get_json() + assert len(data) == 2 + for p in data: + assert "predicted_x" in p + assert "predicted_y" in p + assert "timestamp" in p + + if calib_csv.exists(): + os.remove(calib_csv) + temp_file = base_path / "temp_batch_predict.csv" + if temp_file.exists(): + os.remove(temp_file) + + +def test_batch_predict_rejects_get(client): + assert client.get("/api/session/batch_predict").status_code == 405 + + +def test_batch_predict_handles_bad_json(client): + resp = client.post("/api/session/batch_predict", + data="not json", + content_type="application/json") + assert resp.status_code == 500 diff --git a/tests/test_session_model.py b/tests/test_session_model.py new file mode 100644 index 00000000..11a67061 --- /dev/null +++ b/tests/test_session_model.py @@ -0,0 +1,55 @@ +from app.models.session import Session + + +def _make_session(**overrides): + defaults = dict( + id=1, title="Test Session", description="A test session", + user_id=42, created_date="2025-01-01", + website_url="https://example.com", + screen_record_url="https://example.com/screen.webm", + webcam_record_url="https://example.com/webcam.webm", + heatmap_url="https://example.com/heatmap.png", + calib_points=[[100, 200], [300, 400]], + iris_points=[[0.3, 0.4], [0.5, 0.6]], + ) + defaults.update(overrides) + return Session(**defaults) + + +def test_attributes_are_stored(): + s = _make_session() + assert s.id == 1 + assert s.title == "Test Session" + assert s.user_id == 42 + assert s.website_url == "https://example.com" + + +def test_to_dict(): + s = _make_session(id=5, title="X", calib_points=[[1, 2]], iris_points=[[0.1, 0.2]]) + d = s.to_dict() + + assert d["id"] == 5 + assert d["title"] == "X" + assert d["callib_points"] == [[1, 2]] + assert d["iris_points"] == [[0.1, 0.2]] + + +def test_to_dict_has_all_expected_keys(): + d = _make_session().to_dict() + assert set(d.keys()) == { + "id", "title", "description", "user_id", "created_date", + "website_url", "screen_record_url", "webcam_record_url", + "heatmap_url", "callib_points", "iris_points", + } + + +def test_none_values_are_preserved(): + s = Session( + id=None, title=None, description=None, user_id=None, + created_date=None, website_url=None, screen_record_url=None, + webcam_record_url=None, heatmap_url=None, calib_points=None, + iris_points=None, + ) + d = s.to_dict() + assert d["id"] is None + assert d["callib_points"] is None