Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ Adding RPC interface to dynamic-sidecar [part1] #7251

Draft
wants to merge 19 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ class ActivityInfo(BaseModel):


ActivityInfoOrNone: TypeAlias = ActivityInfo | None

DcokerComposeYamlStr: TypeAlias = str
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logging

from models_library.api_schemas_dynamic_sidecar.containers import DcokerComposeYamlStr
from models_library.projects_nodes_io import NodeID
from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace
from pydantic import TypeAdapter

from ....logging_utils import log_decorator
from ... import RabbitMQRPCClient

_logger = logging.getLogger(__name__)


@log_decorator(_logger, level=logging.DEBUG)
async def store_compose_spec(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
node_id: NodeID,
docker_compose_yaml: DcokerComposeYamlStr,
) -> None:
rpc_namespace = RPCNamespace.from_entries(
{"service": "dy-sidecar", "node_id": f"{node_id}"}
)
result = await rabbitmq_rpc_client.request(
rpc_namespace,
TypeAdapter(RPCMethodName).validate_python("store_compose_spec"),
docker_compose_yaml=docker_compose_yaml,
)
assert result is None # nosec
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import logging

from models_library.projects_nodes_io import NodeID
from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace
from pydantic import TypeAdapter

from ....logging_utils import log_decorator
from ... import RabbitMQRPCClient

_logger = logging.getLogger(__name__)


@log_decorator(_logger, level=logging.DEBUG)
async def free_reserved_disk_space(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
node_id: NodeID,
) -> None:
rpc_namespace = RPCNamespace.from_entries(
{"service": "dy-sidecar", "node_id": f"{node_id}"}
)
result = await rabbitmq_rpc_client.request(
rpc_namespace,
TypeAdapter(RPCMethodName).validate_python("free_reserved_disk_space"),
)
assert result is None # nosec
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
from typing import Final

from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage
from models_library.projects_nodes_io import NodeID
Expand All @@ -11,10 +10,6 @@

_logger = logging.getLogger(__name__)

_UPDATE_DISK_USAGE: Final[RPCMethodName] = TypeAdapter(RPCMethodName).validate_python(
"update_disk_usage"
)


@log_decorator(_logger, level=logging.DEBUG)
async def update_disk_usage(
Expand All @@ -28,7 +23,7 @@ async def update_disk_usage(
)
result = await rabbitmq_rpc_client.request(
rpc_namespace,
_UPDATE_DISK_USAGE,
TypeAdapter(RPCMethodName).validate_python("update_disk_usage"),
usage=usage,
)
assert result is None # nosec
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import logging

from models_library.projects_nodes_io import NodeID
from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace
from models_library.sidecar_volumes import VolumeCategory, VolumeStatus
from pydantic import TypeAdapter

from ....logging_utils import log_decorator
from ... import RabbitMQRPCClient

_logger = logging.getLogger(__name__)


@log_decorator(_logger, level=logging.DEBUG)
async def save_volume_state(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
node_id: NodeID,
status: VolumeStatus,
category: VolumeCategory,
) -> None:
rpc_namespace = RPCNamespace.from_entries(
{"service": "dy-sidecar", "node_id": f"{node_id}"}
)
result = await rabbitmq_rpc_client.request(
rpc_namespace,
TypeAdapter(RPCMethodName).validate_python("save_volume_state"),
status=status,
category=category,
)
assert result is None # nosec
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from asyncio import Lock
from typing import Annotated, Any, Final

from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, FastAPI, HTTPException
from fastapi import Path as PathParam
from fastapi import Query, Request, status
from models_library.api_schemas_dynamic_sidecar.containers import (
Expand All @@ -22,18 +22,14 @@
ContainerExecTimeoutError,
)
from ...core.settings import ApplicationSettings
from ...core.validation import (
ComposeSpecValidation,
parse_compose_spec,
validate_compose_spec,
)
from ...core.validation import parse_compose_spec
from ...models.schemas.containers import ContainersComposeSpec
from ...models.shared_store import SharedStore
from ...modules.container_utils import run_command_in_container
from ...modules.mounted_fs import MountedVolumes
from ...services import containers
from ._dependencies import (
get_application,
get_container_restart_lock,
get_mounted_volumes,
get_settings,
get_shared_store,
)
Expand Down Expand Up @@ -65,31 +61,16 @@ def _raise_if_container_is_missing(
@cancel_on_disconnect
async def store_compose_spec(
request: Request,
settings: Annotated[ApplicationSettings, Depends(get_settings)],
containers_compose_spec: ContainersComposeSpec,
shared_store: Annotated[SharedStore, Depends(get_shared_store)],
mounted_volumes: Annotated[MountedVolumes, Depends(get_mounted_volumes)],
app: Annotated[FastAPI, Depends(get_application)],
):
"""
Validates and stores the docker compose spec for the user services.
"""
_ = request
"""Validates and stores the docker compose spec for the user services."""

async with shared_store:
compose_spec_validation: ComposeSpecValidation = await validate_compose_spec(
settings=settings,
compose_file_content=containers_compose_spec.docker_compose_yaml,
mounted_volumes=mounted_volumes,
)
shared_store.compose_spec = compose_spec_validation.compose_spec
shared_store.container_names = compose_spec_validation.current_container_names
shared_store.original_to_container_names = (
compose_spec_validation.original_to_current_container_names
)

_logger.info("Validated compose-spec:\n%s", f"{shared_store.compose_spec}")

assert shared_store.compose_spec # nosec
_ = request
await containers.store_conpose_spec(
app,
docker_compose_yaml=containers_compose_spec.docker_compose_yaml,
)


@router.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async def attach_container_to_network(
)
return

# NOTE: A docker network is only visible on a docker node when it is
# NOTE: A docker overlay network is only visible on a docker node when it is
# used by a container
network = DockerNetwork(docker=docker, id_=item.network_id)
await network.connect(
Expand Down Expand Up @@ -172,7 +172,7 @@ async def detach_container_from_network(
)
return

# NOTE: A docker network is only visible on a docker node when it is
# NOTE: A docker overlay network is only visible on a docker node when it is
# used by a container
network = DockerNetwork(docker=docker, id_=item.network_id)
await network.disconnect({"Container": container_id, "Force": True})
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from fastapi import APIRouter, status

from ...core.reserved_space import remove_reserved_disk_space
from ...services import disk

router = APIRouter()

Expand All @@ -11,4 +11,4 @@
status_code=status.HTTP_204_NO_CONTENT,
)
async def free_reserved_disk_space() -> None:
remove_reserved_disk_space()
disk.remove_reserved_disk_space()
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from fastapi import APIRouter, Depends
from typing import Annotated

from fastapi import APIRouter, Depends, FastAPI
from fastapi import Path as PathParam
from fastapi import status
from models_library.sidecar_volumes import VolumeCategory, VolumeState, VolumeStatus
from models_library.sidecar_volumes import VolumeCategory, VolumeStatus
from pydantic import BaseModel

from ...models.shared_store import SharedStore
from ._dependencies import get_shared_store
from ...services import volumes
from ._dependencies import get_application

router = APIRouter()

Expand All @@ -21,8 +23,7 @@ class PutVolumeItem(BaseModel):
)
async def put_volume_state(
item: PutVolumeItem,
volume_category: VolumeCategory = PathParam(..., alias="id"),
shared_store: SharedStore = Depends(get_shared_store),
app: Annotated[FastAPI, Depends(get_application)],
volume_category: Annotated[VolumeCategory, PathParam(..., alias="id")],
) -> None:
async with shared_store:
shared_store.volume_states[volume_category] = VolumeState(status=item.status)
await volumes.save_volume_state(app, status=item.status, category=volume_category)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from fastapi import FastAPI
from models_library.api_schemas_dynamic_sidecar.containers import DcokerComposeYamlStr
from pydantic import validate_call
from servicelib.rabbitmq import RPCRouter

from ...services import containers

router = RPCRouter()


@router.expose()
@validate_call(config={"arbitrary_types_allowed": True})
async def store_compose_spec(
app: FastAPI, *, docker_compose_yaml: DcokerComposeYamlStr
) -> None:
await containers.store_conpose_spec(app, docker_compose_yaml=docker_compose_yaml)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from fastapi import FastAPI
from servicelib.rabbitmq import RPCRouter

from ...services import disk

router = RPCRouter()


@router.expose()
async def free_reserved_disk_space(_: FastAPI) -> None:
disk.remove_reserved_disk_space()
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from fastapi import FastAPI
from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage
from pydantic import validate_call
from servicelib.rabbitmq import RPCRouter

from ...modules.system_monitor import get_disk_usage_monitor
Expand All @@ -8,5 +9,6 @@


@router.expose()
@validate_call(config={"arbitrary_types_allowed": True})
async def update_disk_usage(app: FastAPI, *, usage: dict[str, DiskUsage]) -> None:
get_disk_usage_monitor(app).set_disk_usage_for_path(usage)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from fastapi import FastAPI
from models_library.sidecar_volumes import VolumeCategory, VolumeStatus
from pydantic import validate_call
from servicelib.rabbitmq import RPCRouter

from ...services import volumes

router = RPCRouter()


@router.expose()
@validate_call(config={"arbitrary_types_allowed": True})
async def save_volume_state(
app: FastAPI, *, status: VolumeStatus, category: VolumeCategory
) -> None:
await volumes.save_volume_state(app, status=status, category=category)
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

from ...core.rabbitmq import get_rabbitmq_rpc_server
from ...core.settings import ApplicationSettings
from . import _disk_usage
from . import _containers, _disk, _disk_usage, _volumes

ROUTERS: list[RPCRouter] = [
_containers.router,
_disk_usage.router,
_disk.router,
_volumes.router,
]


Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from models_library.api_schemas_dynamic_sidecar.containers import DcokerComposeYamlStr
from models_library.services_creation import CreateServiceMetricsAdditionalParams
from pydantic import BaseModel


class ContainersComposeSpec(BaseModel):
docker_compose_yaml: str
docker_compose_yaml: DcokerComposeYamlStr


class ContainersCreate(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import aiofiles
from fastapi import FastAPI
from models_library.api_schemas_dynamic_sidecar.containers import DcokerComposeYamlStr
from models_library.sidecar_volumes import VolumeCategory, VolumeState, VolumeStatus
from pydantic import BaseModel, Field, PrivateAttr

Expand Down Expand Up @@ -50,7 +51,7 @@ class SharedStore(_StoreMixin):
shared_store.container_names = copied_list
"""

compose_spec: str | None = Field(
compose_spec: DcokerComposeYamlStr | None = Field(
default=None, description="stores the stringified compose spec"
)
container_names: list[ContainerNameStr] = Field(
Expand Down Expand Up @@ -119,3 +120,8 @@ async def on_startup() -> None:
)

app.add_event_handler("startup", on_startup)


def get_shared_store(app: FastAPI) -> SharedStore:
shared_store: SharedStore = app.state.shared_store
return shared_store
Loading
Loading