Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bug for fq_name #2208

Closed
wants to merge 8 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions metaflow/plugins/events_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,9 @@ def flow_init(
)
if "project_branch" in self.attributes["flow"]:
if is_stringish(self.attributes["flow"]["project_branch"]):
result["branch"] = self.attributes["flow"]["project_branch"]
result["project_branch"] = self.attributes["flow"][
"project_branch"
]
else:
raise MetaflowException(
"The *project_branch* attribute of the *flow* is not a string"
Expand Down Expand Up @@ -467,6 +469,7 @@ def flow_init(
"The *name* attribute '%s' is not a valid string"
% str(flow_name)
)

result = {"fq_name": flow_name}
if "project" in flow:
if is_stringish(flow["project"]):
Expand All @@ -478,7 +481,7 @@ def flow_init(
)
if "project_branch" in flow:
if is_stringish(flow["project_branch"]):
result["branch"] = flow["project_branch"]
result["project_branch"] = flow["project_branch"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change seems to actually fix a bug with the argo events implementation that has gone unnoticed. Currently deploying a flow with

@trigger_on_finish(flow={"name": "ProjectEventsTestFlow", "project": "sa_test_project", "project_branch": "user.saikonen"})

does not apply a filter on the project_branch though it should.

with the introduced changes this is correctly being applied. The relevant code is in https://github.com/Netflix/metaflow/blob/master/metaflow/plugins/argo/argo_workflows.py#L641-L646
cc @savingoyal

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is a regression or not. have triggers filtered by project&branch been working in the past with argo?

else:
raise MetaflowException(
"The *project_branch* attribute of the *flow* %s "
Expand Down Expand Up @@ -597,9 +600,41 @@ def format_deploytime_value(self):
# Entire trigger is a function (returns either string or dict)
old_trig = trigger
if isinstance(trigger, DeployTimeField):
# convert the trigger to string or dict
trigger = deploy_time_eval(trigger)
if is_stringish(trigger):
pass
elif isinstance(trigger, dict):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic needs further refactoring in a follow up PR.

if "name" not in trigger:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an interesting fix that this PR introduces. Previously this incorrect usage was also working for whatever reason:

def flow_name_dict_func(ctx):
    # Is 'flow' correct??? docstring says 'name'
    return {"flow": "DeployTimeTriggerParams"}


@trigger_on_finish(flow=flow_name_dict_func)
class DeployTimeTriggerOnFinishFlow4(FlowSpec):

With the changes in this PR, Metaflow correctly raises an exception on the missing name in the dict

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an issue with the current PR though when trying out with the correct syntax.

def flow_name_dict_func(ctx):
    # Is 'flow' correct??? docstring says 'name'
    return {"name": "DeployTimeTriggerParams"}
    # return {"name": "DeployTimeTriggerParams", "project": "TEST", "project_branch": "test_branch"}


@trigger_on_finish(flow=flow_name_dict_func)
class DeployTimeTriggerOnFinishFlow4(FlowSpec):

fails with KeyError on argo workflows due to event["flow"] not being set.

raise MetaflowException(
"The *flow* attribute for *@trigger_on_finish* is missing the "
"*name* key."
)
flow_name = trigger["name"]

if not is_stringish(flow_name) or "." in flow_name:
raise MetaflowException(
"The *name* attribute of the *flow* is not a valid string"
)
result = {"fq_name": flow_name}
if "project" in trigger:
if is_stringish(trigger["project"]):
result["project"] = trigger["project"]
else:
raise MetaflowException(
"The *project* attribute of the *flow* is not a string"
)
if "project_branch" in trigger:
if is_stringish(trigger["project_branch"]):
result["project_branch"] = trigger["project_branch"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you extract out lines 602-630 into a helper function? The code is identical to lines 397-429. Also on line 425 it's setting result to branch instead of project_branch. Which one is correct?

else:
raise MetaflowException(
"The *project_branch* attribute of the *flow* is not a string"
)
trigger = result
# effect is to set all fields to None if they don't exist.
if isinstance(trigger, dict):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment on this code: "effect is to set all fields to None if they don't exist.

trigger["fq_name"] = trigger.get("name")
trigger["fq_name"] = trigger.get("fq_name")
trigger["project"] = trigger.get("project")
trigger["branch"] = trigger.get("project_branch")
# We also added this bc it won't be formatted yet
Expand Down
Loading