|
| 1 | +import asyncio |
| 2 | +import contextlib |
1 | 3 | import json |
2 | 4 | from collections.abc import AsyncGenerator, Callable |
3 | 5 | from copy import deepcopy |
| 6 | +from datetime import timedelta |
| 7 | +from pathlib import Path |
4 | 8 | from typing import Any, Protocol |
5 | 9 |
|
6 | 10 | import pytest |
|
19 | 23 | from renku_data_services.data_api.app import register_all_handlers |
20 | 24 | from renku_data_services.data_api.dependencies import DependencyManager |
21 | 25 | from renku_data_services.data_connectors.apispec import DataConnector as ApiDataConnector |
| 26 | +from renku_data_services.k8s.clients import K8sClusterClient |
| 27 | +from renku_data_services.k8s.config import from_kubeconfig_file, get_clusters |
| 28 | +from renku_data_services.k8s.constants import ClusterId |
| 29 | +from renku_data_services.k8s.watcher import K8sWatcher, k8s_object_handler |
22 | 30 | from renku_data_services.migrations.core import run_migrations_for_app |
23 | 31 | from renku_data_services.namespace.apispec import GroupResponse as ApiGroup |
24 | 32 | from renku_data_services.namespace.models import UserNamespace |
| 33 | +from renku_data_services.notebooks.constants import JUPYTER_SESSION_GVK |
25 | 34 | from renku_data_services.project.apispec import Project as ApiProject |
26 | 35 | from renku_data_services.search.apispec import SearchResult |
27 | 36 | from renku_data_services.secrets_storage_api.app import register_all_handlers as register_secrets_handlers |
|
33 | 42 | from renku_data_services.users.dummy_kc_api import DummyKeycloakAPI |
34 | 43 | from renku_data_services.users.models import UserInfo |
35 | 44 | from renku_data_services.utils.middleware import validate_null_byte |
| 45 | +from test.bases.renku_data_services.data_api.utils import KindCluster, setup_amalthea |
36 | 46 | from test.bases.renku_data_services.data_tasks.test_sync import get_kc_users |
37 | 47 | from test.utils import SanicReusableASGITestClient, TestDependencyManager |
38 | 48 |
|
@@ -727,3 +737,52 @@ def __make_headers(user: UserInfo, admin: bool = False) -> dict[str, str]: |
727 | 737 | } |
728 | 738 | ) |
729 | 739 | return {"Authorization": f"Bearer {access_token}"} |
| 740 | + |
| 741 | + |
| 742 | +@pytest.fixture(scope="session") |
| 743 | +def cluster_name(): |
| 744 | + return f"k8s-cluster-{str(ULID()).lower()}" |
| 745 | + |
| 746 | + |
| 747 | +@pytest.fixture(scope="session") |
| 748 | +def kubeconfig_path(monkeysession): |
| 749 | + kconf = ".kind-kubeconfig.yaml" |
| 750 | + monkeysession.setenv("KUBECONFIG", kconf) |
| 751 | + return Path(kconf) |
| 752 | + |
| 753 | + |
| 754 | +@pytest.fixture(scope="session") |
| 755 | +def cluster(cluster_name, kubeconfig_path): |
| 756 | + with KindCluster(cluster_name, kubeconfig=str(kubeconfig_path)) as cluster: |
| 757 | + yield cluster |
| 758 | + |
| 759 | + |
| 760 | +@pytest.fixture(scope="session") |
| 761 | +def amalthea_installation(cluster): |
| 762 | + setup_amalthea("amalthea", "amalthea", "0.22.0", cluster) |
| 763 | + |
| 764 | + |
| 765 | +@pytest_asyncio.fixture |
| 766 | +async def jupyter_server_k8s_watcher(cluster, amalthea_installation, app_manager_instance): |
| 767 | + app_manager = app_manager_instance |
| 768 | + default_kubeconfig = await from_kubeconfig_file(cluster.kubeconfig) |
| 769 | + clusters: dict[ClusterId, K8sClusterClient] = {} |
| 770 | + async for client in get_clusters( |
| 771 | + kube_conf_root_dir=app_manager.config.k8s_config_root, |
| 772 | + default_kubeconfig=default_kubeconfig, |
| 773 | + cluster_repo=app_manager.cluster_repo, |
| 774 | + ): |
| 775 | + clusters[client.get_cluster().id] = client |
| 776 | + |
| 777 | + # sleep to give amalthea a chance to create the CRDs, otherwise the watcher can error out |
| 778 | + await asyncio.sleep(1) |
| 779 | + watcher = K8sWatcher( |
| 780 | + handler=k8s_object_handler(app_manager.config.nb_config.k8s_db_cache, app_manager.metrics, app_manager.rp_repo), |
| 781 | + clusters=clusters, |
| 782 | + kinds=[JUPYTER_SESSION_GVK], |
| 783 | + db_cache=app_manager.config.nb_config.k8s_db_cache, |
| 784 | + ) |
| 785 | + asyncio.create_task(watcher.start()) |
| 786 | + yield |
| 787 | + with contextlib.suppress(TimeoutError): |
| 788 | + await watcher.stop(timeout=timedelta(seconds=1)) |
0 commit comments