Skip to content
This repository was archived by the owner on Apr 30, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions blueprints-provider/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# 1.0.0
- Add Create blueprint worker

16 changes: 16 additions & 0 deletions blueprints-provider/python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Conductor worker for Frinx Uniconfig


Conductor workers for Frinx blueprints-provider service

## Getting started

### Prerequisites

- Python 3.10+ is required to use this package.

### Install the package

```bash
pip install frinx-blueprints-provider-worker
```
6 changes: 6 additions & 0 deletions blueprints-provider/python/RELEASE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
## Release matrix

| Docker Image Tag | Python Library Version |
|------------------|------------------------|


26 changes: 26 additions & 0 deletions blueprints-provider/python/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
version: '3.8'

services:
postgres:
image: postgres:16.4-alpine3.20
user: postgres
volumes:
- /var/lib/postgresql/data
command: postgres -N 300
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=blueprints
ports:
- "5432:5432"
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U postgres" ]
interval: 10s
timeout: 5s
retries: 5
start_period: 10s
networks:
- backend

networks:
backend:
Empty file.
121 changes: 121 additions & 0 deletions blueprints-provider/python/frinx_worker/blueprint_provider/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import os
from enum import Enum
from types import MappingProxyType

import requests
from frinx.common.conductor_enums import TaskResultStatus
from frinx.common.type_aliases import DictAny
from frinx.common.type_aliases import ListAny
from frinx.common.worker.task_def import TaskOutput
from frinx.common.worker.task_result import TaskResult
from graphql_pydantic_converter.graphql_types import QueryForm
from pydantic import BaseModel
from pydantic import Field

# TODO: add to frinx-python-sdk
BLUEPRINTS_PROVIDER_URL_BASE = os.getenv('BLUEPRINTS_PROVIDER_URL_BASE', 'http://localhost:8080/graphql')
BLUEPRINTS_PROVIDER_HEADERS = MappingProxyType({'Content-Type': 'application/json'})


class BlueprintsProviderWorkerOutput(TaskOutput):
"""Blueprints-provider worker output."""

query: str = Field(
description="Constructed GraphQL query.",
)
variable: DictAny | None = Field(
description="Constructed input GraphQL variables.",
default=None
)
response_body: ListAny | DictAny | None | None = Field(
description="Response body.",
)
response_code: int = Field(
description="Response code.",
)

class ResponseStatus(str, Enum):
"""Response status."""

DATA = "data"
"""Response contains valid data."""
ERRORS = "errors"
"""Response contains some errors."""
FAILED = "failed"
"""HTTP request failed without providing list of errors in response."""


class BlueprintsProviderOutput(BaseModel):
"""Parsed response from Blueprints-provider service."""

status: ResponseStatus = Field(
description="Response status."
)
code: int = Field(
description="Parsed response code."
)
data: ListAny | DictAny | None = Field(
default=None,
description="Structured response data."
)


def execute_graphql_operation(
query: str,
variables: DictAny | None = None,
blueprint_provider_url_base: str = BLUEPRINTS_PROVIDER_URL_BASE
) -> BlueprintsProviderOutput:
"""
Execute GraphQL query.

:param query: GraphQL query
:param variables: GraphQL variables in dictionary format
:param blueprint_provider_url_base: Blueprints-provider service URL base
:return: BlueprintsProviderOutput object
""" ""
response = requests.post(
blueprint_provider_url_base,
json={
"query": query,
"variables": variables
},
headers=BLUEPRINTS_PROVIDER_HEADERS
)
data = response.json()
status_code = response.status_code

if data.get("errors") is not None:
return BlueprintsProviderOutput(data=data["errors"], status=ResponseStatus.ERRORS, code=status_code)

if data.get("data") is not None:
return BlueprintsProviderOutput(data=data["data"], status=ResponseStatus.DATA, code=status_code)

return BlueprintsProviderOutput(status=ResponseStatus.FAILED, code=status_code)


def response_handler(query: QueryForm, response: BlueprintsProviderOutput) -> TaskResult:
"""
Handle response from Blueprints-service service.

:param query: GraphQL query information
:param response: parsed topology-discovery response
:return: built TaskResult object
"""
output = BlueprintsProviderWorkerOutput(
response_code=response.code,
response_body=response.data,
query=query.query,
variable=query.variable
)
match response.status:
case ResponseStatus.DATA:
task_result = TaskResult(status=TaskResultStatus.COMPLETED)
task_result.status = TaskResultStatus.COMPLETED
task_result.output = output
return task_result
case _:
task_result = TaskResult(status=TaskResultStatus.FAILED)
task_result.status = TaskResultStatus.FAILED
task_result.logs = str(response)
task_result.output = output
return task_result
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from frinx.common.type_aliases import ListStr
from frinx.common.worker.service import ServiceWorkersImpl
from frinx.common.worker.task_def import TaskDefinition
from frinx.common.worker.task_def import TaskExecutionProperties
from frinx.common.worker.task_def import TaskInput
from frinx.common.worker.task_result import TaskResult
from frinx.common.worker.worker import WorkerImpl
from frinx_api.blueprints_provider import BlueprintNode
from frinx_api.blueprints_provider import BlueprintNodeInput
from frinx_api.blueprints_provider import BlueprintOutput
from frinx_api.blueprints_provider import BlueprintType
from frinx_api.blueprints_provider import ConnectionType
from frinx_api.blueprints_provider import CreateBlueprintMutation
from pydantic import Field

from frinx_worker.blueprint_provider.utils import BlueprintsProviderOutput
from frinx_worker.blueprint_provider.utils import BlueprintsProviderWorkerOutput
from frinx_worker.blueprint_provider.utils import execute_graphql_operation
from frinx_worker.blueprint_provider.utils import response_handler


class BlueprintsProviderWorkers(ServiceWorkersImpl):
class CreateBlueprint(WorkerImpl):

class ExecutionProperties(TaskExecutionProperties):
exclude_empty_inputs: bool = True

class WorkerDefinition(TaskDefinition):
name: str = "BLUEPRINTS_PROVIDER_create_blueprint"
description: str = "Create blueprint"
labels: ListStr = ["BLUEPRINTS-PROVIDER"]
timeout_seconds: int = 3600
response_timeout_seconds: int = 3600

class WorkerInput(TaskInput):
name: str = Field(
description="Blueprint name",
examples=["install_cli_cisco", "install_gnmi_nokia_7750_22_10"],
)
blueprint_type: BlueprintType = Field(
description="Specifies the type of blueprint",
examples=[BlueprintType.TOPOLOGY_LLDP],
)
connection_type: ConnectionType = Field(
description="Management protocol used for creation of connection to the device",
examples=[ConnectionType.NETCONF],
)
model_pattern: str | None = Field(
description="Regular expression pattern for matching the model of the device",
examples=["vsr .+"],
)
vendor_pattern: str | None = Field(
description="Regular expression pattern for matching the vendor of the device",
examples=[".*wrt"],
)
version_pattern: str | None = Field(
description="Regular expression pattern for matching the version of the devices",
examples=["v[0-9]+"],
)
template: str = Field(description="JSON string")


class WorkerOutput(BlueprintsProviderWorkerOutput):
...

def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]:
create_blueprint_mutation = CreateBlueprintMutation(
node=BlueprintNodeInput(
name=worker_input.name,
blueprint_type=worker_input.blueprint_type,
connection_type=worker_input.connection_type,
model_pattern=worker_input.model_pattern,
vendor_pattern=worker_input.vendor_pattern,
version_pattern=worker_input.version_pattern,
template=worker_input.template,
),
payload=BlueprintOutput(
id=True,
node=BlueprintNode(
name=True,
blueprint_type=True,
connection_type=True,
model_pattern=True,
vendor_pattern=True,
version_pattern=True,
template=True,
)
)
)

query = create_blueprint_mutation.render()
response: BlueprintsProviderOutput = execute_graphql_operation(query=query.query, variables=query.variable)
return response_handler(query, response)
Loading
Loading