Skip to content

Commit

Permalink
feat: Add load_only_active to images resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Feb 11, 2025
1 parent 6edb239 commit 2dedaa9
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 19 deletions.
3 changes: 3 additions & 0 deletions docs/manager/graphql-reference/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type Queries {
is_installed: Boolean
is_operation: Boolean @deprecated(reason: "Deprecated since 24.03.4. This field is ignored if `load_filters` is specified and is not null.")

"""Added in 25.3.0."""
load_only_active: Boolean = true

Check warning on line 87 in docs/manager/graphql-reference/schema.graphql

View workflow job for this annotation

GitHub Actions / GraphQL Inspector

Argument 'load_only_active: Boolean' (with default value) added to field 'Queries.images'

Adding a new argument to an existing field may involve a change in resolve function logic that potentially may cause some side effects.

"""
Added in 24.03.8. Allowed values are: [general, operational, customized]. When superuser queries with `customized` option set the resolver will return every customized images (including those not owned by callee). To resolve images owned by user only call `customized_images`.
"""
Expand Down
3 changes: 0 additions & 3 deletions src/ai/backend/manager/cli/image_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ async def list_images(cli_ctx, short, installed_only):
):
displayed_items = []
try:
# TODO QUESTION: Should we display deleted image here?
# Idea: Add `deleted` option to include deleted images.
items = await ImageRow.list(session)
# NOTE: installed/installed_agents fields are no longer provided in CLI,
Expand Down Expand Up @@ -231,7 +230,6 @@ async def validate_image_canonical(
if current:
architecture = architecture or CURRENT_ARCH

# TODO QUESTION: Should we use deleted image here?
assert architecture is not None
image_row = await ImageRow.resolve(
session, [ImageIdentifier(canonical, architecture)]
Expand All @@ -243,7 +241,6 @@ async def validate_image_canonical(
value = f"{', '.join(value)}"
print(value)
else:
# TODO QUESTION: Should we use deleted image here?
rows = await session.scalars(
sa.select(ImageRow)
.where(ImageRow.name == canonical)
Expand Down
1 change: 0 additions & 1 deletion src/ai/backend/manager/container_registry/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ async def commit_rescan_result(self) -> None:
else:
image_identifiers = [(k.canonical, k.architecture) for k in _all_updates.keys()]
async with self.db.begin_session() as session:
# TODO QUESTION: Should we filter out deleted image here?
existing_images = await session.scalars(
sa.select(ImageRow)
.where(
Expand Down
7 changes: 4 additions & 3 deletions src/ai/backend/manager/container_registry/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ai.backend.common.docker import arch_name_aliases, get_docker_connector
from ai.backend.logging import BraceStyleAdapter

from ..models.image import ImageRow
from ..models.image import ImageRow, ImageStatus
from .base import (
BaseContainerRegistry,
concurrency_sema,
Expand Down Expand Up @@ -81,12 +81,13 @@ async def _read_image_info(
already_exists = 0
config_digest = data["Id"]
async with self.db.begin_readonly_session() as db_session:
# TODO QUESTION: Should we use deleted image here?
already_exists = await db_session.scalar(
sa.select([sa.func.count(ImageRow.id)]).where(
sa.select([sa.func.count(ImageRow.id)])
.where(
ImageRow.config_digest == config_digest,
ImageRow.is_local == sa.false(),
)
.where(ImageRow.status == ImageStatus.ALIVE),
)
if already_exists > 0:
return {}, "already synchronized from a remote registry"
Expand Down
13 changes: 10 additions & 3 deletions src/ai/backend/manager/models/gql.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,10 @@ class Queries(graphene.ObjectType):
is_operation=graphene.Boolean(
deprecation_reason="Deprecated since 24.03.4. This field is ignored if `load_filters` is specified and is not null."
),
load_only_active=graphene.Boolean(
default_value=True,
description="Added in 25.3.0.",
),
load_filters=graphene.List(
graphene.String,
default_value=None,
Expand Down Expand Up @@ -1373,13 +1377,15 @@ async def resolve_image(
client_role = ctx.user["role"]
client_domain = ctx.user["domain_name"]
if id:
item = await Image.load_item_by_id(info.context, uuid.UUID(id))
item = await Image.load_item_by_id(info.context, uuid.UUID(id), load_only_active=False)
else:
if not (reference and architecture):
raise InvalidAPIParameters(
"reference/architecture and id can't be omitted at the same time!"
)
item = await Image.load_item(info.context, reference, architecture)
item = await Image.load_item(
info.context, reference, architecture, load_only_active=False
)
if client_role == UserRole.SUPERADMIN:
pass
elif client_role in (UserRole.ADMIN, UserRole.USER):
Expand Down Expand Up @@ -1428,6 +1434,7 @@ async def resolve_images(
*,
is_installed: bool | None = None,
is_operation=False,
load_only_active: bool = True,
load_filters: list[str] | None = None,
image_filters: list[str] | None = None,
) -> Sequence[Image]:
Expand Down Expand Up @@ -1459,7 +1466,7 @@ async def resolve_images(
# but to conform with previous implementation...
image_load_types.add(ImageLoadFilter.OPERATIONAL)

items = await Image.load_all(ctx, types=image_load_types)
items = await Image.load_all(ctx, types=image_load_types, load_only_active=load_only_active)
if client_role == UserRole.SUPERADMIN:
pass
elif client_role in (UserRole.ADMIN, UserRole.USER):
Expand Down
30 changes: 23 additions & 7 deletions src/ai/backend/manager/models/gql_models/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,15 @@ async def batch_load_by_canonical(
cls,
graph_ctx: GraphQueryContext,
image_names: Sequence[str],
load_only_active: bool = True,
) -> Sequence[Optional[Image]]:
query = (
sa.select(ImageRow)
.where(ImageRow.name.in_(image_names))
.options(selectinload(ImageRow.aliases))
)
if load_only_active:
query = query.where(ImageRow.status == ImageStatus.ALIVE)
async with graph_ctx.db.begin_readonly_session() as session:
result = await session.execute(query)
return [await Image.from_row(graph_ctx, row) for row in result.scalars().all()]
Expand All @@ -210,18 +213,22 @@ async def batch_load_by_image_ref(
cls,
graph_ctx: GraphQueryContext,
image_refs: Sequence[ImageRef],
load_only_active: bool = True,
) -> Sequence[Optional[Image]]:
image_names = [x.canonical for x in image_refs]
return await cls.batch_load_by_canonical(graph_ctx, image_names)
return await cls.batch_load_by_canonical(graph_ctx, image_names, load_only_active)

@classmethod
async def load_item_by_id(
cls,
ctx: GraphQueryContext,
id: UUID,
load_only_active: bool = True,
) -> Image:
async with ctx.db.begin_readonly_session() as session:
row = await ImageRow.get(session, id, load_aliases=True)
row = await ImageRow.get(
session, id, load_aliases=True, load_only_active=load_only_active
)
if not row:
raise ImageNotFound

Expand All @@ -233,6 +240,7 @@ async def load_item(
ctx: GraphQueryContext,
reference: str,
architecture: str,
load_only_active: bool = True,
) -> Image:
try:
async with ctx.db.begin_readonly_session() as session:
Expand All @@ -242,6 +250,7 @@ async def load_item(
ImageIdentifier(reference, architecture),
ImageAlias(reference),
],
load_only_active=load_only_active,
)
except UnknownImageReference:
raise ImageNotFound
Expand All @@ -253,9 +262,12 @@ async def load_all(
ctx: GraphQueryContext,
*,
types: set[ImageLoadFilter] = set(),
load_only_active: bool = True,
) -> Sequence[Image]:
async with ctx.db.begin_readonly_session() as session:
rows = await ImageRow.list(session, load_aliases=True)
rows = await ImageRow.list(
session, load_aliases=True, load_only_active=load_only_active
)
items: list[Image] = [
item async for item in cls.bulk_load(ctx, rows) if item.matches_filter(ctx, types)
]
Expand Down Expand Up @@ -355,13 +367,16 @@ async def batch_load_by_name_and_arch(
cls,
graph_ctx: GraphQueryContext,
name_and_arch: Sequence[tuple[str, str]],
load_only_active: bool = True,
) -> Sequence[Sequence[ImageNode]]:
# TODO QUESTION: Should we filter out deleted image here?
query = (
sa.select(ImageRow)
.where(sa.tuple_(ImageRow.name, ImageRow.architecture).in_(name_and_arch))
.options(selectinload(ImageRow.aliases))
)
if load_only_active:
query = query.where(ImageRow.status == ImageStatus.ALIVE)

async with graph_ctx.db.begin_readonly_session() as db_session:
return await batch_multiresult_in_scalar_stream(
graph_ctx,
Expand All @@ -377,10 +392,12 @@ async def batch_load_by_image_identifier(
cls,
graph_ctx: GraphQueryContext,
image_ids: Sequence[ImageIdentifier],
load_only_active: bool = True,
) -> Sequence[Sequence[ImageNode]]:
# TODO QUESTION: Should we filter out deleted image here?
name_and_arch_tuples = [(img.canonical, img.architecture) for img in image_ids]
return await cls.batch_load_by_name_and_arch(graph_ctx, name_and_arch_tuples)
return await cls.batch_load_by_name_and_arch(
graph_ctx, name_and_arch_tuples, load_only_active
)

@overload
@classmethod
Expand Down Expand Up @@ -456,7 +473,6 @@ async def get_node(cls, info: graphene.ResolveInfo, id: str) -> ImageNode:
graph_ctx: GraphQueryContext = info.context

_, image_id = AsyncNode.resolve_global_id(info, id)
# TODO QUESTION: Should we filter out deleted image here?
query = (
sa.select(ImageRow)
.where(ImageRow.id == image_id)
Expand Down
2 changes: 0 additions & 2 deletions src/ai/backend/manager/models/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,6 @@ async def build_ctx_in_system_scope(
permissions = await self.calculate_permission(ctx, SystemScope())
image_id_permission_map: dict[UUID, frozenset[ImagePermission]] = {}

# TODO QUESTION: Should we filter out deleted image here?
for image_row in await self.db_session.scalars(
sa.select(ImageRow).where(ImageRow.status == ImageStatus.ALIVE)
):
Expand Down Expand Up @@ -988,7 +987,6 @@ async def _in_user_scope(
permissions = await self.calculate_permission(ctx, scope)
image_id_permission_map: dict[UUID, frozenset[ImagePermission]] = {}
allowed_registries: set[str] = set(user_row.domain.allowed_docker_registries)
# TODO QUESTION: Should we filter out deleted image here?
_img_query_stmt = (
sa.select(ImageRow)
.where(ImageRow.status == ImageStatus.ALIVE)
Expand Down

0 comments on commit 2dedaa9

Please sign in to comment.