Skip to content

Commit

Permalink
make it possible to manually set chunks when loading dask arrays (#1477)
Browse files Browse the repository at this point in the history
* make it possible to manually set chunks when loading dask arrays
* Apply suggestions from code review
Copyright and typos in setting chunks for dask arrays.

Co-authored-by: Axel Huebl <[email protected]>
  • Loading branch information
pordyna and ax3l committed Aug 17, 2023
1 parent 64c6b63 commit 99daca7
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 48 deletions.
3 changes: 3 additions & 0 deletions docs/source/analysis/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ The central Python API calls to convert to DASK datatypes are the ``ParticleSpec
# note: no series.flush() needed
The ``to_dask_array`` method will automatically set Dask array chunking based on the available chunks in the read data set.
The default behavior can be overridden by passing an additional keyword argument ``chunks``, see the `dask.array.from_array documentation <https://docs.dask.org/en/stable/generated/dask.array.from_array.html>`__ for more details.
For example, to chunk only along the outermost axis in a 3D dataset using the default Dask array chunk size, call ``to_dask_array(chunks={0: 'auto', 1: -1, 2: -1})``.

Example
-------
Expand Down
102 changes: 54 additions & 48 deletions src/binding/python/openpmd_api/DaskArray.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""
This file is part of the openPMD-api.
Copyright 2021 openPMD contributors
Authors: Axel Huebl
Copyright 2021-2023 openPMD contributors
Authors: Axel Huebl, Pawel Ordyna
License: LGPLv3+
"""
import math
Expand Down Expand Up @@ -49,14 +49,18 @@ def __getitem__(self, slices):
return data


def record_component_to_daskarray(record_component):
def record_component_to_daskarray(record_component, chunks=None):
"""
Load a RecordComponent into a Dask.array.
Parameters
----------
record_component : openpmd_api.Record_Component
A record component class in openPMD-api.
chunks : chunks parameter to pass to dask.array.from_array.
See dask documentation for more details.
When set to None (default) the chunking will be automaticaly
determined based on record_component.available_chunks().
Returns
-------
Expand Down Expand Up @@ -84,54 +88,56 @@ def record_component_to_daskarray(record_component):
if not found_dask:
raise ImportError("dask NOT found. Install dask for Dask DataFrame "
"support.")

# get optimal chunks
chunks = record_component.available_chunks()

# sort and prepare the chunks for Dask's array API
# https://docs.dask.org/en/latest/array-chunks.html
# https://docs.dask.org/en/latest/array-api.html?highlight=from_array#other-functions
# sorted and unique
offsets_per_dim = list(map(list, zip(*[chunk.offset for chunk in chunks])))
offsets_sorted_unique_per_dim = [sorted(set(o)) for o in offsets_per_dim]

# print("offsets_sorted_unique_per_dim=",
# list(offsets_sorted_unique_per_dim))

# case 1: PIConGPU static load balancing (works with Dask assumptions,
# chunk option no. 3)
# all chunks in the same column have the same column width although
# individual columns have different widths
# case 2: AMReX boxes
# all chunks are multiple of a common block size, offsets are a multiple
# of a common blocksize
# problem: too limited description in Dask
# https://github.com/dask/dask/issues/7475
# work-around: create smaller chunks (this incurs a read cost) by forcing
# into case 1
# (this can lead to larger blocks than using the gcd of the
# extents aka AMReX block size)
common_chunk_widths_per_dim = list()
for d, offsets_in_dim in enumerate(offsets_sorted_unique_per_dim):
# print("d=", d, offsets_in_dim, record_component.shape[d])
offsets_in_dim_arr = np.array(offsets_in_dim)
# note: this is in the right order of rows/columns, contrary to a
# sorted extent list from chunks
extents_in_dim = np.zeros_like(offsets_in_dim_arr)
extents_in_dim[:-1] = offsets_in_dim_arr[1:]
extents_in_dim[-1] = record_component.shape[d]
if len(extents_in_dim) > 1:
extents_in_dim[:-1] -= offsets_in_dim_arr[:-1]
extents_in_dim[-1] -= offsets_in_dim_arr[-1]
# print("extents_in_dim=", extents_in_dim)
common_chunk_widths_per_dim.append(tuple(extents_in_dim))

common_chunk_widths_per_dim = tuple(common_chunk_widths_per_dim)
# print("common_chunk_widths_per_dim=", common_chunk_widths_per_dim)
if chunks is None:
# get optimal chunks
chunks = record_component.available_chunks()

# sort and prepare the chunks for Dask's array API
# https://docs.dask.org/en/latest/array-chunks.html
# https://docs.dask.org/en/latest/array-api.html?highlight=from_array#other-functions
# sorted and unique
offsets_per_dim = list(
map(list, zip(*[chunk.offset for chunk in chunks])))
offsets_sorted_unique_per_dim = [
sorted(set(o)) for o in offsets_per_dim]

# print("offsets_sorted_unique_per_dim=",
# list(offsets_sorted_unique_per_dim))

# case 1: PIConGPU static load balancing (works with Dask assumptions,
# chunk option no. 3)
# all chunks in the same column have the same column width although
# individual columns have different widths
# case 2: AMReX boxes
# all chunks are multiple of a common block size, offsets
# are a multiple of a common blocksize
# problem: too limited description in Dask
# https://github.com/dask/dask/issues/7475
# work-around: create smaller chunks (this incurs a read cost)
# by forcing into case 1
# (this can lead to larger blocks than using
# the gcd of the extents aka AMReX block size)
common_chunk_widths_per_dim = list()
for d, offsets_in_dim in enumerate(offsets_sorted_unique_per_dim):
# print("d=", d, offsets_in_dim, record_component.shape[d])
offsets_in_dim_arr = np.array(offsets_in_dim)
# note: this is in the right order of rows/columns, contrary to a
# sorted extent list from chunks
extents_in_dim = np.zeros_like(offsets_in_dim_arr)
extents_in_dim[:-1] = offsets_in_dim_arr[1:]
extents_in_dim[-1] = record_component.shape[d]
if len(extents_in_dim) > 1:
extents_in_dim[:-1] -= offsets_in_dim_arr[:-1]
extents_in_dim[-1] -= offsets_in_dim_arr[-1]
# print("extents_in_dim=", extents_in_dim)
common_chunk_widths_per_dim.append(tuple(extents_in_dim))

chunks = tuple(common_chunk_widths_per_dim)
# print("chunks=", chunks)

da = from_array(
DaskRecordComponent(record_component),
chunks=common_chunk_widths_per_dim,
chunks=chunks,
# name=None,
asarray=True,
fancy=False,
Expand Down

0 comments on commit 99daca7

Please sign in to comment.