Skip to content

Commit 221aeba

Browse files
vigsterkrPanaetius
andauthored
feat(core): add graph workflow group and remove subcommand (#2177)
* add graph workflow group and remove subcommand * raise exception when provided workflow is not found * Apply suggestions from code review * check user provided workflow name uniqueness before executing a run Co-authored-by: Ralf Grubenmann <[email protected]>
1 parent a7cdd1e commit 221aeba

File tree

5 files changed

+123
-1
lines changed

5 files changed

+123
-1
lines changed

renku/cli/graph.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,14 @@
2222
from renku.cli.utils.callback import ClickCallback
2323
from renku.cli.utils.click import CaseInsensitiveChoice
2424
from renku.core.incubation.command import Command
25-
from renku.core.incubation.graph import FORMATS, add_to_dataset, create_dataset, export_graph, generate_graph
25+
from renku.core.incubation.graph import (
26+
FORMATS,
27+
add_to_dataset,
28+
create_dataset,
29+
export_graph,
30+
generate_graph,
31+
remove_workflow,
32+
)
2633
from renku.core.incubation.graph import status as get_status
2734
from renku.core.incubation.graph import update as perform_update
2835
from renku.core.models.workflow.dependency_graph import DependencyGraph
@@ -189,3 +196,17 @@ def add(name, urls, external, force, overwrite, create, sources, destination, re
189196
ref=ref,
190197
)
191198
click.secho("OK", fg="green")
199+
200+
201+
@graph.group()
202+
def workflow():
203+
"""Proof-of-Concept command for workflow operations using new metadata."""
204+
205+
206+
@workflow.command()
207+
@click.argument("name", metavar="<name or uuid>")
208+
@click.option("-f", "--force", is_flag=True, help="Force remove (don't prompt user to confirm).")
209+
def remove(name, force):
210+
"""Remove the workflow named <name>."""
211+
communicator = ClickCallback()
212+
remove_workflow().with_communicator(communicator).build().execute(name=name, force=force)

renku/core/commands/run.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525

2626
from renku.core import errors
2727
from renku.core.incubation.command import Command
28+
from renku.core.incubation.graph import unique_workflow
2829
from renku.core.management.git import get_mapped_std_streams
2930
from renku.core.models.cwl.command_line_tool import CommandLineToolFactory
31+
from renku.core.models.provenance.provenance_graph import ProvenanceGraph
3032
from renku.core.utils import communication
3133
from renku.core.utils.urls import get_slug
3234

@@ -55,6 +57,12 @@ def _run_command(
5557
if name != valid_name:
5658
raise errors.ParameterError(f"Invalid name: '{name}' (Hint: '{valid_name}' is valid).")
5759

60+
# TODO: refactor this once we switch to Database
61+
if client.provenance_graph_path.exists():
62+
workflows = unique_workflow(ProvenanceGraph.from_json(client.provenance_graph_path))
63+
if name in workflows:
64+
raise errors.ParameterError(f"Duplicate workflow name: workflow '{name}' already exists.")
65+
5866
paths = explicit_outputs if no_output_detection else client.candidate_paths
5967
mapped_std = get_mapped_std_streams(paths, streams=("stdout", "stderr"))
6068

renku/core/incubation/graph.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import sys
2323
import traceback
2424
from collections import defaultdict
25+
from datetime import datetime
2526
from pathlib import Path
2627
from typing import Dict
28+
from urllib.parse import urlparse
2729

2830
import git
2931
from git import NULL_TREE, Commit, GitCommandError
@@ -42,6 +44,8 @@
4244
from renku.core.models.provenance.activities import Activity
4345
from renku.core.models.provenance.activity import ActivityCollection
4446
from renku.core.models.provenance.provenance_graph import ProvenanceGraph
47+
from renku.core.models.workflow.dependency_graph import DependencyGraph
48+
from renku.core.models.workflow.plan import Plan
4549
from renku.core.models.workflow.run import Run
4650
from renku.core.utils import communication
4751
from renku.core.utils.contexts import measure
@@ -453,3 +457,49 @@ def _add_to_dataset(
453457
except (FileNotFoundError, git.exc.NoSuchPathError) as e:
454458
message = "\n\t".join(urls)
455459
raise errors.ParameterError(f"Could not find paths/URLs: \n\t{message}") from e
460+
461+
462+
def remove_workflow():
463+
"""Return a command for removing workflow."""
464+
command = Command().command(_remove_workflow).lock_project()
465+
return command.require_migration().with_commit(commit_only=GRAPH_METADATA_PATHS)
466+
467+
468+
def _remove_workflow(client, name: str, force: bool):
469+
"""Remove the given workflow."""
470+
now = datetime.utcnow()
471+
# TODO: refactor this once we switch to Database
472+
provenance_graph = ProvenanceGraph.from_json(client.provenance_graph_path)
473+
pg_workflows = unique_workflow(provenance_graph)
474+
475+
not_found_text = f'The specified workflow is "{name}" is not an active workflow.'
476+
plan = None
477+
parse_result = urlparse(name)
478+
if parse_result.scheme:
479+
plan = next(filter(lambda x: x.id == name, pg_workflows.values()), None)
480+
if not plan and name not in pg_workflows:
481+
raise errors.ParameterError(not_found_text)
482+
483+
if not force:
484+
prompt_text = f'You are about to remove the following workflow "{name}".' + "\n" + "\nDo you wish to continue?"
485+
communication.confirm(prompt_text, abort=True, warning=True)
486+
487+
plan = plan or pg_workflows[name]
488+
plan.invalidated_at = now
489+
dependency_graph = DependencyGraph.from_json(client.dependency_graph_path)
490+
for p in dependency_graph.plans:
491+
if p.id == plan.id:
492+
p.invalidated_at = now
493+
494+
dependency_graph.to_json()
495+
provenance_graph.to_json()
496+
497+
498+
def unique_workflow(provenance_graph: ProvenanceGraph) -> Dict[str, Plan]:
499+
"""Map of unique plans in the provenance graph indexed by name."""
500+
workflows = dict()
501+
for activity in provenance_graph.activities:
502+
plan = activity.association.plan
503+
if plan.invalidated_at is None and plan.name not in workflows:
504+
workflows[plan.name] = plan
505+
return workflows

renku/core/models/workflow/plan.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import copy
2121
import itertools
22+
from datetime import datetime
2223
from pathlib import PurePosixPath
2324
from typing import Any, Dict, List
2425
from uuid import uuid4
@@ -54,6 +55,7 @@ def __init__(
5455
description: str = None,
5556
id: str,
5657
inputs: List[CommandInput] = None,
58+
invalidated_at: datetime = None,
5759
keywords: List[str] = None,
5860
name: str = None,
5961
outputs: List[CommandOutput] = None,
@@ -63,6 +65,7 @@ def __init__(
6365
self.description: str = description
6466
self.id: str = id
6567
self.inputs: List[CommandInput] = inputs or []
68+
self.invalidated_at: datetime = invalidated_at
6669
self.keywords: List[str] = keywords or []
6770
self.name: str = name
6871
self.outputs: List[CommandOutput] = outputs or []
@@ -281,6 +284,7 @@ class Meta:
281284
description = fields.String(schema.description, missing=None)
282285
id = fields.Id()
283286
inputs = Nested(renku.hasInputs, CommandInputSchema, many=True, missing=None)
287+
invalidated_at = fields.DateTime(prov.invalidatedAtTime, add_value_types=True)
284288
keywords = fields.List(schema.keywords, fields.String(), missing=None)
285289
name = fields.String(schema.name, missing=None)
286290
outputs = Nested(renku.hasOutputs, CommandOutputSchema, many=True, missing=None)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright 2017-2021- Swiss Data Science Center (SDSC)
4+
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
5+
# Eidgenössische Technische Hochschule Zürich (ETHZ).
6+
#
7+
# Licensed under the Apache License, Version 2.0 (the "License");
8+
# you may not use this file except in compliance with the License.
9+
# You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
"""Test workflow commands."""
19+
20+
# from renku.core.incubation.command import Command, CommandResult
21+
from renku.cli import cli
22+
23+
24+
def test_workflow_remove_command(runner, project):
25+
"""test workflow remove with builder."""
26+
# TODO: add fixture project with graph generated by default
27+
result = runner.invoke(cli, ["graph", "generate"])
28+
assert 0 == result.exit_code
29+
30+
workflow_name = "test_workflow"
31+
32+
result = runner.invoke(cli, ["graph", "workflow", "remove", workflow_name])
33+
assert 2 == result.exit_code
34+
35+
result = runner.invoke(cli, ["run", "--success-code", "0", "--no-output", "--name", workflow_name, "echo", "foo"])
36+
assert 0 == result.exit_code
37+
38+
result = runner.invoke(cli, ["graph", "workflow", "remove", "--force", workflow_name])
39+
assert 0 == result.exit_code

0 commit comments

Comments
 (0)