Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cylc set: log warning when no outputs specified and task has no required outputs #6505

Draft
wants to merge 1 commit into
base: 8.4.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cylc/flow/id_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ def filter_ids(
* If IDTokens.Cycle all CyclePoints with any matching tasks will
be returned.
warn:
Whether to log a warning if no matching tasks are found.
Whether to log a warning if no matching tasks are found in the
pool.

TODO:
Consider using wcmatch which would add support for
Expand Down
34 changes: 27 additions & 7 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1943,14 +1943,15 @@ def set_prereqs_and_outputs(

"""
# Get matching pool tasks and inactive task definitions.
itasks, inactive_tasks, unmatched = self.filter_task_proxies(
itasks, inactive_tasks, _unmatched = self.filter_task_proxies(
items,
inactive=True,
warn_no_active=False,
)

flow_nums = self._get_flow_nums(flow, flow_descr)

nothing_set: Set[str] = set()
# Set existing task proxies.
for itask in itasks:
if flow == ['none'] and itask.flow_nums != set():
Expand All @@ -1966,7 +1967,8 @@ def set_prereqs_and_outputs(
# Spawn as if seq xtrig of parentless task was satisfied,
# with associated task producing these outputs.
self.check_spawn_psx_task(itask)
self._set_outputs_itask(itask, outputs)
if not self._set_outputs_itask(itask, outputs):
nothing_set.add(itask.identity)

# Spawn and set inactive tasks.
if not flow:
Expand All @@ -1982,8 +1984,18 @@ def set_prereqs_and_outputs(
point, tdef, flow_nums,
flow_wait=flow_wait, transient=True
)
if trans is not None:
self._set_outputs_itask(trans, outputs)
if trans is not None and self._set_outputs_itask(
trans, outputs
):
nothing_set.add(trans.identity)

if nothing_set:
msg = "Task has" if len(nothing_set) == 1 else "Tasks have"
msg += (
" no required outputs to set: "
f"{', '.join(sorted(nothing_set))}"
)
LOG.warning(msg)

if self.compute_runahead():
self.release_runahead_tasks()
Expand All @@ -1992,10 +2004,16 @@ def _set_outputs_itask(
self,
itask: 'TaskProxy',
outputs: List[str],
) -> None:
"""Set requested outputs on a task proxy and spawn children."""
) -> bool:
"""Set requested outputs on a task proxy and spawn children.

Return False if no outputs were specified and the task has no required
outputs to set
"""
if not outputs:
outputs = list(itask.state.outputs.iter_required_messages())
if not outputs:
return False
else:
outputs = self._standardise_outputs(
itask.point, itask.tdef, outputs
Expand All @@ -2018,6 +2036,7 @@ def _set_outputs_itask(
self.workflow_db_mgr.put_update_task_state(itask)
self.workflow_db_mgr.put_update_task_outputs(itask)
self.workflow_db_mgr.process_queued_ops()
return True

def _set_prereqs_itask(
self,
Expand Down Expand Up @@ -2316,7 +2335,8 @@ def filter_task_proxies(
ids:
ID strings.
warn_no_active:
Whether to log a warning if no matching active tasks are found.
Whether to log a warning if no matching tasks are found in the
pool.
inactive:
If True, unmatched IDs will be checked against taskdefs
and cycle, and any matches will be returned in the second
Expand Down
66 changes: 65 additions & 1 deletion tests/integration/scripts/test_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@
Note: see also functional tests
"""

import logging

from cylc.flow.commands import (
run_cmd,
set_prereqs_and_outputs,
)
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.data_messages_pb2 import PbTaskProxy
from cylc.flow.data_store_mgr import TASK_PROXIES
from cylc.flow.flow_mgr import FLOW_ALL
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_state import TASK_STATUS_SUCCEEDED, TASK_STATUS_WAITING
from cylc.flow.task_state import (
TASK_STATUS_SUCCEEDED,
TASK_STATUS_WAITING,
)


async def test_set_parentless_spawning(
Expand Down Expand Up @@ -164,3 +174,57 @@ async def test_pre_all(flow, scheduler, run):
schd.pool.set_prereqs_and_outputs(['1/z'], [], ['all'], ['all'])
warn_or_higher = [i for i in log.records if i.levelno > 30]
assert warn_or_higher == []


async def test_logging(flow, scheduler, start, log_filter):
"""Test logging of a mixture of valid and invalid tasks, tasks with
some required and no required outputs."""
schd: Scheduler = scheduler(
flow({
'scheduler': {
'cycle point format': 'CCYY',
},
'scheduling': {
'initial cycle point': '2000',
'graph': {
'R3//P1Y': 'a? & a:x & b? => c?',
},
},
'runtime': {
'a': {
'outputs': {'x': 'whatever'}
}
}
})
)
tasks_to_set = [
# Tasks with required outputs:
'2000/a',
# Tasks without required outputs:
'2000/b', '2000/c',
# Glob that matches future tasks:
'2002/*',
# Invalid tasks:
'2005/a', '2000/doh',
]
async with start(schd):
await run_cmd(set_prereqs_and_outputs(schd, tasks_to_set, [FLOW_ALL]))

assert log_filter(
logging.WARNING,
"Tasks have no required outputs to set: 2000/a, 2000/b, 2002/a, 2002/b",
)
assert log_filter(
logging.WARNING, "Invalid cycle point for task: a, 2005"
)
assert log_filter(logging.WARNING, "No matching tasks found: doh")
assert len(log_filter(logging.WARNING)) == 3

# Check singular form of the above message
await run_cmd(set_prereqs_and_outputs(schd, ['2000/b'], [FLOW_ALL]))

assert log_filter(
logging.WARNING,
"Task has no required outputs to set: 2000/b",
)
assert len(log_filter(logging.WARNING)) == 4
Loading