From f0ff7963ac820c33ac998e0b4252f328ca7fea99 Mon Sep 17 00:00:00 2001 From: Uzma Tafsir <164934164+sfc-gh-utafsir@users.noreply.github.com> Date: Tue, 18 Jun 2024 16:55:29 -0700 Subject: [PATCH] Project import generated by Copybara. (#105) GitOrigin-RevId: 799da6cf0e5020abb73dcf1be9906fc318c1b2b9 Co-authored-by: Snowflake Authors --- .gitignore | 2 +- CHANGELOG.md | 22 +- CONTRIBUTING.md | 2 +- bazel/environments/conda-env-snowflake.yml | 4 +- bazel/environments/conda-env.yml | 4 +- bazel/environments/conda-gpu-env.yml | 4 +- ci/conda_recipe/meta.yaml | 4 +- codegen/sklearn_wrapper_template.py_template | 2 +- docs/source/cortex.rst | 23 ++ docs/source/dataset.rst | 27 ++ docs/source/feature_store.rst | 26 ++ docs/source/index.rst | 8 +- docs/source/model.rst | 7 +- docs/source/modeling.rst | 68 ++--- requirements.txt | 4 +- requirements.yml | 6 +- snowflake/cortex/BUILD.bazel | 16 + snowflake/cortex/_complete.py | 31 +- snowflake/cortex/_sse_client.py | 81 +++++ snowflake/cortex/_util.py | 113 ++++++- snowflake/cortex/complete_test.py | 135 +++++++++ snowflake/cortex/sse_test.py | 129 ++++++++ .../ml/_internal/lineage/lineage_utils.py | 59 ++-- .../_internal/lineage/lineage_utils_test.py | 52 +++- snowflake/ml/dataset/dataset.py | 27 +- snowflake/ml/dataset/dataset_factory.py | 7 +- .../_internal/synthetic_data_generator.py | 4 +- snowflake/ml/feature_store/feature_store.py | 4 +- .../notebooks/customer_demo/README.md | 27 +- .../ml/model/_client/sql/model_version.py | 4 +- .../model/_model_composer/model_composer.py | 4 +- .../model_method/fixtures/function_3.py | 4 +- .../infer_table_function.py_template | 4 +- .../ml/model/_signatures/builtins_handler.py | 3 +- snowflake/ml/model/_signatures/core.py | 14 +- .../ml/model/_signatures/pandas_handler.py | 2 + .../ml/model/_signatures/snowpark_handler.py | 6 +- .../ml/model/_signatures/snowpark_test.py | 16 + snowflake/ml/model/model_signature.py | 2 + snowflake/ml/model/model_signature_test.py | 22 ++ snowflake/ml/model/type_hints.py | 1 + snowflake/ml/modeling/_internal/BUILD.bazel | 2 + .../ml/modeling/_internal/estimator_utils.py | 59 +++- .../distributed_hpo_trainer.py | 277 ++++++++++-------- .../distributed_search_udf_file.py | 2 + .../snowpark_handlers.py | 56 ++-- .../snowpark_trainer.py | 216 ++++++-------- .../xgboost_external_memory_trainer.py | 38 +-- .../xgboost_external_memory_trainer_test.py | 15 +- snowflake/ml/modeling/framework/base.py | 11 +- .../ml/modeling/impute/simple_imputer.py | 12 +- .../model_selection/grid_search_cv.py | 6 +- .../model_selection/randomized_search_cv.py | 6 +- snowflake/ml/modeling/pipeline/pipeline.py | 5 + .../ml/modeling/preprocessing/binarizer.py | 10 +- .../preprocessing/k_bins_discretizer.py | 9 +- .../modeling/preprocessing/label_encoder.py | 15 +- .../modeling/preprocessing/max_abs_scaler.py | 10 +- .../modeling/preprocessing/min_max_scaler.py | 11 +- .../ml/modeling/preprocessing/normalizer.py | 10 +- .../modeling/preprocessing/one_hot_encoder.py | 12 +- .../modeling/preprocessing/ordinal_encoder.py | 13 +- .../modeling/preprocessing/robust_scaler.py | 11 +- .../modeling/preprocessing/standard_scaler.py | 10 +- snowflake/ml/version.bzl | 2 +- .../feature_store_large_scale_test.py | 2 +- tests/integ/snowflake/ml/fileset/BUILD.bazel | 2 +- .../model/input_validation_integ_test.py | 49 ++++ tests/integ/snowflake/ml/registry/BUILD.bazel | 1 + .../snowflake/ml/registry/model/BUILD.bazel | 1 + .../model/registry_custom_model_test.py | 25 ++ .../model/registry_modeling_model_test.py | 23 +- 72 files changed, 1397 insertions(+), 504 deletions(-) create mode 100644 docs/source/cortex.rst create mode 100644 docs/source/dataset.rst create mode 100644 docs/source/feature_store.rst create mode 100644 snowflake/cortex/_sse_client.py create mode 100644 snowflake/cortex/sse_test.py diff --git a/.gitignore b/.gitignore index b7ecc509..8c44d455 100644 --- a/.gitignore +++ b/.gitignore @@ -394,4 +394,4 @@ user.bazelrc html_coverage_report/ # VSCode configuration -.vscode/ +.vscode diff --git a/CHANGELOG.md b/CHANGELOG.md index bb29f0f0..82a80f16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,22 @@ # Release History -## 1.5.2 +## 1.5.3 + +### Bug Fixes + +- Modeling: Fix an issue causing lineage information to be missing for + `Pipeline`, `GridSearchCV` , `SimpleImputer`, and `RandomizedSearchCV` +- Registry: Fix an issue that leads to incorrect result when using pandas Dataframe with over 100, 000 rows as the input + of `ModelVersion.run` method in Stored Procedure. + +### Behavior Changes + +### New Features + +- Registry: Add support for TIMESTAMP_NTZ model signature data type, allowing timestamp input and output. +- Dataset: Add `DatasetVersion.label_cols` and `DatasetVersion.exclude_cols` properties. + +## 1.5.2 (06-10-2024) ### Bug Fixes @@ -12,7 +28,7 @@ ### New Features -## 1.5.1 +## 1.5.1 (05-22-2024) ### Bug Fixes @@ -37,7 +53,7 @@ permissions to operate on schema. Please call `import snowflake.ml.modeling.parameters.enable_anonymous_sproc # noqa: F401` -## 1.5.0 +## 1.5.0 (05-01-2024) ### Bug Fixes diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 71a2d770..990f4705 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -139,7 +139,7 @@ bazel build --config=typecheck Or you could run ```sh -./ci/type_check.sh -b +./ci/type_check/type_check.sh -b ``` You only need to specify `-b ` if your `bazel` is not in `$PATH` or is an alias. diff --git a/bazel/environments/conda-env-snowflake.yml b/bazel/environments/conda-env-snowflake.yml index a3deb00d..bb54209e 100644 --- a/bazel/environments/conda-env-snowflake.yml +++ b/bazel/environments/conda-env-snowflake.yml @@ -50,8 +50,8 @@ dependencies: - sentence-transformers==2.2.2 - sentencepiece==0.1.99 - shap==0.42.1 - - snowflake-connector-python==3.5.0 - - snowflake-snowpark-python==1.11.1 + - snowflake-connector-python==3.10.0 + - snowflake-snowpark-python==1.15.0 - sphinx==5.0.2 - sqlparse==0.4.4 - tensorflow==2.12.0 diff --git a/bazel/environments/conda-env.yml b/bazel/environments/conda-env.yml index 05d1aba0..f9e69333 100644 --- a/bazel/environments/conda-env.yml +++ b/bazel/environments/conda-env.yml @@ -55,8 +55,8 @@ dependencies: - sentence-transformers==2.2.2 - sentencepiece==0.1.99 - shap==0.42.1 - - snowflake-connector-python==3.5.0 - - snowflake-snowpark-python==1.11.1 + - snowflake-connector-python==3.10.0 + - snowflake-snowpark-python==1.15.0 - sphinx==5.0.2 - sqlparse==0.4.4 - tensorflow==2.12.0 diff --git a/bazel/environments/conda-gpu-env.yml b/bazel/environments/conda-gpu-env.yml index 36745bb3..43273b6a 100755 --- a/bazel/environments/conda-gpu-env.yml +++ b/bazel/environments/conda-gpu-env.yml @@ -57,8 +57,8 @@ dependencies: - sentence-transformers==2.2.2 - sentencepiece==0.1.99 - shap==0.42.1 - - snowflake-connector-python==3.5.0 - - snowflake-snowpark-python==1.11.1 + - snowflake-connector-python==3.10.0 + - snowflake-snowpark-python==1.15.0 - sphinx==5.0.2 - sqlparse==0.4.4 - tensorflow==2.12.0 diff --git a/ci/conda_recipe/meta.yaml b/ci/conda_recipe/meta.yaml index bd2ce910..89d25442 100644 --- a/ci/conda_recipe/meta.yaml +++ b/ci/conda_recipe/meta.yaml @@ -17,7 +17,7 @@ build: noarch: python package: name: snowflake-ml-python - version: 1.5.2 + version: 1.5.3 requirements: build: - python @@ -42,7 +42,7 @@ requirements: - scikit-learn>=1.2.1,<1.4 - scipy>=1.9,<2 - snowflake-connector-python>=3.5.0,<4 - - snowflake-snowpark-python>=1.11.1,<2,!=1.12.0 + - snowflake-snowpark-python>=1.15.0,<2 - sqlparse>=0.4,<1 - typing-extensions>=4.1.0,<5 - xgboost>=1.7.3,<2 diff --git a/codegen/sklearn_wrapper_template.py_template b/codegen/sklearn_wrapper_template.py_template index f48ae884..acbd80eb 100644 --- a/codegen/sklearn_wrapper_template.py_template +++ b/codegen/sklearn_wrapper_template.py_template @@ -142,7 +142,7 @@ class {transform.original_class_name}(BaseTransformer): inspect.currentframe(), {transform.original_class_name}.__class__.__name__ ), api_calls=[Session.call], - custom_tags=dict([("autogen", True)]) if self._autogenerated else None, + custom_tags={{"autogen": True}} if self._autogenerated else None, ) pd_df: pd.DataFrame = dataset.to_pandas(statement_params=statement_params) pd_df.columns = dataset.columns diff --git a/docs/source/cortex.rst b/docs/source/cortex.rst new file mode 100644 index 00000000..47e2f91a --- /dev/null +++ b/docs/source/cortex.rst @@ -0,0 +1,23 @@ +=================== +snowflake.ml.cortex +=================== + +.. automodule:: snowflake.cortex + :noindex: + +.. currentmodule:: snowflake.cortex + +.. rubric:: Classes + +.. autosummary:: + :toctree: api/model + + Complete + ExtractAnswer + Sentiment + Summarize + Translate + +.. .. rubric:: Attributes + +.. None diff --git a/docs/source/dataset.rst b/docs/source/dataset.rst new file mode 100644 index 00000000..3697d91e --- /dev/null +++ b/docs/source/dataset.rst @@ -0,0 +1,27 @@ +:orphan: + +=========================== +snowflake.ml.dataset +=========================== + +.. automodule:: snowflake.ml.dataset + :noindex: + +.. currentmodule:: snowflake.ml.dataset + +.. rubric:: Classes + +.. autosummary:: + :toctree: api/dataset + + Dataset + DatasetReader + DatasetVersion + +.. rubric:: Functions + +.. autosummary:: + :toctree: api/dataset + + create_from_dataframe + load_dataset diff --git a/docs/source/feature_store.rst b/docs/source/feature_store.rst new file mode 100644 index 00000000..40f8220f --- /dev/null +++ b/docs/source/feature_store.rst @@ -0,0 +1,26 @@ +:orphan: + +=========================== +snowflake.ml.feature_store +=========================== + +.. automodule:: snowflake.ml.feature_store + :noindex: + +.. currentmodule:: snowflake.ml.feature_store + +.. rubric:: Classes + +.. autosummary:: + :toctree: api/feature_store + + Entity + FeatureStore + FeatureView + +.. rubric:: Functions + +.. autosummary:: + :toctree: api/feature_store + + setup_feature_store diff --git a/docs/source/index.rst b/docs/source/index.rst index 5477edb9..dccd7d44 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -15,6 +15,9 @@ Portions of the Snowpark ML API Reference are derived from | **scikit-learn** Copyright © 2007-2023 The scikit-learn developers. All rights reserved. | **xgboost** Copyright © 2019 by xgboost contributors. | **lightgbm** Copyright © Microsoft Corporation. +| + +---- Table of Contents ================= @@ -22,7 +25,10 @@ Table of Contents .. toctree:: :maxdepth: 3 - modeling + cortex + dataset + feature_store fileset model + modeling registry diff --git a/docs/source/model.rst b/docs/source/model.rst index 52daf110..bb0b5e8a 100644 --- a/docs/source/model.rst +++ b/docs/source/model.rst @@ -1,13 +1,10 @@ -=========================== +================== snowflake.ml.model -=========================== +================== .. automodule:: snowflake.ml.model :noindex: -snowflake.ml.model ---------------------------------- - .. currentmodule:: snowflake.ml.model .. rubric:: Classes diff --git a/docs/source/modeling.rst b/docs/source/modeling.rst index 3884591a..6e0cdf73 100644 --- a/docs/source/modeling.rst +++ b/docs/source/modeling.rst @@ -384,11 +384,11 @@ snowflake.ml.modeling.naive_bayes .. autosummary:: :toctree: api/modeling - BernoulliNB - CategoricalNB - ComplementNB - GaussianNB - MultinomialNB + BernoulliNB + CategoricalNB + ComplementNB + GaussianNB + MultinomialNB snowflake.ml.modeling.neighbors @@ -401,15 +401,15 @@ snowflake.ml.modeling.neighbors .. autosummary:: :toctree: api/modeling - KernelDensity - KNeighborsClassifier - KNeighborsRegressor - LocalOutlierFactor - NearestCentroid - NearestNeighbors - NeighborhoodComponentsAnalysis - RadiusNeighborsClassifier - RadiusNeighborsRegressor + KernelDensity + KNeighborsClassifier + KNeighborsRegressor + LocalOutlierFactor + NearestCentroid + NearestNeighbors + NeighborhoodComponentsAnalysis + RadiusNeighborsClassifier + RadiusNeighborsRegressor snowflake.ml.modeling.neural_network @@ -422,9 +422,9 @@ snowflake.ml.modeling.neural_network .. autosummary:: :toctree: api/modeling - BernoulliRBM - MLPClassifier - MLPRegressor + BernoulliRBM + MLPClassifier + MLPRegressor snowflake.ml.modeling.pipeline ------------------------------------------- @@ -436,7 +436,7 @@ snowflake.ml.modeling.pipeline .. autosummary:: :toctree: api/modeling - Pipeline + Pipeline snowflake.ml.modeling.preprocessing ----------------------------------------------- @@ -469,8 +469,8 @@ snowflake.ml.modeling.semi_supervised .. autosummary:: :toctree: api/modeling - LabelPropagation - LabelSpreading + LabelPropagation + LabelSpreading snowflake.ml.modeling.svm @@ -483,12 +483,12 @@ snowflake.ml.modeling.svm .. autosummary:: :toctree: api/modeling - LinearSVC - LinearSVR - NuSVC - NuSVR - SVC - SVR + LinearSVC + LinearSVR + NuSVC + NuSVR + SVC + SVR @@ -502,10 +502,10 @@ snowflake.ml.modeling.tree .. autosummary:: :toctree: api/modeling - DecisionTreeClassifier - DecisionTreeRegressor - ExtraTreeClassifier - ExtraTreeRegressor + DecisionTreeClassifier + DecisionTreeRegressor + ExtraTreeClassifier + ExtraTreeRegressor snowflake.ml.modeling.xgboost @@ -518,7 +518,7 @@ snowflake.ml.modeling.xgboost .. autosummary:: :toctree: api/modeling - XGBClassifier - XGBRegressor - XGBRFClassifier - XGBRFRegressor + XGBClassifier + XGBRegressor + XGBRFClassifier + XGBRFRegressor diff --git a/requirements.txt b/requirements.txt index 56c9743a..083b819b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -45,8 +45,8 @@ scipy==1.9.3 sentence-transformers==2.2.2 sentencepiece==0.1.99 shap==0.42.1 -snowflake-connector-python[pandas]==3.5.0 -snowflake-snowpark-python==1.11.1 +snowflake-connector-python[pandas]==3.10.0 +snowflake-snowpark-python==1.15.0 sphinx==5.0.2 sqlparse==0.4.4 starlette==0.27.0 diff --git a/requirements.yml b/requirements.yml index fbbd666d..a3e01649 100644 --- a/requirements.yml +++ b/requirements.yml @@ -252,11 +252,11 @@ - transformers - name_conda: snowflake-connector-python name_pypi: snowflake-connector-python[pandas] - dev_version: 3.5.0 + dev_version: 3.10.0 version_requirements: '>=3.5.0,<4' - name: snowflake-snowpark-python - dev_version: 1.11.1 - version_requirements: '>=1.11.1,<2,!=1.12.0' + dev_version: 1.15.0 + version_requirements: '>=1.15.0,<2' tags: - deployment_core - snowml_inference_alternative diff --git a/snowflake/cortex/BUILD.bazel b/snowflake/cortex/BUILD.bazel index 310c4bb6..a3484cb0 100644 --- a/snowflake/cortex/BUILD.bazel +++ b/snowflake/cortex/BUILD.bazel @@ -16,6 +16,7 @@ package(default_visibility = [ py_library( name = "util", srcs = ["_util.py"], + deps = [":sse_client"], ) py_library( @@ -23,6 +24,11 @@ py_library( srcs = ["_test_util.py"], ) +py_library( + name = "sse_client", + srcs = ["_sse_client.py"], +) + py_library( name = "complete", srcs = ["_complete.py"], @@ -99,6 +105,16 @@ py_test( ], ) +py_test( + name = "sse_test", + srcs = ["sse_test.py"], + deps = [ + ":complete_test", + ":sse_client", + ":util", + ], +) + py_library( name = "translate", srcs = ["_translate.py"], diff --git a/snowflake/cortex/_complete.py b/snowflake/cortex/_complete.py index 9cec5d36..d641c6dd 100644 --- a/snowflake/cortex/_complete.py +++ b/snowflake/cortex/_complete.py @@ -1,7 +1,12 @@ -from typing import Optional, Union +from typing import Iterator, Optional, Union from snowflake import snowpark -from snowflake.cortex._util import CORTEX_FUNCTIONS_TELEMETRY_PROJECT, call_sql_function +from snowflake.cortex._util import ( + CORTEX_FUNCTIONS_TELEMETRY_PROJECT, + call_rest_function, + call_sql_function, + process_rest_response, +) from snowflake.ml._internal import telemetry @@ -10,19 +15,35 @@ project=CORTEX_FUNCTIONS_TELEMETRY_PROJECT, ) def Complete( - model: Union[str, snowpark.Column], prompt: Union[str, snowpark.Column], session: Optional[snowpark.Session] = None -) -> Union[str, snowpark.Column]: + model: Union[str, snowpark.Column], + prompt: Union[str, snowpark.Column], + session: Optional[snowpark.Session] = None, + use_rest_api_experimental: bool = False, + stream: bool = False, +) -> Union[str, Iterator[str], snowpark.Column]: """Complete calls into the LLM inference service to perform completion. Args: model: A Column of strings representing model types. prompt: A Column of prompts to send to the LLM. session: The snowpark session to use. Will be inferred by context if not specified. + use_rest_api_experimental (bool): Toggles between the use of SQL and REST implementation. This feature is + experimental and can be removed at any time. + stream (bool): Enables streaming. When enabled, a generator function is returned that provides the streaming + output as it is received. Each update is a string containing the new text content since the previous update. + The use of streaming requires the experimental use_rest_api_experimental flag to be enabled. + + Raises: + ValueError: If `stream` is set to True and `use_rest_api_experimental` is set to False. Returns: A column of string responses. """ - + if stream is True and use_rest_api_experimental is False: + raise ValueError("If stream is set to True use_rest_api_experimental must also be set to True") + if use_rest_api_experimental: + response = call_rest_function("complete", model, prompt, session=session, stream=stream) + return process_rest_response(response) return _complete_impl("snowflake.cortex.complete", model, prompt, session=session) diff --git a/snowflake/cortex/_sse_client.py b/snowflake/cortex/_sse_client.py new file mode 100644 index 00000000..a6fcf202 --- /dev/null +++ b/snowflake/cortex/_sse_client.py @@ -0,0 +1,81 @@ +from typing import Iterator, cast + +import requests + + +class Event: + def __init__(self, event: str = "message", data: str = "") -> None: + self.event = event + self.data = data + + def __str__(self) -> str: + s = f"{self.event} event" + if self.data: + s += f", {len(self.data)} bytes" + else: + s += ", no data" + return s + + +class SSEClient: + def __init__(self, response: requests.Response) -> None: + + self.response = response + + def _read(self) -> Iterator[str]: + + lines = b"" + for chunk in self.response: + for line in chunk.splitlines(True): + lines += line + if lines.endswith((b"\r\r", b"\n\n", b"\r\n\r\n")): + yield cast(str, lines) + lines = b"" + if lines: + yield cast(str, lines) + + def events(self) -> Iterator[Event]: + for raw_event in self._read(): + event = Event() + # splitlines() only uses \r and \n + for line in raw_event.splitlines(): + + line = cast(bytes, line).decode("utf-8") + + data = line.split(":", 1) + field = data[0] + + if len(data) > 1: + # "If value starts with a single U+0020 SPACE character, + # remove it from value. .strip() would remove all white spaces" + if data[1].startswith(" "): + value = data[1][1:] + else: + value = data[1] + else: + value = "" + + # The data field may come over multiple lines and their values + # are concatenated with each other. + if field == "data": + event.data += value + "\n" + elif field == "event": + event.event = value + + if not event.data: + continue + + # If the data field ends with a newline, remove it. + if event.data.endswith("\n"): + event.data = event.data[0:-1] # Replace trailing newline - rstrip would remove multiple. + + # Empty event names default to 'message' + event.event = event.event or "message" + + if event.event != "message": # ignore anything but “message” or default event + continue + + yield event + + def close(self) -> None: + self.response.close() diff --git a/snowflake/cortex/_util.py b/snowflake/cortex/_util.py index 7878e51e..f9d3810b 100644 --- a/snowflake/cortex/_util.py +++ b/snowflake/cortex/_util.py @@ -1,15 +1,34 @@ -from typing import Optional, Union, cast +import json +from typing import Iterator, Optional, Union, cast +from urllib.parse import urljoin, urlparse + +import requests from snowflake import snowpark +from snowflake.cortex._sse_client import SSEClient from snowflake.snowpark import context, functions CORTEX_FUNCTIONS_TELEMETRY_PROJECT = "CortexFunctions" +class SSEParseException(Exception): + """This exception is raised when an invalid server sent event is received from the server.""" + + pass + + +class SnowflakeAuthenticationException(Exception): + """This exception is raised when the session object does not have session.connection.rest.token attribute.""" + + pass + + # Calls a sql function, handling both immediate (e.g. python types) and batch # (e.g. snowpark column and literal type modes). def call_sql_function( - function: str, session: Optional[snowpark.Session], *args: Union[str, snowpark.Column] + function: str, + session: Optional[snowpark.Session], + *args: Union[str, snowpark.Column], ) -> Union[str, snowpark.Column]: handle_as_column = False for arg in args: @@ -17,21 +36,29 @@ def call_sql_function( handle_as_column = True if handle_as_column: - return cast(Union[str, snowpark.Column], call_sql_function_column(function, *args)) - return cast(Union[str, snowpark.Column], call_sql_function_immediate(function, session, *args)) + return cast(Union[str, snowpark.Column], _call_sql_function_column(function, *args)) + return cast( + Union[str, snowpark.Column], + _call_sql_function_immediate(function, session, *args), + ) -def call_sql_function_column(function: str, *args: Union[str, snowpark.Column]) -> snowpark.Column: +def _call_sql_function_column(function: str, *args: Union[str, snowpark.Column]) -> snowpark.Column: return cast(snowpark.Column, functions.builtin(function)(*args)) -def call_sql_function_immediate( - function: str, session: Optional[snowpark.Session], *args: Union[str, snowpark.Column] +def _call_sql_function_immediate( + function: str, + session: Optional[snowpark.Session], + *args: Union[str, snowpark.Column], ) -> str: if session is None: session = context.get_active_session() if session is None: - raise Exception("No session available in the current context nor specified as an argument.") + raise SnowflakeAuthenticationException( + """Session required. Provide the session through a session=... argument or ensure an active session is + available in your environment.""" + ) lit_args = [] for arg in args: @@ -40,3 +67,73 @@ def call_sql_function_immediate( empty_df = session.create_dataframe([snowpark.Row()]) df = empty_df.select(functions.builtin(function)(*lit_args)) return cast(str, df.collect()[0][0]) + + +def call_rest_function( + function: str, + model: Union[str, snowpark.Column], + prompt: Union[str, snowpark.Column], + session: Optional[snowpark.Session] = None, + stream: bool = False, +) -> requests.Response: + if session is None: + session = context.get_active_session() + if session is None: + raise SnowflakeAuthenticationException( + """Session required. Provide the session through a session=... argument or ensure an active session is + available in your environment.""" + ) + + if not hasattr(session.connection.rest, "token"): + raise SnowflakeAuthenticationException("Snowflake session error: REST token missing.") + + if session.connection.rest.token is None or session.connection.rest.token == "": # type: ignore[union-attr] + raise SnowflakeAuthenticationException("Snowflake session error: REST token is empty.") + + url = urljoin(session.connection.host, f"api/v2/cortex/inference/{function}") + if urlparse(url).scheme == "": + url = "https://" + url + headers = { + "Content-Type": "application/json", + "Authorization": f'Snowflake Token="{session.connection.rest.token}"', # type: ignore[union-attr] + "Accept": "application/json, text/event-stream", + } + + data = { + "model": model, + "messages": [{"content": prompt}], + "stream": stream, + } + + response = requests.post( + url, + json=data, + headers=headers, + stream=stream, + ) + response.raise_for_status() + return response + + +def process_rest_response(response: requests.Response, stream: bool = False) -> Union[str, Iterator[str]]: + if not stream: + try: + message = response.json()["choices"][0]["message"] + output = str(message.get("content", "")) + return output + except (KeyError, IndexError) as e: + raise SSEParseException("Failed to parse streamed response.") from e + else: + return _return_gen(response) + + +def _return_gen(response: requests.Response) -> Iterator[str]: + client = SSEClient(response) + for event in client.events(): + response_loaded = json.loads(event.data) + try: + delta = response_loaded["choices"][0]["delta"] + output = str(delta.get("content", "")) + yield output + except (KeyError, IndexError) as e: + raise SSEParseException("Failed to parse streamed response.") from e diff --git a/snowflake/cortex/complete_test.py b/snowflake/cortex/complete_test.py index 4bf21d7a..262501b1 100644 --- a/snowflake/cortex/complete_test.py +++ b/snowflake/cortex/complete_test.py @@ -1,11 +1,53 @@ +import http.server +import json +import threading +import unittest +from dataclasses import dataclass +from io import BytesIO +from types import GeneratorType +from typing import Iterator, Union, cast + import _test_util +import requests from absl.testing import absltest from snowflake import snowpark from snowflake.cortex import _complete +from snowflake.cortex._util import process_rest_response from snowflake.snowpark import functions, types +@dataclass +class FakeToken: + token: str = "abc" + + +@dataclass +class FakeConnParams: + rest: FakeToken + host: str + + +@dataclass +class FakeSession: + connection: FakeConnParams + + +class FakeResponse: # needed for testing, imitates some of requests.Response behaviors + def __init__(self, content: bytes) -> None: + self.content = BytesIO(content) + + def iter_content(self, chunk_size: int = 1) -> Iterator[bytes]: + while True: + chunk = self.content.read(chunk_size) + if not chunk: + break + yield chunk + + def __iter__(self) -> Iterator[bytes]: + return self.iter_content() + + class CompleteTest(absltest.TestCase): model = "|model|" prompt = "|prompt|" @@ -39,5 +81,98 @@ def test_complete_column(self) -> None: self.assertEqual(self.complete_for_test(self.model, self.prompt), res) +class MockIpifyHTTPRequestHandler(http.server.BaseHTTPRequestHandler): + """HTTPServer mock request handler""" + + def do_POST(self) -> None: + token: str = cast(str, self.headers.get("Authorization")) + if "Snowflake Token" not in token: + self.send_response(401) + self.end_headers() + return + assert self.path == "/api/v2/cortex/inference/complete" + content_length = int(cast(int, self.headers.get("Content-Length"))) + + post_data = self.rfile.read(content_length).decode("utf-8") + params = json.loads(post_data) + stream = params["stream"] + + if stream: + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.end_headers() + + # Simulate streaming by sending the response in chunks + data_out = "This is a streaming response" + chunk_size = 4 + for i in range(0, len(data_out), chunk_size): + response_line = ( + f"data: {json.dumps( {'choices': [{'delta': {'content': data_out[i:i + chunk_size]}}]})}\n\n" + ) + self.wfile.write(response_line.encode("utf-8")) + self.wfile.flush() + return + + response_json = {"choices": [{"message": {"content": "This is a non streaming response"}}]} + + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(response_json).encode("utf-8")) + + +class UnitTests(unittest.TestCase): + def setUp(self) -> None: + self.server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), MockIpifyHTTPRequestHandler) + self.server_thread = threading.Thread(target=self.server.serve_forever) + self.server_thread.daemon = True + self.server_thread.start() + + def tearDown(self) -> None: + self.server.shutdown() + self.server_thread.join() + + def send_request(self, stream: bool = False) -> Union[str, Iterator[str]]: + faketoken = FakeToken() + fakeconnectionparameters = FakeConnParams( + host=f"http://127.0.0.1:{self.server.server_address[1]}/", rest=faketoken + ) + session = FakeSession(fakeconnectionparameters) + response = _complete.call_rest_function( # type: ignore[attr-defined] + function="complete", + model="my_models", + prompt="test_prompt", + session=cast(snowpark.Session, session), + stream=stream, + ) + return process_rest_response(response, stream=stream) + + def test_non_streaming(self) -> None: + result = self.send_request(stream=False) + self.assertEqual("This is a non streaming response", result) + + def test_wrong_token(self) -> None: + headers = {"Authorization": "Wrong Token=123"} + data = {"stream": "hh"} + + # Send the POST request + response = requests.post( + f"http://127.0.0.1:{self.server.server_address[1]}/api/v2/cortex/inference/complete", + headers=headers, + json=data, + ) + self.assertEqual(response.status_code, 401) + + def test_streaming(self) -> None: + result = self.send_request(stream=True) + + output = "".join(list(result)) + self.assertEqual("This is a streaming response", output) + + def test_streaming_type(self) -> None: + result = self.send_request(stream=True) + self.assertIsInstance(result, GeneratorType) + + if __name__ == "__main__": absltest.main() diff --git a/snowflake/cortex/sse_test.py b/snowflake/cortex/sse_test.py new file mode 100644 index 00000000..b0ecebf4 --- /dev/null +++ b/snowflake/cortex/sse_test.py @@ -0,0 +1,129 @@ +from typing import List, cast + +import requests +from absl.testing import absltest + +from snowflake.cortex._sse_client import SSEClient +from snowflake.cortex.complete_test import FakeResponse + + +def _streaming_messages(response_data: bytes) -> List[str]: + client = SSEClient(cast(requests.Response, FakeResponse(response_data))) + out = [] + for event in client.events(): + out.append(event.data) + return out + + +class SSETest(absltest.TestCase): + def test_empty_response(self) -> None: + # Set up the mock streaming response with no data + response_data = b"" + + result = _streaming_messages(response_data) + + assert result == [] + + def test_empty_response_many_newlines(self) -> None: + # Set up the mock streaming response with no data (in the form of newlines) + response_data = b"\n\n\n\n\n\n" + + result = _streaming_messages(response_data) + + assert result == [] + + def test_whitespace_handling(self) -> None: + # Set up the mock streaming response with leading and trailing whitespace + response_data = b" \n \ndata: Message 1\n\n \n \n \n \n" + + result = _streaming_messages(response_data) + + expected_message = ["Message 1"] + assert expected_message == result + + def test_ignore_anything_but_message_event(self) -> None: + # check that only "data" is considered + + response_data = ( + b"data: data\n\n" + b"event: event\n\n" + b"id: id\n\n" + b"retry: retry\n\n" + b"some_other_message: some_other_message\n\n" + ) + + result = _streaming_messages(response_data) + + expected_message = ["data"] + assert result == expected_message + + def test_colon_cases(self) -> None: + response_data_colon_middle = b"data: choices: middle_colon\n\n" + response_many_colons = b"data: choices: middle_colon: last_colon\n\n" + + result_data_colon_middle = _streaming_messages(response_data_colon_middle) + result_many_colons = _streaming_messages(response_many_colons) + + expected_colon_middle = ["choices: middle_colon"] + expected_many_colons = ["choices: middle_colon: last_colon"] + + assert result_data_colon_middle == expected_colon_middle + assert result_many_colons == expected_many_colons + + def test_line_separator(self) -> None: + # test if data is not combined if it has trailing \n\n + # fmt: off + response_not_combined = ( + b"data: one\n\n" + b"data: two\n" + b"data: three\n\n" + ) + # fmt: on + + result_parsed = _streaming_messages(response_not_combined) + + assert result_parsed == ["one", "two\nthree"] # not combined + + def test_combined_data(self) -> None: + # test if data is combined if it has trailing \n + # fmt: off + response_not_combined = ( + b"data: jeden\n" + b"data: dwa\n\n" + ) + # fmt: on + + result_parsed = _streaming_messages(response_not_combined) + assert result_parsed == ["jeden\ndwa"] # combined due to only one \n + + def test_commented_data(self) -> None: + # test if data is treated as comment if it starts with a : + response_not_combined = b": jeden\n\n" + + result_parsed = _streaming_messages(response_not_combined) + + assert result_parsed == [] # not combined + + def test_ignore_other_event_types(self) -> None: + # test if data is ignored if its event is not message + # fmt: off + response_sth_else = ( + b"data: jeden\n" + b"event: sth_else\n\n" + ) + # fmt: on + + result_parsed = _streaming_messages(response_sth_else) + + assert result_parsed == [] # ignore anything that is not message + + def test_empty_data_json(self) -> None: + response_sth_else = b"data: {}" + + result_parsed = _streaming_messages(response_sth_else) + + assert result_parsed == ["{}"] + + +if __name__ == "__main__": + absltest.main() diff --git a/snowflake/ml/_internal/lineage/lineage_utils.py b/snowflake/ml/_internal/lineage/lineage_utils.py index aa04e9e7..9ec0d331 100644 --- a/snowflake/ml/_internal/lineage/lineage_utils.py +++ b/snowflake/ml/_internal/lineage/lineage_utils.py @@ -1,21 +1,11 @@ import copy import functools -from typing import Any, Callable, List +from typing import Any, Callable, List, Optional from snowflake import snowpark from snowflake.ml._internal.lineage import data_source -DATA_SOURCES_ATTR = "_data_sources" - - -def _get_datasources(*args: Any) -> List[data_source.DataSource]: - """Helper method for extracting data sources attribute from DataFrames in an argument list""" - result = [] - for arg in args: - srcs = getattr(arg, DATA_SOURCES_ATTR, None) - if isinstance(srcs, list) and all(isinstance(s, data_source.DataSource) for s in srcs): - result += srcs - return result +_DATA_SOURCES_ATTR = "_data_sources" def _wrap_func( @@ -32,6 +22,37 @@ def wrapped(*args: Any, **kwargs: Any) -> snowpark.DataFrame: return wrapped +def _wrap_class_func(fn: Callable[..., snowpark.DataFrame]) -> Callable[..., snowpark.DataFrame]: + @functools.wraps(fn) + def wrapped(*args: Any, **kwargs: Any) -> snowpark.DataFrame: + df = fn(*args, **kwargs) + data_sources = get_data_sources(*args, *kwargs.values()) + if data_sources: + patch_dataframe(df, data_sources, inplace=True) + return df + + return wrapped + + +def get_data_sources(*args: Any) -> Optional[List[data_source.DataSource]]: + """Helper method for extracting data sources attribute from DataFrames in an argument list""" + result: Optional[List[data_source.DataSource]] = None + for arg in args: + srcs = getattr(arg, _DATA_SOURCES_ATTR, None) + if isinstance(srcs, list) and all(isinstance(s, data_source.DataSource) for s in srcs): + if result is None: + result = [] + result += srcs + return result + + +def set_data_sources(obj: Any, data_sources: Optional[List[data_source.DataSource]]) -> None: + """Helper method for attaching data sources to an object""" + if data_sources: + assert all(isinstance(ds, data_source.DataSource) for ds in data_sources) + setattr(obj, _DATA_SOURCES_ATTR, data_sources) + + def patch_dataframe( df: snowpark.DataFrame, data_sources: List[data_source.DataSource], inplace: bool = False ) -> snowpark.DataFrame: @@ -62,7 +83,7 @@ def patch_dataframe( ] if not inplace: df = copy.copy(df) - setattr(df, DATA_SOURCES_ATTR, data_sources) + set_data_sources(df, data_sources) for func in funcs: fn = getattr(df, func, None) if fn is not None: @@ -70,18 +91,6 @@ def patch_dataframe( return df -def _wrap_class_func(fn: Callable[..., snowpark.DataFrame]) -> Callable[..., snowpark.DataFrame]: - @functools.wraps(fn) - def wrapped(*args: Any, **kwargs: Any) -> snowpark.DataFrame: - df = fn(*args, **kwargs) - data_sources = _get_datasources(*args) + _get_datasources(*kwargs.values()) - if data_sources: - patch_dataframe(df, data_sources, inplace=True) - return df - - return wrapped - - # Class-level monkey-patches for klass, func_list in { snowpark.DataFrame: [ diff --git a/snowflake/ml/_internal/lineage/lineage_utils_test.py b/snowflake/ml/_internal/lineage/lineage_utils_test.py index 8acb5e80..e9e3c3a3 100644 --- a/snowflake/ml/_internal/lineage/lineage_utils_test.py +++ b/snowflake/ml/_internal/lineage/lineage_utils_test.py @@ -9,6 +9,10 @@ class DatasetDataFrameTest(parameterized.TestCase): + class TestSourcedObject: + def __init__(self, sources: Optional[List[data_source.DataSource]]) -> None: + setattr(self, lineage_utils._DATA_SOURCES_ATTR, sources) + def setUp(self) -> None: connection_parameters = connection_params.SnowflakeLoginOptions() self.session = snowpark.Session.builder.configs(connection_parameters).create() @@ -25,8 +29,52 @@ def setUp(self) -> None: inplace=True, ) + @parameterized.parameters( # type: ignore[misc] + ( + [], + None, + ), + ( + [TestSourcedObject(None)], + None, + ), + ( + [TestSourcedObject(None), TestSourcedObject(None)], + None, + ), + ( + [TestSourcedObject([])], + [], + ), + ( + [TestSourcedObject([data_source.DataSource("foo", "v1", "foo_url")])], + [data_source.DataSource("foo", "v1", "foo_url")], + ), + ( + [ + TestSourcedObject([data_source.DataSource("foo", "v1", "foo_url")]), + TestSourcedObject([data_source.DataSource("foo", "v2", "foo_url")]), + ], + [data_source.DataSource("foo", "v1", "foo_url"), data_source.DataSource("foo", "v2", "foo_url")], + ), + # FIXME: Enable this test case once dedupe support added + # ( + # [ + # TestSourcedObject([data_source.DataSource("foo", "v1", "foo_url")]), + # TestSourcedObject([data_source.DataSource("foo", "v1", "foo_url")]), + # TestSourcedObject([data_source.DataSource("foo", "v2", "foo_url")]), + # ], + # [data_source.DataSource("foo", "v1", "foo_url"), data_source.DataSource("foo", "v2", "foo_url")], + # ), + ) + def test_get_data_sources( + self, args: List[TestSourcedObject], expected: Optional[List[data_source.DataSource]] + ) -> None: + self.assertEqual(expected, lineage_utils.get_data_sources(*args)) + @parameterized.product( # type: ignore[misc] data_sources=[ + None, [], [data_source.DataSource("foo", "v1", "foo_url")], [data_source.DataSource("foo", "v1", "foo_url"), data_source.DataSource("foo", "v1", "foo_url")], @@ -65,6 +113,7 @@ def test_patch_dataframe(self, data_sources: List[data_source.DataSource], inpla ("cache_result",), ("random_split", [0.8, 0.2]), ("join_table_function", F.flatten(F.col("array"))), + ("__copy__"), ) def test_dataframe_func(self, func_name: str, *args: Any, **kwargs: Any) -> None: func = getattr(self.df, func_name) @@ -128,6 +177,7 @@ def test_dataframe_pipeline(self) -> None: ("cache_result",), ("random_split", [0.8, 0.2]), ("join_table_function", F.flatten(F.col("array"))), + ("__copy__"), ) def test_vanilla_dataframe_func(self, func_name: str, *args: Any, **kwargs: Any) -> None: df = self.session.sql( @@ -146,7 +196,7 @@ def test_vanilla_dataframe_func(self, func_name: str, *args: Any, **kwargs: Any) def validate_dataframe(self, df: Any, data_sources: Optional[List[data_source.DataSource]]) -> None: self.assertIsInstance(df, get_args(Union[snowpark.DataFrame, snowpark.RelationalGroupedDataFrame])) - self.assertEqual(data_sources, getattr(df, lineage_utils.DATA_SOURCES_ATTR, None)) + self.assertEqual(data_sources, lineage_utils.get_data_sources(df)) if __name__ == "__main__": diff --git a/snowflake/ml/dataset/dataset.py b/snowflake/ml/dataset/dataset.py index fddee0f6..e96820a2 100644 --- a/snowflake/ml/dataset/dataset.py +++ b/snowflake/ml/dataset/dataset.py @@ -65,6 +65,20 @@ def comment(self) -> Optional[str]: comment: Optional[str] = self._get_property("comment") return comment + @property + def label_cols(self) -> List[str]: + metadata = self._get_metadata() + if metadata is None or metadata.label_cols is None: + return [] + return metadata.label_cols + + @property + def exclude_cols(self) -> List[str]: + metadata = self._get_metadata() + if metadata is None or metadata.exclude_cols is None: + return [] + return metadata.exclude_cols + def _get_property(self, property_name: str, default: Any = None) -> Any: if self._properties is None: sql_result = ( @@ -91,17 +105,6 @@ def _get_metadata(self) -> Optional[dataset_metadata.DatasetMetadata]: warnings.warn(f"Metadata parsing failed with error: {e}", UserWarning, stacklevel=2) return self._metadata - def _get_exclude_cols(self) -> List[str]: - metadata = self._get_metadata() - if metadata is None: - return [] - cols = [] - if metadata.exclude_cols: - cols.extend(metadata.exclude_cols) - if metadata.label_cols: - cols.extend(metadata.label_cols) - return cols - def url(self) -> str: """Returns the URL of the DatasetVersion contents in Snowflake. @@ -168,7 +171,7 @@ def read(self) -> dataset_reader.DatasetReader: fully_qualified_name=self._fully_qualified_name, version=v.name, url=v.url(), - exclude_cols=v._get_exclude_cols(), + exclude_cols=(v.label_cols + v.exclude_cols), ) ], ) diff --git a/snowflake/ml/dataset/dataset_factory.py b/snowflake/ml/dataset/dataset_factory.py index d7b0ecc7..104b378c 100644 --- a/snowflake/ml/dataset/dataset_factory.py +++ b/snowflake/ml/dataset/dataset_factory.py @@ -16,8 +16,7 @@ def create_from_dataframe( **version_kwargs: Any, ) -> dataset.Dataset: """ - Create a new versioned Dataset from a DataFrame and returns - a DatasetReader for the newly created Dataset version. + Create a new versioned Dataset from a DataFrame. Args: session: The Snowpark Session instance to use. @@ -39,7 +38,7 @@ def create_from_dataframe( @telemetry.send_api_usage_telemetry(project=_PROJECT) def load_dataset(session: snowpark.Session, name: str, version: str) -> dataset.Dataset: """ - Load a versioned Dataset into a DatasetReader. + Load a versioned Dataset. Args: session: The Snowpark Session instance to use. @@ -47,7 +46,7 @@ def load_dataset(session: snowpark.Session, name: str, version: str) -> dataset. version: The dataset version name. Returns: - A DatasetReader object. + A Dataset object. """ ds: dataset.Dataset = dataset.Dataset.load(session, name).select_version(version) return ds diff --git a/snowflake/ml/feature_store/_internal/synthetic_data_generator.py b/snowflake/ml/feature_store/_internal/synthetic_data_generator.py index e2133cc0..a8127f2b 100644 --- a/snowflake/ml/feature_store/_internal/synthetic_data_generator.py +++ b/snowflake/ml/feature_store/_internal/synthetic_data_generator.py @@ -92,7 +92,5 @@ def _generate_new_data(self, num_rows: int) -> None: batch.append(row) df = self._session.create_dataframe(batch, self._table_schema) - df.write.mode("append").save_as_table( # type:ignore[call-overload] - [self._database, self._schema, self._source_table], block=True - ) + df.write.mode("append").save_as_table([self._database, self._schema, self._source_table], block=True) logger.info(f"Dumped {num_rows} rows to table {self._source_table}.") diff --git a/snowflake/ml/feature_store/feature_store.py b/snowflake/ml/feature_store/feature_store.py index 8096cc24..7ba15677 100644 --- a/snowflake/ml/feature_store/feature_store.py +++ b/snowflake/ml/feature_store/feature_store.py @@ -920,7 +920,7 @@ def generate_dataset( try: if output_type == "table": table_name = f"{name}_{version}" - result_df.write.mode("errorifexists").save_as_table(table_name) # type: ignore[call-overload] + result_df.write.mode("errorifexists").save_as_table(table_name) ds_df = self._session.table(table_name) return ds_df else: @@ -1762,7 +1762,7 @@ def _tag_ref_internal_enabled(self) -> bool: f""" SELECT * FROM TABLE( {self._config.database}.INFORMATION_SCHEMA.TAG_REFERENCES_INTERNAL( - TAG_NAME => '{_FEATURE_STORE_OBJECT_TAG}' + TAG_NAME => '{self._get_fully_qualified_name(_FEATURE_STORE_OBJECT_TAG)}' ) ) LIMIT 1; """ diff --git a/snowflake/ml/feature_store/notebooks/customer_demo/README.md b/snowflake/ml/feature_store/notebooks/customer_demo/README.md index 1fafd339..ea3d6194 100644 --- a/snowflake/ml/feature_store/notebooks/customer_demo/README.md +++ b/snowflake/ml/feature_store/notebooks/customer_demo/README.md @@ -1,7 +1,24 @@ -# Instructions to generate PDF +# Feature Store demo notebooks -Run below command to generate PDF from ipynb. +Here we have 3 example notebooks that demonstrate Feature Store use cases in different scenarios. -```bash -jupyter nbconvert --to webpdf --allow-chromium-download -``` +## Basic Feature Demo + +This example demonstrates creating and managing **non time-series features** in Feature Store and using the features in +training and inference. It also demonstrates interoperation with Model Registry. You can find the example in +[Basic_Feature_Demo.ipynb](https://github.com/snowflakedb/snowflake-ml-python/blob/main/snowflake/ml/feature_store/notebooks/customer_demo/Basic_Feature_Demo.ipynb) +and [Basic_Feature_Demo.pdf](https://github.com/snowflakedb/snowflake-ml-python/blob/main/snowflake/ml/feature_store/notebooks/customer_demo/Basic_Feature_Demo.pdf). + +## Time Series Feature Demo + +This example demonstrates creating and managing **time-series features** in Feature Store and using the features in +training and inference. It also demonstrates interoperation with Model Registry. You can find the example in +[Time_Series_Feature_Demo.ipynb](https://github.com/snowflakedb/snowflake-ml-python/blob/main/snowflake/ml/feature_store/notebooks/customer_demo/DBT_External_Feature_Pipeline_Demo.ipynb) +and [Time_Series_Feature_Demo.pdf](https://github.com/snowflakedb/snowflake-ml-python/blob/main/snowflake/ml/feature_store/notebooks/customer_demo/Time_Series_Feature_Demo.pdf). + +## DBT External Feature Pipeline Demo + +This example demonstrates how to register [DBT models](https://docs.getdbt.com/docs/build/models) as external Feature +Views in Feature Store. You need to have a DBT account to run this demo. You can find the example in +[DBT_External_Feature_Pipeline_Demo.ipynb](https://github.com/snowflakedb/snowflake-ml-python/blob/main/snowflake/ml/feature_store/notebooks/customer_demo/DBT_External_Feature_Pipeline_Demo.ipynb) +and [DBT_External_Feature_Pipeline_Demo.pdf](https://github.com/snowflakedb/snowflake-ml-python/blob/main/snowflake/ml/feature_store/notebooks/customer_demo/DBT_External_Feature_Pipeline_Demo.pdf). diff --git a/snowflake/ml/model/_client/sql/model_version.py b/snowflake/ml/model/_client/sql/model_version.py index 49a38558..26d0ffbc 100644 --- a/snowflake/ml/model/_client/sql/model_version.py +++ b/snowflake/ml/model/_client/sql/model_version.py @@ -272,7 +272,7 @@ def invoke_function_method( actual_schema_name.identifier(), tmp_table_name, ) - input_df.write.save_as_table( # type: ignore[call-overload] + input_df.write.save_as_table( table_name=INTERMEDIATE_TABLE_NAME, mode="errorifexists", table_type="temporary", @@ -348,7 +348,7 @@ def invoke_table_function_method( actual_schema_name.identifier(), tmp_table_name, ) - input_df.write.save_as_table( # type: ignore[call-overload] + input_df.write.save_as_table( table_name=INTERMEDIATE_TABLE_NAME, mode="errorifexists", table_type="temporary", diff --git a/snowflake/ml/model/_model_composer/model_composer.py b/snowflake/ml/model/_model_composer/model_composer.py index afd9ed7a..8101cd0f 100644 --- a/snowflake/ml/model/_model_composer/model_composer.py +++ b/snowflake/ml/model/_model_composer/model_composer.py @@ -182,9 +182,9 @@ def load( def _get_data_sources( self, model: model_types.SupportedModelType, sample_input_data: Optional[model_types.SupportedDataType] = None ) -> Optional[List[data_source.DataSource]]: - data_sources = getattr(model, lineage_utils.DATA_SOURCES_ATTR, None) + data_sources = lineage_utils.get_data_sources(model) if not data_sources and sample_input_data is not None: - data_sources = getattr(sample_input_data, lineage_utils.DATA_SOURCES_ATTR, None) + data_sources = lineage_utils.get_data_sources(sample_input_data) if isinstance(data_sources, list) and all(isinstance(item, data_source.DataSource) for item in data_sources): return data_sources return None diff --git a/snowflake/ml/model/_model_composer/model_method/fixtures/function_3.py b/snowflake/ml/model/_model_composer/model_method/fixtures/function_3.py index fc42abd4..e05a2dbd 100644 --- a/snowflake/ml/model/_model_composer/model_method/fixtures/function_3.py +++ b/snowflake/ml/model/_model_composer/model_method/fixtures/function_3.py @@ -74,4 +74,6 @@ def __exit__( class infer: @vectorized(input=pd.DataFrame) def end_partition(self, df: pd.DataFrame) -> pd.DataFrame: - return runner(df) + df.columns = input_cols + input_df = df.astype(dtype=dtype_map) + return runner(input_df[input_cols]) diff --git a/snowflake/ml/model/_model_composer/model_method/infer_table_function.py_template b/snowflake/ml/model/_model_composer/model_method/infer_table_function.py_template index 94a13fcd..181e94ff 100644 --- a/snowflake/ml/model/_model_composer/model_method/infer_table_function.py_template +++ b/snowflake/ml/model/_model_composer/model_method/infer_table_function.py_template @@ -74,4 +74,6 @@ dtype_map = {{feature.name: feature.as_dtype() for feature in features}} class {function_name}: @vectorized(input=pd.DataFrame) def end_partition(self, df: pd.DataFrame) -> pd.DataFrame: - return runner(df) + df.columns = input_cols + input_df = df.astype(dtype=dtype_map) + return runner(input_df[input_cols]) diff --git a/snowflake/ml/model/_signatures/builtins_handler.py b/snowflake/ml/model/_signatures/builtins_handler.py index d56cd503..75c7b655 100644 --- a/snowflake/ml/model/_signatures/builtins_handler.py +++ b/snowflake/ml/model/_signatures/builtins_handler.py @@ -1,3 +1,4 @@ +import datetime from collections import abc from typing import Literal, Sequence @@ -24,7 +25,7 @@ def can_handle(data: model_types.SupportedDataType) -> TypeGuard[model_types._Su # String is a Sequence but we take them as an whole if isinstance(element, abc.Sequence) and not isinstance(element, str): can_handle = ListOfBuiltinHandler.can_handle(element) - elif not isinstance(element, (int, float, bool, str)): + elif not isinstance(element, (int, float, bool, str, datetime.datetime)): can_handle = False break return can_handle diff --git a/snowflake/ml/model/_signatures/core.py b/snowflake/ml/model/_signatures/core.py index a245b181..5c4663b4 100644 --- a/snowflake/ml/model/_signatures/core.py +++ b/snowflake/ml/model/_signatures/core.py @@ -53,6 +53,8 @@ def __init__(self, value: str, snowpark_type: Type[spt.DataType], numpy_type: np STRING = ("string", spt.StringType, np.str_) BYTES = ("bytes", spt.BinaryType, np.bytes_) + TIMESTAMP_NTZ = ("datetime64[ns]", spt.TimestampType, "datetime64[ns]") + def as_snowpark_type(self) -> spt.DataType: """Convert to corresponding Snowpark Type. @@ -78,6 +80,13 @@ def from_numpy_type(cls, np_type: npt.DTypeLike) -> "DataType": Corresponding DataType. """ np_to_snowml_type_mapping = {i._numpy_type: i for i in DataType} + + # Add datetime types: + datetime_res = ["Y", "M", "W", "D", "h", "m", "s", "ms", "us", "ns"] + + for res in datetime_res: + np_to_snowml_type_mapping[f"datetime64[{res}]"] = DataType.TIMESTAMP_NTZ + for potential_type in np_to_snowml_type_mapping.keys(): if np.can_cast(np_type, potential_type, casting="no"): # This is used since the same dtype might represented in different ways. @@ -247,9 +256,12 @@ def as_snowpark_type(self) -> spt.DataType: result_type = spt.ArrayType(result_type) return result_type - def as_dtype(self) -> npt.DTypeLike: + def as_dtype(self) -> Union[npt.DTypeLike, str]: """Convert to corresponding local Type.""" if not self._shape: + # scalar dtype: use keys from `np.sctypeDict` to prevent unit-less dtype 'datetime64' + if "datetime64" in self._dtype._value: + return self._dtype._value return self._dtype._numpy_type return np.object_ diff --git a/snowflake/ml/model/_signatures/pandas_handler.py b/snowflake/ml/model/_signatures/pandas_handler.py index 9ce26d5b..6041bf12 100644 --- a/snowflake/ml/model/_signatures/pandas_handler.py +++ b/snowflake/ml/model/_signatures/pandas_handler.py @@ -147,6 +147,8 @@ def infer_signature(data: pd.DataFrame, role: Literal["input", "output"]) -> Seq specs.append(core.FeatureSpec(dtype=core.DataType.STRING, name=ft_name)) elif isinstance(data[df_col].iloc[0], bytes): specs.append(core.FeatureSpec(dtype=core.DataType.BYTES, name=ft_name)) + elif isinstance(data[df_col].iloc[0], np.datetime64): + specs.append(core.FeatureSpec(dtype=core.DataType.TIMESTAMP_NTZ, name=ft_name)) else: specs.append(core.FeatureSpec(dtype=core.DataType.from_numpy_type(df_col_dtype), name=ft_name)) return specs diff --git a/snowflake/ml/model/_signatures/snowpark_handler.py b/snowflake/ml/model/_signatures/snowpark_handler.py index fc05dd7b..500644ea 100644 --- a/snowflake/ml/model/_signatures/snowpark_handler.py +++ b/snowflake/ml/model/_signatures/snowpark_handler.py @@ -107,6 +107,9 @@ def convert_from_df( if not features: features = pandas_handler.PandasDataFrameHandler.infer_signature(df, role="input") # Role will be no effect on the column index. That is to say, the feature name is the actual column name. + if keep_order: + df = df.reset_index(drop=True) + df[infer_template._KEEP_ORDER_COL_NAME] = df.index sp_df = session.create_dataframe(df) column_names = [] columns = [] @@ -122,7 +125,4 @@ def convert_from_df( sp_df = sp_df.with_columns(column_names, columns) - if keep_order: - sp_df = sp_df.with_column(infer_template._KEEP_ORDER_COL_NAME, F.monotonically_increasing_id()) - return sp_df diff --git a/snowflake/ml/model/_signatures/snowpark_test.py b/snowflake/ml/model/_signatures/snowpark_test.py index 4d2779a7..28b29a99 100644 --- a/snowflake/ml/model/_signatures/snowpark_test.py +++ b/snowflake/ml/model/_signatures/snowpark_test.py @@ -1,3 +1,4 @@ +import datetime import decimal import numpy as np @@ -341,6 +342,21 @@ def test_convert_to_and_from_df(self) -> None: pd_df, snowpark_handler.SnowparkDataFrameHandler.convert_to_df(sp_df), check_dtype=False ) + d = datetime.datetime(year=2024, month=6, day=21, hour=1, minute=1, second=1) + pd_df = pd.DataFrame([[1, d], [2, d]], columns=["col_0", "col_1"]) + sp_df = snowpark_handler.SnowparkDataFrameHandler.convert_from_df( + self._session, + pd_df, + keep_order=False, + features=[ + model_signature.FeatureSpec(name="col_0", dtype=model_signature.DataType.INT64), + model_signature.FeatureSpec(name="col_1", dtype=model_signature.DataType.TIMESTAMP_NTZ), + ], + ) + pd.testing.assert_frame_equal( + pd_df, snowpark_handler.SnowparkDataFrameHandler.convert_to_df(sp_df), check_dtype=False + ) + if __name__ == "__main__": absltest.main() diff --git a/snowflake/ml/model/model_signature.py b/snowflake/ml/model/model_signature.py index c1bc9877..c0ed8732 100644 --- a/snowflake/ml/model/model_signature.py +++ b/snowflake/ml/model/model_signature.py @@ -168,6 +168,8 @@ def _validate_numpy_array( max_v <= np.finfo(feature_type._numpy_type).max # type: ignore[arg-type] and min_v >= np.finfo(feature_type._numpy_type).min # type: ignore[arg-type] ) + elif feature_type in [core.DataType.TIMESTAMP_NTZ]: + return np.issubdtype(arr.dtype, np.datetime64) else: return np.can_cast(arr.dtype, feature_type._numpy_type, casting="no") diff --git a/snowflake/ml/model/model_signature_test.py b/snowflake/ml/model/model_signature_test.py index 28377ead..7687fed8 100644 --- a/snowflake/ml/model/model_signature_test.py +++ b/snowflake/ml/model/model_signature_test.py @@ -1,3 +1,5 @@ +import datetime + import numpy as np import pandas as pd import tensorflow as tf @@ -22,6 +24,26 @@ def test_infer_signature(self) -> None: [model_signature.FeatureSpec("input_feature_0", model_signature.DataType.INT64)], ) + d1 = datetime.datetime(year=2024, month=6, day=21, hour=1, minute=1, second=1) + d2 = datetime.datetime(year=2024, month=7, day=11, hour=1, minute=1, second=1) + df_dates = pd.DataFrame([d1, d2]) + self.assertListEqual( + model_signature._infer_signature(df_dates, role="input"), + [model_signature.FeatureSpec("input_feature_0", model_signature.DataType.TIMESTAMP_NTZ)], + ) + + arr_dates = np.array([d1, d2], dtype=np.datetime64) + self.assertListEqual( + model_signature._infer_signature(arr_dates, role="input"), + [model_signature.FeatureSpec("input_feature_0", model_signature.DataType.TIMESTAMP_NTZ)], + ) + + lt_dates = [d1, d2] + self.assertListEqual( + model_signature._infer_signature(lt_dates, role="input"), + [model_signature.FeatureSpec("input_feature_0", model_signature.DataType.TIMESTAMP_NTZ)], + ) + lt1 = [1, 2, 3, 4] self.assertListEqual( model_signature._infer_signature(lt1, role="input"), diff --git a/snowflake/ml/model/type_hints.py b/snowflake/ml/model/type_hints.py index 1ec03579..5148b863 100644 --- a/snowflake/ml/model/type_hints.py +++ b/snowflake/ml/model/type_hints.py @@ -54,6 +54,7 @@ "np.bool_", "np.str_", "np.bytes_", + "np.datetime64", ] _SupportedNumpyArray = npt.NDArray[_SupportedNumpyDtype] _SupportedBuiltinsList = Sequence[_SupportedBuiltins] diff --git a/snowflake/ml/modeling/_internal/BUILD.bazel b/snowflake/ml/modeling/_internal/BUILD.bazel index 0b2a19b3..f68f4efe 100644 --- a/snowflake/ml/modeling/_internal/BUILD.bazel +++ b/snowflake/ml/modeling/_internal/BUILD.bazel @@ -48,6 +48,8 @@ py_library( srcs = ["estimator_utils.py"], deps = [ "//snowflake/ml/_internal/exceptions", + "//snowflake/ml/_internal/utils:query_result_checker", + "//snowflake/ml/_internal/utils:temp_file_utils", "//snowflake/ml/modeling/framework", ], ) diff --git a/snowflake/ml/modeling/_internal/estimator_utils.py b/snowflake/ml/modeling/_internal/estimator_utils.py index b4ae8f00..f8122c96 100644 --- a/snowflake/ml/modeling/_internal/estimator_utils.py +++ b/snowflake/ml/modeling/_internal/estimator_utils.py @@ -1,15 +1,19 @@ import inspect import numbers +import os from typing import Any, Callable, Dict, List, Set, Tuple +import cloudpickle as cp import numpy as np from numpy import typing as npt -from typing_extensions import TypeGuard from snowflake.ml._internal.exceptions import error_codes, exceptions +from snowflake.ml._internal.utils import temp_file_utils +from snowflake.ml._internal.utils.query_result_checker import SqlResultValidator from snowflake.ml.modeling.framework._utils import to_native_format from snowflake.ml.modeling.framework.base import BaseTransformer from snowflake.snowpark import Session +from snowflake.snowpark._internal import utils as snowpark_utils def validate_sklearn_args(args: Dict[str, Tuple[Any, Any, bool]], klass: type) -> Dict[str, Any]: @@ -97,6 +101,7 @@ def original_estimator_has_callable(attr: str) -> Callable[[Any], bool]: Returns: A function which checks for the existence of callable `attr` on the given object. """ + from typing_extensions import TypeGuard def check(self: BaseTransformer) -> TypeGuard[Callable[..., object]]: """Check for the existence of callable `attr` in self. @@ -218,3 +223,55 @@ def handle_inference_result( ) return transformed_numpy_array, output_cols + + +def create_temp_stage(session: Session) -> str: + """Creates temporary stage. + + Args: + session: Session + + Returns: + Temp stage name. + """ + # Create temp stage to upload pickled model file. + transform_stage_name = snowpark_utils.random_name_for_temp_object(snowpark_utils.TempObjectType.STAGE) + stage_creation_query = f"CREATE OR REPLACE TEMPORARY STAGE {transform_stage_name};" + SqlResultValidator(session=session, query=stage_creation_query).has_dimensions( + expected_rows=1, expected_cols=1 + ).validate() + return transform_stage_name + + +def upload_model_to_stage( + stage_name: str, estimator: object, session: Session, statement_params: Dict[str, str] +) -> str: + """Util method to pickle and upload the model to a temp Snowflake stage. + + + Args: + stage_name: Stage name to save model. + estimator: Estimator object to upload to stage (sklearn model object) + session: The snowpark session to use. + statement_params: Statement parameters for query telemetry. + + Returns: + a tuple containing stage file paths for pickled input model for training and location to store trained + models(response from training sproc). + """ + # Create a temp file and dump the transform to that file. + local_transform_file_name = temp_file_utils.get_temp_file_path() + with open(local_transform_file_name, mode="w+b") as local_transform_file: + cp.dump(estimator, local_transform_file) + + # Put locally serialized transform on stage. + session.file.put( + local_file_name=local_transform_file_name, + stage_location=stage_name, + auto_compress=False, + overwrite=True, + statement_params=statement_params, + ) + + temp_file_utils.cleanup_temp_files([local_transform_file_name]) + return os.path.basename(local_transform_file_name) diff --git a/snowflake/ml/modeling/_internal/snowpark_implementations/distributed_hpo_trainer.py b/snowflake/ml/modeling/_internal/snowpark_implementations/distributed_hpo_trainer.py index 0e35fa4c..efdc1cca 100644 --- a/snowflake/ml/modeling/_internal/snowpark_implementations/distributed_hpo_trainer.py +++ b/snowflake/ml/modeling/_internal/snowpark_implementations/distributed_hpo_trainer.py @@ -4,6 +4,7 @@ import os import posixpath import sys +import uuid from typing import Any, Dict, List, Optional, Tuple, Union import cloudpickle as cp @@ -16,10 +17,7 @@ identifier, pkg_version_utils, snowpark_dataframe_utils, -) -from snowflake.ml._internal.utils.temp_file_utils import ( - cleanup_temp_files, - get_temp_file_path, + temp_file_utils, ) from snowflake.ml.modeling._internal.model_specifications import ( ModelSpecificationsBuilder, @@ -37,13 +35,14 @@ from snowflake.snowpark.types import IntegerType, StringType, StructField, StructType from snowflake.snowpark.udtf import UDTFRegistration -cp.register_pickle_by_value(inspect.getmodule(get_temp_file_path)) +cp.register_pickle_by_value(inspect.getmodule(temp_file_utils.get_temp_file_path)) cp.register_pickle_by_value(inspect.getmodule(identifier.get_inferred_name)) cp.register_pickle_by_value(inspect.getmodule(snowpark_dataframe_utils.cast_snowpark_dataframe)) _PROJECT = "ModelDevelopment" DEFAULT_UDTF_NJOBS = 3 ENABLE_EFFICIENT_MEMORY_USAGE = False +_UDTF_STAGE_NAME = f"MEMORY_EFFICIENT_UDTF_{str(uuid.uuid4()).replace('-', '_')}" def construct_cv_results( @@ -318,7 +317,7 @@ def fit_search_snowpark( original_refit = estimator.refit # Create a temp file and dump the estimator to that file. - estimator_file_name = get_temp_file_path() + estimator_file_name = temp_file_utils.get_temp_file_path() params_to_evaluate = [] for param_to_eval in list(param_grid): for k, v in param_to_eval.items(): @@ -357,6 +356,7 @@ def fit_search_snowpark( ) estimator_location = put_result[0].target imports.append(f"@{temp_stage_name}/{estimator_location}") + temp_file_utils.cleanup_temp_files([estimator_file_name]) search_sproc_name = random_name_for_temp_object(TempObjectType.PROCEDURE) random_udtf_name = random_name_for_temp_object(TempObjectType.TABLE_FUNCTION) @@ -413,7 +413,7 @@ def _distributed_search( X = df[input_cols] y = df[label_cols].squeeze() if label_cols else None - local_estimator_file_name = get_temp_file_path() + local_estimator_file_name = temp_file_utils.get_temp_file_path() session.file.get(stage_estimator_file_name, local_estimator_file_name) local_estimator_file_path = os.path.join( @@ -429,7 +429,7 @@ def _distributed_search( n_splits = build_cross_validator.get_n_splits(X, y, None) # store the cross_validator's test indices only to save space cross_validator_indices = [test for _, test in build_cross_validator.split(X, y, None)] - local_indices_file_name = get_temp_file_path() + local_indices_file_name = temp_file_utils.get_temp_file_path() with open(local_indices_file_name, mode="w+b") as local_indices_file_obj: cp.dump(cross_validator_indices, local_indices_file_obj) @@ -445,6 +445,8 @@ def _distributed_search( cross_validator_indices_length = int(len(cross_validator_indices)) parameter_grid_length = len(param_grid) + temp_file_utils.cleanup_temp_files([local_estimator_file_name, local_indices_file_name]) + assert estimator is not None @cachetools.cached(cache={}) @@ -647,7 +649,7 @@ def end_partition(self) -> None: if hasattr(estimator.best_estimator_, "feature_names_in_"): estimator.feature_names_in_ = estimator.best_estimator_.feature_names_in_ - local_result_file_name = get_temp_file_path() + local_result_file_name = temp_file_utils.get_temp_file_path() with open(local_result_file_name, mode="w+b") as local_result_file_obj: cp.dump(estimator, local_result_file_obj) @@ -658,6 +660,7 @@ def end_partition(self) -> None: auto_compress=False, overwrite=True, ) + temp_file_utils.cleanup_temp_files([local_result_file_name]) # Note: you can add something like + "|" + str(df) to the return string # to pass debug information to the caller. @@ -671,7 +674,7 @@ def end_partition(self) -> None: label_cols, ) - local_estimator_path = get_temp_file_path() + local_estimator_path = temp_file_utils.get_temp_file_path() session.file.get( posixpath.join(temp_stage_name, sproc_export_file_name), local_estimator_path, @@ -680,7 +683,7 @@ def end_partition(self) -> None: with open(os.path.join(local_estimator_path, sproc_export_file_name), mode="r+b") as result_file_obj: fit_estimator = cp.load(result_file_obj) - cleanup_temp_files([local_estimator_path]) + temp_file_utils.cleanup_temp_files([local_estimator_path]) return fit_estimator @@ -716,7 +719,7 @@ def fit_search_snowpark_enable_efficient_memory_usage( imports = [f"@{row.name}" for row in session.sql(f"LIST @{temp_stage_name}/{dataset_file_name}").collect()] # Create a temp file and dump the estimator to that file. - estimator_file_name = get_temp_file_path() + estimator_file_name = temp_file_utils.get_temp_file_path() params_to_evaluate = list(param_grid) CONSTANTS: Dict[str, Any] = dict() CONSTANTS["dataset_snowpark_cols"] = dataset.columns @@ -757,6 +760,7 @@ def fit_search_snowpark_enable_efficient_memory_usage( ) estimator_location = os.path.basename(estimator_file_name) imports.append(f"@{temp_stage_name}/{estimator_location}") + temp_file_utils.cleanup_temp_files([estimator_file_name]) CONSTANTS["estimator_location"] = estimator_location search_sproc_name = random_name_for_temp_object(TempObjectType.PROCEDURE) @@ -823,7 +827,7 @@ def _distributed_search( if sample_weight_col: fit_params["sample_weight"] = df[sample_weight_col].squeeze() - local_estimator_file_folder_name = get_temp_file_path() + local_estimator_file_folder_name = temp_file_utils.get_temp_file_path() session.file.get(stage_estimator_file_name, local_estimator_file_folder_name) local_estimator_file_path = os.path.join( @@ -869,7 +873,7 @@ def _distributed_search( # (1) store the cross_validator's test indices only to save space cross_validator_indices = [test for _, test in build_cross_validator.split(X, y, None)] - local_indices_file_name = get_temp_file_path() + local_indices_file_name = temp_file_utils.get_temp_file_path() with open(local_indices_file_name, mode="w+b") as local_indices_file_obj: cp.dump(cross_validator_indices, local_indices_file_obj) @@ -884,7 +888,7 @@ def _distributed_search( imports.append(f"@{temp_stage_name}/{indices_location}") # (2) store the base estimator - local_base_estimator_file_name = get_temp_file_path() + local_base_estimator_file_name = temp_file_utils.get_temp_file_path() with open(local_base_estimator_file_name, mode="w+b") as local_base_estimator_file_obj: cp.dump(base_estimator, local_base_estimator_file_obj) session.file.put( @@ -897,7 +901,7 @@ def _distributed_search( imports.append(f"@{temp_stage_name}/{base_estimator_location}") # (3) store the fit_and_score_kwargs - local_fit_and_score_kwargs_file_name = get_temp_file_path() + local_fit_and_score_kwargs_file_name = temp_file_utils.get_temp_file_path() with open(local_fit_and_score_kwargs_file_name, mode="w+b") as local_fit_and_score_kwargs_file_obj: cp.dump(fit_and_score_kwargs, local_fit_and_score_kwargs_file_obj) session.file.put( @@ -918,7 +922,7 @@ def _distributed_search( CONSTANTS["fit_and_score_kwargs_location"] = fit_and_score_kwargs_location # (6) store the constants - local_constant_file_name = get_temp_file_path(prefix="constant") + local_constant_file_name = temp_file_utils.get_temp_file_path(prefix="constant") with open(local_constant_file_name, mode="w+b") as local_indices_file_obj: cp.dump(CONSTANTS, local_indices_file_obj) @@ -932,6 +936,17 @@ def _distributed_search( constant_location = os.path.basename(local_constant_file_name) imports.append(f"@{temp_stage_name}/{constant_location}") + temp_file_utils.cleanup_temp_files( + [ + local_estimator_file_folder_name, + local_indices_file_name, + local_base_estimator_file_name, + local_base_estimator_file_name, + local_fit_and_score_kwargs_file_name, + local_constant_file_name, + ] + ) + cross_validator_indices_length = int(len(cross_validator_indices)) parameter_grid_length = len(param_grid) @@ -942,124 +957,144 @@ def _distributed_search( import tempfile + # delete is set to False to support Windows environment with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False, encoding="utf-8") as f: udf_code = execute_template f.file.write(udf_code) f.file.flush() - # Register the UDTF function from the file - udtf_registration.register_from_file( - file_path=f.name, - handler_name="SearchCV", - name=random_udtf_name, - output_schema=StructType( - [StructField("FIRST_IDX", IntegerType()), StructField("EACH_CV_RESULTS", StringType())] - ), - input_types=[IntegerType(), IntegerType(), IntegerType()], - replace=True, - imports=imports, # type: ignore[arg-type] - is_permanent=False, - packages=required_deps, # type: ignore[arg-type] - statement_params=udtf_statement_params, - ) - - HP_TUNING = F.table_function(random_udtf_name) - - # param_indices is for the index for each parameter grid; - # cv_indices is for the index for each cross_validator's fold; - # param_cv_indices is for the index for the product of (len(param_indices) * len(cv_indices)) - cv_indices, param_indices = zip( - *product(range(cross_validator_indices_length), range(parameter_grid_length)) - ) - - indices_info_pandas = pd.DataFrame( - { - "IDX": [i // _NUM_CPUs for i in range(parameter_grid_length * cross_validator_indices_length)], - "PARAM_IND": param_indices, - "CV_IND": cv_indices, - } - ) - - indices_info_sp = session.create_dataframe(indices_info_pandas) - # execute udtf by querying HP_TUNING table - HP_raw_results = indices_info_sp.select( - ( - HP_TUNING(indices_info_sp["IDX"], indices_info_sp["PARAM_IND"], indices_info_sp["CV_IND"]).over( - partition_by="IDX" + # Use catchall exception handling and a finally block to clean up the _UDTF_STAGE_NAME + try: + # Create one stage for data and for estimators. + # Because only permanent functions support _sf_node_singleton for now, therefore, + # UDTF creation would change to is_permanent=True, and manually drop the stage after UDTF is done + _stage_creation_query_udtf = f"CREATE OR REPLACE STAGE {_UDTF_STAGE_NAME};" + session.sql(_stage_creation_query_udtf).collect() + + # Register the UDTF function from the file + udtf_registration.register_from_file( + file_path=f.name, + handler_name="SearchCV", + name=random_udtf_name, + output_schema=StructType( + [StructField("FIRST_IDX", IntegerType()), StructField("EACH_CV_RESULTS", StringType())] + ), + input_types=[IntegerType(), IntegerType(), IntegerType()], + replace=True, + imports=imports, # type: ignore[arg-type] + stage_location=_UDTF_STAGE_NAME, + is_permanent=True, + packages=required_deps, # type: ignore[arg-type] + statement_params=udtf_statement_params, ) - ), - ) - - first_test_score, cv_results_ = construct_cv_results_memory_efficient_version( - estimator, - n_splits, - list(param_grid), - HP_raw_results.select("EACH_CV_RESULTS").sort(F.col("FIRST_IDX")).collect(), - cross_validator_indices_length, - parameter_grid_length, - ) - - estimator.cv_results_ = cv_results_ - estimator.multimetric_ = isinstance(first_test_score, dict) - # check refit_metric now for a callable scorer that is multimetric - if callable(estimator.scoring) and estimator.multimetric_: - estimator._check_refit_for_multimetric(first_test_score) - refit_metric = estimator.refit + HP_TUNING = F.table_function(random_udtf_name) - # For multi-metric evaluation, store the best_index_, best_params_ and - # best_score_ iff refit is one of the scorer names - # In single metric evaluation, refit_metric is "score" - if estimator.refit or not estimator.multimetric_: - estimator.best_index_ = estimator._select_best_index(estimator.refit, refit_metric, cv_results_) - if not callable(estimator.refit): - # With a non-custom callable, we can select the best score - # based on the best index - estimator.best_score_ = cv_results_[f"mean_test_{refit_metric}"][estimator.best_index_] - estimator.best_params_ = cv_results_["params"][estimator.best_index_] - - if estimator.refit: - estimator.best_estimator_ = clone(base_estimator).set_params( - **clone(estimator.best_params_, safe=False) - ) + # param_indices is for the index for each parameter grid; + # cv_indices is for the index for each cross_validator's fold; + # param_cv_indices is for the index for the product of (len(param_indices) * len(cv_indices)) + cv_indices, param_indices = zip( + *product(range(cross_validator_indices_length), range(parameter_grid_length)) + ) - # Let the sproc use all cores to refit. - estimator.n_jobs = estimator.n_jobs or -1 + indices_info_pandas = pd.DataFrame( + { + "IDX": [ + i // _NUM_CPUs for i in range(parameter_grid_length * cross_validator_indices_length) + ], + "PARAM_IND": param_indices, + "CV_IND": cv_indices, + } + ) - # process the input as args - argspec = inspect.getfullargspec(estimator.fit) - args = {"X": X} - if label_cols: - label_arg_name = "Y" if "Y" in argspec.args else "y" - args[label_arg_name] = y - if sample_weight_col is not None and "sample_weight" in argspec.args: - args["sample_weight"] = df[sample_weight_col].squeeze() - # estimator.refit = original_refit - refit_start_time = time.time() - estimator.best_estimator_.fit(**args) - refit_end_time = time.time() - estimator.refit_time_ = refit_end_time - refit_start_time + indices_info_sp = session.create_dataframe(indices_info_pandas) + # execute udtf by querying HP_TUNING table + HP_raw_results = indices_info_sp.select( + ( + HP_TUNING( + indices_info_sp["IDX"], indices_info_sp["PARAM_IND"], indices_info_sp["CV_IND"] + ).over(partition_by="IDX") + ), + ) - if hasattr(estimator.best_estimator_, "feature_names_in_"): - estimator.feature_names_in_ = estimator.best_estimator_.feature_names_in_ + first_test_score, cv_results_ = construct_cv_results_memory_efficient_version( + estimator, + n_splits, + list(param_grid), + HP_raw_results.select("EACH_CV_RESULTS").sort(F.col("FIRST_IDX")).collect(), + cross_validator_indices_length, + parameter_grid_length, + ) - # Store the only scorer not as a dict for single metric evaluation - estimator.scorer_ = scorers - estimator.n_splits_ = n_splits + estimator.cv_results_ = cv_results_ + estimator.multimetric_ = isinstance(first_test_score, dict) + + # check refit_metric now for a callable scorer that is multimetric + if callable(estimator.scoring) and estimator.multimetric_: + estimator._check_refit_for_multimetric(first_test_score) + refit_metric = estimator.refit + + # For multi-metric evaluation, store the best_index_, best_params_ and + # best_score_ iff refit is one of the scorer names + # In single metric evaluation, refit_metric is "score" + if estimator.refit or not estimator.multimetric_: + estimator.best_index_ = estimator._select_best_index(estimator.refit, refit_metric, cv_results_) + if not callable(estimator.refit): + # With a non-custom callable, we can select the best score + # based on the best index + estimator.best_score_ = cv_results_[f"mean_test_{refit_metric}"][estimator.best_index_] + estimator.best_params_ = cv_results_["params"][estimator.best_index_] + + if estimator.refit: + estimator.best_estimator_ = clone(base_estimator).set_params( + **clone(estimator.best_params_, safe=False) + ) - local_result_file_name = get_temp_file_path() + # Let the sproc use all cores to refit. + estimator.n_jobs = estimator.n_jobs or -1 + + # process the input as args + argspec = inspect.getfullargspec(estimator.fit) + args = {"X": X} + if label_cols: + label_arg_name = "Y" if "Y" in argspec.args else "y" + args[label_arg_name] = y + if sample_weight_col is not None and "sample_weight" in argspec.args: + args["sample_weight"] = df[sample_weight_col].squeeze() + # estimator.refit = original_refit + refit_start_time = time.time() + estimator.best_estimator_.fit(**args) + refit_end_time = time.time() + estimator.refit_time_ = refit_end_time - refit_start_time + + if hasattr(estimator.best_estimator_, "feature_names_in_"): + estimator.feature_names_in_ = estimator.best_estimator_.feature_names_in_ + + # Store the only scorer not as a dict for single metric evaluation + estimator.scorer_ = scorers + estimator.n_splits_ = n_splits + + local_result_file_name = temp_file_utils.get_temp_file_path() + + with open(local_result_file_name, mode="w+b") as local_result_file_obj: + cp.dump(estimator, local_result_file_obj) + + session.file.put( + local_result_file_name, + temp_stage_name, + auto_compress=False, + overwrite=True, + ) - with open(local_result_file_name, mode="w+b") as local_result_file_obj: - cp.dump(estimator, local_result_file_obj) + # Clean up the stages and files + session.sql(f"DROP STAGE IF EXISTS {_UDTF_STAGE_NAME}") - session.file.put( - local_result_file_name, - temp_stage_name, - auto_compress=False, - overwrite=True, - ) + temp_file_utils.cleanup_temp_files([local_result_file_name]) - return str(os.path.basename(local_result_file_name)) + return str(os.path.basename(local_result_file_name)) + finally: + # Clean up the stages + session.sql(f"DROP STAGE IF EXISTS {_UDTF_STAGE_NAME}") sproc_export_file_name = _distributed_search( session, @@ -1069,7 +1104,7 @@ def _distributed_search( label_cols, ) - local_estimator_path = get_temp_file_path() + local_estimator_path = temp_file_utils.get_temp_file_path() session.file.get( posixpath.join(temp_stage_name, sproc_export_file_name), local_estimator_path, @@ -1078,7 +1113,7 @@ def _distributed_search( with open(os.path.join(local_estimator_path, sproc_export_file_name), mode="r+b") as result_file_obj: fit_estimator = cp.load(result_file_obj) - cleanup_temp_files(local_estimator_path) + temp_file_utils.cleanup_temp_files(local_estimator_path) return fit_estimator diff --git a/snowflake/ml/modeling/_internal/snowpark_implementations/distributed_search_udf_file.py b/snowflake/ml/modeling/_internal/snowpark_implementations/distributed_search_udf_file.py index 573493ed..a6fcdbc3 100644 --- a/snowflake/ml/modeling/_internal/snowpark_implementations/distributed_search_udf_file.py +++ b/snowflake/ml/modeling/_internal/snowpark_implementations/distributed_search_udf_file.py @@ -156,4 +156,6 @@ def end_partition(self) -> Iterator[Tuple[int, str]]: self.fit_score_params[0][0], binary_cv_results, ) + +SearchCV._sf_node_singleton = True """ diff --git a/snowflake/ml/modeling/_internal/snowpark_implementations/snowpark_handlers.py b/snowflake/ml/modeling/_internal/snowpark_implementations/snowpark_handlers.py index a6528fb7..55833cc6 100644 --- a/snowflake/ml/modeling/_internal/snowpark_implementations/snowpark_handlers.py +++ b/snowflake/ml/modeling/_internal/snowpark_implementations/snowpark_handlers.py @@ -2,6 +2,7 @@ import inspect import os import posixpath +import sys from typing import Any, Dict, List, Optional from uuid import uuid4 @@ -13,12 +14,10 @@ identifier, pkg_version_utils, snowpark_dataframe_utils, + temp_file_utils, ) from snowflake.ml._internal.utils.query_result_checker import SqlResultValidator -from snowflake.ml._internal.utils.temp_file_utils import ( - cleanup_temp_files, - get_temp_file_path, -) +from snowflake.ml.modeling._internal import estimator_utils from snowflake.ml.modeling._internal.estimator_utils import handle_inference_result from snowflake.snowpark import DataFrame, Session, functions as F, types as T from snowflake.snowpark._internal.utils import ( @@ -26,7 +25,7 @@ random_name_for_temp_object, ) -cp.register_pickle_by_value(inspect.getmodule(get_temp_file_path)) +cp.register_pickle_by_value(inspect.getmodule(temp_file_utils.get_temp_file_path)) cp.register_pickle_by_value(inspect.getmodule(identifier.get_inferred_name)) cp.register_pickle_by_value(inspect.getmodule(handle_inference_result)) @@ -97,7 +96,25 @@ def batch_inference( dependencies = self._get_validated_snowpark_dependencies(session, dependencies) dataset = self.dataset - estimator = self.estimator + + statement_params = telemetry.get_function_usage_statement_params( + project=_PROJECT, + subproject=self._subproject, + function_name=telemetry.get_statement_params_full_func_name(inspect.currentframe(), self._class_name), + api_calls=[F.pandas_udf], + custom_tags={"autogen": True} if self._autogenerated else None, + ) + + temp_stage_name = estimator_utils.create_temp_stage(session) + + estimator_file_name = estimator_utils.upload_model_to_stage( + stage_name=temp_stage_name, + estimator=self.estimator, + session=session, + statement_params=statement_params, + ) + imports = [f"@{temp_stage_name}/{estimator_file_name}"] + # Register vectorized UDF for batch inference batch_inference_udf_name = random_name_for_temp_object(TempObjectType.FUNCTION) @@ -113,13 +130,13 @@ def batch_inference( for field in fields: input_datatypes.append(field.datatype) - statement_params = telemetry.get_function_usage_statement_params( - project=_PROJECT, - subproject=self._subproject, - function_name=telemetry.get_statement_params_full_func_name(inspect.currentframe(), self._class_name), - api_calls=[F.pandas_udf], - custom_tags=dict([("autogen", True)]) if self._autogenerated else None, - ) + # TODO(xjiang): for optimization, use register_from_file to reduce duplicate loading estimator object + # or use cachetools here + def load_estimator() -> object: + estimator_file_path = os.path.join(sys._xoptions["snowflake_import_directory"], f"{estimator_file_name}") + with open(estimator_file_path, mode="rb") as local_estimator_file_obj: + estimator_object = cp.load(local_estimator_file_obj) + return estimator_object @F.pandas_udf( # type: ignore[arg-type, misc] is_permanent=False, @@ -129,6 +146,7 @@ def batch_inference( session=session, statement_params=statement_params, input_types=[T.PandasDataFrameType(input_datatypes)], + imports=imports, # type: ignore[arg-type] ) def vec_batch_infer(input_df: pd.DataFrame) -> T.PandasSeries[dict]: # type: ignore[type-arg] import numpy as np # noqa: F401 @@ -136,6 +154,8 @@ def vec_batch_infer(input_df: pd.DataFrame) -> T.PandasSeries[dict]: # type: ig input_df.columns = snowpark_cols + estimator = load_estimator() + if hasattr(estimator, "n_jobs"): # Vectorized UDF cannot handle joblib multiprocessing right now, deactivate the n_jobs estimator.n_jobs = 1 @@ -225,7 +245,7 @@ def score( queries = dataset.queries["queries"] # Create a temp file and dump the score to that file. - local_score_file_name = get_temp_file_path() + local_score_file_name = temp_file_utils.get_temp_file_path() with open(local_score_file_name, mode="w+b") as local_score_file: cp.dump(estimator, local_score_file) @@ -247,7 +267,7 @@ def score( inspect.currentframe(), self.__class__.__name__ ), api_calls=[F.sproc], - custom_tags=dict([("autogen", True)]) if self._autogenerated else None, + custom_tags={"autogen": True} if self._autogenerated else None, ) # Put locally serialized score on stage. session.file.put( @@ -290,7 +310,7 @@ def score_wrapper_sproc( df: pd.DataFrame = sp_df.to_pandas(statement_params=score_statement_params) df.columns = sp_df.columns - local_score_file_name = get_temp_file_path() + local_score_file_name = temp_file_utils.get_temp_file_path() session.file.get(stage_score_file_name, local_score_file_name, statement_params=score_statement_params) local_score_file_name_path = os.path.join(local_score_file_name, os.listdir(local_score_file_name)[0]) @@ -323,7 +343,7 @@ def score_wrapper_sproc( inspect.currentframe(), self.__class__.__name__ ), api_calls=[Session.call], - custom_tags=dict([("autogen", True)]) if self._autogenerated else None, + custom_tags={"autogen": True} if self._autogenerated else None, ) kwargs = telemetry.get_sproc_statement_params_kwargs(score_wrapper_sproc, score_statement_params) @@ -338,7 +358,7 @@ def score_wrapper_sproc( **kwargs, ) - cleanup_temp_files([local_score_file_name]) + temp_file_utils.cleanup_temp_files([local_score_file_name]) return score diff --git a/snowflake/ml/modeling/_internal/snowpark_implementations/snowpark_trainer.py b/snowflake/ml/modeling/_internal/snowpark_implementations/snowpark_trainer.py index 78597d66..20e1028a 100644 --- a/snowflake/ml/modeling/_internal/snowpark_implementations/snowpark_trainer.py +++ b/snowflake/ml/modeling/_internal/snowpark_implementations/snowpark_trainer.py @@ -17,30 +17,19 @@ identifier, pkg_version_utils, snowpark_dataframe_utils, + temp_file_utils, ) -from snowflake.ml._internal.utils.query_result_checker import SqlResultValidator -from snowflake.ml._internal.utils.temp_file_utils import ( - cleanup_temp_files, - get_temp_file_path, -) +from snowflake.ml.modeling._internal import estimator_utils from snowflake.ml.modeling._internal.estimator_utils import handle_inference_result from snowflake.ml.modeling._internal.model_specifications import ( ModelSpecifications, ModelSpecificationsBuilder, ) -from snowflake.snowpark import ( - DataFrame, - Session, - exceptions as snowpark_exceptions, - functions as F, -) -from snowflake.snowpark._internal.utils import ( - TempObjectType, - random_name_for_temp_object, -) +from snowflake.snowpark import DataFrame, Session, exceptions as snowpark_exceptions +from snowflake.snowpark._internal import utils as snowpark_utils from snowflake.snowpark.stored_procedure import StoredProcedure -cp.register_pickle_by_value(inspect.getmodule(get_temp_file_path)) +cp.register_pickle_by_value(inspect.getmodule(temp_file_utils.get_temp_file_path)) cp.register_pickle_by_value(inspect.getmodule(identifier.get_inferred_name)) cp.register_pickle_by_value(inspect.getmodule(handle_inference_result)) @@ -90,60 +79,6 @@ def __init__( self._subproject = subproject self._class_name = estimator.__class__.__name__ - def _create_temp_stage(self) -> str: - """ - Creates temporary stage. - - Returns: - Temp stage name. - """ - # Create temp stage to upload pickled model file. - transform_stage_name = random_name_for_temp_object(TempObjectType.STAGE) - stage_creation_query = f"CREATE OR REPLACE TEMPORARY STAGE {transform_stage_name};" - SqlResultValidator(session=self.session, query=stage_creation_query).has_dimensions( - expected_rows=1, expected_cols=1 - ).validate() - return transform_stage_name - - def _upload_model_to_stage(self, stage_name: str) -> Tuple[str, str]: - """ - Util method to pickle and upload the model to a temp Snowflake stage. - - Args: - stage_name: Stage name to save model. - - Returns: - a tuple containing stage file paths for pickled input model for training and location to store trained - models(response from training sproc). - """ - # Create a temp file and dump the transform to that file. - local_transform_file_name = get_temp_file_path() - with open(local_transform_file_name, mode="w+b") as local_transform_file: - cp.dump(self.estimator, local_transform_file) - - # Use posixpath to construct stage paths - stage_transform_file_name = posixpath.join(stage_name, os.path.basename(local_transform_file_name)) - stage_result_file_name = posixpath.join(stage_name, os.path.basename(local_transform_file_name)) - - statement_params = telemetry.get_function_usage_statement_params( - project=_PROJECT, - subproject=self._subproject, - function_name=telemetry.get_statement_params_full_func_name(inspect.currentframe(), self._class_name), - api_calls=[F.sproc], - custom_tags=dict([("autogen", True)]) if self._autogenerated else None, - ) - # Put locally serialized transform on stage. - self.session.file.put( - local_transform_file_name, - stage_transform_file_name, - auto_compress=False, - overwrite=True, - statement_params=statement_params, - ) - - cleanup_temp_files([local_transform_file_name]) - return (stage_transform_file_name, stage_result_file_name) - def _fetch_model_from_stage(self, dir_path: str, file_name: str, statement_params: Dict[str, str]) -> object: """ Downloads the serialized model from a stage location and unpickles it. @@ -156,7 +91,7 @@ def _fetch_model_from_stage(self, dir_path: str, file_name: str, statement_param Returns: Deserialized model object. """ - local_result_file_name = get_temp_file_path() + local_result_file_name = temp_file_utils.get_temp_file_path() self.session.file.get( posixpath.join(dir_path, file_name), local_result_file_name, @@ -166,13 +101,13 @@ def _fetch_model_from_stage(self, dir_path: str, file_name: str, statement_param with open(os.path.join(local_result_file_name, file_name), mode="r+b") as result_file_obj: fit_estimator = cp.load(result_file_obj) - cleanup_temp_files([local_result_file_name]) + temp_file_utils.cleanup_temp_files([local_result_file_name]) return fit_estimator def _build_fit_wrapper_sproc( self, model_spec: ModelSpecifications, - ) -> Callable[[Any, List[str], str, str, List[str], List[str], Optional[str], Dict[str, str]], str]: + ) -> Callable[[Any, List[str], str, List[str], List[str], Optional[str], Dict[str, str]], str]: """ Constructs and returns a python stored procedure function to be used for training model. @@ -188,8 +123,7 @@ def _build_fit_wrapper_sproc( def fit_wrapper_function( session: Session, sql_queries: List[str], - stage_transform_file_name: str, - stage_result_file_name: str, + temp_stage_name: str, input_cols: List[str], label_cols: List[str], sample_weight_col: Optional[str], @@ -212,9 +146,13 @@ def fit_wrapper_function( df: pd.DataFrame = sp_df.to_pandas(statement_params=statement_params) df.columns = sp_df.columns - local_transform_file_name = get_temp_file_path() + local_transform_file_name = temp_file_utils.get_temp_file_path() - session.file.get(stage_transform_file_name, local_transform_file_name, statement_params=statement_params) + session.file.get( + stage_location=temp_stage_name, + target_directory=local_transform_file_name, + statement_params=statement_params, + ) local_transform_file_path = os.path.join( local_transform_file_name, os.listdir(local_transform_file_name)[0] @@ -233,14 +171,14 @@ def fit_wrapper_function( estimator.fit(**args) - local_result_file_name = get_temp_file_path() + local_result_file_name = temp_file_utils.get_temp_file_path() with open(local_result_file_name, mode="w+b") as local_result_file_obj: cp.dump(estimator, local_result_file_obj) session.file.put( - local_result_file_name, - stage_result_file_name, + local_file_name=local_result_file_name, + stage_location=temp_stage_name, auto_compress=False, overwrite=True, statement_params=statement_params, @@ -254,7 +192,7 @@ def fit_wrapper_function( def _get_fit_wrapper_sproc_anonymous(self, statement_params: Dict[str, str]) -> StoredProcedure: model_spec = ModelSpecificationsBuilder.build(model=self.estimator) - fit_sproc_name = random_name_for_temp_object(TempObjectType.PROCEDURE) + fit_sproc_name = snowpark_utils.random_name_for_temp_object(snowpark_utils.TempObjectType.PROCEDURE) relaxed_dependencies = pkg_version_utils.get_valid_pkg_versions_supported_in_snowflake_conda_channel( pkg_versions=model_spec.pkgDependencies, session=self.session @@ -284,7 +222,7 @@ def _get_fit_wrapper_sproc(self, statement_params: Dict[str, str]) -> StoredProc fit_sproc: StoredProcedure = self.session._FIT_WRAPPER_SPROCS[fit_sproc_key] # type: ignore[attr-defined] return fit_sproc - fit_sproc_name = random_name_for_temp_object(TempObjectType.PROCEDURE) + fit_sproc_name = snowpark_utils.random_name_for_temp_object(snowpark_utils.TempObjectType.PROCEDURE) relaxed_dependencies = pkg_version_utils.get_valid_pkg_versions_supported_in_snowflake_conda_channel( pkg_versions=model_spec.pkgDependencies, session=self.session @@ -307,7 +245,7 @@ def _get_fit_wrapper_sproc(self, statement_params: Dict[str, str]) -> StoredProc def _build_fit_predict_wrapper_sproc( self, model_spec: ModelSpecifications, - ) -> Callable[[Session, List[str], str, str, List[str], Dict[str, str], bool, List[str], str], str]: + ) -> Callable[[Session, List[str], str, List[str], Dict[str, str], bool, List[str], str], str]: """ Constructs and returns a python stored procedure function to be used for training model. @@ -323,8 +261,7 @@ def _build_fit_predict_wrapper_sproc( def fit_predict_wrapper_function( session: Session, sql_queries: List[str], - stage_transform_file_name: str, - stage_result_file_name: str, + temp_stage_name: str, input_cols: List[str], statement_params: Dict[str, str], drop_input_cols: bool, @@ -347,9 +284,13 @@ def fit_predict_wrapper_function( df: pd.DataFrame = sp_df.to_pandas(statement_params=statement_params) df.columns = sp_df.columns - local_transform_file_name = get_temp_file_path() + local_transform_file_name = temp_file_utils.get_temp_file_path() - session.file.get(stage_transform_file_name, local_transform_file_name, statement_params=statement_params) + session.file.get( + stage_location=temp_stage_name, + target_directory=local_transform_file_name, + statement_params=statement_params, + ) local_transform_file_path = os.path.join( local_transform_file_name, os.listdir(local_transform_file_name)[0] @@ -359,14 +300,14 @@ def fit_predict_wrapper_function( fit_predict_result = estimator.fit_predict(X=df[input_cols]) - local_result_file_name = get_temp_file_path() + local_result_file_name = temp_file_utils.get_temp_file_path() with open(local_result_file_name, mode="w+b") as local_result_file_obj: cp.dump(estimator, local_result_file_obj) session.file.put( - local_result_file_name, - stage_result_file_name, + local_file_name=local_result_file_name, + stage_location=temp_stage_name, auto_compress=False, overwrite=True, statement_params=statement_params, @@ -407,7 +348,6 @@ def _build_fit_transform_wrapper_sproc( Session, List[str], str, - str, List[str], Optional[List[str]], Optional[str], @@ -433,8 +373,7 @@ def _build_fit_transform_wrapper_sproc( def fit_transform_wrapper_function( session: Session, sql_queries: List[str], - stage_transform_file_name: str, - stage_result_file_name: str, + temp_stage_name: str, input_cols: List[str], label_cols: Optional[List[str]], sample_weight_col: Optional[str], @@ -459,9 +398,13 @@ def fit_transform_wrapper_function( df: pd.DataFrame = sp_df.to_pandas(statement_params=statement_params) df.columns = sp_df.columns - local_transform_file_name = get_temp_file_path() + local_transform_file_name = temp_file_utils.get_temp_file_path() - session.file.get(stage_transform_file_name, local_transform_file_name, statement_params=statement_params) + session.file.get( + stage_location=temp_stage_name, + target_directory=local_transform_file_name, + statement_params=statement_params, + ) local_transform_file_path = os.path.join( local_transform_file_name, os.listdir(local_transform_file_name)[0] @@ -480,14 +423,14 @@ def fit_transform_wrapper_function( fit_transform_result = estimator.fit_transform(**args) - local_result_file_name = get_temp_file_path() + local_result_file_name = temp_file_utils.get_temp_file_path() with open(local_result_file_name, mode="w+b") as local_result_file_obj: cp.dump(estimator, local_result_file_obj) session.file.put( - local_result_file_name, - stage_result_file_name, + local_file_name=local_result_file_name, + stage_location=temp_stage_name, auto_compress=False, overwrite=True, statement_params=statement_params, @@ -535,7 +478,7 @@ def fit_transform_wrapper_function( def _get_fit_predict_wrapper_sproc_anonymous(self, statement_params: Dict[str, str]) -> StoredProcedure: model_spec = ModelSpecificationsBuilder.build(model=self.estimator) - fit_predict_sproc_name = random_name_for_temp_object(TempObjectType.PROCEDURE) + fit_predict_sproc_name = snowpark_utils.random_name_for_temp_object(snowpark_utils.TempObjectType.PROCEDURE) relaxed_dependencies = pkg_version_utils.get_valid_pkg_versions_supported_in_snowflake_conda_channel( pkg_versions=model_spec.pkgDependencies, session=self.session @@ -567,7 +510,7 @@ def _get_fit_predict_wrapper_sproc(self, statement_params: Dict[str, str]) -> St ] return fit_sproc - fit_predict_sproc_name = random_name_for_temp_object(TempObjectType.PROCEDURE) + fit_predict_sproc_name = snowpark_utils.random_name_for_temp_object(snowpark_utils.TempObjectType.PROCEDURE) relaxed_dependencies = pkg_version_utils.get_valid_pkg_versions_supported_in_snowflake_conda_channel( pkg_versions=model_spec.pkgDependencies, session=self.session @@ -592,7 +535,7 @@ def _get_fit_predict_wrapper_sproc(self, statement_params: Dict[str, str]) -> St def _get_fit_transform_wrapper_sproc_anonymous(self, statement_params: Dict[str, str]) -> StoredProcedure: model_spec = ModelSpecificationsBuilder.build(model=self.estimator) - fit_transform_sproc_name = random_name_for_temp_object(TempObjectType.PROCEDURE) + fit_transform_sproc_name = snowpark_utils.random_name_for_temp_object(snowpark_utils.TempObjectType.PROCEDURE) relaxed_dependencies = pkg_version_utils.get_valid_pkg_versions_supported_in_snowflake_conda_channel( pkg_versions=model_spec.pkgDependencies, session=self.session @@ -623,7 +566,7 @@ def _get_fit_transform_wrapper_sproc(self, statement_params: Dict[str, str]) -> ] return fit_sproc - fit_transform_sproc_name = random_name_for_temp_object(TempObjectType.PROCEDURE) + fit_transform_sproc_name = snowpark_utils.random_name_for_temp_object(snowpark_utils.TempObjectType.PROCEDURE) relaxed_dependencies = pkg_version_utils.get_valid_pkg_versions_supported_in_snowflake_conda_channel( pkg_versions=model_spec.pkgDependencies, session=self.session @@ -663,19 +606,21 @@ def train(self) -> object: # Extract query that generated the dataframe. We will need to pass it to the fit procedure. queries = dataset.queries["queries"] - transform_stage_name = self._create_temp_stage() - (stage_transform_file_name, stage_result_file_name) = self._upload_model_to_stage( - stage_name=transform_stage_name - ) - - # Call fit sproc + temp_stage_name = estimator_utils.create_temp_stage(self.session) statement_params = telemetry.get_function_usage_statement_params( project=_PROJECT, subproject=self._subproject, function_name=telemetry.get_statement_params_full_func_name(inspect.currentframe(), self._class_name), api_calls=[Session.call], - custom_tags=dict([("autogen", True)]) if self._autogenerated else None, + custom_tags={"autogen": True} if self._autogenerated else None, ) + estimator_utils.upload_model_to_stage( + stage_name=temp_stage_name, + estimator=self.estimator, + session=self.session, + statement_params=statement_params, + ) + # Call fit sproc if _ENABLE_ANONYMOUS_SPROC: fit_wrapper_sproc = self._get_fit_wrapper_sproc_anonymous(statement_params=statement_params) @@ -686,8 +631,7 @@ def train(self) -> object: sproc_export_file_name: str = fit_wrapper_sproc( self.session, queries, - stage_transform_file_name, - stage_result_file_name, + temp_stage_name, self.input_cols, self.label_cols, self.sample_weight_col, @@ -706,7 +650,7 @@ def train(self) -> object: sproc_export_file_name = fields[0] return self._fetch_model_from_stage( - dir_path=stage_result_file_name, + dir_path=temp_stage_name, file_name=sproc_export_file_name, statement_params=statement_params, ) @@ -734,32 +678,34 @@ def train_fit_predict( # Extract query that generated the dataframe. We will need to pass it to the fit procedure. queries = dataset.queries["queries"] - transform_stage_name = self._create_temp_stage() - (stage_transform_file_name, stage_result_file_name) = self._upload_model_to_stage( - stage_name=transform_stage_name - ) - - # Call fit sproc statement_params = telemetry.get_function_usage_statement_params( project=_PROJECT, subproject=self._subproject, function_name=telemetry.get_statement_params_full_func_name(inspect.currentframe(), self._class_name), api_calls=[Session.call], - custom_tags=dict([("autogen", True)]) if self._autogenerated else None, + custom_tags={"autogen": True} if self._autogenerated else None, ) + temp_stage_name = estimator_utils.create_temp_stage(self.session) + estimator_utils.upload_model_to_stage( + stage_name=temp_stage_name, + estimator=self.estimator, + session=self.session, + statement_params=statement_params, + ) + + # Call fit sproc if _ENABLE_ANONYMOUS_SPROC: fit_predict_wrapper_sproc = self._get_fit_predict_wrapper_sproc_anonymous(statement_params=statement_params) else: fit_predict_wrapper_sproc = self._get_fit_predict_wrapper_sproc(statement_params=statement_params) - fit_predict_result_name = random_name_for_temp_object(TempObjectType.TABLE) + fit_predict_result_name = snowpark_utils.random_name_for_temp_object(snowpark_utils.TempObjectType.TABLE) sproc_export_file_name: str = fit_predict_wrapper_sproc( self.session, queries, - stage_transform_file_name, - stage_result_file_name, + temp_stage_name, self.input_cols, statement_params, drop_input_cols, @@ -769,7 +715,7 @@ def train_fit_predict( output_result_sp = self.session.table(fit_predict_result_name) fitted_estimator = self._fetch_model_from_stage( - dir_path=stage_result_file_name, + dir_path=temp_stage_name, file_name=sproc_export_file_name, statement_params=statement_params, ) @@ -799,20 +745,23 @@ def train_fit_transform( # Extract query that generated the dataframe. We will need to pass it to the fit procedure. queries = dataset.queries["queries"] - transform_stage_name = self._create_temp_stage() - (stage_transform_file_name, stage_result_file_name) = self._upload_model_to_stage( - stage_name=transform_stage_name - ) - - # Call fit sproc statement_params = telemetry.get_function_usage_statement_params( project=_PROJECT, subproject=self._subproject, function_name=telemetry.get_statement_params_full_func_name(inspect.currentframe(), self._class_name), api_calls=[Session.call], - custom_tags=dict([("autogen", True)]) if self._autogenerated else None, + custom_tags={"autogen": True} if self._autogenerated else None, + ) + + temp_stage_name = estimator_utils.create_temp_stage(self.session) + estimator_utils.upload_model_to_stage( + stage_name=temp_stage_name, + estimator=self.estimator, + session=self.session, + statement_params=statement_params, ) + # Call fit sproc if _ENABLE_ANONYMOUS_SPROC: fit_transform_wrapper_sproc = self._get_fit_transform_wrapper_sproc_anonymous( statement_params=statement_params @@ -820,13 +769,12 @@ def train_fit_transform( else: fit_transform_wrapper_sproc = self._get_fit_transform_wrapper_sproc(statement_params=statement_params) - fit_transform_result_name = random_name_for_temp_object(TempObjectType.TABLE) + fit_transform_result_name = snowpark_utils.random_name_for_temp_object(snowpark_utils.TempObjectType.TABLE) sproc_export_file_name: str = fit_transform_wrapper_sproc( self.session, queries, - stage_transform_file_name, - stage_result_file_name, + temp_stage_name, self.input_cols, self.label_cols, self.sample_weight_col, @@ -838,7 +786,7 @@ def train_fit_transform( output_result_sp = self.session.table(fit_transform_result_name) fitted_estimator = self._fetch_model_from_stage( - dir_path=stage_result_file_name, + dir_path=temp_stage_name, file_name=sproc_export_file_name, statement_params=statement_params, ) diff --git a/snowflake/ml/modeling/_internal/snowpark_implementations/xgboost_external_memory_trainer.py b/snowflake/ml/modeling/_internal/snowpark_implementations/xgboost_external_memory_trainer.py index 61525aaf..df3d9241 100644 --- a/snowflake/ml/modeling/_internal/snowpark_implementations/xgboost_external_memory_trainer.py +++ b/snowflake/ml/modeling/_internal/snowpark_implementations/xgboost_external_memory_trainer.py @@ -13,12 +13,12 @@ exceptions, modeling_error_messages, ) -from snowflake.ml._internal.utils import pkg_version_utils +from snowflake.ml._internal.utils import pkg_version_utils, temp_file_utils from snowflake.ml._internal.utils.query_result_checker import ResultValidator from snowflake.ml._internal.utils.snowpark_dataframe_utils import ( cast_snowpark_dataframe, ) -from snowflake.ml._internal.utils.temp_file_utils import get_temp_file_path +from snowflake.ml.modeling._internal import estimator_utils from snowflake.ml.modeling._internal.model_specifications import ( ModelSpecifications, ModelSpecificationsBuilder, @@ -306,8 +306,6 @@ def _get_xgb_external_memory_fit_wrapper_sproc( ) # type: ignore[misc] def fit_wrapper_sproc( session: Session, - stage_transform_file_name: str, - stage_result_file_name: str, dataset_stage_name: str, batch_size: int, input_cols: List[str], @@ -320,9 +318,13 @@ def fit_wrapper_sproc( import cloudpickle as cp - local_transform_file_name = get_temp_file_path() + local_transform_file_name = temp_file_utils.get_temp_file_path() - session.file.get(stage_transform_file_name, local_transform_file_name, statement_params=statement_params) + session.file.get( + stage_location=dataset_stage_name, + target_directory=local_transform_file_name, + statement_params=statement_params, + ) local_transform_file_path = os.path.join( local_transform_file_name, os.listdir(local_transform_file_name)[0] @@ -345,13 +347,13 @@ def fit_wrapper_sproc( sample_weight_col=sample_weight_col, ) - local_result_file_name = get_temp_file_path() + local_result_file_name = temp_file_utils.get_temp_file_path() with open(local_result_file_name, mode="w+b") as local_result_file_obj: cp.dump(estimator, local_result_file_obj) session.file.put( - local_result_file_name, - stage_result_file_name, + local_file_name=local_result_file_name, + stage_location=dataset_stage_name, auto_compress=False, overwrite=True, statement_params=statement_params, @@ -394,11 +396,6 @@ def train(self) -> object: SnowflakeMLException: For known types of user and system errors. e: For every unexpected exception from SnowflakeClient. """ - temp_stage_name = self._create_temp_stage() - (stage_transform_file_name, stage_result_file_name) = self._upload_model_to_stage(stage_name=temp_stage_name) - data_file_paths = self._write_training_data_to_stage(dataset_stage_name=temp_stage_name) - - # Call fit sproc statement_params = telemetry.get_function_usage_statement_params( project=_PROJECT, subproject=self._subproject, @@ -406,7 +403,16 @@ def train(self) -> object: api_calls=[Session.call], custom_tags=None, ) + temp_stage_name = estimator_utils.create_temp_stage(self.session) + estimator_utils.upload_model_to_stage( + stage_name=temp_stage_name, + estimator=self.estimator, + session=self.session, + statement_params=statement_params, + ) + data_file_paths = self._write_training_data_to_stage(dataset_stage_name=temp_stage_name) + # Call fit sproc model_spec = ModelSpecificationsBuilder.build(model=self.estimator) fit_wrapper = self._get_xgb_external_memory_fit_wrapper_sproc( model_spec=model_spec, @@ -418,8 +424,6 @@ def train(self) -> object: try: sproc_export_file_name = fit_wrapper( self.session, - stage_transform_file_name, - stage_result_file_name, temp_stage_name, self._batch_size, self.input_cols, @@ -440,7 +444,7 @@ def train(self) -> object: sproc_export_file_name = fields[0] return self._fetch_model_from_stage( - dir_path=stage_result_file_name, + dir_path=temp_stage_name, file_name=sproc_export_file_name, statement_params=statement_params, ) diff --git a/snowflake/ml/modeling/_internal/snowpark_implementations/xgboost_external_memory_trainer_test.py b/snowflake/ml/modeling/_internal/snowpark_implementations/xgboost_external_memory_trainer_test.py index f16cc0b0..c8b56d56 100644 --- a/snowflake/ml/modeling/_internal/snowpark_implementations/xgboost_external_memory_trainer_test.py +++ b/snowflake/ml/modeling/_internal/snowpark_implementations/xgboost_external_memory_trainer_test.py @@ -5,10 +5,7 @@ from absl.testing import absltest from sklearn.datasets import load_iris -from snowflake.ml._internal.utils.temp_file_utils import ( - cleanup_temp_files, - get_temp_file_path, -) +from snowflake.ml._internal.utils import temp_file_utils from snowflake.ml.modeling._internal.snowpark_implementations.xgboost_external_memory_trainer import ( get_data_iterator, ) @@ -34,7 +31,7 @@ def test_data_iterator_single_file(self) -> None: num_rows_in_original_dataset = df.shape[0] batch_size = 20 - temp_file = get_temp_file_path() + temp_file = temp_file_utils.get_temp_file_path() df.to_parquet(temp_file) it = get_data_iterator( @@ -58,7 +55,7 @@ def consumer_func(data: pd.DataFrame, label: pd.DataFrame) -> None: self.assertEqual(num_rows, num_rows_in_original_dataset) self.assertEqual(num_batches, math.ceil(float(num_rows_in_original_dataset) / float(batch_size))) - cleanup_temp_files(temp_file) + temp_file_utils.cleanup_temp_files(temp_file) def test_data_iterator_multiple_file(self) -> None: df, input_cols, label_col = self.get_dataset() @@ -66,8 +63,8 @@ def test_data_iterator_multiple_file(self) -> None: num_rows_in_original_dataset = df.shape[0] batch_size = 20 - temp_file1 = get_temp_file_path() - temp_file2 = get_temp_file_path() + temp_file1 = temp_file_utils.get_temp_file_path() + temp_file2 = temp_file_utils.get_temp_file_path() df1, df2 = df.iloc[:70], df.iloc[70:] df1.to_parquet(temp_file1) df2.to_parquet(temp_file2) @@ -93,7 +90,7 @@ def consumer_func(data: pd.DataFrame, label: pd.DataFrame) -> None: self.assertEqual(num_rows, num_rows_in_original_dataset) self.assertEqual(num_batches, math.ceil(float(num_rows_in_original_dataset) / float(batch_size))) - cleanup_temp_files([temp_file1, temp_file2]) + temp_file_utils.cleanup_temp_files([temp_file1, temp_file2]) if __name__ == "__main__": diff --git a/snowflake/ml/modeling/framework/base.py b/snowflake/ml/modeling/framework/base.py index de24809b..96382c0a 100644 --- a/snowflake/ml/modeling/framework/base.py +++ b/snowflake/ml/modeling/framework/base.py @@ -16,7 +16,7 @@ exceptions, modeling_error_messages, ) -from snowflake.ml._internal.lineage import data_source, lineage_utils +from snowflake.ml._internal.lineage import lineage_utils from snowflake.ml._internal.utils import identifier, parallelize from snowflake.ml.modeling.framework import _utils from snowflake.snowpark import functions as F @@ -386,7 +386,6 @@ def __init__( self.file_names = file_names self.custom_states = custom_states self.sample_weight_col = sample_weight_col - self._data_sources: Optional[List[data_source.DataSource]] = None self.start_time = datetime.now().strftime(_utils.DATETIME_FORMAT)[:-3] @@ -421,18 +420,14 @@ def _get_dependencies(self) -> List[str]: """ return [] - def _get_data_sources(self) -> Optional[List[data_source.DataSource]]: - return self._data_sources - @telemetry.send_api_usage_telemetry( project=PROJECT, subproject=SUBPROJECT, ) def fit(self, dataset: Union[snowpark.DataFrame, pd.DataFrame]) -> "BaseEstimator": """Runs universal logics for all fit implementations.""" - self._data_sources = getattr(dataset, lineage_utils.DATA_SOURCES_ATTR, None) - if self._data_sources: - assert all(isinstance(ds, data_source.DataSource) for ds in self._data_sources) + data_sources = lineage_utils.get_data_sources(dataset) + lineage_utils.set_data_sources(self, data_sources) return self._fit(dataset) @abstractmethod diff --git a/snowflake/ml/modeling/impute/simple_imputer.py b/snowflake/ml/modeling/impute/simple_imputer.py index bb3d7858..73140a85 100644 --- a/snowflake/ml/modeling/impute/simple_imputer.py +++ b/snowflake/ml/modeling/impute/simple_imputer.py @@ -102,10 +102,14 @@ class SimpleImputer(base.BaseTransformer): For string or object data types, `fill_value` must be a string. If `None`, `fill_value` will be 0 when imputing numerical data and `missing_value` for strings and object data types. input_cols: Optional[Union[str, List[str]]] - Columns to use as inputs during fit and transform. + The name(s) of one or more columns in the input DataFrame containing feature(s) to be imputed. Input + columns must be specified before fit with this argument or after initialization with the + `set_input_cols` method. This argument is optional for API consistency. output_cols: Optional[Union[str, List[str]]] - A string or list of strings representing column names that will store the output of transform operation. - The length of `output_cols` must equal the length of `input_cols`. + The name(s) to assign output columns in the output DataFrame. The number of + output columns specified must equal the number of input columns. Output columns must be specified before + transform with this argument or after initialization with the `set_output_cols` method. This argument is + optional for API consistency. passthrough_cols: A string or a list of strings indicating column names to be excluded from any operations (such as train, transform, or inference). These specified column(s) will remain untouched throughout the process. This option is helpful in scenarios @@ -230,7 +234,7 @@ def check_type_consistency(col_types: Dict[str, T.DataType]) -> None: return input_col_datatypes - def fit(self, dataset: Union[snowpark.DataFrame, pd.DataFrame]) -> "SimpleImputer": + def _fit(self, dataset: Union[snowpark.DataFrame, pd.DataFrame]) -> "SimpleImputer": if isinstance(dataset, snowpark.DataFrame): return self._fit_snowpark(dataset) else: diff --git a/snowflake/ml/modeling/model_selection/grid_search_cv.py b/snowflake/ml/modeling/model_selection/grid_search_cv.py index 27b1e7ba..608f8f8f 100644 --- a/snowflake/ml/modeling/model_selection/grid_search_cv.py +++ b/snowflake/ml/modeling/model_selection/grid_search_cv.py @@ -285,11 +285,7 @@ def _get_active_columns(self) -> List[str]: ) return selected_cols - @telemetry.send_api_usage_telemetry( - project=_PROJECT, - subproject=_SUBPROJECT, - ) - def fit(self, dataset: Union[DataFrame, pd.DataFrame]) -> "GridSearchCV": + def _fit(self, dataset: Union[DataFrame, pd.DataFrame]) -> "GridSearchCV": """Run fit with all sets of parameters For more details on this function, see [sklearn.model_selection.GridSearchCV.fit] (https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html#sklearn.model_selection.GridSearchCV.fit) diff --git a/snowflake/ml/modeling/model_selection/randomized_search_cv.py b/snowflake/ml/modeling/model_selection/randomized_search_cv.py index 77d3ba8f..cfa5d591 100644 --- a/snowflake/ml/modeling/model_selection/randomized_search_cv.py +++ b/snowflake/ml/modeling/model_selection/randomized_search_cv.py @@ -298,11 +298,7 @@ def _get_active_columns(self) -> List[str]: ) return selected_cols - @telemetry.send_api_usage_telemetry( - project=_PROJECT, - subproject=_SUBPROJECT, - ) - def fit(self, dataset: Union[DataFrame, pd.DataFrame]) -> "RandomizedSearchCV": + def _fit(self, dataset: Union[DataFrame, pd.DataFrame]) -> "RandomizedSearchCV": """Run fit with all sets of parameters For more details on this function, see [sklearn.model_selection.RandomizedSearchCV.fit] (https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.RandomizedSearchCV.html#sklearn.model_selection.RandomizedSearchCV.fit) diff --git a/snowflake/ml/modeling/pipeline/pipeline.py b/snowflake/ml/modeling/pipeline/pipeline.py index b02e14f0..2a06dd91 100644 --- a/snowflake/ml/modeling/pipeline/pipeline.py +++ b/snowflake/ml/modeling/pipeline/pipeline.py @@ -17,6 +17,7 @@ from snowflake import snowpark from snowflake.ml._internal import file_utils, telemetry from snowflake.ml._internal.exceptions import error_codes, exceptions +from snowflake.ml._internal.lineage import lineage_utils from snowflake.ml._internal.utils import snowpark_dataframe_utils, temp_file_utils from snowflake.ml.model.model_signature import ModelSignature, _infer_signature from snowflake.ml.modeling._internal.model_transformer_builder import ( @@ -427,6 +428,10 @@ def fit(self, dataset: Union[snowpark.DataFrame, pd.DataFrame], squash: Optional else dataset ) + # Extract lineage information here since we're overriding fit() directly + data_sources = lineage_utils.get_data_sources(dataset) + lineage_utils.set_data_sources(self, data_sources) + if self._can_be_trained_in_ml_runtime(dataset): if not self._is_convertible_to_sklearn: raise ValueError("This pipeline cannot be converted to an sklearn pipeline.") diff --git a/snowflake/ml/modeling/preprocessing/binarizer.py b/snowflake/ml/modeling/preprocessing/binarizer.py index 34d2202c..c3b8af16 100644 --- a/snowflake/ml/modeling/preprocessing/binarizer.py +++ b/snowflake/ml/modeling/preprocessing/binarizer.py @@ -25,11 +25,15 @@ class Binarizer(base.BaseTransformer): Feature values below or equal to this are replaced by 0, above it by 1. Default values is 0.0. input_cols: Optional[Union[str, Iterable[str]]], default=None - The name(s) of one or more columns in a DataFrame containing a feature to be binarized. + The name(s) of one or more columns in the input DataFrame containing feature(s) to be binarized. Input + columns must be specified before transform with this argument or after initialization with the + `set_input_cols` method. This argument is optional for API consistency. output_cols: Optional[Union[str, Iterable[str]]], default=None - The name(s) of one or more columns in a DataFrame in which results will be stored. The number of - columns specified must match the number of input columns. + The name(s) to assign output columns in the output DataFrame. The number of + columns specified must equal the number of input columns. Output columns must be specified before transform + with this argument or after initialization with the `set_output_cols` method. This argument is optional for + API consistency. passthrough_cols: Optional[Union[str, Iterable[str]]], default=None A string or a list of strings indicating column names to be excluded from any diff --git a/snowflake/ml/modeling/preprocessing/k_bins_discretizer.py b/snowflake/ml/modeling/preprocessing/k_bins_discretizer.py index 29903843..1d0d30e8 100644 --- a/snowflake/ml/modeling/preprocessing/k_bins_discretizer.py +++ b/snowflake/ml/modeling/preprocessing/k_bins_discretizer.py @@ -74,10 +74,15 @@ class KBinsDiscretizer(base.BaseTransformer): - 'quantile': All bins in each feature have the same number of points. input_cols: str or Iterable [column_name], default=None - Single or multiple input columns. + The name(s) of one or more columns in the input DataFrame containing feature(s) to be discretized. + Input columns must be specified before fit with this argument or after initialization with the + `set_input_cols` method. This argument is optional for API consistency. output_cols: str or Iterable [column_name], default=None - Single or multiple output columns. + The name(s) to assign output columns in the output DataFrame. The number of + columns specified must equal the number of input columns. Output columns must be specified before transform + with this argument or after initialization with the `set_output_cols` method. This argument is optional for + API consistency. passthrough_cols: A string or a list of strings indicating column names to be excluded from any operations (such as train, transform, or inference). These specified column(s) diff --git a/snowflake/ml/modeling/preprocessing/label_encoder.py b/snowflake/ml/modeling/preprocessing/label_encoder.py index 7a14c58c..36a999bc 100644 --- a/snowflake/ml/modeling/preprocessing/label_encoder.py +++ b/snowflake/ml/modeling/preprocessing/label_encoder.py @@ -25,11 +25,12 @@ class LabelEncoder(base.BaseTransformer): Args: input_cols: Optional[Union[str, List[str]]] - The name of a column in a DataFrame to be encoded. May be a string or a list containing one string. + The name of a column or a list containing one column name to be encoded in the input DataFrame. There must + be exactly one input column specified before fit. This argument is optional for API consistency. output_cols: Optional[Union[str, List[str]]] - The name of a column in a DataFrame where the results will be stored. May be a string or a list - containing one string. + The name of a column or a list containing one column name where the results will be stored. There must be + exactly one output column specified before trainsform. This argument is optional for API consistency. passthrough_cols: Optional[Union[str, List[str]]] A string or a list of strings indicating column names to be excluded from any @@ -54,11 +55,11 @@ def __init__( Args: input_cols: Optional[Union[str, List[str]]] - The name of a column in a DataFrame to be encoded. May be a string or a list containing one - string. + The name of a column or a list containing one column name to be encoded in the input DataFrame. There + must be exactly one input column specified before fit. This argument is optional for API consistency. output_cols: Optional[Union[str, List[str]]] - The name of a column in a DataFrame where the results will be stored. May be a string or a list - containing one string. + The name of a column or a list containing one column name where the results will be stored. There must + be exactly one output column specified before transform. This argument is optional for API consistency. passthrough_cols: Optional[Union[str, List[str]]] A string or a list of strings indicating column names to be excluded from any operations (such as train, transform, or inference). These specified column(s) diff --git a/snowflake/ml/modeling/preprocessing/max_abs_scaler.py b/snowflake/ml/modeling/preprocessing/max_abs_scaler.py index 80da91c3..cd8dc323 100644 --- a/snowflake/ml/modeling/preprocessing/max_abs_scaler.py +++ b/snowflake/ml/modeling/preprocessing/max_abs_scaler.py @@ -28,11 +28,15 @@ class MaxAbsScaler(base.BaseTransformer): Args: input_cols: Optional[Union[str, List[str]]], default=None - The name(s) of one or more columns in a DataFrame containing a feature to be scaled. + The name(s) of one or more columns in the input DataFrame containing feature(s) to be scaled. Input + columns must be specified before fit with this argument or after initialization with the + `set_input_cols` method. This argument is optional for API consistency. output_cols: Optional[Union[str, List[str]]], default=None - The name(s) of one or more columns in a DataFrame in which results will be stored. The number of - columns specified must match the number of input columns. + The name(s) to assign output columns in the output DataFrame. The number of + columns specified must equal the number of input columns. Output columns must be specified before transform + with this argument or after initialization with the `set_output_cols` method. This argument is optional for + API consistency. passthrough_cols: Optional[Union[str, List[str]]], default=None A string or a list of strings indicating column names to be excluded from any diff --git a/snowflake/ml/modeling/preprocessing/min_max_scaler.py b/snowflake/ml/modeling/preprocessing/min_max_scaler.py index b5d8fa21..3d7c5ee4 100644 --- a/snowflake/ml/modeling/preprocessing/min_max_scaler.py +++ b/snowflake/ml/modeling/preprocessing/min_max_scaler.py @@ -29,12 +29,15 @@ class MinMaxScaler(base.BaseTransformer): Whether to clip transformed values of held-out data to the specified feature range (default is True). input_cols: Optional[Union[str, List[str]]], default=None - The name(s) of one or more columns in a DataFrame containing a feature to be scaled. Each specified - input column is scaled independently and stored in the corresponding output column. + The name(s) of one or more columns in the input DataFrame containing feature(s) to be scaled. Input + columns must be specified before fit with this argument or after initialization with the + `set_input_cols` method. This argument is optional for API consistency. output_cols: Optional[Union[str, List[str]]], default=None - The name(s) of one or more columns in a DataFrame in which results will be stored. The number of - columns specified must match the number of input columns. + The name(s) to assign output columns in the output DataFrame. The number of + columns specified must equal the number of input columns. Output columns must be specified before transform + with this argument or after initialization with the `set_output_cols` method. This argument is optional for + API consistency. passthrough_cols: Optional[Union[str, List[str]]], default=None A string or a list of strings indicating column names to be excluded from any diff --git a/snowflake/ml/modeling/preprocessing/normalizer.py b/snowflake/ml/modeling/preprocessing/normalizer.py index a3ebfa1c..da8756d3 100644 --- a/snowflake/ml/modeling/preprocessing/normalizer.py +++ b/snowflake/ml/modeling/preprocessing/normalizer.py @@ -28,11 +28,15 @@ class Normalizer(base.BaseTransformer): values. It must be one of 'l1', 'l2', or 'max'. input_cols: Optional[Union[str, List[str]]] - Columns to use as inputs during transform. + The name(s) of one or more columns in the input DataFrame containing feature(s) to be normalized. Input + columns must be specified before transform with this argument or after initialization with the + `set_input_cols` method. This argument is optional for API consistency. output_cols: Optional[Union[str, List[str]]] - A string or list of strings representing column names that will store the output of transform operation. - The length of `output_cols` must equal the length of `input_cols`. + The name(s) to assign output columns in the output DataFrame. The number of + columns specified must equal the number of input columns. Output columns must be specified before transform + with this argument or after initialization with the `set_output_cols` method. This argument is optional for + API consistency. passthrough_cols: Optional[Union[str, List[str]]] A string or a list of strings indicating column names to be excluded from any diff --git a/snowflake/ml/modeling/preprocessing/one_hot_encoder.py b/snowflake/ml/modeling/preprocessing/one_hot_encoder.py index c82354a6..d9ef2b48 100644 --- a/snowflake/ml/modeling/preprocessing/one_hot_encoder.py +++ b/snowflake/ml/modeling/preprocessing/one_hot_encoder.py @@ -157,10 +157,18 @@ class OneHotEncoder(base.BaseTransformer): there is no limit to the number of output features. input_cols: Optional[Union[str, List[str]]], default=None - Single or multiple input columns. + The name(s) of one or more columns in the input DataFrame containing feature(s) to be encoded. Input + columns must be specified before fit with this argument or after initialization with the + `set_input_cols` method. This argument is optional for API consistency. output_cols: Optional[Union[str, List[str]]], default=None - Single or multiple output columns. + The prefix to be used for encoded output for each input column. The number of + output column prefixes specified must match the number of input columns. Output column prefixes must be + specified before transform with this argument or after initialization with the `set_output_cols` method. + + Note: Dense output column names are case-sensitive and resolve identifiers following Snowflake rules, e.g. + `"PREFIX_a"`, `PREFIX_A`, `"prefix_A"`. Therefore, there is no need to provide double-quoted column names + as that would result in invalid identifiers. passthrough_cols: Optional[Union[str, List[str]]] A string or a list of strings indicating column names to be excluded from any diff --git a/snowflake/ml/modeling/preprocessing/ordinal_encoder.py b/snowflake/ml/modeling/preprocessing/ordinal_encoder.py index c48b8b85..37e4a589 100644 --- a/snowflake/ml/modeling/preprocessing/ordinal_encoder.py +++ b/snowflake/ml/modeling/preprocessing/ordinal_encoder.py @@ -67,11 +67,14 @@ class OrdinalEncoder(base.BaseTransformer): The value to be used to encode unknown categories. input_cols: Optional[Union[str, List[str]]], default=None - The name(s) of one or more columns in a DataFrame containing a feature to be encoded. + The name(s) of one or more columns in the input DataFrame containing feature(s) to be encoded. Input + columns must be specified before fit with this argument or after initialization with the + `set_input_cols` method. This argument is optional for API consistency. output_cols: Optional[Union[str, List[str]]], default=None - The name(s) of one or more columns in a DataFrame in which results will be stored. The number of - columns specified must match the number of input columns. + The prefix to be used for encoded output for each input column. The number of + output column prefixes specified must equal the number of input columns. Output column prefixes must be + specified before transform with this argument or after initialization with the `set_output_cols` method. passthrough_cols: Optional[Union[str, List[str]]], default=None A string or a list of strings indicating column names to be excluded from any @@ -247,7 +250,7 @@ def _fit_category_state(self, dataset: snowpark.DataFrame) -> None: # columns: COLUMN_NAME, CATEGORY, INDEX state_df = self._get_category_index_state_df(dataset) # save the dataframe on server side so that transform doesn't need to upload - state_df.write.save_as_table( # type: ignore[call-overload] + state_df.write.save_as_table( self._vocab_table_name, mode="overwrite", table_type="temporary", @@ -520,7 +523,7 @@ def _transform_snowpark(self, dataset: snowpark.DataFrame) -> snowpark.DataFrame ) batch_table_name = snowpark_utils.random_name_for_temp_object(snowpark_utils.TempObjectType.TABLE) - transformed_dataset.write.save_as_table( # type: ignore[call-overload] + transformed_dataset.write.save_as_table( batch_table_name, mode="overwrite", table_type="temporary", diff --git a/snowflake/ml/modeling/preprocessing/robust_scaler.py b/snowflake/ml/modeling/preprocessing/robust_scaler.py index 7f8dd968..f88aa2a4 100644 --- a/snowflake/ml/modeling/preprocessing/robust_scaler.py +++ b/snowflake/ml/modeling/preprocessing/robust_scaler.py @@ -37,12 +37,15 @@ class RobustScaler(base.BaseTransformer): the dataset is scaled down. If less than 1, the dataset is scaled up. input_cols: Optional[Union[str, List[str]]], default=None - The name(s) of one or more columns in a DataFrame containing a feature to be scaled. + The name(s) of one or more columns in the input DataFrame containing feature(s) to be scaled. Input + columns must be specified before fit with this argument or after initialization with the + `set_input_cols` method. This argument is optional for API consistency. output_cols: Optional[Union[str, List[str]]], default=None - The name(s) of one or more columns in a DataFrame in which results will be stored. The number of - columns specified must match the number of input columns. For dense output, the column names specified are - used as base names for the columns created for each category. + The name(s) to assign output columns in the output DataFrame. The number of + columns specified must equal the number of input columns. Output columns must be specified before transform + with this argument or after initialization with the `set_output_cols` method. This argument is optional for + API consistency. passthrough_cols: Optional[Union[str, List[str]]], default=None A string or a list of strings indicating column names to be excluded from any diff --git a/snowflake/ml/modeling/preprocessing/standard_scaler.py b/snowflake/ml/modeling/preprocessing/standard_scaler.py index 8ae50993..091dbd35 100644 --- a/snowflake/ml/modeling/preprocessing/standard_scaler.py +++ b/snowflake/ml/modeling/preprocessing/standard_scaler.py @@ -26,11 +26,15 @@ class StandardScaler(base.BaseTransformer): If True, scale the data unit variance (i.e. unit standard deviation). input_cols: Optional[Union[str, List[str]]], default=None - The name(s) of one or more columns in a DataFrame containing a feature to be scaled. + The name(s) of one or more columns in the input DataFrame containing feature(s) to be scaled. Input + columns must be specified before fit with this argument or after initialization with the + `set_input_cols` method. This argument is optional for API consistency. output_cols: Optional[Union[str, List[str]]], default=None - The name(s) of one or more columns in a DataFrame in which results will be stored. The number of - columns specified must match the number of input columns. + The name(s) to assign output columns in the output DataFrame. The number of + columns specified must equal the number of input columns. Output columns must be specified before transform + with this argument or after initialization with the `set_output_cols` method. This argument is optional for + API consistency. passthrough_cols: Optional[Union[str, List[str]]], default=None A string or a list of strings indicating column names to be excluded from any diff --git a/snowflake/ml/version.bzl b/snowflake/ml/version.bzl index c78f51ef..6d247f40 100644 --- a/snowflake/ml/version.bzl +++ b/snowflake/ml/version.bzl @@ -1,2 +1,2 @@ # This is parsed by regex in conda reciper meta file. Make sure not to break it. -VERSION = "1.5.2" +VERSION = "1.5.3" diff --git a/tests/integ/snowflake/ml/feature_store/feature_store_large_scale_test.py b/tests/integ/snowflake/ml/feature_store/feature_store_large_scale_test.py index 3d7ec6ad..cd10c8cc 100644 --- a/tests/integ/snowflake/ml/feature_store/feature_store_large_scale_test.py +++ b/tests/integ/snowflake/ml/feature_store/feature_store_large_scale_test.py @@ -176,7 +176,7 @@ def create_select_query(start: str, end: str) -> str: .sort_values(by="PULOCATIONID") .reset_index(drop=True) ) - assert_frame_equal(expected_pdf, actual_pdf, check_dtype=True) + assert_frame_equal(expected_pdf, actual_pdf, check_dtype=False) if __name__ == "__main__": diff --git a/tests/integ/snowflake/ml/fileset/BUILD.bazel b/tests/integ/snowflake/ml/fileset/BUILD.bazel index 83c26388..57ce0b32 100644 --- a/tests/integ/snowflake/ml/fileset/BUILD.bazel +++ b/tests/integ/snowflake/ml/fileset/BUILD.bazel @@ -57,7 +57,7 @@ py_test( name = "fileset_tensorflow_integ_test", timeout = "long", srcs = ["fileset_tensorflow_integ_test.py"], - shard_count = 4, + shard_count = 8, deps = [ ":fileset_integ_test_base", "//snowflake/ml/fileset", diff --git a/tests/integ/snowflake/ml/model/_client/model/input_validation_integ_test.py b/tests/integ/snowflake/ml/model/_client/model/input_validation_integ_test.py index 75b0d226..8bd08070 100644 --- a/tests/integ/snowflake/ml/model/_client/model/input_validation_integ_test.py +++ b/tests/integ/snowflake/ml/model/_client/model/input_validation_integ_test.py @@ -1,3 +1,4 @@ +import datetime import uuid import pandas as pd @@ -10,6 +11,7 @@ from tests.integ.snowflake.ml.test_utils import dataframe_utils, db_manager MODEL_NAME = "TEST_MODEL" +TIMESTAMP_MODEL_NAME = "TEST_TIMESTAMP_MODEL" VERSION_NAME = "V1" @@ -67,6 +69,25 @@ def setUpClass(self) -> None: }, ) + self._mv_timestamp = self.registry.log_model( + model=lm, + model_name=TIMESTAMP_MODEL_NAME, + version_name=VERSION_NAME, + signatures={ + "predict": model_signature.ModelSignature( + inputs=[ + model_signature.FeatureSpec(name="c1", dtype=model_signature.DataType.INT8), + model_signature.FeatureSpec(name="c2", dtype=model_signature.DataType.INT8), + model_signature.FeatureSpec(name="c3", dtype=model_signature.DataType.INT8), + model_signature.FeatureSpec(name="c4", dtype=model_signature.DataType.TIMESTAMP_NTZ), + ], + outputs=[ + model_signature.FeatureSpec(name="output", dtype=model_signature.DataType.INT8), + ], + ) + }, + ) + @classmethod def tearDownClass(self) -> None: self._db_manager.drop_database(self._test_db) @@ -114,6 +135,34 @@ def test_strict(self) -> None: with self.assertRaisesRegex(ValueError, "Data Validation Error"): self._mv.run(sp_df, strict_input_validation=True) + def test_timestamps(self) -> None: + d1 = datetime.datetime(year=2024, month=6, day=21, minute=1, second=1) + d2 = datetime.datetime(year=2024, month=7, day=11, minute=1, second=1) + + pd.testing.assert_frame_equal( + self._mv_timestamp.run(pd.DataFrame([[1, 2, 3, d1], [4, 2, 5, d2]]), strict_input_validation=True), + pd.DataFrame([1, 4], columns=["output"]), + check_dtype=False, + ) + + with self.assertRaisesRegex(ValueError, "Data Validation Error"): + self._mv_timestamp.run(pd.DataFrame([[1, 2, 4, d1], [257, 2, 5, d2]]), strict_input_validation=True) + + sp_df = self._session.create_dataframe([[1, 2, 3, d1], [4, 2, 5, d2]], schema=['"c1"', '"c2"', '"c3"', '"c4"']) + y_df_expected = pd.DataFrame([[1, 2, 3, d1, 1], [4, 2, 5, d2, 4]], columns=["c1", "c2", "c3", "c4", "output"]) + dataframe_utils.check_sp_df_res( + self._mv_timestamp.run(sp_df, strict_input_validation=True), y_df_expected, check_dtype=False + ) + + sp_df = self._session.create_dataframe( + [[1, 2, 3, d1], [257, 2, 5, d2]], schema=['"c1"', '"c2"', '"c3"', '"c4"'] + ) + y_df_expected = pd.DataFrame( + [[1, 2, 3, d1, 1], [257, 2, 5, d2, 257]], columns=["c1", "c2", "c3", "c4", "output"] + ) + with self.assertRaisesRegex(ValueError, "Data Validation Error"): + self._mv_timestamp.run(sp_df, strict_input_validation=True) + if __name__ == "__main__": absltest.main() diff --git a/tests/integ/snowflake/ml/registry/BUILD.bazel b/tests/integ/snowflake/ml/registry/BUILD.bazel index cb76af20..93451bd4 100644 --- a/tests/integ/snowflake/ml/registry/BUILD.bazel +++ b/tests/integ/snowflake/ml/registry/BUILD.bazel @@ -2,6 +2,7 @@ load("//bazel:py_rules.bzl", "py_library", "py_test") py_test( name = "model_registry_basic_integ_test", + timeout = "long", srcs = ["model_registry_basic_integ_test.py"], deps = [ "//snowflake/ml/registry:model_registry", diff --git a/tests/integ/snowflake/ml/registry/model/BUILD.bazel b/tests/integ/snowflake/ml/registry/model/BUILD.bazel index 1d23d45f..635e6b9e 100644 --- a/tests/integ/snowflake/ml/registry/model/BUILD.bazel +++ b/tests/integ/snowflake/ml/registry/model/BUILD.bazel @@ -141,6 +141,7 @@ py_test( ":registry_model_test_base", "//snowflake/ml/modeling/lightgbm:lgbm_regressor", "//snowflake/ml/modeling/linear_model:logistic_regression", + "//snowflake/ml/modeling/pipeline", "//snowflake/ml/modeling/xgboost:xgb_regressor", ], ) diff --git a/tests/integ/snowflake/ml/registry/model/registry_custom_model_test.py b/tests/integ/snowflake/ml/registry/model/registry_custom_model_test.py index 0f64503a..10ddb603 100644 --- a/tests/integ/snowflake/ml/registry/model/registry_custom_model_test.py +++ b/tests/integ/snowflake/ml/registry/model/registry_custom_model_test.py @@ -97,6 +97,31 @@ async def _test(self: "TestRegistryCustomModelInteg") -> None: asyncio.get_event_loop().run_until_complete(_test(self)) + @registry_model_test_base.RegistryModelTestBase.sproc_test(test_owners_rights=False) + @parameterized.product( # type: ignore[misc] + registry_test_fn=registry_model_test_base.RegistryModelTestBase.REGISTRY_TEST_FN_LIST, + ) + def test_large_input( + self, + registry_test_fn: str, + ) -> None: + arr = np.random.randint(100, size=(1_000_000, 3)) + pd_df = pd.DataFrame(arr, columns=["c1", "c2", "c3"]) + clf = DemoModel(custom_model.ModelContext()) + getattr(self, registry_test_fn)( + model=clf, + sample_input_data=pd_df, + prediction_assert_fns={ + "predict": ( + pd_df, + lambda res: pd.testing.assert_frame_equal( + res, + pd.DataFrame(arr[:, 0], columns=["output"]), + ), + ), + }, + ) + @parameterized.product( # type: ignore[misc] registry_test_fn=registry_model_test_base.RegistryModelTestBase.REGISTRY_TEST_FN_LIST, ) diff --git a/tests/integ/snowflake/ml/registry/model/registry_modeling_model_test.py b/tests/integ/snowflake/ml/registry/model/registry_modeling_model_test.py index 6f80627c..1114bd24 100644 --- a/tests/integ/snowflake/ml/registry/model/registry_modeling_model_test.py +++ b/tests/integ/snowflake/ml/registry/model/registry_modeling_model_test.py @@ -10,6 +10,7 @@ from snowflake.ml.model._model_composer import model_composer from snowflake.ml.modeling.lightgbm import LGBMRegressor from snowflake.ml.modeling.linear_model import LogisticRegression +from snowflake.ml.modeling.pipeline import Pipeline from snowflake.ml.modeling.xgboost import XGBRegressor from snowflake.snowpark import types as T from tests.integ.snowflake.ml.registry.model import registry_model_test_base @@ -106,10 +107,12 @@ def test_snowml_model_deploy_lightgbm( @parameterized.product( # type: ignore[misc] registry_test_fn=registry_model_test_base.RegistryModelTestBase.REGISTRY_TEST_FN_LIST, + use_pipeline=[False, True], ) def test_dataset_to_model_lineage( self, registry_test_fn: str, + use_pipeline: bool = False, ) -> None: iris_X = datasets.load_iris(as_frame=True).frame iris_X.columns = [s.replace(" (CM)", "").replace(" ", "") for s in iris_X.columns.str.upper()] @@ -118,14 +121,18 @@ def test_dataset_to_model_lineage( LABEL_COLUMNS = "TARGET" OUTPUT_COLUMNS = "PREDICTED_TARGET" regr = LogisticRegression(input_cols=INPUT_COLUMNS, output_cols=OUTPUT_COLUMNS, label_cols=LABEL_COLUMNS) - schema = [ - T.StructField("SEPALLENGTH", T.DoubleType()), - T.StructField("SEPALWIDTH", T.DoubleType()), - T.StructField("PETALLENGTH", T.DoubleType()), - T.StructField("PETALWIDTH", T.DoubleType()), - T.StructField("TARGET", T.StringType()), - T.StructField("PREDICTED_TARGET", T.StringType()), - ] + if use_pipeline: + regr = Pipeline([("regr", regr)]) + schema = T.StructType( + [ + T.StructField("SEPALLENGTH", T.DoubleType()), + T.StructField("SEPALWIDTH", T.DoubleType()), + T.StructField("PETALLENGTH", T.DoubleType()), + T.StructField("PETALWIDTH", T.DoubleType()), + T.StructField("TARGET", T.StringType()), + T.StructField("PREDICTED_TARGET", T.StringType()), + ] + ) test_features_df = self.session.create_dataframe(iris_X, schema=schema) test_features_dataset = dataset.create_from_dataframe(