Skip to content

Commit

Permalink
Project import generated by Copybara. (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
snowflake-provisioner authored Sep 1, 2023
1 parent 192f794 commit f0326eb
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 120 deletions.
2 changes: 1 addition & 1 deletion ci/build_and_run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ fi

# Compare test required dependencies with wheel pkg dependencies and exclude tests if necessary
EXCLUDE_TESTS=$(mktemp "${TEMP_TEST_DIR}/exclude_tests_XXXXX")
if [[ ${MODE} = "continuous_run" ]]; then
if [[ ${MODE} = "continuous_run" || ${MODE} = "release" ]]; then
./ci/get_excluded_tests.sh -f "${EXCLUDE_TESTS}" -m unused -b "${BAZEL}"
elif [[ ${MODE} = "merge_gate" ]]; then
./ci/get_excluded_tests.sh -f "${EXCLUDE_TESTS}" -m all -b "${BAZEL}"
Expand Down
2 changes: 1 addition & 1 deletion ci/get_excluded_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# -f: specify output file path
# -m: specify the mode from the following options
# unused: exclude integration tests whose dependency is not part of the wheel package.
# The missing dependency cuold happen when a new operator is being developed,
# The missing dependency could happen when a new operator is being developed,
# but not yet released.
# unaffected: exclude integration tests whose dependency is not part of the affected targets
# compare to the the merge base to main of current revision.
Expand Down
87 changes: 46 additions & 41 deletions snowflake/ml/_internal/utils/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,54 @@
_RESULT_SIZE_THRESHOLD = 5 * (1024**2) # 5MB


class SnowflakeResult:
# This module handles serialization, uploading, downloading, and deserialization of stored
# procedure results. If the results are too large to be returned from a stored procedure,
# the result will be uploaded. The client can then retrieve and deserialize the result if
# it was uploaded.


def serialize(session: snowpark.Session, result: Any) -> bytes:
"""
Serialize a tuple containing the result (or None) and the result object filepath
if the result was uploaded to a stage (or None).
Args:
session: Snowpark session.
result: Object to be serialized.
Returns:
Cloudpickled string of bytes of the result tuple.
"""
Handles serialization, uploading, downloading, and deserialization of stored procedure results. If the results
are too large to be returned from a stored procedure, the result will be uploaded. The client can then retrieve
and deserialize the result if it was uploaded.
result_object_filepath = None
result_bytes = cloudpickle.dumps(result)
if sys.getsizeof(result_bytes) > _RESULT_SIZE_THRESHOLD:
stage_name = snowpark_utils.random_name_for_temp_object(snowpark_utils.TempObjectType.STAGE)
session.sql(f"CREATE TEMPORARY STAGE {stage_name}").collect()
result_object_filepath = f"@{stage_name}/{snowpark_utils.generate_random_alphanumeric()}"
session.file.put_stream(BytesIO(result_bytes), result_object_filepath)
result_object_filepath = f"{result_object_filepath}.gz"

if result_object_filepath is not None:
return cloudpickle.dumps((None, result_object_filepath)) # type: ignore[no-any-return]

return cloudpickle.dumps((result, None)) # type: ignore[no-any-return]


def deserialize(session: snowpark.Session, result_bytes: bytes) -> Any:
"""
Loads and/or deserializes the (maybe uploaded) result.
def __init__(self, session: snowpark.Session, result: Any) -> None:
self.result = result
self.session = session
self.result_object_filepath = None
result_bytes = cloudpickle.dumps(self.result)
if sys.getsizeof(result_bytes) > _RESULT_SIZE_THRESHOLD:
stage_name = snowpark_utils.random_name_for_temp_object(snowpark_utils.TempObjectType.STAGE)
session.sql(f"CREATE TEMPORARY STAGE {stage_name}").collect()
result_object_filepath = f"@{stage_name}/{snowpark_utils.generate_random_alphanumeric()}"
session.file.put_stream(BytesIO(result_bytes), result_object_filepath)
self.result_object_filepath = f"{result_object_filepath}.gz"

def serialize(self) -> bytes:
"""
Serialize a tuple containing the result (or None) and the result object filepath
if the result was uploaded to a stage (or None).
Returns:
Cloudpickled string of bytes of the result tuple.
"""
if self.result_object_filepath is not None:
return cloudpickle.dumps((None, self.result_object_filepath)) # type: ignore[no-any-return]
return cloudpickle.dumps((self.result, None)) # type: ignore[no-any-return]

@staticmethod
def load_result_from_filepath(session: snowpark.Session, result_object_filepath: str) -> Any:
"""
Loads and deserializes the uploaded result.
Args:
session: Snowpark session.
result_object_filepath: Stage filepath of the result object returned by serialize method.
Returns:
The original serialized result (any type).
"""
Args:
session: Snowpark session.
result_bytes: String of bytes returned by serialize method.
Returns:
The deserialized result (any type).
"""
result_object, result_object_filepath = cloudpickle.loads(result_bytes)
if result_object_filepath is not None:
result_object_bytes_io = session.file.get_stream(result_object_filepath, decompress=True)
result_bytes = result_object_bytes_io.read()
return cloudpickle.loads(result_bytes)
result_object = cloudpickle.loads(result_bytes)

return result_object
42 changes: 12 additions & 30 deletions snowflake/ml/modeling/metrics/ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def precision_recall_curve(
statement_params = telemetry.get_statement_params(_PROJECT, _SUBPROJECT)
cols = metrics_utils.flatten_cols([y_true_col_name, probas_pred_col_name, sample_weight_col_name])
queries = df[cols].queries["queries"]
pickled_snowflake_result = cloudpickle.dumps(result)
pickled_result_module = cloudpickle.dumps(result)

@F.sproc( # type: ignore[misc]
is_permanent=False,
Expand Down Expand Up @@ -109,16 +109,10 @@ def precision_recall_curve_anon_sproc(session: snowpark.Session) -> bytes:
pos_label=pos_label,
sample_weight=sample_weight,
)
result_module = cloudpickle.loads(pickled_snowflake_result)
result_object = result_module.SnowflakeResult(session, (precision, recall, thresholds))

return result_object.serialize() # type: ignore[no-any-return]

sproc_result = precision_recall_curve_anon_sproc(session)
result_object, result_object_filepath = cloudpickle.loads(sproc_result)
if result_object_filepath is not None:
result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath)
result_module = cloudpickle.loads(pickled_result_module)
return result_module.serialize(session, (precision, recall, thresholds)) # type: ignore[no-any-return]

result_object = result.deserialize(session, precision_recall_curve_anon_sproc(session))
res: Tuple[npt.NDArray[np.float_], npt.NDArray[np.float_], npt.NDArray[np.float_]] = result_object
return res

Expand Down Expand Up @@ -223,7 +217,7 @@ class scores must correspond to the order of ``labels``,
statement_params = telemetry.get_statement_params(_PROJECT, _SUBPROJECT)
cols = metrics_utils.flatten_cols([y_true_col_names, y_score_col_names, sample_weight_col_name])
queries = df[cols].queries["queries"]
pickled_snowflake_result = cloudpickle.dumps(result)
pickled_result_module = cloudpickle.dumps(result)

@F.sproc( # type: ignore[misc]
is_permanent=False,
Expand Down Expand Up @@ -254,16 +248,10 @@ def roc_auc_score_anon_sproc(session: snowpark.Session) -> bytes:
multi_class=multi_class,
labels=labels,
)
result_module = cloudpickle.loads(pickled_snowflake_result)
result_object = result_module.SnowflakeResult(session, auc)

return result_object.serialize() # type: ignore[no-any-return]

sproc_result = roc_auc_score_anon_sproc(session)
result_object, result_object_filepath = cloudpickle.loads(sproc_result)
if result_object_filepath is not None:
result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath)
result_module = cloudpickle.loads(pickled_result_module)
return result_module.serialize(session, auc) # type: ignore[no-any-return]

result_object = result.deserialize(session, roc_auc_score_anon_sproc(session))
auc: Union[float, npt.NDArray[np.float_]] = result_object
return auc

Expand Down Expand Up @@ -320,7 +308,7 @@ def roc_curve(
statement_params = telemetry.get_statement_params(_PROJECT, _SUBPROJECT)
cols = metrics_utils.flatten_cols([y_true_col_name, y_score_col_name, sample_weight_col_name])
queries = df[cols].queries["queries"]
pickled_snowflake_result = cloudpickle.dumps(result)
pickled_result_module = cloudpickle.dumps(result)

@F.sproc( # type: ignore[misc]
is_permanent=False,
Expand Down Expand Up @@ -350,16 +338,10 @@ def roc_curve_anon_sproc(session: snowpark.Session) -> bytes:
drop_intermediate=drop_intermediate,
)

result_module = cloudpickle.loads(pickled_snowflake_result)
result_object = result_module.SnowflakeResult(session, (fpr, tpr, thresholds))

return result_object.serialize() # type: ignore[no-any-return]

sproc_result = roc_curve_anon_sproc(session)
result_object, result_object_filepath = cloudpickle.loads(sproc_result)
if result_object_filepath is not None:
result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath)
result_module = cloudpickle.loads(pickled_result_module)
return result_module.serialize(session, (fpr, tpr, thresholds)) # type: ignore[no-any-return]

result_object = result.deserialize(session, roc_curve_anon_sproc(session))
res: Tuple[npt.NDArray[np.float_], npt.NDArray[np.float_], npt.NDArray[np.float_]] = result_object

return res
59 changes: 12 additions & 47 deletions snowflake/ml/modeling/metrics/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,9 @@ def d2_absolute_error_score_anon_sproc(session: snowpark.Session) -> bytes:
multioutput=multioutput,
)
result_module = cloudpickle.loads(pickled_snowflake_result)
result_object = result_module.SnowflakeResult(session, score)

return result_object.serialize() # type: ignore[no-any-return]

sproc_result = d2_absolute_error_score_anon_sproc(session)
result_object, result_object_filepath = cloudpickle.loads(sproc_result)
if result_object_filepath is not None:
result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath)
return result_module.serialize(session, score) # type: ignore[no-any-return]

result_object = result.deserialize(session, d2_absolute_error_score_anon_sproc(session))
score: Union[float, npt.NDArray[np.float_]] = result_object
return score

Expand Down Expand Up @@ -192,14 +186,9 @@ def d2_pinball_score_anon_sproc(session: snowpark.Session) -> bytes:
multioutput=multioutput,
)
result_module = cloudpickle.loads(pickled_result_module)
result_object = result_module.SnowflakeResult(session, score)

return result_object.serialize() # type: ignore[no-any-return]
return result_module.serialize(session, score) # type: ignore[no-any-return]

sproc_result = d2_pinball_score_anon_sproc(session)
result_object, result_object_filepath = cloudpickle.loads(sproc_result)
if result_object_filepath is not None:
result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath)
result_object = result.deserialize(session, d2_pinball_score_anon_sproc(session))

score: Union[float, npt.NDArray[np.float_]] = result_object
return score
Expand Down Expand Up @@ -301,15 +290,9 @@ def explained_variance_score_anon_sproc(session: snowpark.Session) -> bytes:
force_finite=force_finite,
)
result_module = cloudpickle.loads(pickled_result_module)
result_object = result_module.SnowflakeResult(session, score)

return result_object.serialize() # type: ignore[no-any-return]

sproc_result = explained_variance_score_anon_sproc(session)
result_object, result_object_filepath = cloudpickle.loads(sproc_result)
if result_object_filepath is not None:
result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath)
return result_module.serialize(session, score) # type: ignore[no-any-return]

result_object = result.deserialize(session, explained_variance_score_anon_sproc(session))
score: Union[float, npt.NDArray[np.float_]] = result_object
return score

Expand Down Expand Up @@ -389,15 +372,9 @@ def mean_absolute_error_anon_sproc(session: snowpark.Session) -> bytes:
)

result_module = cloudpickle.loads(pickled_result_module)
result_object = result_module.SnowflakeResult(session, loss)

return result_object.serialize() # type: ignore[no-any-return]

sproc_result = mean_absolute_error_anon_sproc(session)
result_object, result_object_filepath = cloudpickle.loads(sproc_result)
if result_object_filepath is not None:
result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath)
return result_module.serialize(session, loss) # type: ignore[no-any-return]

result_object = result.deserialize(session, mean_absolute_error_anon_sproc(session))
loss: Union[float, npt.NDArray[np.float_]] = result_object
return loss

Expand Down Expand Up @@ -485,15 +462,9 @@ def mean_absolute_percentage_error_anon_sproc(session: snowpark.Session) -> byte
multioutput=multioutput,
)
result_module = cloudpickle.loads(pickled_result_module)
result_object = result_module.SnowflakeResult(session, loss)

return result_object.serialize() # type: ignore[no-any-return]

sproc_result = mean_absolute_percentage_error_anon_sproc(session)
result_object, result_object_filepath = cloudpickle.loads(sproc_result)
if result_object_filepath is not None:
result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath)
return result_module.serialize(session, loss) # type: ignore[no-any-return]

result_object = result.deserialize(session, mean_absolute_percentage_error_anon_sproc(session))
loss: Union[float, npt.NDArray[np.float_]] = result_object
return loss

Expand Down Expand Up @@ -571,15 +542,9 @@ def mean_squared_error_anon_sproc(session: snowpark.Session) -> bytes:
squared=squared,
)
result_module = cloudpickle.loads(pickled_result_module)
result_object = result_module.SnowflakeResult(session, loss)

return result_object.serialize() # type: ignore[no-any-return]

sproc_result = mean_squared_error_anon_sproc(session)
result_object, result_object_filepath = cloudpickle.loads(sproc_result)
if result_object_filepath is not None:
result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath)
return result_module.serialize(session, loss) # type: ignore[no-any-return]

result_object = result.deserialize(session, mean_squared_error_anon_sproc(session))
loss: Union[float, npt.NDArray[np.float_]] = result_object
return loss

Expand Down

0 comments on commit f0326eb

Please sign in to comment.