diff --git a/ci/build_and_run_tests.sh b/ci/build_and_run_tests.sh index 1db5b70d..c0cf53c0 100755 --- a/ci/build_and_run_tests.sh +++ b/ci/build_and_run_tests.sh @@ -117,7 +117,7 @@ fi # Compare test required dependencies with wheel pkg dependencies and exclude tests if necessary EXCLUDE_TESTS=$(mktemp "${TEMP_TEST_DIR}/exclude_tests_XXXXX") -if [[ ${MODE} = "continuous_run" ]]; then +if [[ ${MODE} = "continuous_run" || ${MODE} = "release" ]]; then ./ci/get_excluded_tests.sh -f "${EXCLUDE_TESTS}" -m unused -b "${BAZEL}" elif [[ ${MODE} = "merge_gate" ]]; then ./ci/get_excluded_tests.sh -f "${EXCLUDE_TESTS}" -m all -b "${BAZEL}" diff --git a/ci/get_excluded_tests.sh b/ci/get_excluded_tests.sh index 9d852c72..c324c20a 100755 --- a/ci/get_excluded_tests.sh +++ b/ci/get_excluded_tests.sh @@ -8,7 +8,7 @@ # -f: specify output file path # -m: specify the mode from the following options # unused: exclude integration tests whose dependency is not part of the wheel package. -# The missing dependency cuold happen when a new operator is being developed, +# The missing dependency could happen when a new operator is being developed, # but not yet released. # unaffected: exclude integration tests whose dependency is not part of the affected targets # compare to the the merge base to main of current revision. diff --git a/snowflake/ml/_internal/utils/result.py b/snowflake/ml/_internal/utils/result.py index 9542848d..d965840f 100644 --- a/snowflake/ml/_internal/utils/result.py +++ b/snowflake/ml/_internal/utils/result.py @@ -13,49 +13,54 @@ _RESULT_SIZE_THRESHOLD = 5 * (1024**2) # 5MB -class SnowflakeResult: +# This module handles serialization, uploading, downloading, and deserialization of stored +# procedure results. If the results are too large to be returned from a stored procedure, +# the result will be uploaded. The client can then retrieve and deserialize the result if +# it was uploaded. + + +def serialize(session: snowpark.Session, result: Any) -> bytes: + """ + Serialize a tuple containing the result (or None) and the result object filepath + if the result was uploaded to a stage (or None). + + Args: + session: Snowpark session. + result: Object to be serialized. + + Returns: + Cloudpickled string of bytes of the result tuple. """ - Handles serialization, uploading, downloading, and deserialization of stored procedure results. If the results - are too large to be returned from a stored procedure, the result will be uploaded. The client can then retrieve - and deserialize the result if it was uploaded. + result_object_filepath = None + result_bytes = cloudpickle.dumps(result) + if sys.getsizeof(result_bytes) > _RESULT_SIZE_THRESHOLD: + stage_name = snowpark_utils.random_name_for_temp_object(snowpark_utils.TempObjectType.STAGE) + session.sql(f"CREATE TEMPORARY STAGE {stage_name}").collect() + result_object_filepath = f"@{stage_name}/{snowpark_utils.generate_random_alphanumeric()}" + session.file.put_stream(BytesIO(result_bytes), result_object_filepath) + result_object_filepath = f"{result_object_filepath}.gz" + + if result_object_filepath is not None: + return cloudpickle.dumps((None, result_object_filepath)) # type: ignore[no-any-return] + + return cloudpickle.dumps((result, None)) # type: ignore[no-any-return] + + +def deserialize(session: snowpark.Session, result_bytes: bytes) -> Any: """ + Loads and/or deserializes the (maybe uploaded) result. - def __init__(self, session: snowpark.Session, result: Any) -> None: - self.result = result - self.session = session - self.result_object_filepath = None - result_bytes = cloudpickle.dumps(self.result) - if sys.getsizeof(result_bytes) > _RESULT_SIZE_THRESHOLD: - stage_name = snowpark_utils.random_name_for_temp_object(snowpark_utils.TempObjectType.STAGE) - session.sql(f"CREATE TEMPORARY STAGE {stage_name}").collect() - result_object_filepath = f"@{stage_name}/{snowpark_utils.generate_random_alphanumeric()}" - session.file.put_stream(BytesIO(result_bytes), result_object_filepath) - self.result_object_filepath = f"{result_object_filepath}.gz" - - def serialize(self) -> bytes: - """ - Serialize a tuple containing the result (or None) and the result object filepath - if the result was uploaded to a stage (or None). - - Returns: - Cloudpickled string of bytes of the result tuple. - """ - if self.result_object_filepath is not None: - return cloudpickle.dumps((None, self.result_object_filepath)) # type: ignore[no-any-return] - return cloudpickle.dumps((self.result, None)) # type: ignore[no-any-return] - - @staticmethod - def load_result_from_filepath(session: snowpark.Session, result_object_filepath: str) -> Any: - """ - Loads and deserializes the uploaded result. - - Args: - session: Snowpark session. - result_object_filepath: Stage filepath of the result object returned by serialize method. - - Returns: - The original serialized result (any type). - """ + Args: + session: Snowpark session. + result_bytes: String of bytes returned by serialize method. + + Returns: + The deserialized result (any type). + """ + result_object, result_object_filepath = cloudpickle.loads(result_bytes) + if result_object_filepath is not None: result_object_bytes_io = session.file.get_stream(result_object_filepath, decompress=True) result_bytes = result_object_bytes_io.read() - return cloudpickle.loads(result_bytes) + result_object = cloudpickle.loads(result_bytes) + + return result_object diff --git a/snowflake/ml/modeling/metrics/ranking.py b/snowflake/ml/modeling/metrics/ranking.py index c401e899..1b5f6b77 100644 --- a/snowflake/ml/modeling/metrics/ranking.py +++ b/snowflake/ml/modeling/metrics/ranking.py @@ -81,7 +81,7 @@ def precision_recall_curve( statement_params = telemetry.get_statement_params(_PROJECT, _SUBPROJECT) cols = metrics_utils.flatten_cols([y_true_col_name, probas_pred_col_name, sample_weight_col_name]) queries = df[cols].queries["queries"] - pickled_snowflake_result = cloudpickle.dumps(result) + pickled_result_module = cloudpickle.dumps(result) @F.sproc( # type: ignore[misc] is_permanent=False, @@ -109,16 +109,10 @@ def precision_recall_curve_anon_sproc(session: snowpark.Session) -> bytes: pos_label=pos_label, sample_weight=sample_weight, ) - result_module = cloudpickle.loads(pickled_snowflake_result) - result_object = result_module.SnowflakeResult(session, (precision, recall, thresholds)) - - return result_object.serialize() # type: ignore[no-any-return] - - sproc_result = precision_recall_curve_anon_sproc(session) - result_object, result_object_filepath = cloudpickle.loads(sproc_result) - if result_object_filepath is not None: - result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath) + result_module = cloudpickle.loads(pickled_result_module) + return result_module.serialize(session, (precision, recall, thresholds)) # type: ignore[no-any-return] + result_object = result.deserialize(session, precision_recall_curve_anon_sproc(session)) res: Tuple[npt.NDArray[np.float_], npt.NDArray[np.float_], npt.NDArray[np.float_]] = result_object return res @@ -223,7 +217,7 @@ class scores must correspond to the order of ``labels``, statement_params = telemetry.get_statement_params(_PROJECT, _SUBPROJECT) cols = metrics_utils.flatten_cols([y_true_col_names, y_score_col_names, sample_weight_col_name]) queries = df[cols].queries["queries"] - pickled_snowflake_result = cloudpickle.dumps(result) + pickled_result_module = cloudpickle.dumps(result) @F.sproc( # type: ignore[misc] is_permanent=False, @@ -254,16 +248,10 @@ def roc_auc_score_anon_sproc(session: snowpark.Session) -> bytes: multi_class=multi_class, labels=labels, ) - result_module = cloudpickle.loads(pickled_snowflake_result) - result_object = result_module.SnowflakeResult(session, auc) - - return result_object.serialize() # type: ignore[no-any-return] - - sproc_result = roc_auc_score_anon_sproc(session) - result_object, result_object_filepath = cloudpickle.loads(sproc_result) - if result_object_filepath is not None: - result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath) + result_module = cloudpickle.loads(pickled_result_module) + return result_module.serialize(session, auc) # type: ignore[no-any-return] + result_object = result.deserialize(session, roc_auc_score_anon_sproc(session)) auc: Union[float, npt.NDArray[np.float_]] = result_object return auc @@ -320,7 +308,7 @@ def roc_curve( statement_params = telemetry.get_statement_params(_PROJECT, _SUBPROJECT) cols = metrics_utils.flatten_cols([y_true_col_name, y_score_col_name, sample_weight_col_name]) queries = df[cols].queries["queries"] - pickled_snowflake_result = cloudpickle.dumps(result) + pickled_result_module = cloudpickle.dumps(result) @F.sproc( # type: ignore[misc] is_permanent=False, @@ -350,16 +338,10 @@ def roc_curve_anon_sproc(session: snowpark.Session) -> bytes: drop_intermediate=drop_intermediate, ) - result_module = cloudpickle.loads(pickled_snowflake_result) - result_object = result_module.SnowflakeResult(session, (fpr, tpr, thresholds)) - - return result_object.serialize() # type: ignore[no-any-return] - - sproc_result = roc_curve_anon_sproc(session) - result_object, result_object_filepath = cloudpickle.loads(sproc_result) - if result_object_filepath is not None: - result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath) + result_module = cloudpickle.loads(pickled_result_module) + return result_module.serialize(session, (fpr, tpr, thresholds)) # type: ignore[no-any-return] + result_object = result.deserialize(session, roc_curve_anon_sproc(session)) res: Tuple[npt.NDArray[np.float_], npt.NDArray[np.float_], npt.NDArray[np.float_]] = result_object return res diff --git a/snowflake/ml/modeling/metrics/regression.py b/snowflake/ml/modeling/metrics/regression.py index 472ab884..7aab1a09 100644 --- a/snowflake/ml/modeling/metrics/regression.py +++ b/snowflake/ml/modeling/metrics/regression.py @@ -99,15 +99,9 @@ def d2_absolute_error_score_anon_sproc(session: snowpark.Session) -> bytes: multioutput=multioutput, ) result_module = cloudpickle.loads(pickled_snowflake_result) - result_object = result_module.SnowflakeResult(session, score) - - return result_object.serialize() # type: ignore[no-any-return] - - sproc_result = d2_absolute_error_score_anon_sproc(session) - result_object, result_object_filepath = cloudpickle.loads(sproc_result) - if result_object_filepath is not None: - result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath) + return result_module.serialize(session, score) # type: ignore[no-any-return] + result_object = result.deserialize(session, d2_absolute_error_score_anon_sproc(session)) score: Union[float, npt.NDArray[np.float_]] = result_object return score @@ -192,14 +186,9 @@ def d2_pinball_score_anon_sproc(session: snowpark.Session) -> bytes: multioutput=multioutput, ) result_module = cloudpickle.loads(pickled_result_module) - result_object = result_module.SnowflakeResult(session, score) - - return result_object.serialize() # type: ignore[no-any-return] + return result_module.serialize(session, score) # type: ignore[no-any-return] - sproc_result = d2_pinball_score_anon_sproc(session) - result_object, result_object_filepath = cloudpickle.loads(sproc_result) - if result_object_filepath is not None: - result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath) + result_object = result.deserialize(session, d2_pinball_score_anon_sproc(session)) score: Union[float, npt.NDArray[np.float_]] = result_object return score @@ -301,15 +290,9 @@ def explained_variance_score_anon_sproc(session: snowpark.Session) -> bytes: force_finite=force_finite, ) result_module = cloudpickle.loads(pickled_result_module) - result_object = result_module.SnowflakeResult(session, score) - - return result_object.serialize() # type: ignore[no-any-return] - - sproc_result = explained_variance_score_anon_sproc(session) - result_object, result_object_filepath = cloudpickle.loads(sproc_result) - if result_object_filepath is not None: - result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath) + return result_module.serialize(session, score) # type: ignore[no-any-return] + result_object = result.deserialize(session, explained_variance_score_anon_sproc(session)) score: Union[float, npt.NDArray[np.float_]] = result_object return score @@ -389,15 +372,9 @@ def mean_absolute_error_anon_sproc(session: snowpark.Session) -> bytes: ) result_module = cloudpickle.loads(pickled_result_module) - result_object = result_module.SnowflakeResult(session, loss) - - return result_object.serialize() # type: ignore[no-any-return] - - sproc_result = mean_absolute_error_anon_sproc(session) - result_object, result_object_filepath = cloudpickle.loads(sproc_result) - if result_object_filepath is not None: - result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath) + return result_module.serialize(session, loss) # type: ignore[no-any-return] + result_object = result.deserialize(session, mean_absolute_error_anon_sproc(session)) loss: Union[float, npt.NDArray[np.float_]] = result_object return loss @@ -485,15 +462,9 @@ def mean_absolute_percentage_error_anon_sproc(session: snowpark.Session) -> byte multioutput=multioutput, ) result_module = cloudpickle.loads(pickled_result_module) - result_object = result_module.SnowflakeResult(session, loss) - - return result_object.serialize() # type: ignore[no-any-return] - - sproc_result = mean_absolute_percentage_error_anon_sproc(session) - result_object, result_object_filepath = cloudpickle.loads(sproc_result) - if result_object_filepath is not None: - result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath) + return result_module.serialize(session, loss) # type: ignore[no-any-return] + result_object = result.deserialize(session, mean_absolute_percentage_error_anon_sproc(session)) loss: Union[float, npt.NDArray[np.float_]] = result_object return loss @@ -571,15 +542,9 @@ def mean_squared_error_anon_sproc(session: snowpark.Session) -> bytes: squared=squared, ) result_module = cloudpickle.loads(pickled_result_module) - result_object = result_module.SnowflakeResult(session, loss) - - return result_object.serialize() # type: ignore[no-any-return] - - sproc_result = mean_squared_error_anon_sproc(session) - result_object, result_object_filepath = cloudpickle.loads(sproc_result) - if result_object_filepath is not None: - result_object = result.SnowflakeResult.load_result_from_filepath(session, result_object_filepath) + return result_module.serialize(session, loss) # type: ignore[no-any-return] + result_object = result.deserialize(session, mean_squared_error_anon_sproc(session)) loss: Union[float, npt.NDArray[np.float_]] = result_object return loss