diff --git a/cylc/flow/id_match.py b/cylc/flow/id_match.py index de3fa21cb1..e128e9c5e3 100644 --- a/cylc/flow/id_match.py +++ b/cylc/flow/id_match.py @@ -98,7 +98,8 @@ def filter_ids( * If IDTokens.Cycle all CyclePoints with any matching tasks will be returned. warn: - Whether to log a warning if no matching tasks are found. + Whether to log a warning if no matching tasks are found in the + pool. TODO: Consider using wcmatch which would add support for diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 3b1990169f..577a096493 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1943,7 +1943,7 @@ def set_prereqs_and_outputs( """ # Get matching pool tasks and inactive task definitions. - itasks, inactive_tasks, unmatched = self.filter_task_proxies( + itasks, inactive_tasks, _unmatched = self.filter_task_proxies( items, inactive=True, warn_no_active=False, @@ -1951,6 +1951,7 @@ def set_prereqs_and_outputs( flow_nums = self._get_flow_nums(flow, flow_descr) + nothing_set: Set[str] = set() # Set existing task proxies. for itask in itasks: if flow == ['none'] and itask.flow_nums != set(): @@ -1966,7 +1967,8 @@ def set_prereqs_and_outputs( # Spawn as if seq xtrig of parentless task was satisfied, # with associated task producing these outputs. self.check_spawn_psx_task(itask) - self._set_outputs_itask(itask, outputs) + if not self._set_outputs_itask(itask, outputs): + nothing_set.add(itask.identity) # Spawn and set inactive tasks. if not flow: @@ -1982,8 +1984,18 @@ def set_prereqs_and_outputs( point, tdef, flow_nums, flow_wait=flow_wait, transient=True ) - if trans is not None: - self._set_outputs_itask(trans, outputs) + if trans is not None and self._set_outputs_itask( + trans, outputs + ): + nothing_set.add(trans.identity) + + if nothing_set: + msg = "Task has" if len(nothing_set) == 1 else "Tasks have" + msg += ( + " no required outputs to set: " + f"{', '.join(sorted(nothing_set))}" + ) + LOG.warning(msg) if self.compute_runahead(): self.release_runahead_tasks() @@ -1992,10 +2004,16 @@ def _set_outputs_itask( self, itask: 'TaskProxy', outputs: List[str], - ) -> None: - """Set requested outputs on a task proxy and spawn children.""" + ) -> bool: + """Set requested outputs on a task proxy and spawn children. + + Return False if no outputs were specified and the task has no required + outputs to set + """ if not outputs: outputs = list(itask.state.outputs.iter_required_messages()) + if not outputs: + return False else: outputs = self._standardise_outputs( itask.point, itask.tdef, outputs @@ -2018,6 +2036,7 @@ def _set_outputs_itask( self.workflow_db_mgr.put_update_task_state(itask) self.workflow_db_mgr.put_update_task_outputs(itask) self.workflow_db_mgr.process_queued_ops() + return True def _set_prereqs_itask( self, @@ -2316,7 +2335,8 @@ def filter_task_proxies( ids: ID strings. warn_no_active: - Whether to log a warning if no matching active tasks are found. + Whether to log a warning if no matching tasks are found in the + pool. inactive: If True, unmatched IDs will be checked against taskdefs and cycle, and any matches will be returned in the second diff --git a/tests/integration/scripts/test_set.py b/tests/integration/scripts/test_set.py index 41fcb7e3bf..1dfe8a5cd8 100644 --- a/tests/integration/scripts/test_set.py +++ b/tests/integration/scripts/test_set.py @@ -19,11 +19,21 @@ Note: see also functional tests """ +import logging + +from cylc.flow.commands import ( + run_cmd, + set_prereqs_and_outputs, +) from cylc.flow.cycling.integer import IntegerPoint from cylc.flow.data_messages_pb2 import PbTaskProxy from cylc.flow.data_store_mgr import TASK_PROXIES +from cylc.flow.flow_mgr import FLOW_ALL from cylc.flow.scheduler import Scheduler -from cylc.flow.task_state import TASK_STATUS_SUCCEEDED, TASK_STATUS_WAITING +from cylc.flow.task_state import ( + TASK_STATUS_SUCCEEDED, + TASK_STATUS_WAITING, +) async def test_set_parentless_spawning( @@ -164,3 +174,57 @@ async def test_pre_all(flow, scheduler, run): schd.pool.set_prereqs_and_outputs(['1/z'], [], ['all'], ['all']) warn_or_higher = [i for i in log.records if i.levelno > 30] assert warn_or_higher == [] + + +async def test_logging(flow, scheduler, start, log_filter): + """Test logging of a mixture of valid and invalid tasks, tasks with + some required and no required outputs.""" + schd: Scheduler = scheduler( + flow({ + 'scheduler': { + 'cycle point format': 'CCYY', + }, + 'scheduling': { + 'initial cycle point': '2000', + 'graph': { + 'R3//P1Y': 'a? & a:x & b? => c?', + }, + }, + 'runtime': { + 'a': { + 'outputs': {'x': 'whatever'} + } + } + }) + ) + tasks_to_set = [ + # Tasks with required outputs: + '2000/a', + # Tasks without required outputs: + '2000/b', '2000/c', + # Glob that matches future tasks: + '2002/*', + # Invalid tasks: + '2005/a', '2000/doh', + ] + async with start(schd): + await run_cmd(set_prereqs_and_outputs(schd, tasks_to_set, [FLOW_ALL])) + + assert log_filter( + logging.WARNING, + "Tasks have no required outputs to set: 2000/a, 2000/b, 2002/a, 2002/b", + ) + assert log_filter( + logging.WARNING, "Invalid cycle point for task: a, 2005" + ) + assert log_filter(logging.WARNING, "No matching tasks found: doh") + assert len(log_filter(logging.WARNING)) == 3 + + # Check singular form of the above message + await run_cmd(set_prereqs_and_outputs(schd, ['2000/b'], [FLOW_ALL])) + + assert log_filter( + logging.WARNING, + "Task has no required outputs to set: 2000/b", + ) + assert len(log_filter(logging.WARNING)) == 4