Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bases/renku_data_services/k8s_cache/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dataclasses import dataclass
from typing import Self

from renku_data_services.app_config.config import SentryConfig
from renku_data_services.db_config.config import DBConfig


Expand Down Expand Up @@ -73,6 +74,7 @@ class Config:
metrics: _MetricsConfig
image_builders: _ImageBuilderConfig
v1_services: _V1ServicesConfig
sentry: SentryConfig

@classmethod
def from_env(cls) -> Config:
Expand All @@ -82,10 +84,12 @@ def from_env(cls) -> Config:
metrics = _MetricsConfig.from_env()
image_builders = _ImageBuilderConfig.from_env()
v1_services = _V1ServicesConfig.from_env()
sentry = SentryConfig.from_env()
return cls(
db=db,
k8s=k8s,
metrics=metrics,
image_builders=image_builders,
v1_services=v1_services,
sentry=sentry,
)
32 changes: 15 additions & 17 deletions bases/renku_data_services/k8s_cache/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,30 @@ class DependencyManager:

config: Config

quota_repo: QuotaRepository
_k8s_cache: K8sDbCache | None = None
_quota_repo: QuotaRepository | None = field(default=None, repr=False, init=False)
_k8s_cache: K8sDbCache | None = field(default=None, repr=False, init=False)
_metrics_repo: MetricsRepository | None = field(default=None, repr=False, init=False)
_metrics: StagingMetricsService | None = field(default=None, repr=False, init=False)
_rp_repo: ResourcePoolRepository | None = field(default=None, repr=False, init=False)
_cluster_repo: ClusterRepository | None = field(default=None, repr=False, init=False)

@property
def metrics_repo(self) -> MetricsRepository:
"""The DB adapter for metrics."""
if not self._metrics_repo:
self._metrics_repo = MetricsRepository(session_maker=self.config.db.async_session_maker)
return self._metrics_repo

@property
def metrics(self) -> StagingMetricsService:
"""The metrics service interface."""
if not self._metrics:
self._metrics = StagingMetricsService(enabled=self.config.metrics.enabled, metrics_repo=self.metrics_repo)
self._metrics = StagingMetricsService(enabled=self.config.metrics.enabled, metrics_repo=self.metrics_repo())
return self._metrics

@property
def rp_repo(self) -> ResourcePoolRepository:
"""The resource pool repository."""
if not self._rp_repo:
self._rp_repo = ResourcePoolRepository(
session_maker=self.config.db.async_session_maker, quotas_repo=self.quota_repo
session_maker=self.config.db.async_session_maker, quotas_repo=self.quota_repo()
)
return self._rp_repo

Expand All @@ -52,7 +49,6 @@ def cluster_repo(self) -> ClusterRepository:
self._cluster_repo = ClusterRepository(session_maker=self.config.db.async_session_maker)
return self._cluster_repo

@property
def k8s_cache(self) -> K8sDbCache:
"""The DB adapter for the k8s cache."""
if not self._k8s_cache:
Expand All @@ -61,19 +57,21 @@ def k8s_cache(self) -> K8sDbCache:
)
return self._k8s_cache

def quota_repo(self) -> QuotaRepository:
"""The resource quota repository."""
if not self._quota_repo:
# NOTE: We only need the QuotaRepository to instantiate the ResourcePoolRepository which is used to get
# the resource class and pool information for metrics. We don't need quota information for metrics at all
# so we use the dummy client for quotas here as we don't actually access k8s, just the db.
self._quota_repo = QuotaRepository(
DummyCoreClient({}, {}), DummySchedulingClient({}), namespace=self.config.k8s.renku_namespace
)
return self._quota_repo

@classmethod
def from_env(cls) -> "DependencyManager":
"""Create a config from environment variables."""
config = Config.from_env()

# NOTE: We only need the QuotaRepository to instantiate the ResourcePoolRepository which is used to get
# the resource class and pool information for metrics. We don't need quota information for metrics at all
# so we use the dummy client for quotas here as we don't actually access k8s, just the db.
quota_repo = QuotaRepository(
DummyCoreClient({}, {}), DummySchedulingClient({}), namespace=config.k8s.renku_namespace
)

return cls(
config=config,
quota_repo=quota_repo,
)
23 changes: 21 additions & 2 deletions bases/renku_data_services/k8s_cache/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import asyncio

import sentry_sdk
from sentry_sdk.integrations.asyncio import AsyncioIntegration
from sentry_sdk.integrations.grpc import GRPCIntegration

from renku_data_services.app_config import logging
from renku_data_services.k8s.clients import K8sClusterClient
from renku_data_services.k8s.config import KubeConfigEnv, get_clusters
Expand All @@ -20,6 +24,21 @@ async def main() -> None:
dm = DependencyManager.from_env()
default_kubeconfig = KubeConfigEnv()

if dm.config.sentry.enabled:
logger.info("enabling sentry")
sentry_sdk.init(
dsn=dm.config.sentry.dsn,
environment=dm.config.sentry.environment,
release=dm.config.sentry.release or None,
integrations=[
AsyncioIntegration(),
GRPCIntegration(),
],
enable_tracing=dm.config.sentry.sample_rate > 0,
traces_sample_rate=dm.config.sentry.sample_rate,
in_app_include=["renku_data_services"],
)

clusters: dict[ClusterId, K8sClusterClient] = {}
async for client in get_clusters(
kube_conf_root_dir=dm.config.k8s.kube_config_root,
Expand All @@ -35,10 +54,10 @@ async def main() -> None:
kinds.extend([BUILD_RUN_GVK, TASK_RUN_GVK])
logger.info(f"Resources: {kinds}")
watcher = K8sWatcher(
handler=k8s_object_handler(dm.k8s_cache, dm.metrics, rp_repo=dm.rp_repo),
handler=k8s_object_handler(dm.k8s_cache(), dm.metrics(), rp_repo=dm.rp_repo()),
clusters=clusters,
kinds=kinds,
db_cache=dm.k8s_cache,
db_cache=dm.k8s_cache(),
)
await watcher.start()
logger.info("started watching resources")
Expand Down