Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make it possible to manually set chunks when loading dask arrays #1477

Merged
merged 3 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/source/analysis/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ The central Python API calls to convert to DASK datatypes are the ``ParticleSpec

# note: no series.flush() needed

The ``to_dask_array`` method will automatically set Dask array chunking based on the available chunks in the read data set.
The default behavior can be overridden by passing an additional keyword argument ``chunks``, see the `dask.array.from_array documentation <https://docs.dask.org/en/stable/generated/dask.array.from_array.html>`__ for more details.
For example, to chunk only along the outermost axis in a 3D dataset using the default Dask array chunk size, call ``to_dask_array(chunks={0: 'auto', 1: -1, 2: -1})``.

Example
-------
Expand Down
102 changes: 54 additions & 48 deletions src/binding/python/openpmd_api/DaskArray.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""
This file is part of the openPMD-api.

Copyright 2021 openPMD contributors
Authors: Axel Huebl
Copyright 2021-2023 openPMD contributors
Authors: Axel Huebl, Pawel Ordyna
License: LGPLv3+
"""
import math
Expand Down Expand Up @@ -49,14 +49,18 @@ def __getitem__(self, slices):
return data


def record_component_to_daskarray(record_component):
def record_component_to_daskarray(record_component, chunks=None):
"""
Load a RecordComponent into a Dask.array.

Parameters
----------
record_component : openpmd_api.Record_Component
A record component class in openPMD-api.
chunks : chunks parameter to pass to dask.array.from_array.
See dask documentation for more details.
When set to None (default) the chunking will be automaticaly
determined based on record_component.available_chunks().

Returns
-------
Expand Down Expand Up @@ -84,54 +88,56 @@ def record_component_to_daskarray(record_component):
if not found_dask:
raise ImportError("dask NOT found. Install dask for Dask DataFrame "
"support.")

# get optimal chunks
chunks = record_component.available_chunks()

# sort and prepare the chunks for Dask's array API
# https://docs.dask.org/en/latest/array-chunks.html
# https://docs.dask.org/en/latest/array-api.html?highlight=from_array#other-functions
# sorted and unique
offsets_per_dim = list(map(list, zip(*[chunk.offset for chunk in chunks])))
offsets_sorted_unique_per_dim = [sorted(set(o)) for o in offsets_per_dim]

# print("offsets_sorted_unique_per_dim=",
# list(offsets_sorted_unique_per_dim))

# case 1: PIConGPU static load balancing (works with Dask assumptions,
# chunk option no. 3)
# all chunks in the same column have the same column width although
# individual columns have different widths
# case 2: AMReX boxes
# all chunks are multiple of a common block size, offsets are a multiple
# of a common blocksize
# problem: too limited description in Dask
# https://github.com/dask/dask/issues/7475
# work-around: create smaller chunks (this incurs a read cost) by forcing
# into case 1
# (this can lead to larger blocks than using the gcd of the
# extents aka AMReX block size)
common_chunk_widths_per_dim = list()
for d, offsets_in_dim in enumerate(offsets_sorted_unique_per_dim):
# print("d=", d, offsets_in_dim, record_component.shape[d])
offsets_in_dim_arr = np.array(offsets_in_dim)
# note: this is in the right order of rows/columns, contrary to a
# sorted extent list from chunks
extents_in_dim = np.zeros_like(offsets_in_dim_arr)
extents_in_dim[:-1] = offsets_in_dim_arr[1:]
extents_in_dim[-1] = record_component.shape[d]
if len(extents_in_dim) > 1:
extents_in_dim[:-1] -= offsets_in_dim_arr[:-1]
extents_in_dim[-1] -= offsets_in_dim_arr[-1]
# print("extents_in_dim=", extents_in_dim)
common_chunk_widths_per_dim.append(tuple(extents_in_dim))

common_chunk_widths_per_dim = tuple(common_chunk_widths_per_dim)
# print("common_chunk_widths_per_dim=", common_chunk_widths_per_dim)
if chunks is None:
# get optimal chunks
chunks = record_component.available_chunks()

# sort and prepare the chunks for Dask's array API
# https://docs.dask.org/en/latest/array-chunks.html
# https://docs.dask.org/en/latest/array-api.html?highlight=from_array#other-functions
# sorted and unique
offsets_per_dim = list(
map(list, zip(*[chunk.offset for chunk in chunks])))
offsets_sorted_unique_per_dim = [
sorted(set(o)) for o in offsets_per_dim]

# print("offsets_sorted_unique_per_dim=",
# list(offsets_sorted_unique_per_dim))

# case 1: PIConGPU static load balancing (works with Dask assumptions,
# chunk option no. 3)
# all chunks in the same column have the same column width although
# individual columns have different widths
# case 2: AMReX boxes
# all chunks are multiple of a common block size, offsets
# are a multiple of a common blocksize
# problem: too limited description in Dask
# https://github.com/dask/dask/issues/7475
# work-around: create smaller chunks (this incurs a read cost)
# by forcing into case 1
# (this can lead to larger blocks than using
# the gcd of the extents aka AMReX block size)
common_chunk_widths_per_dim = list()
for d, offsets_in_dim in enumerate(offsets_sorted_unique_per_dim):
# print("d=", d, offsets_in_dim, record_component.shape[d])
offsets_in_dim_arr = np.array(offsets_in_dim)
# note: this is in the right order of rows/columns, contrary to a
# sorted extent list from chunks
extents_in_dim = np.zeros_like(offsets_in_dim_arr)
extents_in_dim[:-1] = offsets_in_dim_arr[1:]
extents_in_dim[-1] = record_component.shape[d]
if len(extents_in_dim) > 1:
extents_in_dim[:-1] -= offsets_in_dim_arr[:-1]
extents_in_dim[-1] -= offsets_in_dim_arr[-1]
# print("extents_in_dim=", extents_in_dim)
common_chunk_widths_per_dim.append(tuple(extents_in_dim))

chunks = tuple(common_chunk_widths_per_dim)
# print("chunks=", chunks)

da = from_array(
DaskRecordComponent(record_component),
chunks=common_chunk_widths_per_dim,
chunks=chunks,
# name=None,
asarray=True,
fancy=False,
Expand Down
Loading