Skip to content

Commit

Permalink
Merge pull request #304 from JDASoftwareGroup/dont_dispatch_metadata
Browse files Browse the repository at this point in the history
Remove dispatch of metadata and indices
  • Loading branch information
fjetter authored Jul 2, 2020
2 parents 7bb50dd + 842fbd3 commit 39a5f31
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 30 deletions.
20 changes: 18 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,31 @@
Changelog
=========

Version 3.9.1 (2020-06-XX)
==========================
Version 3.10.0 (2020-06-XX)
===========================

Improvements
^^^^^^^^^^^^
* Dispatch performance improved for large datasets including metadata
* Introduction of ``dispatch_metadata`` kwarg to metapartitions read pipelines
to allow for transition for future breaking release.


Bug fixes
^^^^^^^^^

* Ensure that the empty (sentinel) DataFrame used in :func:`~kartothek.io.eager.read_table`
also has the correct behaviour when using the ``categoricals`` argument.


Breaking changes in ``io_components.read``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* The ``dispatch_metapartitions`` and ``dispatch_metapartitions_from_factory``
will no longer attach index and metadata information to the created MP
instances, unless explicitly requested.


Version 3.9.0 (2020-06-03)
==========================

Expand Down
7 changes: 6 additions & 1 deletion kartothek/core/docs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
This is a helper module to simplify code documentation
"""
Expand Down Expand Up @@ -175,6 +174,12 @@
"load_dataset_metadata": """
load_dataset_metadata: bool
Optional argument on whether to load the metadata or not""",
"dispatch_metadata": """
dispatch_metadata:
If True, attach dataset user metadata and dataset index information to
the MetaPartition instances generated.
Note: This feature is deprecated and this feature toggle is only
introduced to allow for easier transition.""",
}


Expand Down
3 changes: 3 additions & 0 deletions kartothek/io/dask/bag.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def read_dataset_as_metapartitions_bag(
factory=None,
dispatch_by=None,
partition_size=None,
dispatch_metadata=True,
):
"""
Retrieve dataset as `dask.bag` of `MetaPartition` objects.
Expand All @@ -80,6 +81,7 @@ def read_dataset_as_metapartitions_bag(
label_filter=label_filter,
predicates=predicates,
dispatch_by=dispatch_by,
dispatch_metadata=dispatch_metadata,
)
mps = db.from_sequence(mps, partition_size=partition_size)

Expand Down Expand Up @@ -164,6 +166,7 @@ def read_dataset_as_dataframe_bag(
predicates=predicates,
dispatch_by=dispatch_by,
partition_size=partition_size,
dispatch_metadata=False,
)
return mps.map(_get_data)

Expand Down
3 changes: 3 additions & 0 deletions kartothek/io/dask/delayed.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ def read_dataset_as_delayed_metapartitions(
predicates=None,
factory=None,
dispatch_by=None,
dispatch_metadata=True,
):
"""
A collection of dask.delayed objects to retrieve a dataset from store where each
Expand All @@ -283,6 +284,7 @@ def read_dataset_as_delayed_metapartitions(
label_filter=label_filter,
predicates=predicates,
dispatch_by=dispatch_by,
dispatch_metadata=dispatch_metadata,
)

if concat_partitions_on_primary_index or dispatch_by:
Expand Down Expand Up @@ -418,6 +420,7 @@ def read_table_as_delayed(
predicates=predicates,
factory=factory,
dispatch_by=dispatch_by,
dispatch_metadata=False,
)
return list(map_delayed(partial(_get_data, table=table), mps))

Expand Down
3 changes: 3 additions & 0 deletions kartothek/io/eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def read_dataset_as_dataframes(
predicates=predicates,
factory=ds_factory,
dispatch_by=dispatch_by,
dispatch_metadata=False,
)
return [mp.data for mp in mps]

Expand All @@ -161,6 +162,7 @@ def read_dataset_as_metapartitions(
predicates=None,
factory=None,
dispatch_by=None,
dispatch_metadata=True,
):
"""
Read a dataset as a list of :class:`kartothek.io_components.metapartition.MetaPartition`.
Expand Down Expand Up @@ -209,6 +211,7 @@ def read_dataset_as_metapartitions(
predicates=predicates,
factory=ds_factory,
dispatch_by=dispatch_by,
dispatch_metadata=dispatch_metadata,
)
return list(ds_iter)

Expand Down
3 changes: 3 additions & 0 deletions kartothek/io/iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def read_dataset_as_metapartitions__iterator(
predicates=None,
factory=None,
dispatch_by=None,
dispatch_metadata=True,
):
"""
Expand Down Expand Up @@ -73,6 +74,7 @@ def read_dataset_as_metapartitions__iterator(
label_filter=label_filter,
predicates=predicates,
dispatch_by=dispatch_by,
dispatch_metadata=dispatch_metadata,
)

for mp in mps:
Expand Down Expand Up @@ -171,6 +173,7 @@ def read_dataset_as_dataframes__iterator(
predicates=predicates,
factory=factory,
dispatch_by=dispatch_by,
dispatch_metadata=False,
)
for mp in mp_iter:
yield mp.data
Expand Down
9 changes: 1 addition & 8 deletions kartothek/io/testing/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@

from kartothek.core.factory import DatasetFactory
from kartothek.core.index import ExplicitSecondaryIndex
from kartothek.io.eager import (
read_dataset_as_metapartitions,
store_dataframes_as_dataset,
)
from kartothek.io.eager import store_dataframes_as_dataset


def assert_index_dct_equal(dict1, dict2):
Expand Down Expand Up @@ -100,10 +97,6 @@ def test_add_column_to_existing_index(
bound_build_dataset_indices(store_factory, dataset_uuid, columns=["x"])

# Assert indices are properly created
mps = read_dataset_as_metapartitions(store=store_factory, dataset_uuid=dataset_uuid)
for column_name in ["p", "x"]:
assert all([mp.indices[column_name] for mp in mps])

dataset_factory = DatasetFactory(dataset_uuid, store_factory, load_all_indices=True)
assert dataset_factory.indices.keys() == {"p", "x"}

Expand Down
12 changes: 0 additions & 12 deletions kartothek/io/testing/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,18 +537,6 @@ def test_read_dataset_as_dataframes(
)


def test_load_dataset_metadata(
dataset, store_session_factory, bound_load_metapartitions
):
result = bound_load_metapartitions(
dataset_uuid=dataset.uuid,
store=store_session_factory,
load_dataset_metadata=True,
)
for mp in result:
assert set(mp.dataset_metadata.keys()) == {"creation_time", "dataset"}


def test_read_dataset_as_dataframes_columns_projection(
store_factory, bound_load_dataframes, metadata_version
):
Expand Down
26 changes: 22 additions & 4 deletions kartothek/io_components/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def dispatch_metapartitions_from_factory(
predicates: PredicatesType = None,
store: Optional[Callable[[], KeyValueStore]] = None,
dispatch_by: None = None,
dispatch_metadata: bool = False,
) -> Iterator[MetaPartition]:
...

Expand All @@ -35,6 +36,7 @@ def dispatch_metapartitions_from_factory(
predicates: PredicatesType,
store: Optional[Callable[[], KeyValueStore]],
dispatch_by: List[str],
dispatch_metadata: bool,
) -> Iterator[List[MetaPartition]]:
...

Expand All @@ -47,8 +49,18 @@ def dispatch_metapartitions_from_factory(
predicates: PredicatesType = None,
store: Optional[Callable[[], KeyValueStore]] = None,
dispatch_by: Optional[List[str]] = None,
dispatch_metadata: bool = False,
) -> Union[Iterator[MetaPartition], Iterator[List[MetaPartition]]]:

if dispatch_metadata:

warnings.warn(
"The dispatch of metadata and index information as part of the MetaPartition instance is deprecated. "
"The future behaviour will be that this metadata is not dispatched. To set the future behaviour, "
"specifiy ``dispatch_metadata=False``",
DeprecationWarning,
)

if dispatch_by and concat_partitions_on_primary_index:
raise ValueError(
"Both `dispatch_by` and `concat_partitions_on_primary_index` are provided, "
Expand Down Expand Up @@ -114,8 +126,10 @@ def dispatch_metapartitions_from_factory(
mps.append(
MetaPartition.from_partition(
partition=dataset_factory.partitions[label],
dataset_metadata=dataset_factory.metadata,
indices=indices_to_dispatch,
dataset_metadata=dataset_factory.metadata
if dispatch_metadata
else None,
indices=indices_to_dispatch if dispatch_metadata else None,
metadata_version=dataset_factory.metadata_version,
table_meta=dataset_factory.table_meta,
partition_keys=dataset_factory.partition_keys,
Expand All @@ -129,8 +143,10 @@ def dispatch_metapartitions_from_factory(

yield MetaPartition.from_partition(
partition=part,
dataset_metadata=dataset_factory.metadata,
indices=indices_to_dispatch,
dataset_metadata=dataset_factory.metadata
if dispatch_metadata
else None,
indices=indices_to_dispatch if dispatch_metadata else None,
metadata_version=dataset_factory.metadata_version,
table_meta=dataset_factory.table_meta,
partition_keys=dataset_factory.partition_keys,
Expand All @@ -147,6 +163,7 @@ def dispatch_metapartitions(
concat_partitions_on_primary_index: bool = False,
predicates: PredicatesType = None,
dispatch_by: Optional[List[str]] = None,
dispatch_metadata: bool = False,
) -> Union[Iterator[MetaPartition], Iterator[List[MetaPartition]]]:
dataset_factory = DatasetFactory(
dataset_uuid=dataset_uuid,
Expand All @@ -163,4 +180,5 @@ def dispatch_metapartitions(
predicates=predicates,
dispatch_by=dispatch_by,
concat_partitions_on_primary_index=concat_partitions_on_primary_index,
dispatch_metadata=dispatch_metadata,
)
3 changes: 0 additions & 3 deletions tests/io_components/test_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ def test_dispatch_metapartitions(dataset, store_session):
assert len(partitions) == 2
mp = partitions["cluster_1"]
assert isinstance(mp, MetaPartition)
assert dict(mp.dataset_metadata) == dict(dataset.metadata)

mp = partitions["cluster_2"]
assert isinstance(mp, MetaPartition)
assert dict(mp.dataset_metadata) == dict(dataset.metadata)

assert set(mp.table_meta.keys()) == {SINGLE_TABLE, "helper"}

Expand All @@ -43,7 +41,6 @@ def label_filter(part_label):
assert len(partitions) == 1
mp = partitions["cluster_1"]
assert isinstance(mp, MetaPartition)
assert dict(mp.dataset_metadata) == dict(dataset.metadata)


def test_dispatch_metapartitions_without_dataset_metadata(dataset, store_session):
Expand Down

0 comments on commit 39a5f31

Please sign in to comment.