Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,9 @@ def _parse_interactive_sessions_environments(env_var):

MAX_WORKFLOW_SHARING_MESSAGE_LENGTH = 5000
"""Maximum length of the user-provided message when sharing a workflow."""


REANA_JOB_MONITORING_PROMETHEUS_URL = os.getenv(
"REANA_JOB_MONITORING_PROMETHEUS_URL",
"http://reana-prometheus-server/api/v1/query_range",
)
73 changes: 66 additions & 7 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import json
import logging
import time
import uuid
from datetime import datetime

Expand Down Expand Up @@ -40,6 +41,7 @@
ALIVE_STATUSES,
PROGRESS_STATUSES,
REANA_GITLAB_URL,
REANA_JOB_MONITORING_PROMETHEUS_URL,
REANA_URL,
REANA_JOB_STATUS_CONSUMER_PREFETCH_COUNT,
)
Expand Down Expand Up @@ -142,9 +144,10 @@ def on_message(self, body, message):
f"Unexpected error while processing workflow: {e}", exc_info=True
)


def _update_workflow_status(workflow, status, logs):
"""Update workflow status in DB."""


if workflow.status != status:
Workflow.update_workflow_status(Session, workflow.id_, status, logs, None)
if workflow.git_ref:
Expand All @@ -153,6 +156,8 @@ def _update_workflow_status(workflow, status, logs):
if status not in ALIVE_STATUSES:
workflow.run_finished_at = datetime.now()
workflow.logs = workflow.logs or ""

_get_job_monitoring_data(workflow)

try:
workflow_engine_logs = _get_workflow_engine_pod_logs(workflow)
Expand Down Expand Up @@ -296,18 +301,72 @@ def _delete_workflow_job(workflow: Workflow) -> None:
)


def _get_workflow_engine_pod_logs(workflow: Workflow) -> str:
def _get_workflow_pod(workflow: Workflow) -> str | None:
pods = current_k8s_corev1_api_client.list_namespaced_pod(
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
label_selector=f"reana-run-batch-workflow-uuid={str(workflow.id_)}",
)
for pod in pods.items:
if str(workflow.id_) in pod.metadata.name:
return current_k8s_corev1_api_client.read_namespaced_pod_log(
namespace=pod.metadata.namespace,
name=pod.metadata.name,
container="workflow-engine",
)
return pod

return None


def _get_workflow_engine_pod_logs(workflow: Workflow) -> str:
pod = _get_workflow_pod(workflow)

if pod is not None:

return current_k8s_corev1_api_client.read_namespaced_pod_log(
namespace=pod.metadata.namespace,
name=pod.metadata.name,
container="workflow-engine",
)

# There might not be any pod returned by `list_namespaced_pod`, for example
# when a workflow fails to be scheduled
return ""


def _get_job_monitoring_data(workflow) -> dict:
"""Collect job monitoring data."""

logging.info(
"collecting moniting data for workflow %s",
)

pod = _get_workflow_pod(workflow)

monitoring_dict = {}

for metric in [
# TODO: to define which metrics we want to collect really
"container_memory_usage_bytes",
"container_cpu_usage_seconds_total",
]:
query = f'container_cpu_system_seconds_total{{pod="{pod.metadata.name}",image!=""}}'

logging.info(
"querying prometheus for workflow pod %s",
query,
)

r = requests.get(REANA_JOB_MONITORING_PROMETHEUS_URL,
params=dict(
query=query,
step=1,
# TODO: add start and end time of the workflow
start=time.time() - 600,
end=time.time()))

r.raise_for_status()

monitoring_dict['metric'] = r.json()

logging.info(
"monitoring data for workflow %s: %s",
monitoring_dict,
)

return monitoring_dict
2 changes: 2 additions & 0 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,8 @@ def start_interactive_session(self, interactive_session_type, image=None, **kwar
f"Interactive type {interactive_session_type} does not exist."
)

print("test")

validated_image = _validate_interactive_session_image(
interactive_session_type, image
)
Expand Down
Loading