Skip to content

Commit

Permalink
add CLI package operation + support CWL package retrieval for provide…
Browse files Browse the repository at this point in the history
…r process
  • Loading branch information
fmigneault committed Sep 15, 2023
1 parent 34debdc commit a16b20b
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 10 deletions.
4 changes: 3 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ Changes

Changes:
--------
- No change.
- Add ``GET /providers/{provider_id}/processes/{process_id}/package`` endpoint that allows retrieval of the `CWL`
`Application Package` definition generated for the specific `Provider`'s `Process` definition.
- Add `CLI` ``package`` operation to request the remote `Provider` or local `Process` `CWL` `Application Package`.

Fixes:
------
Expand Down
87 changes: 83 additions & 4 deletions weaver/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import os
import re
import sys
import textwrap
import time
from typing import TYPE_CHECKING
from urllib.parse import urlparse

import yaml
from pyramid.httpexceptions import HTTPNotImplemented
from requests.auth import AuthBase, HTTPBasicAuth
from requests.sessions import Session
from requests.structures import CaseInsensitiveDict
from webob.headers import ResponseHeaders
from yaml.scanner import ScannerError
Expand All @@ -22,7 +24,7 @@
from weaver.datatype import AutoBase
from weaver.exceptions import PackageRegistrationError
from weaver.execute import ExecuteMode, ExecuteResponse, ExecuteTransmissionMode
from weaver.formats import ContentType, OutputFormat, get_content_type, get_format
from weaver.formats import ContentType, OutputFormat, get_content_type, get_format, repr_json
from weaver.processes.constants import ProcessSchema
from weaver.processes.convert import (
convert_input_values_schema,
Expand Down Expand Up @@ -348,7 +350,6 @@ def __init__(self, url=None, auth=None):
def _request(self,
method, # type: AnyRequestMethod
url, # type: str
*args, # type: Any
headers=None, # type: Optional[AnyHeadersContainer]
x_headers=None, # type: Optional[AnyHeadersContainer]
request_timeout=None, # type: Optional[int]
Expand All @@ -370,7 +371,20 @@ def _request(self,
if isinstance(request_retries, int) and request_retries > 0:
kwargs.setdefault("retries", request_retries)

return request_extra(method, url, *args, headers=headers, **kwargs)
if LOGGER.isEnabledFor(logging.DEBUG):
fields = set(inspect.signature(Session.request).parameters) - {"params", "url", "method", "json", "body"}
options = {opt: val for opt, val in kwargs.items() if opt in fields}
tab = " "
LOGGER.debug(
f"Request:\n{tab}%s %s\n{tab}Queries:\n%s\n{tab}Headers:\n%s\n{tab}Content:\n%s\n{tab}Options:\n%s",
method,
url,
textwrap.indent(repr_json(kwargs.get("params") or {}, indent=len(tab)), tab * 2),
textwrap.indent(repr_json(headers or {}, indent=len(tab)), tab * 2),
textwrap.indent(repr_json(kwargs.get("json") or kwargs.get("body") or {}, indent=len(tab)), tab * 2),
textwrap.indent(repr_json(options, indent=len(tab)), tab * 2),
)
return request_extra(method, url, headers=headers, **kwargs)

def _get_url(self, url):
# type: (Optional[str]) -> str
Expand Down Expand Up @@ -538,6 +552,9 @@ def register(self,
"""
Registers a remote :term:`Provider` using specified references.
.. note::
This operation is specific to `Weaver`. It is not supported by standard :term:`OGC API - Processes`.
:param provider_id: Identifier to employ for registering the new :term:`Provider`.
:param provider_url: Endpoint location to register the new remote :term:`Provider`.
:param url: Instance URL if not already provided during client creation.
Expand Down Expand Up @@ -576,6 +593,9 @@ def unregister(self,
"""
Unregisters a remote :term:`Provider` using the specified identifier.
.. note::
This operation is specific to `Weaver`. It is not supported by standard :term:`OGC API - Processes`.
:param provider_id: Identifier to employ for unregistering the :term:`Provider`.
:param url: Instance URL if not already provided during client creation.
:param auth:
Expand Down Expand Up @@ -629,8 +649,13 @@ def deploy(self,
If the reference is resolved to be a :term:`Workflow`, all its underlying :term:`Process` steps must be
available under the same URL that this client was initialized with.
.. note::
This is only supported by :term:`OGC API - Processes` instances that support
the `Deploy, Replace, Undeploy` (DRU) extension.
.. seealso::
:ref:`proc_op_deploy`
- :ref:`proc_op_deploy`
- |ogc-api-proc-part2|_
:param process_id:
Desired process identifier.
Expand Down Expand Up @@ -856,6 +881,48 @@ def _get_process_url(self, url, process_id, provider_id=None):
path = f"{base}/processes/{process_id}"
return path

def package(self,
process_id, # type: str
provider_id=None, # type: Optional[str]
url=None, # type: Optional[str]
auth=None, # type: Optional[AuthHandler]
headers=None, # type: Optional[AnyHeadersContainer]
with_links=True, # type: bool
with_headers=False, # type: bool
request_timeout=None, # type: Optional[int]
request_retries=None, # type: Optional[int]
output_format=None, # type: Optional[AnyOutputFormat]
): # type: (...) -> OperationResult
"""
Retrieve the :term:`Application Package` definition of the specified :term:`Process`.
.. note::
This operation is specific to `Weaver`. It is not supported by standard :term:`OGC API - Processes`.
:param process_id: Identifier of the local or remote process to describe.
:param provider_id: Identifier of the provider from which to locate a remote process to describe.
:param url: Instance URL if not already provided during client creation.
:param auth:
Instance authentication handler if not already created during client creation.
Should perform required adjustments to request to allow access control of protected contents.
:param headers:
Additional headers to employ when sending request.
Note that this can break functionalities if expected headers are overridden. Use with care.
:param with_links: Indicate if ``links`` section should be preserved in returned result body.
:param with_headers: Indicate if response headers should be returned in result output.
:param request_timeout: Maximum timout duration (seconds) to wait for a response when performing HTTP requests.
:param request_retries: Amount of attempt to retry HTTP requests in case of failure.
:param output_format: Select an alternate output representation of the result body contents.
:returns: Results of the operation.
"""
path = self._get_process_url(url, process_id, provider_id)
path = f"{path}/package"
resp = self._request("GET", path,
headers=self._headers, x_headers=headers, settings=self._settings, auth=auth,
request_timeout=request_timeout, request_retries=request_retries)
return self._parse_result(resp, message="Retrieving process Application Package.",
output_format=output_format, with_links=with_links, with_headers=with_headers)

@staticmethod
def _parse_inputs(inputs):
# type: (Optional[Union[str, JSON]]) -> Union[OperationResult, ExecutionInputsMap]
Expand Down Expand Up @@ -2365,6 +2432,17 @@ def make_parser():
help="Representation schema of the returned process description (default: %(default)s, case-insensitive)."
)

op_package = WeaverArgumentParser(
"package",
description="Obtain the Application Package definition of an existing process.",
formatter_class=ParagraphFormatter,
)
set_parser_sections(op_package)
add_url_param(op_package)
add_shared_options(op_package)
add_process_param(op_package)
add_provider_param(op_package, required=False)

op_execute = WeaverArgumentParser(
"execute",
description="Submit a job execution for an existing process.",
Expand Down Expand Up @@ -2606,6 +2684,7 @@ def make_parser():
op_unregister,
op_capabilities,
op_describe,
op_package,
op_execute,
op_jobs,
op_monitor,
Expand Down
3 changes: 3 additions & 0 deletions weaver/wps_restapi/providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def includeme(config):
config.add_route(**sd.service_api_route_info(sd.provider_service, settings))
config.add_route(**sd.service_api_route_info(sd.provider_processes_service, settings))
config.add_route(**sd.service_api_route_info(sd.provider_process_service, settings))
config.add_route(**sd.service_api_route_info(sd.provider_process_package_service, settings))
config.add_route(**sd.service_api_route_info(sd.provider_execution_service, settings))
config.add_view(p.get_providers, route_name=sd.providers_service.name,
request_method="GET", renderer=OutputFormat.JSON)
Expand All @@ -32,6 +33,8 @@ def includeme(config):
request_method="GET", renderer=OutputFormat.JSON)
config.add_view(p.get_provider_process, route_name=sd.provider_process_service.name,
request_method="GET", renderer=OutputFormat.JSON)
config.add_view(p.get_provider_process_package, route_name=sd.provider_process_package_service.name,
request_method="GET", renderer=OutputFormat.JSON)
config.add_view(p.submit_provider_job, route_name=sd.provider_jobs_service.name,
request_method="POST", renderer=OutputFormat.JSON)
config.add_view(p.submit_provider_job, route_name=sd.provider_execution_service.name,
Expand Down
24 changes: 19 additions & 5 deletions weaver/wps_restapi/providers/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def get_provider(request):
return HTTPOk(json=data)


@sd.provider_processes_service.get(tags=[sd.TAG_PROVIDERS, sd.TAG_PROCESSES, sd.TAG_PROVIDERS, sd.TAG_GETCAPABILITIES],
@sd.provider_processes_service.get(tags=[sd.TAG_PROVIDERS, sd.TAG_PROCESSES, sd.TAG_GETCAPABILITIES],
renderer=OutputFormat.JSON, schema=sd.ProviderProcessesEndpoint(),
response_schemas=sd.get_provider_processes_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
Expand Down Expand Up @@ -190,7 +190,7 @@ def describe_provider_process(request):
return Process.convert(process, service, get_settings(request))


@sd.provider_process_service.get(tags=[sd.TAG_PROVIDERS, sd.TAG_PROCESSES, sd.TAG_PROVIDERS, sd.TAG_DESCRIBEPROCESS],
@sd.provider_process_service.get(tags=[sd.TAG_PROVIDERS, sd.TAG_PROCESSES, sd.TAG_DESCRIBEPROCESS],
renderer=OutputFormat.JSON, schema=sd.ProviderProcessEndpoint(),
response_schemas=sd.get_provider_process_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
Expand All @@ -199,18 +199,32 @@ def describe_provider_process(request):
def get_provider_process(request):
# type: (PyramidRequest) -> AnyViewResponse
"""
Retrieve a process description (DescribeProcess).
Retrieve a remote provider's process description (DescribeProcess).
"""
process = describe_provider_process(request)
schema = request.params.get("schema")
offering = process.offering(schema)
return HTTPOk(json=offering)


@sd.provider_execution_service.post(tags=[sd.TAG_PROVIDERS, sd.TAG_PROVIDERS, sd.TAG_EXECUTE, sd.TAG_JOBS],
@sd.provider_process_package_service.get(tags=[sd.TAG_PROVIDERS, sd.TAG_PROCESSES, sd.TAG_DESCRIBEPROCESS],
renderer=OutputFormat.JSON, schema=sd.ProviderProcessPackageEndpoint(),
response_schemas=sd.get_provider_process_package_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
@check_provider_requirements
def get_provider_process_package(request):
# type: (PyramidRequest) -> AnyViewResponse
"""
Retrieve a remote provider's process Application Package definition.
"""
process = describe_provider_process(request)
return HTTPOk(json=process.package or {})


@sd.provider_execution_service.post(tags=[sd.TAG_PROVIDERS, sd.TAG_PROCESSES, sd.TAG_EXECUTE, sd.TAG_JOBS],
renderer=OutputFormat.JSON, schema=sd.PostProviderProcessJobRequest(),
response_schemas=sd.post_provider_process_job_responses)
@sd.provider_jobs_service.post(tags=[sd.TAG_PROVIDERS, sd.TAG_PROVIDERS, sd.TAG_EXECUTE, sd.TAG_JOBS],
@sd.provider_jobs_service.post(tags=[sd.TAG_PROVIDERS, sd.TAG_PROCESSES, sd.TAG_EXECUTE, sd.TAG_JOBS],
renderer=OutputFormat.JSON, schema=sd.PostProviderProcessJobRequest(),
response_schemas=sd.post_provider_process_job_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
Expand Down
9 changes: 9 additions & 0 deletions weaver/wps_restapi/swagger_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@
provider_service = Service(name="provider", path=f"{providers_service.path}/{{provider_id}}")
provider_processes_service = Service(name="provider_processes", path=provider_service.path + processes_service.path)
provider_process_service = Service(name="provider_process", path=provider_service.path + process_service.path)
provider_process_package_service = Service(name="provider_process_pkg", path=f"{provider_process_service.path}/package")
provider_jobs_service = Service(name="provider_jobs", path=provider_service.path + process_jobs_service.path)
provider_job_service = Service(name="provider_job", path=provider_service.path + process_job_service.path)
provider_results_service = Service(name="provider_results", path=provider_service.path + process_results_service.path)
Expand Down Expand Up @@ -2796,6 +2797,10 @@ class ProcessPackageEndpoint(LocalProcessPath):
querystring = LocalProcessQuery()


class ProviderProcessPackageEndpoint(ProviderProcessPath, ProcessPackageEndpoint):
pass


class ProcessPayloadEndpoint(LocalProcessPath):
header = RequestHeaders()
querystring = LocalProcessQuery()
Expand Down Expand Up @@ -6633,6 +6638,10 @@ class GoneVaultFileDownloadResponse(ExtendedMappingSchema):
"405": MethodNotAllowedErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
get_provider_process_package_responses = copy(get_process_package_responses)
get_provider_process_package_responses.update({
"403": ForbiddenProviderAccessResponseSchema(),
})
post_provider_responses = {
"201": CreatedPostProvider(description="success"),
"400": ExtendedMappingSchema(description=OWSMissingParameterValue.description),
Expand Down

0 comments on commit a16b20b

Please sign in to comment.