Skip to content

Conversation

@VeckoTheGecko
Copy link
Contributor

No description provided.

Comment on lines +292 to +369
if len(indices_to_write) == 0:
return

pids = pset.particledata.getvardata("id", indices_to_write)
to_add = sorted(set(pids) - set(self._pids_written.keys()))
for i, pid in enumerate(to_add):
self._pids_written[pid] = self._maxids + i
ids = np.array([self._pids_written[p] for p in pids], dtype=int)
self._maxids = len(self._pids_written)

once_ids = np.where(pset.particledata.getvardata("obs_written", indices_to_write) == 0)[0]
if len(once_ids) > 0:
ids_once = ids[once_ids]
indices_to_write_once = indices_to_write[once_ids]

if self.create_new_zarrfile:
if self.chunks is None:
self._chunks = (len(pset), 1)
if pset._repeatpclass is not None and self.chunks[0] < 1e4: # type: ignore[index]
warnings.warn(
f"ParticleFile chunks are set to {self.chunks}, but this may lead to "
f"a significant slowdown in Parcels when many calls to repeatdt. "
f"Consider setting a larger chunk size for your ParticleFile (e.g. chunks=(int(1e4), 1)).",
FileWarning,
stacklevel=2,
)
attrs = self._create_variables_attribute_dict()
obs = np.zeros((self._maxids), dtype=np.int32)
for var in self.vars_to_write:
varout = self._convert_varout_name(var)
if varout not in ["trajectory"]: # because 'trajectory' is written as coordinate
if self._write_once(var):
data = np.full(
(arrsize[0],),
self._fill_value_map[self.vars_to_write[var]],
dtype=self.vars_to_write[var],
)
data[ids_once] = pset.particledata.getvardata(var, indices_to_write_once)
dims = ["trajectory"]
else:
data = np.full(
arrsize, self._fill_value_map[self.vars_to_write[var]], dtype=self.vars_to_write[var]
)
data[ids, 0] = pset.particledata.getvardata(var, indices_to_write)
dims = ["trajectory", "obs"]
ds[varout] = xr.DataArray(data=data, dims=dims, attrs=attrs[varout])
ds[varout].encoding["chunks"] = self.chunks[0] if self._write_once(var) else self.chunks # type: ignore[index]
ds.to_zarr(self.fname, mode="w")
self._create_new_zarrfile = False
if (self._maxids > len(ids)) or (self._maxids > self.chunks[0]): # type: ignore[index]
arrsize = (self._maxids, self.chunks[1]) # type: ignore[index]
else:
# Either use the store that was provided directly or create a DirectoryStore:
if issubclass(type(self.fname), zarr.storage.Store):
store = self.fname
else:
store = zarr.DirectoryStore(self.fname)
Z = zarr.group(store=store, overwrite=False)
obs = pset.particledata.getvardata("obs_written", indices_to_write)
for var in self.vars_to_write:
varout = self._convert_varout_name(var)
if self._maxids > Z[varout].shape[0]:
self._extend_zarr_dims(Z[varout], store, dtype=self.vars_to_write[var], axis=0)
arrsize = (len(ids), self.chunks[1]) # type: ignore[index]
ds = xr.Dataset(
attrs=self.metadata,
coords={"trajectory": ("trajectory", pids), "obs": ("obs", np.arange(arrsize[1], dtype=np.int32))},
)
attrs = self._create_variables_attribute_dict()
obs = np.zeros((self._maxids), dtype=np.int32)
for var in self.vars_to_write:
varout = self._convert_varout_name(var)
if varout not in ["trajectory"]: # because 'trajectory' is written as coordinate
if self._write_once(var):
if len(once_ids) > 0:
Z[varout].vindex[ids_once] = pset.particledata.getvardata(var, indices_to_write_once)
data = np.full(
(arrsize[0],),
self._fill_value_map[self.vars_to_write[var]],
dtype=self.vars_to_write[var],
)
data[ids_once] = pset.particledata.getvardata(var, indices_to_write_once)
dims = ["trajectory"]
else:
if max(obs) >= Z[varout].shape[1]: # type: ignore[type-var]
self._extend_zarr_dims(Z[varout], store, dtype=self.vars_to_write[var], axis=1)
Z[varout].vindex[ids, obs] = pset.particledata.getvardata(var, indices_to_write)
data = np.full(
arrsize, self._fill_value_map[self.vars_to_write[var]], dtype=self.vars_to_write[var]
)
data[ids, 0] = pset.particledata.getvardata(var, indices_to_write)
dims = ["trajectory", "obs"]
ds[varout] = xr.DataArray(data=data, dims=dims, attrs=attrs[varout])
ds[varout].encoding["chunks"] = self.chunks[0] if self._write_once(var) else self.chunks # type: ignore[index]
ds.to_zarr(self.fname, mode="w")
self._create_new_zarrfile = False
else:
# Either use the store that was provided directly or create a DirectoryStore:
if isinstance(self.fname, zarr.storage.Store):
store = self.fname
else:
store = zarr.DirectoryStore(self.fname)
Z = zarr.group(store=store, overwrite=False)
obs = pset.particledata.getvardata("obs_written", indices_to_write)
for var in self.vars_to_write:
varout = self._convert_varout_name(var)
if self._maxids > Z[varout].shape[0]:
self._extend_zarr_dims(Z[varout], store, dtype=self.vars_to_write[var], axis=0)
if self._write_once(var):
if len(once_ids) > 0:
Z[varout].vindex[ids_once] = pset.particledata.getvardata(var, indices_to_write_once)
else:
if max(obs) >= Z[varout].shape[1]: # type: ignore[type-var]
self._extend_zarr_dims(Z[varout], store, dtype=self.vars_to_write[var], axis=1)
Z[varout].vindex[ids, obs] = pset.particledata.getvardata(var, indices_to_write)

pset.particledata.setvardata("obs_written", indices_to_write, obs + 1)
pset.particledata.setvardata("obs_written", indices_to_write, obs + 1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a reversal of the clause and a de-indentation.

self._create_new_zarrfile = False
else:
# Either use the store that was provided directly or create a DirectoryStore:
if isinstance(self.fname, zarr.storage.Store):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously was issubclass(type(self.fname), zarr.storage.Store):, this is equivalent

@VeckoTheGecko VeckoTheGecko merged commit 8cfc912 into main Feb 10, 2025
16 checks passed
@VeckoTheGecko VeckoTheGecko deleted the v/small-ref branch February 10, 2025 15:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

3 participants