Skip to content

Commit

Permalink
Project import generated by Copybara. (#16)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 1c09d7ecb92720c6367448f920684dabf40d2813

Co-authored-by: Snowflake Authors <[email protected]>
  • Loading branch information
snowflake-provisioner and Snowflake Authors authored May 23, 2023
1 parent 2a14eaf commit cd38c89
Show file tree
Hide file tree
Showing 37 changed files with 2,596 additions and 542 deletions.
7 changes: 6 additions & 1 deletion ci/conda_recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,25 @@ requirements:
- python
- absl-py>=0.15,<2
- anyio>=3.5.0,<4
- cloudpickle
- fsspec>=2022.11,<=2023.1
- numpy>=1.23,<1.24
- packaging>=23.0,<24
- pyyaml>=6.0,<7
- scipy>=1.9,<2
- snowflake-connector-python
- snowflake-snowpark-python>=1.4.0,<=2
- sqlparse>=0.4,<1
- typing-extensions>=4.1.0,<5

# conda-libmamba-solver is conda-specific requirement, and should not appear in wheel's dependency.
- conda-libmamba-solver>=23.1.0,<24

# TODO(snandamuri): Versions of these packages must be exactly same between user's workspace and
# snowpark sandbox. Generic definitions like scikit-learn>=1.1.0,<2 wont work because snowflake conda channel
# only has a few allowlisted versions of scikit-learn available, so we must force users to use scikit-learn
# versions that are available in the snowflake conda channel. Since there is no way to specify allow list of
# versions in the requirements file, we are pinning the versions here.
- joblib>=1.0.0,<=1.1.1
- scikit-learn>=1.2.1,<2
- xgboost==1.7.3
about:
Expand Down
9 changes: 6 additions & 3 deletions codegen/sklearn_wrapper_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,9 +802,10 @@ def generate(self) -> "SklearnWrapperGenerator":
if self._is_hist_gradient_boosting_regressor:
self.test_estimator_input_args_list.extend(["min_samples_leaf=1", "max_leaf_nodes=100"])

# TODO(snandamuri): Replace cloudpickle with joblib after latest version of joblib is added to snowflake conda.
self.fit_sproc_deps = self.predict_udf_deps = (
"f'numpy=={np.__version__}', f'pandas=={pd.__version__}', f'scikit-learn=={sklearn.__version__}', "
"f'xgboost=={xgboost.__version__}', f'joblib=={joblib.__version__}'"
"f'xgboost=={xgboost.__version__}', f'cloudpickle=={cp.__version__}'"
)
self._construct_string_from_lists()
return self
Expand All @@ -819,9 +820,10 @@ def generate(self) -> "XGBoostWrapperGenerator":
self.estimator_imports_list.append("import xgboost")
self.test_estimator_input_args_list.extend(["random_state=0", "subsample=1.0", "colsample_bynode=1.0"])
self.fit_sproc_imports = "import xgboost"
# TODO(snandamuri): Replace cloudpickle with joblib after latest version of joblib is added to snowflake conda.
self.fit_sproc_deps = self.predict_udf_deps = (
"f'numpy=={np.__version__}', f'pandas=={pd.__version__}', f'xgboost=={xgboost.__version__}', "
"f'joblib=={joblib.__version__}'"
"f'cloudpickle=={cp.__version__}'"
)
self._construct_string_from_lists()
return self
Expand All @@ -836,9 +838,10 @@ def generate(self) -> "LightGBMWrapperGenerator":
self.estimator_imports_list.append("import lightgbm")
self.test_estimator_input_args_list.extend(["random_state=0"])
self.fit_sproc_imports = "import lightgbm"
# TODO(snandamuri): Replace cloudpickle with joblib after latest version of joblib is added to snowflake conda.
self.fit_sproc_deps = self.predict_udf_deps = (
"f'numpy=={np.__version__}', f'pandas=={pd.__version__}', f'lightgbm=={lightgbm.__version__}', "
"f'joblib=={joblib.__version__}'"
"f'cloudpickle=={cp.__version__}'"
)
self._construct_string_from_lists()
return self
64 changes: 50 additions & 14 deletions codegen/sklearn_wrapper_template.py_template
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import os
from typing import Iterable, Optional, Union, List, Any, Dict, Callable
from uuid import uuid4

import joblib
import cloudpickle as cp
import pandas as pd
import numpy as np
{transform.estimator_imports}
Expand Down Expand Up @@ -183,7 +183,8 @@ class {transform.original_class_name}(BaseTransformer):

# Create a temp file and dump the transform to that file.
local_transform_file_name = get_temp_file_path()
joblib.dump(self._sklearn_object, local_transform_file_name)
with open(local_transform_file_name, mode="w+b") as local_transform_file:
cp.dump(self._sklearn_object, local_transform_file)

# Create temp stage to run fit.
transform_stage_name = "SNOWML_TRANSFORM_{{safe_id}}".format(safe_id=self.id)
Expand Down Expand Up @@ -214,7 +215,13 @@ class {transform.original_class_name}(BaseTransformer):
custom_tags=dict([("autogen", True)]),
)
# Put locally serialized transform on stage.
session.file.put(local_transform_file_name, stage_transform_file_name, auto_compress=False, overwrite=True, statement_params=statement_params)
session.file.put(
local_transform_file_name,
stage_transform_file_name,
auto_compress=False,
overwrite=True,
statement_params=statement_params
)

@sproc(
is_permanent=False,
Expand All @@ -233,7 +240,7 @@ class {transform.original_class_name}(BaseTransformer):
label_cols: List[str],
sample_weight_col: Optional[str]
) -> str:
import joblib
import cloudpickle as cp
import numpy as np
import os
import pandas
Expand All @@ -251,7 +258,12 @@ class {transform.original_class_name}(BaseTransformer):

session.file.get(stage_transform_file_name, local_transform_file_name, statement_params=statement_params)

estimator = joblib.load(os.path.join(local_transform_file_name, os.listdir(local_transform_file_name)[0]))
local_transform_file_path = os.path.join(
local_transform_file_name,
os.listdir(local_transform_file_name)[0]
)
with open(local_transform_file_path, mode="r+b") as local_transform_file_obj:
estimator = cp.load(local_transform_file_obj)

argspec = inspect.getfullargspec(estimator.fit)
args = {{'X': df[input_cols]}}
Expand All @@ -268,12 +280,20 @@ class {transform.original_class_name}(BaseTransformer):
local_result_file_name = local_result_file.name
local_result_file.close()

joblib_dump_files = joblib.dump(estimator, local_result_file_name)
session.file.put(local_result_file_name, stage_result_file_name, auto_compress = False, overwrite = True, statement_params=statement_params)
with open(local_result_file_name, mode="w+b") as local_result_file_obj:
cp.dump(estimator, local_result_file_obj)

session.file.put(
local_result_file_name,
stage_result_file_name,
auto_compress = False,
overwrite = True,
statement_params=statement_params
)

# Note: you can add something like + "|" + str(df) to the return string
# to pass debug information to the caller.
return str(os.path.basename(joblib_dump_files[0]))
return str(os.path.basename(local_result_file_name))

# Call fit sproc
statement_params = telemetry.get_function_usage_statement_params(
Expand Down Expand Up @@ -302,8 +322,13 @@ class {transform.original_class_name}(BaseTransformer):
if len(fields) > 1:
print("\n".join(fields[1:]))

session.file.get(os.path.join(stage_result_file_name, sproc_export_file_name), local_result_file_name, statement_params=statement_params)
self._sklearn_object = joblib.load(os.path.join(local_result_file_name, sproc_export_file_name))
session.file.get(
os.path.join(stage_result_file_name, sproc_export_file_name),
local_result_file_name,
statement_params=statement_params
)
with open(os.path.join(local_result_file_name, sproc_export_file_name),mode="r+b") as result_file_obj:
self._sklearn_object = cp.load(result_file_obj)

cleanup_temp_files([local_transform_file_name, local_result_file_name])

Expand Down Expand Up @@ -843,7 +868,8 @@ class {transform.original_class_name}(BaseTransformer):

# Create a temp file and dump the score to that file.
local_score_file_name = get_temp_file_path()
joblib.dump(self._sklearn_object, local_score_file_name)
with open(local_score_file_name, mode="w+b") as local_score_file:
cp.dump(self._sklearn_object, local_score_file)

# Create temp stage to run score.
score_stage_name = "SNOWML_SCORE_{{safe_id}}".format(safe_id=self.id)
Expand Down Expand Up @@ -872,7 +898,13 @@ class {transform.original_class_name}(BaseTransformer):
custom_tags=dict([("autogen", True)]),
)
# Put locally serialized score on stage.
session.file.put(local_score_file_name, stage_score_file_name, auto_compress=False, overwrite=True, statement_params=statement_params)
session.file.put(
local_score_file_name,
stage_score_file_name,
auto_compress=False,
overwrite=True,
statement_params=statement_params
)

@sproc(
is_permanent=False,
Expand All @@ -890,7 +922,7 @@ class {transform.original_class_name}(BaseTransformer):
label_cols: List[str],
sample_weight_col: Optional[str]
) -> float:
import joblib
import cloudpickle as cp
import numpy as np
import os
import pandas
Expand All @@ -905,7 +937,11 @@ class {transform.original_class_name}(BaseTransformer):
local_score_file.close()

session.file.get(stage_score_file_name, local_score_file_name, statement_params=statement_params)
estimator = joblib.load(os.path.join(local_score_file_name, os.listdir(local_score_file_name)[0]))

local_score_file_name_path = os.path.join(local_score_file_name, os.listdir(local_score_file_name)[0])
with open(local_score_file_name_path, mode="r+b") as local_score_file_obj:
estimator = cp.load(local_score_file_obj)

argspec = inspect.getfullargspec(estimator.score)
if "X" in argspec.args:
args = {{'X': df[input_cols]}}
Expand Down
3 changes: 2 additions & 1 deletion conda-env-snowflake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies:
- lightgbm==3.3.5
- networkx==2.8.4
- numpy==1.23.4
- packaging==23.0
- pandas==1.4.4
- pytest==7.1.2
- python==3.8.13
Expand All @@ -35,6 +36,6 @@ dependencies:
- scikit-learn==1.2.2
- snowflake-snowpark-python==1.4.0
- sqlparse==0.4.3
- typing-extensions==4.3.0
- typing-extensions==4.5.0
- xgboost==1.7.3
- mypy==0.981 # not a package dependency.
3 changes: 2 additions & 1 deletion conda-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies:
- mypy==0.981
- networkx==2.8.4
- numpy==1.23.4
- packaging==23.0
- pandas==1.4.4
- pytest==7.1.2
- python==3.8.13
Expand All @@ -36,5 +37,5 @@ dependencies:
- torchdata==0.4.1
- transformers==4.27.1
- types-PyYAML==6.0.12
- typing-extensions==4.3.0
- typing-extensions==4.5.0
- xgboost==1.7.3
4 changes: 3 additions & 1 deletion snowflake/ml/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ snowml_wheel(
requires = [
"absl-py>=0.15,<2",
"anyio>=3.5.0,<4",
"cloudpickle", # Version range is specified by snowpark. We are implicitly depending on it.
"fsspec[http]>=2022.11,<=2023.1",
"numpy>=1.23,<1.24",
"packaging>=23.0,<24",
"pyyaml>=6.0,<7",
"scipy>=1.9,<2",
"snowflake-connector-python[pandas]",
"snowflake-snowpark-python>=1.4.0,<2",
"sqlparse>=0.4,<1",
"typing-extensions>=4.1.0,<5",

# TODO(snandamuri): Versions of these packages must be exactly same between user's workspace and
# snowpark sandbox. Generic definitions like scikit-learn>=1.1.0,<2 wont work because snowflake conda channel
Expand All @@ -53,7 +56,6 @@ snowml_wheel(
# versions in the requirements file, we are pinning the versions here.
"scikit-learn>=1.2.1,<2",
"xgboost==1.7.3",
"joblib>=1.0.0,<=1.1.1", # All the release versions between 1.0.0 and 1.1.1 are available in SF Conda channel.
],
version = VERSION,
deps = [
Expand Down
1 change: 1 addition & 0 deletions snowflake/ml/_internal/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ py_test(
srcs = ["env_utils_test.py"],
deps = [
":env_utils",
":env",
"//snowflake/ml/test_utils:mock_data_frame",
"//snowflake/ml/test_utils:mock_session",
],
Expand Down
65 changes: 51 additions & 14 deletions snowflake/ml/_internal/env_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from snowflake.ml._internal.utils import query_result_checker
from snowflake.snowpark import session

_INFO_SCHEMA_PACKAGES_HAS_RUNTIME_VERSION: Optional[bool] = None
_SNOWFLAKE_CONDA_PACKAGE_CACHE: Dict[str, List[version.Version]] = {}


Expand Down Expand Up @@ -219,13 +220,16 @@ def relax_requirement_version(req: requirements.Requirement) -> requirements.Req
return new_req


def resolve_conda_environment(packages: List[requirements.Requirement], channels: List[str]) -> Optional[List[str]]:
def resolve_conda_environment(
packages: List[requirements.Requirement], channels: List[str], python_version: str
) -> Optional[List[str]]:
"""Use conda api to check if given packages are resolvable in given channels. Only work when conda is
locally installed.
Args:
packages: Packages to be installed.
channels: Anaconda channels (name or url) where conda should search into.
python_version: A string of python version where model is run.
Returns:
List of frozen dependencies represented in PEP 508 form if resolvable, None otherwise.
Expand All @@ -234,7 +238,7 @@ def resolve_conda_environment(packages: List[requirements.Requirement], channels
from conda_libmamba_solver import solver

package_names = list(map(lambda x: x.name, packages))
specs = list(map(str, packages))
specs = list(map(str, packages)) + [f"python=={python_version}"]

conda_solver = solver.LibMambaSolver("snow-env", channels=channels, specs_to_add=specs)
try:
Expand All @@ -252,18 +256,38 @@ def resolve_conda_environment(packages: List[requirements.Requirement], channels
)


def _check_runtime_version_column_existence(session: session.Session) -> bool:
sql = textwrap.dedent(
"""
SHOW COLUMNS
LIKE 'runtime_version'
IN TABLE information_schema.packages;
"""
)
result = session.sql(sql).count()
return result == 1


def validate_requirements_in_snowflake_conda_channel(
session: session.Session, reqs: List[requirements.Requirement]
session: session.Session, reqs: List[requirements.Requirement], python_version: str
) -> Optional[List[str]]:
"""Search the snowflake anaconda channel for packages with version meet the specifier.
Args:
session: Snowflake connection session.
reqs: List of requirement specifiers.
python_version: A string of python version where model is run.
Raises:
ValueError: Raised when the specifier cannot be supported when creating UDF.
Returns:
A list of pinned latest version that available in Snowflake anaconda channel and meet the version specifier.
"""
global _INFO_SCHEMA_PACKAGES_HAS_RUNTIME_VERSION

if _INFO_SCHEMA_PACKAGES_HAS_RUNTIME_VERSION is None:
_INFO_SCHEMA_PACKAGES_HAS_RUNTIME_VERSION = _check_runtime_version_column_existence(session)
ret_list = []
reqs_to_request = []
for req in reqs:
Expand All @@ -273,14 +297,26 @@ def validate_requirements_in_snowflake_conda_channel(
pkg_names_str = " OR ".join(
f"package_name = '{req_name}'" for req_name in sorted(req.name for req in reqs_to_request)
)
sql = textwrap.dedent(
f"""
SELECT PACKAGE_NAME, VERSION
FROM information_schema.packages
WHERE ({pkg_names_str})
AND language = 'python';
"""
)
if _INFO_SCHEMA_PACKAGES_HAS_RUNTIME_VERSION:
parsed_python_version = version.Version(python_version)
sql = textwrap.dedent(
f"""
SELECT PACKAGE_NAME, VERSION
FROM information_schema.packages
WHERE ({pkg_names_str})
AND language = 'python'
AND runtime_version = '{parsed_python_version.major}.{parsed_python_version.minor}';
"""
)
else:
sql = textwrap.dedent(
f"""
SELECT PACKAGE_NAME, VERSION
FROM information_schema.packages
WHERE ({pkg_names_str})
AND language = 'python';
"""
)

try:
result = (
Expand All @@ -301,10 +337,11 @@ def validate_requirements_in_snowflake_conda_channel(
except snowflake.connector.DataError:
return None
for req in reqs:
available_versions = list(req.specifier.filter(_SNOWFLAKE_CONDA_PACKAGE_CACHE.get(req.name, [])))
if len(req.specifier) > 1 or any(spec.operator != "==" for spec in req.specifier):
raise ValueError("At most 1 version specifier using == operator is supported without local conda resolver.")
available_versions = list(req.specifier.filter(set(_SNOWFLAKE_CONDA_PACKAGE_CACHE.get(req.name, []))))
if not available_versions:
return None
else:
latest_version = max(available_versions)
ret_list.append(f"{req.name}=={latest_version}")
ret_list.append(str(req))
return sorted(ret_list)
Loading

0 comments on commit cd38c89

Please sign in to comment.