Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add task filtering based on metadata #449

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
prefer task_names in the filtered task ids return when possible.
  • Loading branch information
saikonen committed Jan 27, 2025
commit 71b92d9264d1dc553f775f3dab758e2977ec1d1d
7 changes: 5 additions & 2 deletions services/data/postgres_async_db.py
Original file line number Diff line number Diff line change
@@ -755,10 +755,13 @@ async def get_filtered_task_ids(self, flow_id: str, run_id: str, step_name: str,
values=[v for k, v in filter_dict.items() if v is not None],
order=["task_id"],
enable_joins=True,
select_columns=["task_id"]
select_columns=["task_name, task_id"]
)
# flatten the ids in the response
flattened_response = DBResponse(body=[id for row in db_response.body for id in row], response_code=db_response.response_code)
def _format_id(row):
# pick the task_name over task_id
return row[0] or row[1]
flattened_response = DBResponse(body=[_format_id(row) for row in db_response.body], response_code=db_response.response_code)
return flattened_response, pagination

async def get_task(self, flow_id: str, run_id: str, step_name: str,
44 changes: 44 additions & 0 deletions services/metadata_service/tests/integration_tests/task_test.py
Original file line number Diff line number Diff line change
@@ -227,6 +227,50 @@ async def test_filtered_tasks_get(cli, db):
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks".format(**_first_task), status=400)


async def test_filtered_tasks_mixed_ids_get(cli, db):
# create a flow, run and step for the test
_flow = (await add_flow(db, "TestFlow", "test_user-1", ["a_tag", "b_tag"], ["runtime:test"])).body
_run = (await add_run(db, flow_id=_flow["flow_id"])).body
_step = (await add_step(db, flow_id=_run["flow_id"], run_number=_run["run_number"], step_name="first_step")).body

# add tasks to the step
first_task_name = "first-task-1"
_first_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"], task_name=first_task_name)).body
# we need to refetch the task as the return does not contain the internal task ID we need for further record creation.
_first_task = (await db.task_table_postgres.get_task(flow_id=_step["flow_id"], run_id=_step["run_number"], step_name=_step["step_name"], task_id=first_task_name, expanded=True)).body
_second_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"])).body

# add metadata to filter on
(await add_metadata(db, flow_id=_first_task["flow_id"], run_number=_first_task["run_number"], step_name=_first_task["step_name"], task_id=_first_task["task_id"], metadata={"field_name":"field_a", "value": "value_a"}))
(await add_metadata(db, flow_id=_first_task["flow_id"], run_number=_first_task["run_number"], step_name=_first_task["step_name"], task_id=_first_task["task_id"], metadata={"field_name":"field_b", "value": "value_b"}))

(await add_metadata(db, flow_id=_second_task["flow_id"], run_number=_second_task["run_number"], step_name=_second_task["step_name"], task_id=_second_task["task_id"], metadata={"field_name": "field_a", "value": "not_value_a"}))
(await add_metadata(db, flow_id=_second_task["flow_id"], run_number=_second_task["run_number"], step_name=_second_task["step_name"], task_id=_second_task["task_id"], metadata={"field_name": "field_b", "value": "value_b"}))

# filtering with a shared key should return all relevant tasks
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a".format(**_first_task),
data=[first_task_name, _second_task["task_id"]])

# filtering with a shared value should return all relevant tasks
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_value=value_b".format(**_first_task),
data=[first_task_name, _second_task["task_id"]])

# filtering with a shared key&value should return all relevant tasks
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_b&metadata_value=value_b".format(**_first_task),
data=[first_task_name, _second_task["task_id"]])

# filtering with a shared value should return all relevant tasks
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a&metadata_value=not_value_a".format(**_first_task),
data=[_second_task["task_id"]])

# filtering with a mixed key&value should not return results
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a&metadata_value=value_b".format(**_first_task),
data=[])

# not providing filters should result in error
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks".format(**_first_task), status=400)



async def test_task_get(cli, db):
# create flow, run and step for test