Skip to content

Commit

Permalink
Merge pull request #326 from JDASoftwareGroup/klee2-timestamp-code
Browse files Browse the repository at this point in the history
updation of timestamp related code into ktk
  • Loading branch information
marco-neumann-by authored Jul 30, 2020
2 parents 78450aa + b99cf6b commit 4d4d54e
Show file tree
Hide file tree
Showing 16 changed files with 193 additions and 40 deletions.
10 changes: 10 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
Changelog
=========


Version 3.13.0 (2020-07-30)
===========================

Improvements
^^^^^^^^^^^^

* Update timestamp related code into Ktk Discover Cube functionality.
* Support backward compatibility to old cubes and fix for cli entry point.

Version 3.12.0 (2020-07-23)
===========================

Expand Down
2 changes: 1 addition & 1 deletion conda-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ decorator
msgpack-python>=0.5.2
# Currently dask and numpy==1.16.0 clash
numpy!=1.15.0,!=1.16.0
pandas>=0.23.0, !=1.0.0 # https://github.com/pandas-dev/pandas/issues/31441
pandas>=0.23.0, !=1.0.0 , <1.1 # https://github.com/pandas-dev/pandas/issues/31441
# pyarrow==0.14.0 breaks kartothek
pyarrow>=0.13.0, !=0.14.0, <0.18.0 # Keep upper bound pinned until we see non-breaking releases in the future
simplejson
Expand Down
29 changes: 22 additions & 7 deletions kartothek/api/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ def discover_datasets_unchecked(uuid_prefix, store, filter_ktk_cube_dataset_ids=
}

result = {}
# sorted iteration for determistic error messages in case
# DatasetMetadata.load_from_store fails
# sorted iteration for determistic error messages in case DatasetMetadata.load_from_store fails
for name in sorted(names):
try:
result[name[len(prefix) :]] = DatasetMetadata.load_from_store(
Expand Down Expand Up @@ -201,7 +200,9 @@ def discover_cube(uuid_prefix, store, filter_ktk_cube_dataset_ids=None):
seed_candidates = {
ktk_cube_dataset_id
for ktk_cube_dataset_id, ds in datasets.items()
if ds.metadata.get(KTK_CUBE_METADATA_KEY_IS_SEED, False)
if ds.metadata.get(
KTK_CUBE_METADATA_KEY_IS_SEED, ds.metadata.get("klee_is_seed", False)
)
}
if len(seed_candidates) == 0:
raise ValueError(
Expand All @@ -219,7 +220,10 @@ def discover_cube(uuid_prefix, store, filter_ktk_cube_dataset_ids=None):
seed_dataset = list(seed_candidates)[0]

seed_ds = datasets[seed_dataset]
dimension_columns = seed_ds.metadata.get(KTK_CUBE_METADATA_DIMENSION_COLUMNS)
dimension_columns = seed_ds.metadata.get(
KTK_CUBE_METADATA_DIMENSION_COLUMNS,
seed_ds.metadata.get("klee_dimension_columns"),
)
if dimension_columns is None:
raise ValueError(
'Could not recover dimension columns from seed dataset ("{seed_dataset}") of cube "{uuid_prefix}".'.format(
Expand All @@ -228,7 +232,7 @@ def discover_cube(uuid_prefix, store, filter_ktk_cube_dataset_ids=None):
)

# datasets written with new kartothek versions (after merge of PR#7747)
# always set KTK_CUBE_METADATA_PARTITION_COLUMNS in the metadata.
# always set KTK_CUBE_METADATA_PARTITION_COLUMNS and "klee_timestamp_column" in the metadata.
# Older versions of ktk_cube do not write these; instead, these columns are inferred from
# the actual partitioning: partition_columns are all but the last partition key
#
Expand All @@ -239,7 +243,11 @@ def discover_cube(uuid_prefix, store, filter_ktk_cube_dataset_ids=None):
#
# TODO: once all cubes are re-created and don't use timestamp column anymore, remove the timestamp column handling
# entirely
partition_columns = seed_ds.metadata.get(KTK_CUBE_METADATA_PARTITION_COLUMNS)
partition_columns = seed_ds.metadata.get(
KTK_CUBE_METADATA_PARTITION_COLUMNS,
seed_ds.metadata.get("klee_partition_columns"),
)
timestamp_column = seed_ds.metadata.get("klee_timestamp_column")

if partition_columns is None:
# infer the partition columns and timestamp column from the actual partitioning:
Expand All @@ -258,11 +266,18 @@ def discover_cube(uuid_prefix, store, filter_ktk_cube_dataset_ids=None):
).format(seed_dataset=seed_dataset, partition_key=partition_keys[0])
)
partition_columns = partition_keys[:-1]
timestamp_column = partition_keys[-1]

index_columns = set()
for ds in datasets.values():
index_columns |= set(ds.indices.keys()) - (
set(dimension_columns) | set(partition_columns)
set(dimension_columns) | set(partition_columns) | {timestamp_column}
)

# we only support the default timestamp column in the compat code
if (timestamp_column is not None) and (timestamp_column != "KLEE_TS"):
raise NotImplementedError(
f"Can only read old cubes if the timestamp column is 'KLEE_TS', but '{timestamp_column}' was detected."
)

cube = Cube(
Expand Down
4 changes: 2 additions & 2 deletions kartothek/core/cube/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Common constants for klee.
Common constants for Kartothek.
"""
from kartothek.serialization import ParquetSerializer

Expand Down Expand Up @@ -29,7 +29,7 @@
#: Storage format for kartothek metadata that is be used by default.
KTK_CUBE_METADATA_STORAGE_FORMAT = "json"

#: Kartothek metadata version that klee is based on.
#: Kartothek metadata version that ktk_cube is based on.
KTK_CUBE_METADATA_VERSION = 4

#: Metadata key that is used to mark seed datasets
Expand Down
3 changes: 1 addition & 2 deletions kartothek/io_components/cube/query/_intention.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ def _test_condition_types(conditions, datasets):

if pa.types.is_null(pa_type):
# ignore all-NULL columns
# TODO: the query planner / regrouper could use that to emit 0
# partitions
# TODO: the query planner / regrouper could use that to emit 0 partitions
continue

for v in val:
Expand Down
3 changes: 1 addition & 2 deletions kartothek/io_components/cube/query/_regroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,7 @@ def _create_aligned_partition_df(

# Stage 2: Alignment
#
# Partition DataFrames are aligned based on Cube.partition_columns and
# their category.
# Partition DataFrames are aligned based on Cube.partition_columns and their category.
assert df_seed is not None
df_aligned = df_seed
for df_join in dfs_restrict:
Expand Down
4 changes: 2 additions & 2 deletions kartothek/utils/ktk_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def get_dataset_columns(dataset):
return {
converter_str(col)
for col in get_dataset_schema(dataset).names
if not col.startswith("__") and col != "KTK_CUBE_TS"
if not col.startswith("__") and col != "KLEE_TS"
}


Expand Down Expand Up @@ -202,7 +202,7 @@ def get_partition_dataframe(dataset, cube):
df: pandas.DataFrame
DataFrame with partition data.
"""
cols = sorted(set(dataset.partition_keys) - {"KTK_CUBE_TS"})
cols = sorted(set(dataset.partition_keys) - {"KLEE_TS"})

if not cols:
return pd.DataFrame(
Expand Down
9 changes: 3 additions & 6 deletions kartothek/utils/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ def concat_dataframes(dfs, default=None):
else:
# pd.concat seems to hold the data in memory 3 times (not twice as you might expect it from naive copying the
# input blocks into the output DF). This is very unfortunate especially for larger queries. This column-based
# approach effectively reduces the maximum memory consumption and to our
# knowledge is not measuable slower.
# approach effectively reduces the maximum memory consumption and to our knowledge is not measuable slower.
colset = set(dfs[0].columns)
if not all(colset == set(df.columns) for df in dfs):
raise ValueError("Not all DataFrames have the same set of columns!")
Expand Down Expand Up @@ -286,8 +285,7 @@ def aggregate_to_lists(df, by, data_col):
if not by:
return pd.DataFrame({data_col: pd.Series([list(df[data_col].values)])})

# sort the DataFrame by `by`-values, so that rows of every group-by group
# are consecutive
# sort the DataFrame by `by`-values, so that rows of every group-by group are consecutive
df = sort_dataframe(df, by)

# collect the following data for every group:
Expand All @@ -312,8 +310,7 @@ def _store_group():
result_idx_part.append(idx_part)
result_labels.append(group_values)

# create iterator over row-tuples, where every tuple contains values of
# all by-columns
# create iterator over row-tuples, where every tuple contains values of all by-columns
iterator_idx = zip(*(df[col].values for col in by))

# iterate over all rows in DataFrame and collect groups
Expand Down
3 changes: 1 addition & 2 deletions kartothek/utils/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ class _ResourceNotFoundError: # type: ignore
_logger = logging.getLogger(__name__)


# Specialized implementation for azure-storage-blob < 12, using
# BlockBlobService (`bbs`):
# Specialized implementation for azure-storage-blob < 12, using BlockBlobService (`bbs`):


def _has_azure_bbs(store):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ decorator
msgpack>=0.5.2
# Currently dask and numpy==1.16.0 clash
numpy!=1.15.0,!=1.16.0
pandas>=0.23.0, !=1.0.0 # https://github.com/pandas-dev/pandas/issues/31441
pandas>=0.23.0, !=1.0.0 , <1.1 # https://github.com/pandas-dev/pandas/issues/31441
# pyarrow==0.14.0 breaks kartothek
pyarrow>=0.13.0, !=0.14.0, <0.18.0 # Keep upper bound pinned until we see non-breaking releases in the future
simplejson
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def setup_package():
long_description=open("README.md", "r").read(),
long_description_content_type="text/markdown",
python_requires=">=3.6",
entry_points={"console_scripts": ["kartothek_cube=kartothek.cli:main"]},
entry_points={"console_scripts": ["kartothek_cube=kartothek.cli:cli"]},
)


Expand Down
91 changes: 85 additions & 6 deletions tests/api/test_discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ def test_seed_only(self, cube, function_store):

def test_without_partition_timestamp_metadata(self, cube, function_store):
# test discovery of a cube without metadata keys
# KTK_CUBE_METADATA_PARTITION_COLUMNS still works
# "KLEE_TS" and KTK_CUBE_METADATA_PARTITION_COLUMNS still works
store_data(
cube=cube,
function_store=function_store,
Expand All @@ -935,11 +935,11 @@ def test_without_partition_timestamp_metadata(self, cube, function_store):
"y": [0],
"p": [0],
"q": [0],
"KTK_CUBE_TS": [pd.Timestamp("2000")],
"KLEE_TS": [pd.Timestamp("2000")],
"i1": [0],
}
),
partition_on=["p", "q", "KTK_CUBE_TS"],
partition_on=["p", "q", "KLEE_TS"],
name=cube.seed_dataset,
new_ktk_cube_metadata=False,
)
Expand Down Expand Up @@ -1100,20 +1100,99 @@ def test_raises_partition_keys_impossible_old_metadata(self, cube, function_stor
"y": [0],
"p": [0],
"q": [0],
"KTK_CUBE_TS": [pd.Timestamp("2000")],
"KLEE_TS": [pd.Timestamp("2000")],
}
),
partition_on=["KTK_CUBE_TS"],
partition_on=["KLEE_TS"],
name=cube.seed_dataset,
new_ktk_cube_metadata=False,
)
with pytest.raises(ValueError) as exc:
discover_cube(cube.uuid_prefix, function_store)
assert (
str(exc.value)
== 'Seed dataset ("myseed") has only a single partition key (KTK_CUBE_TS) but should have at least 2.'
== 'Seed dataset ("myseed") has only a single partition key (KLEE_TS) but should have at least 2.'
)

def test_timestamp_col_compat(self, cube, function_store):
"""
Tests that cubes are still readable after timestamp removal.
"""
metadata_dimension_columns_old = "klee_dimension_columns"
metadata_is_seed_old = "klee_is_seed"
metadata_partition_columns_old = "klee_partition_columns"
metadata_timestamp_column_old = "klee_timestamp_column"
timestamp_column_old = "KLEE_TS"

store_data(
cube=cube,
function_store=function_store,
df=pd.DataFrame(
{
"x": [0],
"y": [0],
"p": [0],
"q": [0],
timestamp_column_old: [pd.Timestamp("2000")],
"i1": [0],
"a": [0],
}
),
partition_on=["p", "q", timestamp_column_old],
name=cube.seed_dataset,
metadata={
metadata_dimension_columns_old: cube.dimension_columns,
metadata_is_seed_old: True,
metadata_partition_columns_old: cube.partition_columns,
metadata_timestamp_column_old: timestamp_column_old,
},
)
store_data(
cube=cube,
function_store=function_store,
df=pd.DataFrame(
{
"x": [0],
"y": [0],
"p": [0],
"q": [0],
timestamp_column_old: [pd.Timestamp("2000")],
"b": [0],
}
),
partition_on=["p", "q", timestamp_column_old],
name="enrich",
metadata={
metadata_dimension_columns_old: cube.dimension_columns,
metadata_is_seed_old: False,
metadata_partition_columns_old: cube.partition_columns,
metadata_timestamp_column_old: timestamp_column_old,
},
)

cube_discoverd, datasets_discovered = discover_cube(
cube.uuid_prefix, function_store
)
assert cube == cube_discoverd
assert set(datasets_discovered.keys()) == {cube.seed_dataset, "enrich"}

def test_raises_timestamp_col_is_not_ktk_cube_ts(self, cube, function_store):
store_data(
cube=cube,
function_store=function_store,
df=pd.DataFrame(
{"x": [0], "y": [0], "p": [0], "q": [0], "ts": [pd.Timestamp("2000")]}
),
partition_on=["p", "q", "ts"],
name=cube.seed_dataset,
new_ktk_cube_metadata=False,
)
with pytest.raises(
NotImplementedError,
match="Can only read old cubes if the timestamp column is 'KLEE_TS', but 'ts' was detected.",
):
discover_cube(cube.uuid_prefix, function_store)

def test_raises_partition_keys_impossible(self, cube, function_store):
store_data(
cube=cube,
Expand Down
6 changes: 6 additions & 0 deletions tests/cli/test_base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from subprocess import check_call

import pytest
from dask.callbacks import Callback


def test_entry_point(cli):
check_call("kartothek_cube")


def test_noop(cli):
result = cli()
assert result.exit_code == 0
Expand Down
Loading

0 comments on commit 4d4d54e

Please sign in to comment.