Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions properties/test_parallelcompat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import numpy as np
import pytest

pytest.importorskip("hypothesis")
# isort: split

from hypothesis import given

import xarray.testing.strategies as xrst
from xarray.namedarray.parallelcompat import ChunkManagerEntrypoint


class TestPreserveChunks:
@given(xrst.shape_and_chunks())
def test_preserve_all_chunks(
self, shape_and_chunks: tuple[tuple[int, ...], tuple[int, ...]]
) -> None:
shape, previous_chunks = shape_and_chunks
typesize = 8
target = 1024 * 1024

actual = ChunkManagerEntrypoint.preserve_chunks(
chunks=("preserve",) * len(shape),
shape=shape,
target=target,
typesize=typesize,
previous_chunks=previous_chunks,
)
for i, chunk in enumerate(actual):
if chunk != shape[i]:
assert chunk >= previous_chunks[i]
assert chunk % previous_chunks[i] == 0
assert chunk <= shape[i]

if actual != shape:
assert np.prod(actual) * typesize >= 0.5 * target

@pytest.mark.parametrize("first_chunk", [-1, (), 1])
@given(xrst.shape_and_chunks(min_dims=2))
def test_preserve_some_chunks(
self,
first_chunk: int | tuple[int, ...],
shape_and_chunks: tuple[tuple[int, ...], tuple[int, ...]],
) -> None:
shape, previous_chunks = shape_and_chunks
typesize = 4
target = 2 * 1024 * 1024

actual = ChunkManagerEntrypoint.preserve_chunks(
chunks=(first_chunk, *["preserve" for _ in range(len(shape) - 1)]),
shape=shape,
target=target,
typesize=typesize,
previous_chunks=previous_chunks,
)
for i, chunk in enumerate(actual):
if i == 0:
if first_chunk == 1:
assert chunk == 1
elif first_chunk == -1:
assert chunk == shape[i]
elif first_chunk == ():
assert chunk == previous_chunks[i]
elif chunk != shape[i]:
assert chunk >= previous_chunks[i]
assert chunk % previous_chunks[i] == 0
assert chunk <= shape[i]

# if we have more than one chunk, make sure the chunks are big enough
if actual[1:] != shape[1:]:
assert np.prod(actual) * typesize >= 0.5 * target
4 changes: 2 additions & 2 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,9 @@ def _dataset_from_backend_dataset(
create_default_indexes,
**extra_tokens,
):
if not isinstance(chunks, int | dict) and chunks not in {None, "auto"}:
if not isinstance(chunks, int | dict) and chunks not in {None, "auto", "preserve"}:
raise ValueError(
f"chunks must be an int, dict, 'auto', or None. Instead found {chunks}."
f"chunks must be an int, dict, 'auto', 'preserve', or None. Instead found {chunks}."
)

_protect_dataset_variables_inplace(backend_ds, cache)
Expand Down
2 changes: 1 addition & 1 deletion xarray/namedarray/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def dtype(self) -> _DType_co: ...
_NormalizedChunks = tuple[tuple[int, ...], ...]
# FYI in some cases we don't allow `None`, which this doesn't take account of.
# # FYI the `str` is for a size string, e.g. "16MB", supported by dask.
T_ChunkDim: TypeAlias = str | int | Literal["auto"] | tuple[int, ...] | None # noqa: PYI051
T_ChunkDim: TypeAlias = str | int | Literal["auto", "preserve"] | tuple[int, ...] | None # noqa: PYI051
# We allow the tuple form of this (though arguably we could transition to named dims only)
T_Chunks: TypeAlias = T_ChunkDim | Mapping[Any, T_ChunkDim]

Expand Down
12 changes: 12 additions & 0 deletions xarray/namedarray/daskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ def normalize_chunks(
"""Called by open_dataset"""
from dask.array.core import normalize_chunks

if any(c == "preserve" for c in chunks) and any(c == "auto" for c in chunks):
raise ValueError('chunks cannot use a combination of "auto" and "preserve"')

if previous_chunks and any(c == "preserve" for c in chunks):
chunks = self.preserve_chunks(
chunks,
shape=shape,
target=self.get_auto_chunk_size(),
typesize=dtype.itemsize,
previous_chunks=previous_chunks,
)

return normalize_chunks(
chunks,
shape=shape,
Expand Down
69 changes: 69 additions & 0 deletions xarray/namedarray/parallelcompat.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,3 +777,72 @@ def get_auto_chunk_size(
raise NotImplementedError(
"For 'auto' rechunking of cftime arrays, get_auto_chunk_size must be implemented by the chunk manager"
)

@staticmethod
def preserve_chunks(
chunks: T_Chunks,
shape: tuple[int, ...],
target: int,
typesize: int,
previous_chunks: tuple[int],
) -> tuple[int]:
"""Determine meta chunks

This takes in a chunks value that contains ``"preserve"`` values in certain
dimensions and replaces those values with concrete dimension sizes that try
to get chunks to be of a certain size in bytes, provided by the ``limit=``
keyword. Any dimensions marked as ``"preserve"`` will potentially be multiplied
to get close to the byte target, while never splitting ``previous_chunks``.

Parameters
----------
chunks: tuple[int | str | tuple, ...]
A tuple of either dimensions or tuples of explicit chunk dimensions
Some entries should be "preserve". Any explicit dimensions must match or
be multiple of ``previous_chunks``
shape: tuple[int]
The shape of the array
target: int
The target size of the chunk in bytes.
typesize: int
The size, in bytes, of each element of the chunk.
previous_chunks: tuple[int]
Size of chunks being preserved. Expressed as a tuple of ints which matches
the way chunks are encoded in Zarr.
"""
shape = np.array(shape)
previous_chunks = np.array(previous_chunks)

# "preserve" stays as "preserve"
# empty tuple means match previous chunks
# -1 means whole dim is in one chunk
desired_chunks = np.array(
[
c or previous_chunks[i] if c != -1 else shape[i]
for i, c in enumerate(chunks)
]
)

preserve_chunks = desired_chunks == "preserve"
chunks = np.where(preserve_chunks, previous_chunks, desired_chunks).astype(int)

while True:
# Repeatedly loop over the ``previous_chunks``, multiplying them by 2.
# Stop when:
# 1a. we are larger than the target chunk size OR
# 1b. we are within 50% of the target chunk size OR
# 2. the chunk covers the entire array

num_chunks = shape / chunks * preserve_chunks
idx = np.argmax(num_chunks)
chunk_bytes = np.prod(chunks) * typesize

if chunk_bytes > target or abs(chunk_bytes - target) / target < 0.5:
break

if (num_chunks <= 1).all():
break

chunks[idx] = min(chunks[idx] * 2, shape[idx])

return tuple(int(x) for x in chunks)
2 changes: 1 addition & 1 deletion xarray/namedarray/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def _get_chunk( # type: ignore[no-untyped-def]
preferred_chunk_shape = tuple(
itertools.starmap(preferred_chunks.get, zip(dims, shape, strict=True))
)
if isinstance(chunks, Number) or (chunks == "auto"):
if isinstance(chunks, (Number, str)):
chunks = dict.fromkeys(dims, chunks)
chunk_shape = tuple(
chunks.get(dim, None) or preferred_chunk_sizes
Expand Down
68 changes: 68 additions & 0 deletions xarray/testing/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"names",
"outer_array_indexers",
"pandas_index_dtypes",
"shape_and_chunks",
"supported_dtypes",
"unique_subset_of",
"variables",
Expand Down Expand Up @@ -210,6 +211,73 @@ def dimension_sizes(
)


@st.composite
def shape_and_chunks(
draw: st.DrawFn,
*,
min_dims: int = 1,
max_dims: int = 4,
min_size: int = 1,
max_size: int = 900,
) -> tuple[tuple[int, ...], tuple[int, ...]]:
"""
Generate a shape tuple and corresponding chunks tuple.

Each element in the chunks tuple is smaller than or equal to the
corresponding element in the shape tuple.

Requires the hypothesis package to be installed.

Parameters
----------
min_dims : int, optional
Minimum number of dimensions. Default is 1.
max_dims : int, optional
Maximum number of dimensions. Default is 4.
min_size : int, optional
Minimum size for each dimension. Default is 1.
max_size : int, optional
Maximum size for each dimension. Default is 100.

Returns
-------
tuple[tuple[int, ...], tuple[int, ...]]
A tuple containing (shape, chunks) where:
- shape is a tuple of positive integers
- chunks is a tuple where each element is an integer <= corresponding shape element

Examples
--------
>>> shape_and_chunks().example() # doctest: +SKIP
((5, 3, 8), (2, 3, 4))
>>> shape_and_chunks().example() # doctest: +SKIP
((10, 7), (10, 3))
>>> shape_and_chunks(min_dims=2, max_dims=3).example() # doctest: +SKIP
((4, 6, 2), (2, 3, 1))

See Also
--------
:ref:`testing.hypothesis`_
"""
# Generate the shape tuple
ndim = draw(st.integers(min_value=min_dims, max_value=max_dims))
shape = draw(
st.tuples(
*[st.integers(min_value=min_size, max_value=max_size) for _ in range(ndim)]
)
)

# Generate chunks tuple with each element <= corresponding shape element
chunks_elements = []
for size in shape:
# Each chunk is an integer between 1 and the size of that dimension
chunk_element = draw(st.integers(min_value=1, max_value=size))
chunks_elements.append(chunk_element)

chunks = tuple(chunks_elements)
return shape, chunks


_readable_strings = st.text(
_readable_characters,
max_size=5,
Expand Down
38 changes: 38 additions & 0 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -7284,6 +7284,44 @@ def test_chunking_consintency(chunks, tmp_path: Path) -> None:
xr.testing.assert_chunks_equal(actual, expected)


@requires_zarr
@requires_dask
@pytest.mark.parametrize(
"chunks,expected",
[
("preserve", (320, 320)),
(-1, (500, 500)),
({}, (10, 10)),
({"x": "preserve"}, (500, 10)),
({"x": -1}, (500, 10)),
({"x": "preserve", "y": -1}, (160, 500)),
],
)
def test_open_dataset_chunking_zarr_with_preserve(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are kind of slow.

chunks, expected, tmp_path: Path
) -> None:
encoded_chunks = 10
dask_arr = da.from_array(
np.ones((500, 500), dtype="float64"), chunks=encoded_chunks
)
ds = xr.Dataset(
{
"test": xr.DataArray(
dask_arr,
dims=("x", "y"),
)
}
)
ds["test"].encoding["chunks"] = encoded_chunks
ds.to_zarr(tmp_path / "test.zarr")

with dask.config.set({"array.chunk-size": "1MiB"}):
with open_dataset(
tmp_path / "test.zarr", engine="zarr", chunks=chunks
) as actual:
assert (actual.chunks["x"][0], actual.chunks["y"][0]) == expected


def _check_guess_can_open_and_open(entrypoint, obj, engine, expected):
assert entrypoint.guess_can_open(obj)
with open_dataset(obj, engine=engine) as actual:
Expand Down
Loading