Skip to content

Commit

Permalink
AIP-5333 - @s3_sensor resilient to failures (#146)
Browse files Browse the repository at this point in the history
* AIP-5333 (sub branch) Creating main PR main branch (#145)

* Initial test to see how test works.

* Ensure integ tests work.

* Ensuring pylint passes.

* Adding more fixes to the integration tests.:

* Using correct env variable.

* Adding timeout and remove kfp- from kfp_run_id.

* Adding namespace to polling mechanism.

* Print USER_ID

* Print USER_ID

* Moving wait_for_completion in upload_s3_flow

* Typo.

* Refactoring to ensure both s3_sensor flows pass.

* Refactoring to ensure both s3_sensor flows pass.

* Correctly raising Exception.

* Rename Upload S3 flow to be more appropriate.

* Cleanup.

* Adding back all tests.

* Identify cause of OpsGenie and s3_sensor flow.

* Fixing OpsGenie issue.

* Adding some more wait time for pod to spin up.

* Adding back all tests.

* Wait for workflows to be created.

* Ensuring the s3_sensor workflow is obtained.

* Removing the 15 second wait for obtaining workflows.

* Adding back all tests.

* Getting more info on the workflow.

* Removing comments.

* Adding validate_s3_sensor_flow.py to non_standard flows list.

* Typo

Co-authored-by: Hari Sezhiyan <[email protected]>

* Removing unused imports; running black.

* AIP-5333 (sub branch) - Addressing first round PR comments (#147)

* Resolving first round of PR reviews.

* Switching to get_aws_client, correct type annotations, black formatting.

* Remove unused import.

* Correct upload_file method.

Co-authored-by: Hari Sezhiyan <[email protected]>

* Black formatting issue.

* AIP-5333 (sub branch) PR 2nd Round Reviews (#148)

* Addressing PR comments.

* Adding type annotations and fixing s3_sensor issues.

* Correctly using get_s3_client.

* Finish resolving final PR comments.

Co-authored-by: Hari Sezhiyan <[email protected]>

* Addressing Taleb's final comment on s3_sensor_key_files directory naming.

* Changes on instantiating s3_client only once.

Co-authored-by: Hari Sezhiyan <[email protected]>
  • Loading branch information
hsezhiyan and Hari Sezhiyan authored Dec 4, 2021
1 parent 1b20238 commit a42dc9e
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 135 deletions.
14 changes: 11 additions & 3 deletions metaflow/plugins/kfp/kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
)
from metaflow.plugins import KfpInternalDecorator, EnvironmentDecorator
from metaflow.plugins.kfp.kfp_decorator import KfpException
from metaflow.plugins.kfp.kfp_constants import S3_SENSOR_RETRY_COUNT
from .accelerator_decorator import AcceleratorDecorator
from .kfp_foreach_splits import graph_to_task_ids, KfpForEachSplits
from ..aws.batch.batch_decorator import BatchDecorator
Expand Down Expand Up @@ -816,7 +817,8 @@ def call_build_kfp_dag(workflow_uid_op: ContainerOp):
self._create_exit_handler_op(flow_variables.package_commands)
):
s3_sensor_op: Optional[ContainerOp] = self.create_s3_sensor_op(
flow_parameters_json, flow_variables
flow_parameters_json,
flow_variables,
)
workflow_uid_op: Optional[
ContainerOp
Expand All @@ -829,7 +831,8 @@ def call_build_kfp_dag(workflow_uid_op: ContainerOp):
else:
# TODO: can this and above duplicated code be in a function?
s3_sensor_op: Optional[ContainerOp] = self.create_s3_sensor_op(
flow_parameters_json, flow_variables
flow_parameters_json,
flow_variables,
)
workflow_uid_op: Optional[ContainerOp] = self._create_workflow_uid_op(
s3_sensor_op.output if s3_sensor_op else "",
Expand Down Expand Up @@ -1020,7 +1023,9 @@ def _create_workflow_uid_op(
return None

def create_s3_sensor_op(
self, flow_parameters_json: str, flow_variables: FlowVariables
self,
flow_parameters_json: str,
flow_variables: FlowVariables,
):
s3_sensor_deco: Optional[FlowDecorator] = self.flow._flow_decorators.get(
"s3_sensor"
Expand Down Expand Up @@ -1066,6 +1071,8 @@ def _create_s3_sensor_op(
(
f"{package_commands}"
" && python -m metaflow.plugins.kfp.kfp_s3_sensor"
f" --kfp_run_id {dsl.RUN_ID_PLACEHOLDER}"
f" --flow_name {self.name}"
f" --flow_parameters_json '{flow_parameters_json}'"
f" --path {path}"
f" --path_formatter_code_encoded '{path_formatter_code_encoded}'"
Expand All @@ -1084,6 +1091,7 @@ def _create_s3_sensor_op(
).set_display_name("s3_sensor")

KubeflowPipelines._set_minimal_container_resources(s3_sensor_op)
s3_sensor_op.set_retry(S3_SENSOR_RETRY_COUNT, policy="OnError")
return s3_sensor_op

def _create_exit_handler_op(self, package_commands: str) -> ContainerOp:
Expand Down
1 change: 1 addition & 0 deletions metaflow/plugins/kfp/kfp_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
SPLIT_INDEX_ENV_NAME = "SPLIT_INDEX_ENV_NAME"
INPUT_PATHS_ENV_NAME = "INPUT_PATHS_ENV_NAME"
RETRY_COUNT = "MF_ATTEMPT"
S3_SENSOR_RETRY_COUNT = 3

STEP_ENVIRONMENT_VARIABLES = "/tmp/step-environment-variables.sh"

Expand Down
86 changes: 71 additions & 15 deletions metaflow/plugins/kfp/kfp_s3_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,72 @@
from email.policy import default
import click

import os
import pathlib
from typing import Dict
import botocore
import base64
import json
import marshal
import time
from urllib.parse import urlparse, ParseResult

from typing import Tuple

from metaflow.datastore.util.s3util import get_s3_client


def construct_elapsed_time_s3_bucket_and_key(
flow_name: str, kfp_run_id: str
) -> Tuple[str, str]:
s3_path = os.path.join(
os.getenv("METAFLOW_DATASTORE_SYSROOT_S3"),
flow_name,
kfp_run_id,
"_s3_sensor",
)
s3_path_parsed: ParseResult = urlparse(s3_path)
bucket: str = s3_path_parsed.netloc
key: str = s3_path_parsed.path.lstrip("/")
return bucket, key


def read_elapsed_time_s3_path(
s3: botocore.client.BaseClient, flow_name: str, kfp_run_id: str
) -> float:
bucket: str
key: str
bucket, key = construct_elapsed_time_s3_bucket_and_key(flow_name, kfp_run_id)

try:
s3.head_object(Bucket=bucket, Key=key)
except botocore.exceptions.ClientError as e:
elapsed_time: float = 0.0
else:
s3_object: dict = s3.get_object(Bucket=bucket, Key=key)
elapsed_time: float = float(s3_object["Body"].read().decode("utf-8"))
return elapsed_time


def write_elapsed_time_s3_path(
s3: botocore.client.BaseClient, flow_name: str, kfp_run_id: str, elapsed_time: float
) -> None:
bucket, key = construct_elapsed_time_s3_bucket_and_key(flow_name, kfp_run_id)
elapsed_time_binary_data = str(elapsed_time).encode("ascii")
s3.put_object(Body=elapsed_time_binary_data, Bucket=bucket, Key=key)


# We separate out this function to ensure it can be unit-tested.
def wait_for_s3_path(
path: str,
flow_name: str,
kfp_run_id: str,
timeout_seconds: int,
polling_interval_seconds: int,
path_formatter_code_encoded: str,
flow_parameters_json: str,
os_expandvars: bool,
) -> str:
import boto3
import botocore
import base64
import json
import marshal
import time
from urllib.parse import urlparse
import os

flow_parameters: Dict[str, str] = json.loads(flow_parameters_json)

if path_formatter_code_encoded:
Expand All @@ -46,22 +90,27 @@ def path_formatter_template(key: str, flow_parameters: dict) -> str:
else:
if os_expandvars:
# expand OS env variables
path = os.path.expandvars(path)
path: str = os.path.expandvars(path)
# default variable substitution
path: str = path.format(**flow_parameters)

# debugging print statement for customers so they know the final path
# we're looking for
print(f"Waiting for path: {path}...")

parsed_path = urlparse(path)
parsed_path: ParseResult = urlparse(path)
bucket: str
key: str
bucket, key = parsed_path.netloc, parsed_path.path.lstrip("/")
s3: botocore.client.BaseClient
s3, _ = get_s3_client()

previous_elapsed_time: float = read_elapsed_time_s3_path(s3, flow_name, kfp_run_id)

s3 = boto3.client("s3")
start_time = time.time()
start_time: float = time.time()
while True:
current_time = time.time()
elapsed_time = current_time - start_time
current_time: float = time.time()
elapsed_time: float = current_time - start_time + previous_elapsed_time
if elapsed_time > timeout_seconds:
raise TimeoutError("Timed out while waiting for S3 key..")

Expand All @@ -73,6 +122,7 @@ def path_formatter_template(key: str, flow_parameters: dict) -> str:
print(f"Object found at path {path}! Elapsed time: {elapsed_time}.")
break

write_elapsed_time_s3_path(s3, flow_name, kfp_run_id, elapsed_time)
time.sleep(polling_interval_seconds)

output_path = "/tmp/outputs/Output"
Expand All @@ -85,13 +135,17 @@ def path_formatter_template(key: str, flow_parameters: dict) -> str:

@click.command()
@click.option("--path")
@click.option("--flow_name")
@click.option("--kfp_run_id")
@click.option("--timeout_seconds", type=int)
@click.option("--polling_interval_seconds", type=int)
@click.option("--path_formatter_code_encoded")
@click.option("--flow_parameters_json")
@click.option("--os_expandvars/--no_os_expandvars", default=False)
def wait_for_s3_path_cli(
path: str,
flow_name: str,
kfp_run_id: str,
timeout_seconds: int,
polling_interval_seconds: int,
path_formatter_code_encoded: str,
Expand All @@ -100,6 +154,8 @@ def wait_for_s3_path_cli(
) -> str:
return wait_for_s3_path(
path,
flow_name,
kfp_run_id,
timeout_seconds,
polling_interval_seconds,
path_formatter_code_encoded,
Expand Down
4 changes: 3 additions & 1 deletion metaflow/plugins/kfp/tests/flows/s3_sensor_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@


@s3_sensor(
path=join("$METAFLOW_DATASTORE_SYSROOT_S3", "{file_name}"),
path=join(
"$METAFLOW_DATASTORE_SYSROOT_S3", "s3_sensor_test_key_files", "{file_name}"
),
timeout_seconds=600,
polling_interval_seconds=5,
os_expandvars=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ def formatter(path: str, flow_parameters: Dict[str, str]) -> str:


@s3_sensor(
path=join("{datastore}", "{file_name_for_formatter_test}"),
path=join(
"{datastore}", "s3_sensor_test_key_files", "{file_name_for_formatter_test}"
),
timeout_seconds=600,
polling_interval_seconds=5,
path_formatter=formatter,
Expand Down
56 changes: 0 additions & 56 deletions metaflow/plugins/kfp/tests/flows/upload_to_s3_flow.py

This file was deleted.

Loading

0 comments on commit a42dc9e

Please sign in to comment.