diff --git a/src/infrahub_mcp/.markdownlint.yaml b/src/infrahub_mcp/.markdownlint.yaml deleted file mode 100644 index 542272d..0000000 --- a/src/infrahub_mcp/.markdownlint.yaml +++ /dev/null @@ -1,15 +0,0 @@ ---- -default: true -MD013: false # disables max line-length -MD014: false # dollar signs used before commands -MD024: false # disables 'no duplicate headings', which we use in tabs for instructions -MD025: - front_matter_title: "" # prevent collisions with h1s and frontmatter titles -MD029: false # allows manually creating ordered lists -MD033: false # allows inline html to override markdown styles -MD034: false # no-bare-urls -MD041: false # allow 1st line to not be a top-level heading (required for Towncrier) -MD045: false # no alt text around images -MD047: false # single trailing newline -MD059: # Link descriptions that are provibited - prohibited_texts: [] diff --git a/src/infrahub_mcp/branch.py b/src/infrahub_mcp/branch.py deleted file mode 100644 index e289d8e..0000000 --- a/src/infrahub_mcp/branch.py +++ /dev/null @@ -1,63 +0,0 @@ -from typing import TYPE_CHECKING, Annotated - -from fastmcp import Context, FastMCP -from infrahub_sdk.branch import BranchData -from infrahub_sdk.exceptions import GraphQLError -from mcp.types import ToolAnnotations -from pydantic import Field - -from infrahub_mcp.utils import MCPResponse, MCPToolStatus, _log_and_return_error - -if TYPE_CHECKING: - from infrahub_sdk import InfrahubClient - -mcp: FastMCP = FastMCP(name="Infrahub Branches") - - -@mcp.tool( - tags=["branches", "create"], - annotations=ToolAnnotations(readOnlyHint=False, idempotentHint=True, destructiveHint=False), -) -async def branch_create( - ctx: Context, - name: Annotated[str, Field(description="Name of the branch to create.")], - sync_with_git: Annotated[bool, Field(default=False, description="Whether to sync the branch with git.")], -) -> MCPResponse[dict[str, str]]: - """Create a new branch in infrahub. - - Parameters: - name: Name of the branch to create. - sync_with_git: Whether to sync the branch with git. Defaults to False. - - Returns: - Dictionary with success status and branch details. - """ - - client: InfrahubClient = ctx.request_context.lifespan_context.client - ctx.info(f"Creating branch {name} in Infrahub...") - - try: - branch = await client.branch.create(branch_name=name, sync_with_git=sync_with_git, background_execution=False) - - except GraphQLError as exc: - return _log_and_return_error(ctx=ctx, error=exc, remediation="Check the branch name or your permissions.") - - return MCPResponse( - status=MCPToolStatus.SUCCESS, - data={ - "name": branch.name, - "id": branch.id, - }, - ) - - -@mcp.tool(tags=["branches", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) -async def get_branches(ctx: Context) -> MCPResponse[dict[str, BranchData]]: - """Retrieve all branches from infrahub.""" - - client: InfrahubClient = ctx.request_context.lifespan_context.client - ctx.info("Fetching all branches from Infrahub...") - - branches: dict[str, BranchData] = await client.branch.all() - - return MCPResponse(status=MCPToolStatus.SUCCESS, data=branches) diff --git a/src/infrahub_mcp/gql.py b/src/infrahub_mcp/gql.py deleted file mode 100644 index f890032..0000000 --- a/src/infrahub_mcp/gql.py +++ /dev/null @@ -1,44 +0,0 @@ -from typing import TYPE_CHECKING, Annotated, Any - -from fastmcp import Context, FastMCP -from mcp.types import ToolAnnotations -from pydantic import Field - -from infrahub_mcp.utils import MCPResponse, MCPToolStatus - -if TYPE_CHECKING: - from infrahub_sdk import InfrahubClient - -mcp: FastMCP = FastMCP(name="Infrahub GraphQL") - - -@mcp.tool(tags=["schemas", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) -async def get_graphql_schema(ctx: Context) -> MCPResponse[str]: - """Retrieve the GraphQL schema from Infrahub - - Parameters: - None - - Returns: - MCPResponse with the GraphQL schema as a string. - """ - client: InfrahubClient = ctx.request_context.lifespan_context.client - resp = await client._get(url=f"{client.address}/schema.graphql") # noqa: SLF001 - return MCPResponse(status=MCPToolStatus.SUCCESS, data=resp.text) - - -@mcp.tool(tags=["schemas", "retrieve"], annotations=ToolAnnotations(readOnlyHint=False)) -async def query_graphql( - ctx: Context, query: Annotated[str, Field(description="GraphQL query to execute.")] -) -> MCPResponse[dict[str, Any]]: - """Execute a GraphQL query against Infrahub. - - Parameters: - query: GraphQL query to execute. - - Returns: - MCPResponse with the result of the query. - - """ - client: InfrahubClient = ctx.request_context.lifespan_context.client - return await client.execute_graphql(query=query) diff --git a/src/infrahub_mcp/nodes.py b/src/infrahub_mcp/nodes.py deleted file mode 100644 index b6d0c07..0000000 --- a/src/infrahub_mcp/nodes.py +++ /dev/null @@ -1,230 +0,0 @@ -from typing import TYPE_CHECKING, Annotated, Any - -from fastmcp import Context, FastMCP -from infrahub_sdk.exceptions import GraphQLError, SchemaNotFoundError -from infrahub_sdk.types import Order -from mcp.types import ToolAnnotations -from pydantic import Field - -from infrahub_mcp.constants import schema_attribute_type_mapping -from infrahub_mcp.utils import MCPResponse, MCPToolStatus, _log_and_return_error, convert_node_to_dict - -if TYPE_CHECKING: - from infrahub_sdk.client import InfrahubClient - -mcp: FastMCP = FastMCP(name="Infrahub Nodes") - - -@mcp.tool(tags=["nodes", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) -async def get_nodes( - ctx: Context, - kind: Annotated[str, Field(description="Kind of the objects to retrieve.")], - branch: Annotated[ - str | None, - Field(default=None, description="Branch to retrieve the objects from. Defaults to None (uses default branch)."), - ], - filters: Annotated[dict[str, Any] | None, Field(default=None, description="Dictionary of filters to apply.")], - partial_match: Annotated[bool, Field(default=False, description="Whether to use partial matching for filters.")], -) -> MCPResponse[list[str]]: - """Get all objects of a specific kind from Infrahub. - - To retrieve the list of available kinds, use the `get_schema_mapping` tool. - To retrieve the list of available filters for a specific kind, use the `get_node_filters` tool. - - Parameters: - kind: Kind of the objects to retrieve. - branch: Branch to retrieve the objects from. Defaults to None (uses default branch). - filters: Dictionary of filters to apply. - partial_match: Whether to use partial matching for filters. - - Returns: - MCPResponse with success status and objects. - - """ - client: InfrahubClient = ctx.request_context.lifespan_context.client - ctx.info(f"Fetching nodes of kind: {kind} with filters: {filters} from Infrahub...") - - # Verify if the kind exists in the schema and guide Tool if not - try: - schema = await client.schema.get(kind=kind, branch=branch) - except SchemaNotFoundError: - error_msg = f"Schema not found for kind: {kind}." - remediation_msg = "Use the `get_schema_mapping` tool to list available kinds." - return _log_and_return_error(ctx=ctx, error=error_msg, remediation=remediation_msg) - - # TODO: Verify if the filters are valid for the kind and guide Tool if not - - try: - if filters: - nodes = await client.filters( - kind=schema.kind, - branch=branch, - partial_match=partial_match, - parallel=True, - order=Order(disable=True), - populate_store=True, - prefetch_relationships=True, - **filters, - ) - else: - nodes = await client.all( - kind=schema.kind, - branch=branch, - parallel=True, - order=Order(disable=True), - populate_store=True, - prefetch_relationships=True, - ) - except GraphQLError as exc: - return _log_and_return_error(ctx=ctx, error=exc, remediation="Check the provided filters or the kind name.") - - # Format the response with serializable data - # serialized_nodes = [] - # for node in nodes: - # node_data = await convert_node_to_dict(obj=node, branch=branch) - # serialized_nodes.append(node_data) - serialized_nodes = [obj.display_label for obj in nodes] - - # Return the serialized response - ctx.debug(f"Retrieved {len(serialized_nodes)} nodes of kind {kind}") - - return MCPResponse( - status=MCPToolStatus.SUCCESS, - data=serialized_nodes, - ) - - -@mcp.tool(tags=["nodes", "filters", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) -async def get_node_filters( - ctx: Context, - kind: Annotated[str, Field(description="Kind of the objects to retrieve.")], - branch: Annotated[ - str | None, - Field(default=None, description="Branch to retrieve the objects from. Defaults to None (uses default branch)."), - ], -) -> MCPResponse[dict[str, str]]: - """Retrieve all the available filters for a specific schema node kind. - - There's multiple types of filters - attribute filters are in the form attribute__value - - relationship filters are in the form relationship__attribute__value - you can find more information on the peer node of the relationship using the `get_schema` tool - - Filters that start with parent refer to a related generic schema node. - You can find the type of that related node by inspected the output of the `get_schema` tool. - - Parameters: - kind: Kind of the objects to retrieve. - branch: Branch to retrieve the objects from. Defaults to None (uses default branch). - - Returns: - MCPResponse with success status and filters. - """ - client: InfrahubClient = ctx.request_context.lifespan_context.client - ctx.info(f"Fetching available filters for kind: {kind} from Infrahub...") - - # Verify if the kind exists in the schema and guide Tool if not - try: - schema = await client.schema.get(kind=kind, branch=branch) - except SchemaNotFoundError: - error_msg = f"Schema not found for kind: {kind}." - remediation_msg = "Use the `get_schema_mapping` tool to list available kinds." - return _log_and_return_error(ctx=ctx, error=error_msg, remediation=remediation_msg) - - filters = { - f"{attribute.name}__value": schema_attribute_type_mapping.get(attribute.kind, "String") - for attribute in schema.attributes - } - - for relationship in schema.relationships: - relationship_schema = await client.schema.get(kind=relationship.peer) - relationship_filters = { - f"{relationship.name}__{attribute.name}__value": schema_attribute_type_mapping.get(attribute.kind, "String") - for attribute in relationship_schema.attributes - } - filters.update(relationship_filters) - - return MCPResponse( - status=MCPToolStatus.SUCCESS, - data=filters, - ) - - -@mcp.tool(tags=["nodes", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) -async def get_related_nodes( - ctx: Context, - kind: Annotated[str, Field(description="Kind of the objects to retrieve.")], - relation: Annotated[str, Field(description="Name of the relation to fetch.")], - filters: Annotated[dict[str, Any] | None, Field(default=None, description="Dictionary of filters to apply.")], - branch: Annotated[ - str | None, - Field(default=None, description="Branch to retrieve the objects from. Defaults to None (uses default branch)."), - ], -) -> MCPResponse[list[dict[str, Any]]]: - """Retrieve related nodes by relation name and a kind. - - Args: - kind: Kind of the node to fetch. - filters: Filters to apply on the node to fetch. - relation: Name of the relation to fetch. - branch: Branch to fetch the node from. Defaults to None (uses default branch). - - Returns: - MCPResponse with success status and objects. - - """ - client: InfrahubClient = ctx.request_context.lifespan_context.client - filters = filters or {} - if branch: - ctx.info(f"Fetching nodes related to {kind} with filters {filters} in branch {branch} from Infrahub...") - else: - ctx.info(f"Fetching nodes related to {kind} with filters {filters} from Infrahub...") - - try: - node_id = node_hfid = None - if filters.get("ids"): - node_id = filters["ids"][0] - elif filters.get("hfid"): - node_hfid = filters["hfid"] - if node_id: - node = await client.get( - kind=kind, - id=node_id, - branch=branch, - include=[relation], - prefetch_relationships=True, - populate_store=True, - ) - elif node_hfid: - node = await client.get( - kind=kind, - hfid=node_hfid, - branch=branch, - include=[relation], - prefetch_relationships=True, - populate_store=True, - ) - except Exception as exc: # noqa: BLE001 - return _log_and_return_error(exc) - - rel = getattr(node, relation, None) - if not rel: - _log_and_return_error( - ctx=ctx, - error=f"Relation '{relation}' not found in kind '{kind}'.", - remediation="Check the schema for the kind to confirm if the relation exists.", - ) - peers = [ - await convert_node_to_dict( - branch=branch, - obj=peer.peer, - include_id=True, - ) - for peer in rel.peers - ] - - return MCPResponse( - status=MCPToolStatus.SUCCESS, - data=peers, - ) diff --git a/src/infrahub_mcp/schema.py b/src/infrahub_mcp/schema.py deleted file mode 100644 index a911fe0..0000000 --- a/src/infrahub_mcp/schema.py +++ /dev/null @@ -1,140 +0,0 @@ -from typing import TYPE_CHECKING, Annotated, Any - -from fastmcp import Context, FastMCP -from infrahub_sdk.exceptions import BranchNotFoundError, SchemaNotFoundError -from mcp.types import ToolAnnotations -from pydantic import Field - -from infrahub_mcp.constants import NAMESPACES_INTERNAL -from infrahub_mcp.utils import MCPResponse, MCPToolStatus, _log_and_return_error - -if TYPE_CHECKING: - from infrahub_sdk import InfrahubClient - -mcp: FastMCP = FastMCP(name="Infrahub Schemas") - - -@mcp.tool(tags=["schemas", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) -async def get_schema_mapping( - ctx: Context, - branch: Annotated[ - str | None, - Field(default=None, description="Branch to retrieve the mapping from. Defaults to None (uses default branch)."), - ], -) -> MCPResponse[dict[str, str]]: - """List all schema nodes and generics available in Infrahub - - Parameters: - branch: Branch to retrieve the mapping from. Defaults to None (uses default branch). - - Returns: - Dictionary with success status and schema mapping. - """ - client: InfrahubClient = ctx.request_context.lifespan_context.client - if branch: - ctx.info(f"Fetching schema mapping for {branch} from Infrahub...") - else: - ctx.info("Fetching schema mapping from Infrahub...") - - try: - all_schemas = await client.schema.all(branch=branch) - except BranchNotFoundError as exc: - return _log_and_return_error(ctx=ctx, error=exc, remediation="Check the branch name or your permissions.") - - # TODO: Should we add the description ? - schema_mapping = { - kind: node.label or "" for kind, node in all_schemas.items() if node.namespace not in NAMESPACES_INTERNAL - } - - return MCPResponse( - status=MCPToolStatus.SUCCESS, - data=schema_mapping, - ) - - -@mcp.tool(tags=["schemas", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) -async def get_schema( - ctx: Context, - kind: Annotated[str, Field(description="Schema Kind to retrieve.")], - branch: Annotated[ - str | None, - Field(default=None, description="Branch to retrieve the schema from. Defaults to None (uses default branch)."), - ], -) -> MCPResponse[dict[str, Any]]: - """Retrieve the full schema for a specific kind. - This includes attributes, relationships, and their types. - - Parameters: - kind: Schema Kind to retrieve. - branch: Branch to retrieve the schema from. Defaults to None (uses default branch). - - Returns: - Dictionary with success status and schema. - """ - client: InfrahubClient = ctx.request_context.lifespan_context.client - ctx.info(f"Fetching schema of {kind} from Infrahub...") - - try: - schema = await client.schema.get(kind=kind, branch=branch) - except SchemaNotFoundError: - error_msg = f"Schema not found for kind: {kind}." - remediation_msg = "Use the `get_schema_mapping` tool to list available kinds." - return _log_and_return_error(ctx=ctx, error=error_msg, remediation=remediation_msg) - except BranchNotFoundError as exc: - return _log_and_return_error(ctx=ctx, error=exc, remediation="Check the branch name or your permissions.") - - schema = await client.schema.get(kind=kind, branch=branch) - - return MCPResponse( - status=MCPToolStatus.SUCCESS, - data=schema.model_dump(), - ) - - -@mcp.tool(tags=["schemas", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) -async def get_schemas( - ctx: Context, - branch: Annotated[ - str | None, - Field(default=None, description="Branch to retrieve schemas from. Defaults to None (uses default branch)."), - ], - exclude_profiles: Annotated[ - bool, Field(default=True, description="Whether to exclude Profile schemas. Defaults to True.") - ], - exclude_templates: Annotated[ - bool, Field(default=True, description="Whether to exclude Template schemas. Defaults to True.") - ], -) -> MCPResponse[dict[str, dict[str, Any]]]: - """Retrieve all schemas from Infrahub, optionally excluding Profiles and Templates. - - Parameters: - infrahub_client: Infrahub client to use - branch: Branch to retrieve schemas from - exclude_profiles: Whether to exclude Profile schemas. Defaults to True. - exclude_templates: Whether to exclude Template schemas. Defaults to True. - - Returns: - Dictionary with success status and schemas. - - """ - client: InfrahubClient = ctx.request_context.lifespan_context.client - ctx.info(f"Fetching all schemas in branch {branch or 'main'} from Infrahub...") - - try: - all_schemas = await client.schema.all(branch=branch) - except BranchNotFoundError as exc: - return _log_and_return_error(ctx=ctx, error=exc, remediation="Check the branch name or your permissions.") - - # Filter out Profile and Template if requested - filtered_schemas = {} - for kind, schema in all_schemas.items(): - if (exclude_templates and schema.namespace == "Template") or ( - exclude_profiles and schema.namespace == "Profile" - ): - continue - filtered_schemas[kind] = schema.model_dump() - - return MCPResponse( - status=MCPToolStatus.SUCCESS, - data=filtered_schemas, - ) diff --git a/src/infrahub_mcp/tools/branch.py b/src/infrahub_mcp/tools/branch.py index e289d8e..5688686 100644 --- a/src/infrahub_mcp/tools/branch.py +++ b/src/infrahub_mcp/tools/branch.py @@ -15,14 +15,14 @@ @mcp.tool( - tags=["branches", "create"], + tags={"branches", "create"}, annotations=ToolAnnotations(readOnlyHint=False, idempotentHint=True, destructiveHint=False), ) async def branch_create( ctx: Context, name: Annotated[str, Field(description="Name of the branch to create.")], sync_with_git: Annotated[bool, Field(default=False, description="Whether to sync the branch with git.")], -) -> MCPResponse[dict[str, str]]: +) -> MCPResponse: """Create a new branch in infrahub. Parameters: @@ -34,13 +34,13 @@ async def branch_create( """ client: InfrahubClient = ctx.request_context.lifespan_context.client - ctx.info(f"Creating branch {name} in Infrahub...") + await ctx.info(f"Creating branch {name} in Infrahub...") try: branch = await client.branch.create(branch_name=name, sync_with_git=sync_with_git, background_execution=False) except GraphQLError as exc: - return _log_and_return_error(ctx=ctx, error=exc, remediation="Check the branch name or your permissions.") + return await _log_and_return_error(ctx=ctx, error=exc, remediation="Check the branch name or your permissions.") return MCPResponse( status=MCPToolStatus.SUCCESS, @@ -51,12 +51,12 @@ async def branch_create( ) -@mcp.tool(tags=["branches", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) -async def get_branches(ctx: Context) -> MCPResponse[dict[str, BranchData]]: +@mcp.tool(tags={"branches", "retrieve"}, annotations=ToolAnnotations(readOnlyHint=True)) +async def get_branches(ctx: Context) -> MCPResponse: """Retrieve all branches from infrahub.""" client: InfrahubClient = ctx.request_context.lifespan_context.client - ctx.info("Fetching all branches from Infrahub...") + await ctx.info("Fetching all branches from Infrahub...") branches: dict[str, BranchData] = await client.branch.all() diff --git a/src/infrahub_mcp/tools/gql.py b/src/infrahub_mcp/tools/gql.py index f890032..33b5f07 100644 --- a/src/infrahub_mcp/tools/gql.py +++ b/src/infrahub_mcp/tools/gql.py @@ -12,8 +12,8 @@ mcp: FastMCP = FastMCP(name="Infrahub GraphQL") -@mcp.tool(tags=["schemas", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) -async def get_graphql_schema(ctx: Context) -> MCPResponse[str]: +@mcp.tool(tags={"schemas", "retrieve"}, annotations=ToolAnnotations(readOnlyHint=True)) +async def get_graphql_schema(ctx: Context) -> MCPResponse: """Retrieve the GraphQL schema from Infrahub Parameters: @@ -27,7 +27,7 @@ async def get_graphql_schema(ctx: Context) -> MCPResponse[str]: return MCPResponse(status=MCPToolStatus.SUCCESS, data=resp.text) -@mcp.tool(tags=["schemas", "retrieve"], annotations=ToolAnnotations(readOnlyHint=False)) +@mcp.tool(tags={"schemas", "retrieve"}, annotations=ToolAnnotations(readOnlyHint=False)) async def query_graphql( ctx: Context, query: Annotated[str, Field(description="GraphQL query to execute.")] ) -> MCPResponse[dict[str, Any]]: @@ -41,4 +41,6 @@ async def query_graphql( """ client: InfrahubClient = ctx.request_context.lifespan_context.client - return await client.execute_graphql(query=query) + data = await client.execute_graphql(query=query) + + return MCPResponse(status=MCPToolStatus.SUCCESS, data=data) diff --git a/src/infrahub_mcp/tools/nodes.py b/src/infrahub_mcp/tools/nodes.py index b6d0c07..a7104e6 100644 --- a/src/infrahub_mcp/tools/nodes.py +++ b/src/infrahub_mcp/tools/nodes.py @@ -15,7 +15,7 @@ mcp: FastMCP = FastMCP(name="Infrahub Nodes") -@mcp.tool(tags=["nodes", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) +@mcp.tool(tags={"nodes", "retrieve"}, annotations=ToolAnnotations(readOnlyHint=True)) async def get_nodes( ctx: Context, kind: Annotated[str, Field(description="Kind of the objects to retrieve.")], @@ -25,7 +25,7 @@ async def get_nodes( ], filters: Annotated[dict[str, Any] | None, Field(default=None, description="Dictionary of filters to apply.")], partial_match: Annotated[bool, Field(default=False, description="Whether to use partial matching for filters.")], -) -> MCPResponse[list[str]]: +) -> MCPResponse: """Get all objects of a specific kind from Infrahub. To retrieve the list of available kinds, use the `get_schema_mapping` tool. @@ -42,7 +42,7 @@ async def get_nodes( """ client: InfrahubClient = ctx.request_context.lifespan_context.client - ctx.info(f"Fetching nodes of kind: {kind} with filters: {filters} from Infrahub...") + await ctx.info(f"Fetching nodes of kind: {kind} with filters: {filters} from Infrahub...") # Verify if the kind exists in the schema and guide Tool if not try: @@ -50,12 +50,13 @@ async def get_nodes( except SchemaNotFoundError: error_msg = f"Schema not found for kind: {kind}." remediation_msg = "Use the `get_schema_mapping` tool to list available kinds." - return _log_and_return_error(ctx=ctx, error=error_msg, remediation=remediation_msg) + return await _log_and_return_error(ctx=ctx, error=error_msg, remediation=remediation_msg) # TODO: Verify if the filters are valid for the kind and guide Tool if not try: if filters: + await ctx.debug(f"Applying filters: {filters} with partial_match={partial_match}") nodes = await client.filters( kind=schema.kind, branch=branch, @@ -76,7 +77,7 @@ async def get_nodes( prefetch_relationships=True, ) except GraphQLError as exc: - return _log_and_return_error(ctx=ctx, error=exc, remediation="Check the provided filters or the kind name.") + return await _log_and_return_error(ctx=ctx, error=exc, remediation="Check the provided filters or the kind name.") # Format the response with serializable data # serialized_nodes = [] @@ -86,7 +87,7 @@ async def get_nodes( serialized_nodes = [obj.display_label for obj in nodes] # Return the serialized response - ctx.debug(f"Retrieved {len(serialized_nodes)} nodes of kind {kind}") + await ctx.debug(f"Retrieved {len(serialized_nodes)} nodes of kind {kind}") return MCPResponse( status=MCPToolStatus.SUCCESS, @@ -94,7 +95,7 @@ async def get_nodes( ) -@mcp.tool(tags=["nodes", "filters", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) +@mcp.tool(tags={"nodes", "filters", "retrieve"}, annotations=ToolAnnotations(readOnlyHint=True)) async def get_node_filters( ctx: Context, kind: Annotated[str, Field(description="Kind of the objects to retrieve.")], @@ -102,7 +103,7 @@ async def get_node_filters( str | None, Field(default=None, description="Branch to retrieve the objects from. Defaults to None (uses default branch)."), ], -) -> MCPResponse[dict[str, str]]: +) -> MCPResponse: """Retrieve all the available filters for a specific schema node kind. There's multiple types of filters @@ -122,7 +123,7 @@ async def get_node_filters( MCPResponse with success status and filters. """ client: InfrahubClient = ctx.request_context.lifespan_context.client - ctx.info(f"Fetching available filters for kind: {kind} from Infrahub...") + await ctx.info(f"Fetching available filters for kind: {kind} from Infrahub...") # Verify if the kind exists in the schema and guide Tool if not try: @@ -130,7 +131,7 @@ async def get_node_filters( except SchemaNotFoundError: error_msg = f"Schema not found for kind: {kind}." remediation_msg = "Use the `get_schema_mapping` tool to list available kinds." - return _log_and_return_error(ctx=ctx, error=error_msg, remediation=remediation_msg) + return await _log_and_return_error(ctx=ctx, error=error_msg, remediation=remediation_msg) filters = { f"{attribute.name}__value": schema_attribute_type_mapping.get(attribute.kind, "String") @@ -151,7 +152,7 @@ async def get_node_filters( ) -@mcp.tool(tags=["nodes", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) +@mcp.tool(tags={"nodes", "retrieve"}, annotations=ToolAnnotations(readOnlyHint=True)) async def get_related_nodes( ctx: Context, kind: Annotated[str, Field(description="Kind of the objects to retrieve.")], @@ -161,7 +162,7 @@ async def get_related_nodes( str | None, Field(default=None, description="Branch to retrieve the objects from. Defaults to None (uses default branch)."), ], -) -> MCPResponse[list[dict[str, Any]]]: +) -> MCPResponse: """Retrieve related nodes by relation name and a kind. Args: @@ -177,9 +178,9 @@ async def get_related_nodes( client: InfrahubClient = ctx.request_context.lifespan_context.client filters = filters or {} if branch: - ctx.info(f"Fetching nodes related to {kind} with filters {filters} in branch {branch} from Infrahub...") + await ctx.info(f"Fetching nodes related to {kind} with filters {filters} in branch {branch} from Infrahub...") else: - ctx.info(f"Fetching nodes related to {kind} with filters {filters} from Infrahub...") + await ctx.info(f"Fetching nodes related to {kind} with filters {filters} from Infrahub...") try: node_id = node_hfid = None @@ -206,11 +207,11 @@ async def get_related_nodes( populate_store=True, ) except Exception as exc: # noqa: BLE001 - return _log_and_return_error(exc) + return await _log_and_return_error(ctx=ctx, error=exc) rel = getattr(node, relation, None) if not rel: - _log_and_return_error( + return await _log_and_return_error( ctx=ctx, error=f"Relation '{relation}' not found in kind '{kind}'.", remediation="Check the schema for the kind to confirm if the relation exists.", diff --git a/src/infrahub_mcp/tools/schema.py b/src/infrahub_mcp/tools/schema.py index a911fe0..a9ff4c2 100644 --- a/src/infrahub_mcp/tools/schema.py +++ b/src/infrahub_mcp/tools/schema.py @@ -14,14 +14,14 @@ mcp: FastMCP = FastMCP(name="Infrahub Schemas") -@mcp.tool(tags=["schemas", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) +@mcp.tool(tags={"schemas", "retrieve"}, annotations=ToolAnnotations(readOnlyHint=True)) async def get_schema_mapping( ctx: Context, branch: Annotated[ str | None, Field(default=None, description="Branch to retrieve the mapping from. Defaults to None (uses default branch)."), ], -) -> MCPResponse[dict[str, str]]: +) -> MCPResponse: """List all schema nodes and generics available in Infrahub Parameters: @@ -32,14 +32,14 @@ async def get_schema_mapping( """ client: InfrahubClient = ctx.request_context.lifespan_context.client if branch: - ctx.info(f"Fetching schema mapping for {branch} from Infrahub...") + await ctx.info(f"Fetching schema mapping for {branch} from Infrahub...") else: - ctx.info("Fetching schema mapping from Infrahub...") + await ctx.info("Fetching schema mapping from Infrahub...") try: all_schemas = await client.schema.all(branch=branch) except BranchNotFoundError as exc: - return _log_and_return_error(ctx=ctx, error=exc, remediation="Check the branch name or your permissions.") + return await _log_and_return_error(ctx=ctx, error=exc, remediation="Check the branch name or your permissions.") # TODO: Should we add the description ? schema_mapping = { @@ -52,7 +52,7 @@ async def get_schema_mapping( ) -@mcp.tool(tags=["schemas", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) +@mcp.tool(tags={"schemas", "retrieve"}, annotations=ToolAnnotations(readOnlyHint=True)) async def get_schema( ctx: Context, kind: Annotated[str, Field(description="Schema Kind to retrieve.")], @@ -60,7 +60,7 @@ async def get_schema( str | None, Field(default=None, description="Branch to retrieve the schema from. Defaults to None (uses default branch)."), ], -) -> MCPResponse[dict[str, Any]]: +) -> MCPResponse: """Retrieve the full schema for a specific kind. This includes attributes, relationships, and their types. @@ -72,16 +72,16 @@ async def get_schema( Dictionary with success status and schema. """ client: InfrahubClient = ctx.request_context.lifespan_context.client - ctx.info(f"Fetching schema of {kind} from Infrahub...") + await ctx.info(f"Fetching schema of {kind} from Infrahub...") try: schema = await client.schema.get(kind=kind, branch=branch) except SchemaNotFoundError: error_msg = f"Schema not found for kind: {kind}." remediation_msg = "Use the `get_schema_mapping` tool to list available kinds." - return _log_and_return_error(ctx=ctx, error=error_msg, remediation=remediation_msg) + return await _log_and_return_error(ctx=ctx, error=error_msg, remediation=remediation_msg) except BranchNotFoundError as exc: - return _log_and_return_error(ctx=ctx, error=exc, remediation="Check the branch name or your permissions.") + return await _log_and_return_error(ctx=ctx, error=exc, remediation="Check the branch name or your permissions.") schema = await client.schema.get(kind=kind, branch=branch) @@ -91,7 +91,7 @@ async def get_schema( ) -@mcp.tool(tags=["schemas", "retrieve"], annotations=ToolAnnotations(readOnlyHint=True)) +@mcp.tool(tags={"schemas", "retrieve"}, annotations=ToolAnnotations(readOnlyHint=True)) async def get_schemas( ctx: Context, branch: Annotated[ @@ -104,7 +104,7 @@ async def get_schemas( exclude_templates: Annotated[ bool, Field(default=True, description="Whether to exclude Template schemas. Defaults to True.") ], -) -> MCPResponse[dict[str, dict[str, Any]]]: +) -> MCPResponse: """Retrieve all schemas from Infrahub, optionally excluding Profiles and Templates. Parameters: @@ -118,12 +118,12 @@ async def get_schemas( """ client: InfrahubClient = ctx.request_context.lifespan_context.client - ctx.info(f"Fetching all schemas in branch {branch or 'main'} from Infrahub...") + await ctx.info(f"Fetching all schemas in branch {branch or 'main'} from Infrahub...") try: all_schemas = await client.schema.all(branch=branch) except BranchNotFoundError as exc: - return _log_and_return_error(ctx=ctx, error=exc, remediation="Check the branch name or your permissions.") + return await _log_and_return_error(ctx=ctx, error=exc, remediation="Check the branch name or your permissions.") # Filter out Profile and Template if requested filtered_schemas = {} diff --git a/src/infrahub_mcp/utils.py b/src/infrahub_mcp/utils.py index 35b27e6..3c72f81 100644 --- a/src/infrahub_mcp/utils.py +++ b/src/infrahub_mcp/utils.py @@ -32,11 +32,11 @@ def get_prompt(name: str) -> str: return (PROMPTS_DIRECTORY / f"{name}.md").read_text() -def _log_and_return_error(ctx: Context, error: str | Exception, remediation: str | None = None) -> MCPResponse: +async def _log_and_return_error(ctx: Context, error: str | Exception, remediation: str | None = None) -> MCPResponse: """Log an error and return a standardized error response.""" if isinstance(error, Exception): error = str(error) - ctx.error(message=error) + await ctx.error(message=error) return MCPResponse( status=MCPToolStatus.ERROR, error=error,