Skip to content

Commit

Permalink
Revert "Test code and version support update (#378)"
Browse files Browse the repository at this point in the history
This reverts commit 480299b.
  • Loading branch information
tnixon authored Feb 28, 2024
1 parent 480299b commit b79dbdd
Show file tree
Hide file tree
Showing 17 changed files with 110 additions and 114 deletions.
9 changes: 6 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ Run the following command in your terminal to create a virtual environment in th
tox --devenv .venv -e {environment-name}
```
The `—devenv` flag tells `tox` to create a development environment, and `.venv` is the folder where the virtual environment will be created.

## Environments we test
The environments we test against are defined within the `tox.ini` file, and the requirements for those environments are stored in `python/tests/requirements`. The makeup of these environments is inspired by the [Databricks Runtime](https://docs.databricks.com/en/release-notes/runtime/index.html#) (hence the naming convention), but it's important to note that developing Databricks is **not** a requirement. We're simply mimicking some of the different runtime versions because (a) we recognize that much of the user base uses `tempo` on Databricks and (b) it saves development time spent trying to build out test environments with different versions of Python and PySpark from scratch.
Pre-defined environments can be found within the `tox.ini` file for different Python versions and their corresponding PySpark version. They include:
- py37-pyspark300
- py38-pyspark312
- py38-pyspark321
- py39-pyspark330
- py39-pyspark332

## Run tests locally for one or more environments
You can run tests locally for one or more environments defined enviornments without setting up a development environment first.
Expand Down
6 changes: 0 additions & 6 deletions docs/requirements.txt

This file was deleted.

19 changes: 19 additions & 0 deletions python/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
ipython==8.10.0
numpy==1.24.3
chispa==0.9.2
pandas==1.5.2
pyarrow==12.0.0
python-dateutil==2.8.2
pytz==2022.7.1
scipy==1.10.1
six==1.16.0
wheel==0.38.4
semver==2.13.0
sphinx-autobuild==2021.3.14
furo==2022.9.29
sphinx-copybutton==0.5.1
Sphinx==4.5.0
sphinx-design==0.2.0
sphinx-panels==0.6.0
jsonref==1.1.0
python-dateutil==2.8.2
20 changes: 16 additions & 4 deletions python/tempo/io.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from __future__ import annotations

import logging
import os
from collections import deque
from typing import Optional

import pyspark.sql.functions as sfn
import tempo.tsdf as t_tsdf
from pyspark.sql import SparkSession
from pyspark.sql.utils import ParseException

import tempo.tsdf as t_tsdf

logger = logging.getLogger(__name__)


Expand All @@ -29,6 +31,12 @@ def write(
df = tsdf.df
ts_col = tsdf.ts_col
partitionCols = tsdf.partitionCols
if optimizationCols:
optimizationCols = optimizationCols + ["event_time"]
else:
optimizationCols = ["event_time"]

useDeltaOpt = os.getenv("DATABRICKS_RUNTIME_VERSION") is not None

view_df = df.withColumn("event_dt", sfn.to_date(sfn.col(ts_col))).withColumn(
"event_time",
Expand All @@ -44,12 +52,11 @@ def write(
tabName
)

if optimizationCols:
if useDeltaOpt:
try:
spark.sql(
"optimize {} zorder by {}".format(
tabName,
"(" + ",".join(partitionCols + optimizationCols + [ts_col]) + ")",
tabName, "(" + ",".join(partitionCols + optimizationCols) + ")"
)
)
except ParseException as e:
Expand All @@ -58,3 +65,8 @@ def write(
e
)
)
else:
logger.warning(
"Delta optimizations attempted on a non-Databricks platform. "
"Switch to use Databricks Runtime to get optimization advantages."
)
3 changes: 1 addition & 2 deletions python/tempo/tsdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from pyspark.sql import SparkSession
from pyspark.sql.column import Column
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import TimestampType
from pyspark.sql.window import Window, WindowSpec
from scipy.fft import fft, fftfreq # type: ignore

Expand Down Expand Up @@ -1103,7 +1102,7 @@ def withRangeStats(
]

# build window
if isinstance(self.df.schema[self.ts_col].dataType, TimestampType):
if str(self.df.schema[self.ts_col].dataType) == "TimestampType":
self.df = self.__add_double_ts()
prohibited_cols.extend(["double_ts"])
w = self.__rangeBetweenWindow(
Expand Down
8 changes: 5 additions & 3 deletions python/tempo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
import warnings
from typing import List, Optional, Union, overload

import pyspark.sql.functions as sfn
import tempo.resample as t_resample
import tempo.tsdf as t_tsdf
from IPython import get_ipython
from IPython.core.display import HTML
from IPython.display import display as ipydisplay
from pandas.core.frame import DataFrame as pandasDataFrame

import pyspark.sql.functions as sfn
from pyspark.sql.dataframe import DataFrame

import tempo.resample as t_resample
import tempo.tsdf as t_tsdf

logger = logging.getLogger(__name__)
IS_DATABRICKS = "DB_HOME" in os.environ.keys()

Expand Down
16 changes: 7 additions & 9 deletions python/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
from typing import Union

import jsonref
import pyspark.sql.functions as sfn
from chispa import assert_df_equality
from delta.pip_utils import configure_spark_with_delta_pip

import pyspark.sql.functions as sfn
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

from tempo.intervals import IntervalsDF
from tempo.tsdf import TSDF

Expand All @@ -27,11 +28,9 @@ class SparkTest(unittest.TestCase):
def setUpClass(cls) -> None:
# create and configure PySpark Session
cls.spark = (
configure_spark_with_delta_pip(SparkSession.builder.appName("unit-tests"))
.config(
"spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension",
)
SparkSession.builder.appName("unit-tests")
.config("spark.jars.packages", "io.delta:delta-core_2.12:1.1.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
Expand Down Expand Up @@ -125,7 +124,7 @@ def __loadTestData(self, test_case_path: str) -> dict:
:param test_case_path: string representation of the data path e.g. : "tsdf_tests.BasicTests.test_describe"
:type test_case_path: str
"""
file_name, class_name, func_name = test_case_path.split(".")[-3:]
file_name, class_name, func_name = test_case_path.split(".")

# find our test data file
test_data_file = self.__getTestDataFilePath(file_name)
Expand Down Expand Up @@ -226,5 +225,4 @@ def assertDataFrameEquality(
ignore_row_order=ignore_row_order,
ignore_column_order=ignore_column_order,
ignore_nullable=ignore_nullable,
ignore_metadata=True,
)
71 changes: 44 additions & 27 deletions python/tests/io_tests.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import logging
import os
import unittest
from importlib.metadata import version
from unittest import mock

from packaging import version as pkg_version
from tests.base import SparkTest

DELTA_VERSION = version("delta-spark")


class DeltaWriteTest(SparkTest):
def test_write_to_delta_without_optimization_cols(self):
Expand Down Expand Up @@ -39,6 +37,29 @@ def test_write_to_delta_with_optimization_cols(self):
# should be equal to the expected dataframe
self.assertEqual(self.spark.table(table_name).count(), 7)

def test_write_to_delta_non_dbr_environment_logging(self):
"""Test logging when writing"""

table_name = "my_table_optimization_col"

# load test data
input_tsdf = self.get_data_as_tsdf("input_data")

with self.assertLogs(level="WARNING") as warning_captured:
# test write to delta
input_tsdf.write(self.spark, table_name, ["date"])

self.assertEqual(len(warning_captured.records), 1)
self.assertEqual(
warning_captured.output,
[
"WARNING:tempo.io:"
"Delta optimizations attempted on a non-Databricks platform. "
"Switch to use Databricks Runtime to get optimization advantages."
],
)

@mock.patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "10.4"})
def test_write_to_delta_bad_dbr_environment_logging(self):
"""Test useDeltaOpt Exception"""

Expand All @@ -47,29 +68,25 @@ def test_write_to_delta_bad_dbr_environment_logging(self):
# load test data
input_tsdf = self.get_data_as_tsdf("input_data")

if pkg_version.parse(DELTA_VERSION) < pkg_version.parse("2.0.0"):

with self.assertLogs(level="ERROR") as error_captured:
# should fail to run optimize
input_tsdf.write(self.spark, table_name, ["date"])

self.assertEqual(len(error_captured.records), 1)
print(error_captured.output)
self.assertEqual(
error_captured.output,
[
"ERROR:tempo.io:"
"Delta optimizations attempted, but was not successful.\nError: \nmismatched input "
"'optimize' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', "
"'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', "
"'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', "
"'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', "
"'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)\n\n== SQL ==\noptimize "
"my_table_optimization_col_fails zorder by (symbol,date,event_time)\n^^^\n"
],
)
else:
pass
with self.assertLogs(level="ERROR") as error_captured:
# test write to delta
input_tsdf.write(self.spark, table_name, ["date"])

self.assertEqual(len(error_captured.records), 1)
print(error_captured.output)
self.assertEqual(
error_captured.output,
[
"ERROR:tempo.io:"
"Delta optimizations attempted, but was not successful.\nError: \nmismatched input "
"'optimize' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', "
"'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', "
"'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', "
"'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', "
"'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)\n\n== SQL ==\noptimize "
"my_table_optimization_col_fails zorder by (symbol,date,event_time)\n^^^\n"
],
)


# MAIN
Expand Down
7 changes: 0 additions & 7 deletions python/tests/requirements/dbr104.txt

This file was deleted.

7 changes: 0 additions & 7 deletions python/tests/requirements/dbr113.txt

This file was deleted.

7 changes: 0 additions & 7 deletions python/tests/requirements/dbr122.txt

This file was deleted.

7 changes: 0 additions & 7 deletions python/tests/requirements/dbr133.txt

This file was deleted.

7 changes: 0 additions & 7 deletions python/tests/requirements/dbr142.txt

This file was deleted.

7 changes: 0 additions & 7 deletions python/tests/requirements/dbr91.txt

This file was deleted.

4 changes: 0 additions & 4 deletions python/tests/requirements/dev.txt

This file was deleted.

3 changes: 2 additions & 1 deletion python/tests/tsdf_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,8 @@ def test_withPartitionCols(self):
self.assertEqual(init_tsdf.partitionCols, [])
self.assertEqual(actual_tsdf.partitionCols, ["symbol"])

def test_tsdf_interpolate(self): ...
def test_tsdf_interpolate(self):
...


class FourierTransformTest(SparkTest):
Expand Down
23 changes: 10 additions & 13 deletions python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ envlist =
build-dist
; Mirror Supported LTS DBR versions here: https://docs.databricks.com/release-notes/runtime/
; Use correct PySpark version based on Python version present in env name
dbr{91,104,113,122,133,142}
py37-pyspark300,
py38-pyspark{312,321},
py39-pyspark{330,332}
skip_missing_interpreters = true


Expand All @@ -21,19 +23,14 @@ package = wheel
wheel_build_env = .pkg
setenv =
COVERAGE_FILE = .coverage.{envname}
basepython =
dbr142: py310
dbr133: py310
dbr122: py39
dbr113: py39
dbr104: py38
dbr91: py38
dbr73: py37
deps =
-rtests/requirements/{envname}.txt
-rtests/requirements/dev.txt
pyspark300: pyspark==3.0.0
pyspark312: pyspark==3.1.2
pyspark321: pyspark==3.2.1
pyspark330: pyspark==3.3.0
pyspark332: pyspark==3.3.2
coverage>=7,<8

-rrequirements.txt
commands =
coverage --version
coverage run -m unittest discover -s tests -p '*_tests.py'
Expand Down Expand Up @@ -66,7 +63,7 @@ deps =
mypy>=1,<2
pandas-stubs>=2,<3
types-pytz>=2023,<2024
-rtests/requirements/dbr133.txt
-rrequirements.txt
commands =
mypy {toxinidir}/tempo

Expand Down

0 comments on commit b79dbdd

Please sign in to comment.