Skip to content

Commit 6a1ffd0

Browse files
authored
Merge pull request #170 from zillow/yunw/system-tag-in-kfp-plugin
Add service / k8s tags as system tags
2 parents 6dc0175 + 234e49b commit 6a1ffd0

File tree

5 files changed

+19
-13
lines changed

5 files changed

+19
-13
lines changed

metaflow/metadata/metadata.py

+9
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,15 @@ def _tags(self):
511511
tags.append("metaflow_r_version:" + env["metaflow_r_version"])
512512
if "r_version_code" in env:
513513
tags.append("r_version:" + env["r_version_code"])
514+
# KFP plugin tags
515+
kfp_tags = {
516+
"k8s_namespace": os.environ.get("MF_POD_NAMESPACE"),
517+
"zodiac_service": os.environ.get("ZODIAC_SERVICE"),
518+
"zodiac_team": os.environ.get("ZODIAC_TEAM"),
519+
}
520+
for key in ["k8s_namespace", "zodiac_service", "zodiac_team"]:
521+
if kfp_tags[key]:
522+
tags.append(f"{key}:{kfp_tags[key]}")
514523
return tags
515524

516525
def _register_code_package_metadata(self, run_id, step_name, task_id, attempt):

metaflow/plugins/kfp/kfp_decorator.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,12 @@ def task_pre_step(
126126
self.task_datastore = task_datastore
127127

128128
# TODO: any other KFP environment variables to get and register to Metadata service?
129-
meta = {"kfp-execution": run_id}
129+
meta = {
130+
"kfp-execution": run_id,
131+
"pod_name": os.environ.get("MF_POD_NAME"),
132+
"argo_workflow": os.environ.get("MF_ARGO_WORKFLOW_NAME"),
133+
}
134+
130135
entries = [
131136
MetaDatum(field=k, value=v, type=k, tags=[]) for k, v in meta.items()
132137
]

metaflow/plugins/kfp/kfp_exit_handler.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,14 @@ def email_notify(send_to):
5858
kfp_run_id,
5959
)
6060

61-
pod_namespace = get_env("POD_NAMESPACE", "")
61+
k8s_namespace = get_env("POD_NAMESPACE", "")
6262
argo_workflow_name = get_env("MF_ARGO_WORKFLOW_NAME", "")
6363
email_body = get_env("METAFLOW_NOTIFY_EMAIL_BODY", "")
6464
body = (
6565
f"status = {status} <br/>\n"
6666
f"{kfp_run_url} <br/>\n"
6767
f"Metaflow RunId = kfp-{kfp_run_id} <br/>\n"
68-
f"argo -n {pod_namespace} get {argo_workflow_name} <br/>"
68+
f"argo -n {k8s_namespace} get {argo_workflow_name} <br/>"
6969
"<br/>"
7070
f"{email_body}"
7171
)

metaflow/plugins/kfp/kfp_metaflow_step.py

+1-8
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,7 @@ def _step_cli(
5656

5757
input_paths = None
5858

59-
tags_extended: List[str] = [
60-
f"--tag argo_workflow:{workflow_name}",
61-
"--tag pod_name:$MF_POD_NAME",
62-
"--tag pod_namespace:$MF_POD_NAMESPACE",
63-
# TODO(talebz): A Metaflow plugin framework to customize tags, labels, etc.
64-
"--tag zodiac_service:$ZODIAC_SERVICE",
65-
"--tag zodiac_team:$ZODIAC_TEAM",
66-
]
59+
tags_extended: List[str] = []
6760
if tags:
6861
tags_extended.extend("--tag %s" % tag for tag in tags)
6962

metaflow/plugins/kfp/tests/flows/metadata_flow.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ def end(self):
3737
assert "metaflow_test" in start_step_tags
3838
assert f"zodiac_service:{os.environ['ZODIAC_SERVICE']}" in start_step_tags
3939
assert f"zodiac_team:{os.environ['ZODIAC_TEAM']}" in start_step_tags
40-
assert f"argo_workflow:{os.environ['MF_ARGO_WORKFLOW_NAME']}" in start_step_tags
41-
assert f"pod_namespace:{os.environ['MF_POD_NAMESPACE']}" in start_step_tags
40+
assert f"k8s_namespace:{os.environ['MF_POD_NAMESPACE']}" in start_step_tags
4241

4342
print("MetadataFlow is all done.")
4443

0 commit comments

Comments
 (0)