|
17 | 17 | # limitations under the License. |
18 | 18 | """Renku activity database gateway implementation.""" |
19 | 19 |
|
| 20 | +from itertools import chain |
20 | 21 | from pathlib import Path |
21 | 22 | from typing import List, Optional, Set, Tuple, Union |
22 | 23 |
|
|
26 | 27 | from renku.core.management.interface.activity_gateway import IActivityGateway |
27 | 28 | from renku.core.management.interface.database_dispatcher import IDatabaseDispatcher |
28 | 29 | from renku.core.management.interface.plan_gateway import IPlanGateway |
| 30 | +from renku.core.management.workflow.activity import create_activity_graph |
29 | 31 | from renku.core.metadata.gateway.database_gateway import ActivityDownstreamRelation |
30 | 32 | from renku.core.models.provenance.activity import Activity, ActivityCollection |
31 | 33 | from renku.core.models.workflow.plan import Plan |
@@ -148,6 +150,19 @@ def add(self, activity: Activity): |
148 | 150 | plan_gateway = inject.instance(IPlanGateway) |
149 | 151 | plan_gateway.add(activity.association.plan) |
150 | 152 |
|
| 153 | + # NOTE: Check for a cycle if this activity |
| 154 | + upstream_chains = self.get_upstream_activity_chains(activity) |
| 155 | + downstream_chains = self.get_downstream_activity_chains(activity) |
| 156 | + |
| 157 | + all_activities = set() |
| 158 | + |
| 159 | + for activity_chain in chain(upstream_chains, downstream_chains): |
| 160 | + for current_activity in activity_chain: |
| 161 | + all_activities.add(current_activity) |
| 162 | + |
| 163 | + # NOTE: This call raises an exception if there is a cycle |
| 164 | + create_activity_graph(list(all_activities), with_inputs_outputs=True) |
| 165 | + |
151 | 166 | def add_activity_collection(self, activity_collection: ActivityCollection): |
152 | 167 | """Add an ``ActivityCollection`` to storage.""" |
153 | 168 | database = self.database_dispatcher.current_database |
|
0 commit comments