From 73ae2bcf2f4ea0f6ba071e691eed28bd124acfd8 Mon Sep 17 00:00:00 2001 From: Malcolm White Date: Thu, 26 Apr 2018 14:30:28 -0700 Subject: [PATCH 01/13] Add a /References data group for fast access to referenced regions of continuous data. --- pyasdf/asdf_data_set.py | 74 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/pyasdf/asdf_data_set.py b/pyasdf/asdf_data_set.py index 9f9e716..128b420 100644 --- a/pyasdf/asdf_data_set.py +++ b/pyasdf/asdf_data_set.py @@ -246,6 +246,8 @@ def __init__(self, filename, compression="gzip-3", shuffle=True, self.__file.create_group("Provenance") if "AuxiliaryData" not in self.__file and mode != "r": self.__file.create_group("AuxiliaryData") + if "References" not in self.__file and mode != "r": + self.__file.create_group("References") # Easy access to the waveforms. self.waveforms = StationAccessor(self) @@ -362,6 +364,10 @@ def _provenance_group(self): def _auxiliary_data_group(self): return self.__file["AuxiliaryData"] + @property + def _reference_group(self): + return self.__file["References"] + @property def asdf_format_version_in_file(self): """ @@ -1225,6 +1231,74 @@ def __parse_waveform_input_and_validate(self, waveform): raise NotImplementedError return waveform + def create_reference(self, ref, net, sta, loc, chan, starttime, endtime, + tag=None, overwrite=False): + """ + Create a region reference for fast lookup of segements of + continuous data. + """ + + # Build the station name + _station_name = "%s.%s" % (net, sta) + # Iterate over datasets in waveform group to find matching + # net:sta:loc:chan combination. + for _key in self._waveform_group[_station_name]: + _net, _sta, _loc, _remainder = _key.split(".") + + if _net != net or _sta != sta or _loc != loc: + continue + + _chan, _ts, _te, _tag = _remainder.split("__") + if _chan != chan or (tag is not None and _tag != tag): + continue + + _ds = self._waveform_group["%s/%s" % (_station_name, _key)] + + _ts = obspy.UTCDateTime(_ds.attrs["starttime"]*1e-9) + _samprate = _ds.attrs["sampling_rate"] + _te = _ts + len(_ds)/_samprate + if _te < starttime or _ts > endtime: + continue + + _offset = int((starttime-_ts)*_samprate) + _nsamp = int((endtime-starttime)*_samprate) + _ref = _ds.regionref[_offset:_offset+_nsamp+1] + + if ref not in self._reference_group: + _ref_grp = self._reference_group.create_group(ref) + else: + _ref_grp = self._reference_group[ref] + + _handle = ".".join((_net, _sta, _loc, _chan)) + + if overwrite is True and _handle in _ref_grp: + del(_ref_grp[_handle]) + + if _handle not in _ref_grp: + _ref_ds = _ref_grp.create_dataset(_handle, + (1,), + dtype=h5py.special_dtype(ref=h5py.RegionReference)) + _ref_ds.attrs["sampling_rate"] = _ds.attrs["sampling_rate"] + _ref_ds.attrs["starttime"] = _ds.attrs["starttime"] + int(_offset/_samprate*1.e9) + _ref_ds[0] = _ref + else: + print("Will not overwrite existing reference") + continue + + def get_data_for_reference(self, ref, net, sta, loc, chan): + _handle = "%s/%s.%s.%s.%s" % (ref, net, sta, loc, chan) + if _handle not in self._reference_group: + raise(IOError("Data not found")) + _ref = self._reference_group[_handle][0] + tr = obspy.Trace(data=self.__file[_ref][_ref]) + tr.stats.network = net + tr.stats.station = sta + tr.stats.location = loc + tr.stats.channel = chan + tr.stats.starttime = obspy.UTCDateTime(self._reference_group[_handle].attrs["starttime"]*1e-9) + tr.stats.delta = 1/self._reference_group[_handle].attrs["sampling_rate"] + return(tr) + def get_provenance_document(self, document_name): """ Retrieve a provenance document with a certain name. From c5b6f318839621ad18c38d78e2ef32e9a891cede Mon Sep 17 00:00:00 2001 From: Malcolm White Date: Thu, 26 Apr 2018 15:37:10 -0700 Subject: [PATCH 02/13] Make Reference creation and retrieval more flexible. --- pyasdf/asdf_data_set.py | 217 ++++++++++++++++++++++++++++++---------- 1 file changed, 165 insertions(+), 52 deletions(-) diff --git a/pyasdf/asdf_data_set.py b/pyasdf/asdf_data_set.py index 128b420..b663bc4 100644 --- a/pyasdf/asdf_data_set.py +++ b/pyasdf/asdf_data_set.py @@ -1231,73 +1231,186 @@ def __parse_waveform_input_and_validate(self, waveform): raise NotImplementedError return waveform - def create_reference(self, ref, net, sta, loc, chan, starttime, endtime, - tag=None, overwrite=False): + def create_reference(self, ref, starttime, endtime, net=None, sta=None, + loc=None, chan=None, tag=None, overwrite=False): """ Create a region reference for fast lookup of segements of continuous data. """ - # Build the station name - _station_name = "%s.%s" % (net, sta) - # Iterate over datasets in waveform group to find matching - # net:sta:loc:chan combination. - for _key in self._waveform_group[_station_name]: - _net, _sta, _loc, _remainder = _key.split(".") + if isinstance(net, str): + net = (net,) + elif isinstance(net, tuple) or isinstance(net, list) or net is None: + pass + else: + raise(TypeError(net)) - if _net != net or _sta != sta or _loc != loc: - continue + if isinstance(sta, str): + sta = (sta,) + elif isinstance(sta, tuple) or isinstance(sta, list) or sta is None: + pass + else: + raise(TypeError(sta)) - _chan, _ts, _te, _tag = _remainder.split("__") - if _chan != chan or (tag is not None and _tag != tag): - continue + if isinstance(loc, str): + loc = (loc,) + elif isinstance(loc, tuple) or isinstance(loc, list) or loc is None: + pass + else: + raise(TypeError(loc)) - _ds = self._waveform_group["%s/%s" % (_station_name, _key)] + if isinstance(chan, str): + chan = (chan,) + elif isinstance(chan, tuple) or isinstance(chan, list) or chan is None: + pass + else: + raise(TypeError(chan)) - _ts = obspy.UTCDateTime(_ds.attrs["starttime"]*1e-9) - _samprate = _ds.attrs["sampling_rate"] - _te = _ts + len(_ds)/_samprate - if _te < starttime or _ts > endtime: - continue + if isinstance(tag, str): + tag = (tag,) + elif isinstance(tag, tuple) or isinstance(tag, list) or tag is None: + pass + else: + raise(TypeError(tag)) - _offset = int((starttime-_ts)*_samprate) - _nsamp = int((endtime-starttime)*_samprate) - _ref = _ds.regionref[_offset:_offset+_nsamp+1] + _predicate_net = lambda _key: net == None\ + or _key.split(".")[0] in net - if ref not in self._reference_group: - _ref_grp = self._reference_group.create_group(ref) - else: - _ref_grp = self._reference_group[ref] + _predicate_sta = lambda _key: sta == None\ + or _key.split(".")[1] in sta - _handle = ".".join((_net, _sta, _loc, _chan)) + _predicate_loc = lambda _key: loc == None\ + or _key.split(".")[2] in loc - if overwrite is True and _handle in _ref_grp: - del(_ref_grp[_handle]) + _predicate_chan = lambda _key: chan == None\ + or _key.split(".")[-1].split("__")[0] in chan - if _handle not in _ref_grp: - _ref_ds = _ref_grp.create_dataset(_handle, - (1,), - dtype=h5py.special_dtype(ref=h5py.RegionReference)) - _ref_ds.attrs["sampling_rate"] = _ds.attrs["sampling_rate"] - _ref_ds.attrs["starttime"] = _ds.attrs["starttime"] + int(_offset/_samprate*1.e9) - _ref_ds[0] = _ref - else: - print("Will not overwrite existing reference") - continue + _predicate_tag = lambda _key: tag == None\ + or _key.split(".")[-1].split("__")[-1] in tag + + _predicate_netsta = lambda _key: _predicate_net(_key)\ + and _predicate_sta(_key) + + _predicate_locchantag = lambda _key: _predicate_loc(_key)\ + and _predicate_chan(_key)\ + and _predicate_tag(_key) + + for _station_name in itertools.ifilter(_predicate_netsta, + self._waveform_group.keys()): + for _key in itertools.ifilter(_predicate_locchantag, + self._waveform_group[_station_name].keys()): + + _net, _sta, _loc, _remainder = _key.split(".") + _chan = _remainder.split("__")[0] + + _ds = self._waveform_group["%s/%s" % (_station_name, _key)] + + _ts = obspy.UTCDateTime(_ds.attrs["starttime"]*1e-9) + _samprate = _ds.attrs["sampling_rate"] + _te = _ts + len(_ds)/_samprate + if _te < starttime or _ts > endtime: + continue + + _offset = int((starttime-_ts)*_samprate) + _nsamp = int((endtime-starttime)*_samprate) + _ref = _ds.regionref[_offset:_offset+_nsamp+1] + + if ref not in self._reference_group: + _ref_grp = self._reference_group.create_group(ref) + else: + _ref_grp = self._reference_group[ref] + + _net = "__" if _net == "" else _net + _sta = "__" if _sta == "" else _sta + _loc = "__" if _loc == "" else _loc + _chan = "__" if _chan == "" else _chan + _handle = "/".join((_net, _sta, _loc, _chan)) + + if overwrite is True and _handle in _ref_grp: + del(_ref_grp[_handle]) + + if _handle not in _ref_grp: + _ref_ds = _ref_grp.create_dataset(_handle, + (1,), + dtype=h5py.special_dtype(ref=h5py.RegionReference)) + _ref_ds.attrs["sampling_rate"] = _ds.attrs["sampling_rate"] + _ref_ds.attrs["starttime"] = _ds.attrs["starttime"] + int(_offset/_samprate*1.e9) + _ref_ds[0] = _ref + else: + print("Will not overwrite existing reference") + continue + + def get_data_for_reference(self, ref, net=None, sta=None, loc=None, + chan=None): + """ + Create a region reference for fast lookup of segements of + continuous data. + """ + if not isinstance(ref, str): + raise(TypeError("reference must be type ::str::")) + if ref not in self._reference_group: + raise(IOError("reference does not exist: %s" % ref)) - def get_data_for_reference(self, ref, net, sta, loc, chan): - _handle = "%s/%s.%s.%s.%s" % (ref, net, sta, loc, chan) - if _handle not in self._reference_group: - raise(IOError("Data not found")) - _ref = self._reference_group[_handle][0] - tr = obspy.Trace(data=self.__file[_ref][_ref]) - tr.stats.network = net - tr.stats.station = sta - tr.stats.location = loc - tr.stats.channel = chan - tr.stats.starttime = obspy.UTCDateTime(self._reference_group[_handle].attrs["starttime"]*1e-9) - tr.stats.delta = 1/self._reference_group[_handle].attrs["sampling_rate"] - return(tr) + if isinstance(net, str): + net = (net,) + elif isinstance(net, tuple) or isinstance(net, list) or net is None: + pass + else: + raise(TypeError(net)) + + if isinstance(sta, str): + sta = (sta,) + elif isinstance(sta, tuple) or isinstance(sta, list) or sta is None: + pass + else: + raise(TypeError(sta)) + + if isinstance(loc, str): + loc = (loc,) + elif isinstance(loc, tuple) or isinstance(loc, list) or loc is None: + pass + else: + raise(TypeError(loc)) + + if isinstance(chan, str): + chan = (chan,) + elif isinstance(chan, tuple) or isinstance(chan, list) or chan is None: + pass + else: + raise(TypeError(chan)) + + _predicate_net = lambda _key: net == None or _key in net + _predicate_sta = lambda _key: sta == None or _key in sta + _predicate_loc = lambda _key: loc == None or _key in loc + _predicate_chan = lambda _key: chan == None or _key in chan + + _st = obspy.Stream() + _ref_grp = self._reference_group[ref] + for _net in itertools.ifilter(_predicate_net, + _ref_grp.keys()): + _net_grp = _ref_grp[_net] + + for _sta in itertools.ifilter(_predicate_sta, + _net_grp.keys()): + _sta_grp = _net_grp[_sta] + + for _loc in itertools.ifilter(_predicate_loc, + _sta_grp.keys()): + _loc_grp = _sta_grp[_loc] + + for _chan in itertools.ifilter(_predicate_chan, + _loc_grp.keys()): + _ds = _loc_grp[_chan] + _ref = _ds[0] + _tr = obspy.Trace(data=self.__file[_ref][_ref]) + _tr.stats.network = _net if _net != "__" else "" + _tr.stats.station = _sta if _sta != "__" else "" + _tr.stats.location = _loc if _loc != "__" else "" + _tr.stats.channel = _chan if _chan != "__" else "" + _tr.stats.starttime = obspy.UTCDateTime(_ds.attrs["starttime"]*1e-9) + _tr.stats.delta = 1/_ds.attrs["sampling_rate"] + _st.append(_tr) + return(_st) def get_provenance_document(self, document_name): """ From 8d806c5176316f038944a02349995712949eaf55 Mon Sep 17 00:00:00 2001 From: Malcolm White Date: Thu, 26 Apr 2018 21:46:48 -0700 Subject: [PATCH 03/13] Add documentation and tests. --- .gitignore | 1 + pyasdf/asdf_data_set.py | 255 +++++++++++++++++++++++------ pyasdf/tests/test_asdf_data_set.py | 30 ++++ 3 files changed, 237 insertions(+), 49 deletions(-) diff --git a/.gitignore b/.gitignore index bfc34a2..48f4fa9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ *.egg-info doc/_build/ .cache +*.pyc diff --git a/pyasdf/asdf_data_set.py b/pyasdf/asdf_data_set.py index b663bc4..6598631 100644 --- a/pyasdf/asdf_data_set.py +++ b/pyasdf/asdf_data_set.py @@ -1234,95 +1234,186 @@ def __parse_waveform_input_and_validate(self, waveform): def create_reference(self, ref, starttime, endtime, net=None, sta=None, loc=None, chan=None, tag=None, overwrite=False): """ - Create a region reference for fast lookup of segements of - continuous data. + Creates a region reference for fast lookup of data segments. + + :param ref: The reference label to apply. + :type ref: str + :param starttime: Start time of reference. + :type starttime: :class:`obspy.core.utcdatetime.UTCDateTime` + :param endtime: End time of reference. + :type endtime: :class:`obspy.core.utcdatetime.UTCDateTime` + :param net: Networks to create references for. + :type net: str, tuple + :param sta: Stations to create references for. + :type sta: str, tuple + :param loc: Location codes to create references for. + :type loc: str, tuple + :param chan: Channels to create references for. + :type chan: str, tuple + :param tag: Tag to create references for. + :type tag: str, tuple + :param overwrite: Overwrite existing references for this label. + :type overwrite: bool + + This methodology is useful for creating subsets of a dataset + without duplicating waveforms. + + .. rubric:: Example + + Consider an ASDFDataSet populated with continuous waveforms for + stations from two networks (AA and BB): + + - AA.XXX + - AA.YYY + - AA.ZZZ + - BB.UUU + - BB.VVV + - BB.WWW + + It may be useful to process event-segmented waveforms, where + a one-minute window of data is needed. We can create references + to these windowed data segments for fast extraction: + + .. code-block:: python + + >>> ds.create_reference("event000001", + ... obspy.UTCDateTime("2016001T01:00:00"), + ... obspy.UTCDateTime("2016001T01:01:00")) + >>> ds.get_data_for_reference("event000001") + 18 Trace(s) in Stream: + AA.XXX..HHZ | 2016-01-01T01:00:00.00Z ... | 100.0 Hz, 6001 samples + ... + (16 other traces) + ... + BB.WWW..HHE | 2016-01-01T01:00:00.00Z ... | 100.0 Hz, 6001 samples + + Or perhaps we only want to include data from network AA in the + referenced data set: + + .. code-block:: python + + >>> ds.create_reference("event000001", + ... obspy.UTCDateTime("2016001T01:00:00"), + ... obspy.UTCDateTime("2016001T01:01:00"), + ... net="AA", + ... overwrite=True) + >>> ds.get_data_for_reference("event000001") + 9 Trace(s) in Stream: + AA.XXX..HHZ | 2016-01-01T01:00:00.00Z ... | 100.0 Hz, 6001 samples + ... + (7 other traces) + ... + AA.ZZZ..HHE | 2016-01-01T01:00:00.00Z ... | 100.0 Hz, 6001 samples + + Or only horizontal component data: + + .. code-block:: python + + >>> ds.create_reference("event000001", + ... obspy.UTCDateTime("2016001T01:00:00"), + ... obspy.UTCDateTime("2016001T01:01:00"), + ... chan=("HHN", "HHE"), + ... overwrite=True) + >>> ds.get_data_for_reference("event000001") + 12 Trace(s) in Stream: + AA.XXX..HHN | 2016-01-01T01:00:00.00Z ... | 100.0 Hz, 6001 samples + ... + (10 other traces) + ... + BB.WWW..HHE | 2016-01-01T01:00:00.00Z ... | 100.0 Hz, 6001 samples + + etc... """ - if isinstance(net, str): + if isinstance(net, str) or isinstance(net, unicode): net = (net,) elif isinstance(net, tuple) or isinstance(net, list) or net is None: pass else: raise(TypeError(net)) - if isinstance(sta, str): + if isinstance(sta, str) or isinstance(sta, unicode): sta = (sta,) elif isinstance(sta, tuple) or isinstance(sta, list) or sta is None: pass else: raise(TypeError(sta)) - if isinstance(loc, str): + if isinstance(loc, str) or isinstance(loc, unicode): loc = (loc,) elif isinstance(loc, tuple) or isinstance(loc, list) or loc is None: pass else: raise(TypeError(loc)) - if isinstance(chan, str): + if isinstance(chan, str) or isinstance(chan, unicode): chan = (chan,) elif isinstance(chan, tuple) or isinstance(chan, list) or chan is None: pass else: raise(TypeError(chan)) - if isinstance(tag, str): + if isinstance(tag, str) or isinstance(tag, unicode): tag = (tag,) elif isinstance(tag, tuple) or isinstance(tag, list) or tag is None: pass else: raise(TypeError(tag)) - _predicate_net = lambda _key: net == None\ - or _key.split(".")[0] in net + _ref_dtype = h5py.special_dtype(ref=h5py.RegionReference) - _predicate_sta = lambda _key: sta == None\ - or _key.split(".")[1] in sta + def _predicate_net(_key): + return(net is None or _key.split(".")[0] in net) - _predicate_loc = lambda _key: loc == None\ - or _key.split(".")[2] in loc + def _predicate_sta(_key): + return(sta is None or _key.split(".")[1] in sta) - _predicate_chan = lambda _key: chan == None\ - or _key.split(".")[-1].split("__")[0] in chan + def _predicate_loc(_key): + return(loc is None or _key.split(".")[2] in loc) - _predicate_tag = lambda _key: tag == None\ - or _key.split(".")[-1].split("__")[-1] in tag + def _predicate_chan(_key): + return(chan is None or _key.split(".")[-1].split("__")[0] in chan) - _predicate_netsta = lambda _key: _predicate_net(_key)\ - and _predicate_sta(_key) + def _predicate_tag(_key): + return(tag is None or _key.split(".")[-1].split("__")[-1] in tag) - _predicate_locchantag = lambda _key: _predicate_loc(_key)\ - and _predicate_chan(_key)\ - and _predicate_tag(_key) + def _predicate_netsta(_key): + return(_predicate_net(_key) and _predicate_sta(_key)) + def _predicate_locchantag(_key): + return(_predicate_loc(_key) + and _predicate_chan(_key) + and _predicate_tag(_key)) + + _wf_grp = self._waveform_group for _station_name in itertools.ifilter(_predicate_netsta, self._waveform_group.keys()): for _key in itertools.ifilter(_predicate_locchantag, - self._waveform_group[_station_name].keys()): + _wf_grp[_station_name].keys()): _net, _sta, _loc, _remainder = _key.split(".") _chan = _remainder.split("__")[0] _ds = self._waveform_group["%s/%s" % (_station_name, _key)] - _ts = obspy.UTCDateTime(_ds.attrs["starttime"]*1e-9) + _ts = obspy.UTCDateTime(_ds.attrs["starttime"]*1e-9) _samprate = _ds.attrs["sampling_rate"] - _te = _ts + len(_ds)/_samprate + _te = _ts + len(_ds)/_samprate if _te < starttime or _ts > endtime: continue _offset = int((starttime-_ts)*_samprate) - _nsamp = int((endtime-starttime)*_samprate) - _ref = _ds.regionref[_offset:_offset+_nsamp+1] + _nsamp = int((endtime-starttime)*_samprate) + _ref = _ds.regionref[_offset:_offset+_nsamp+1] if ref not in self._reference_group: _ref_grp = self._reference_group.create_group(ref) else: _ref_grp = self._reference_group[ref] - _net = "__" if _net == "" else _net - _sta = "__" if _sta == "" else _sta - _loc = "__" if _loc == "" else _loc + _net = "__" if _net == "" else _net + _sta = "__" if _sta == "" else _sta + _loc = "__" if _loc == "" else _loc _chan = "__" if _chan == "" else _chan _handle = "/".join((_net, _sta, _loc, _chan)) @@ -1332,9 +1423,10 @@ def create_reference(self, ref, starttime, endtime, net=None, sta=None, if _handle not in _ref_grp: _ref_ds = _ref_grp.create_dataset(_handle, (1,), - dtype=h5py.special_dtype(ref=h5py.RegionReference)) + dtype=_ref_dtype) _ref_ds.attrs["sampling_rate"] = _ds.attrs["sampling_rate"] - _ref_ds.attrs["starttime"] = _ds.attrs["starttime"] + int(_offset/_samprate*1.e9) + _ts = _ds.attrs["starttime"] + int(_offset/_samprate*1.e9) + _ref_ds.attrs["starttime"] = _ts _ref_ds[0] = _ref else: print("Will not overwrite existing reference") @@ -1343,46 +1435,109 @@ def create_reference(self, ref, starttime, endtime, net=None, sta=None, def get_data_for_reference(self, ref, net=None, sta=None, loc=None, chan=None): """ - Create a region reference for fast lookup of segements of - continuous data. + Retrieve referenced data. + + :param ref: Reference label. + :type ref: str + :param net: Networks to retrieve referenced data for. + :type net: str, tuple + :param sta: Stations to retrieve referenced data for. + :type sta: str, tuple + :param loc: Location codes to retrieve referenced data for. + :type loc: str, tuple + :param chan: Channels to retrieve referenced data for. + :type chan: str, tuple + :returns: Referenced data. + :rtype: :class:`~obspy.core.stream.Stream` + + .. rubric:: Example + + Consider an ASDFDataSet with references pointing to event-segmented + waveforms (see :func:`create_reference`). We can retrieve data + for a particular reference label: + + .. code-block:: python + + >>> ds.get_data_for_reference("event000001") + 18 Trace(s) in Stream: + AA.XXX..HHZ | 2016-01-01T01:00:00.00Z ... | 100.0 Hz, 6001 samples + ... + (16 other traces) + ... + BB.WWW..HHE | 2016-01-01T01:00:00.00Z ... | 100.0 Hz, 6001 samples + + Or for only the BB network: + + .. code-block:: python + + >>> ds.get_data_for_reference("event000001", + ... net="BB") + 9 Trace(s) in Stream: + BB.UUU..HHZ | 2016-01-01T01:00:00.00Z ... | 100.0 Hz, 6001 samples + ... + (7 other traces) + ... + BB.WWW..HHE | 2016-01-01T01:00:00.00Z ... | 100.0 Hz, 6001 samples + + Or for only horizontal components: + + .. code-block:: python + + >>> ds.get_data_for_reference("event000001", + ... chan=("HHN","HHE")) + 12 Trace(s) in Stream: + AA.XXX..HHN | 2016-01-01T01:00:00.00Z ... | 100.0 Hz, 6001 samples + ... + (10 other traces) + ... + BB.WWW..HHE | 2016-01-01T01:00:00.00Z ... | 100.0 Hz, 6001 samples + + etc... """ - if not isinstance(ref, str): + if not isinstance(ref, str) and not isinstance(ref, unicode): raise(TypeError("reference must be type ::str::")) if ref not in self._reference_group: raise(IOError("reference does not exist: %s" % ref)) - if isinstance(net, str): + if isinstance(net, str) or isinstance(net, unicode): net = (net,) elif isinstance(net, tuple) or isinstance(net, list) or net is None: pass else: raise(TypeError(net)) - if isinstance(sta, str): + if isinstance(sta, str) or isinstance(sta, unicode): sta = (sta,) elif isinstance(sta, tuple) or isinstance(sta, list) or sta is None: pass else: raise(TypeError(sta)) - if isinstance(loc, str): + if isinstance(loc, str) or isinstance(loc, unicode): loc = (loc,) elif isinstance(loc, tuple) or isinstance(loc, list) or loc is None: pass else: raise(TypeError(loc)) - if isinstance(chan, str): + if isinstance(chan, str) or isinstance(chan, unicode): chan = (chan,) elif isinstance(chan, tuple) or isinstance(chan, list) or chan is None: pass else: raise(TypeError(chan)) - _predicate_net = lambda _key: net == None or _key in net - _predicate_sta = lambda _key: sta == None or _key in sta - _predicate_loc = lambda _key: loc == None or _key in loc - _predicate_chan = lambda _key: chan == None or _key in chan + def _predicate_net(_key): + return(net is None or _key in net) + + def _predicate_sta(_key): + return(sta is None or _key in sta) + + def _predicate_loc(_key): + return(loc is None or _key in loc) + + def _predicate_chan(_key): + return(chan is None or _key in chan) _st = obspy.Stream() _ref_grp = self._reference_group[ref] @@ -1400,14 +1555,16 @@ def get_data_for_reference(self, ref, net=None, sta=None, loc=None, for _chan in itertools.ifilter(_predicate_chan, _loc_grp.keys()): - _ds = _loc_grp[_chan] + _ds = _loc_grp[_chan] _ref = _ds[0] _tr = obspy.Trace(data=self.__file[_ref][_ref]) - _tr.stats.network = _net if _net != "__" else "" - _tr.stats.station = _sta if _sta != "__" else "" - _tr.stats.location = _loc if _loc != "__" else "" - _tr.stats.channel = _chan if _chan != "__" else "" - _tr.stats.starttime = obspy.UTCDateTime(_ds.attrs["starttime"]*1e-9) + _tr.stats.network = _net if _net != "__" else "" + _tr.stats.station = _sta if _sta != "__" else "" + _tr.stats.location = _loc if _loc != "__" else "" + _tr.stats.channel = _chan if _chan != "__" else "" + _tr.stats.starttime = obspy.UTCDateTime( + _ds.attrs["starttime"]*1e-9 + ) _tr.stats.delta = 1/_ds.attrs["sampling_rate"] _st.append(_tr) return(_st) diff --git a/pyasdf/tests/test_asdf_data_set.py b/pyasdf/tests/test_asdf_data_set.py index a9a59cb..1e62395 100644 --- a/pyasdf/tests/test_asdf_data_set.py +++ b/pyasdf/tests/test_asdf_data_set.py @@ -87,6 +87,36 @@ def test_waveform_tags_attribute(tmpdir): assert data_set.waveform_tags == expected +def test_reference_creation(tmpdir): + asdf_filename = os.path.join(tmpdir.strpath, "test.h5") + data_path = os.path.join(data_dir, "small_sample_data_set") + + data_set = ASDFDataSet(asdf_filename) + + for filename in glob.glob(os.path.join(data_path, "*.mseed")): + data_set.add_waveforms(filename, tag="raw") + + data_set.create_reference("ref1", + obspy.UTCDateTime("2013-05-24T05:50:00"), + obspy.UTCDateTime("2013-05-24T05:55:00"), + net="AE") + st = data_set.get_data_for_reference("ref1") + + assert len(st) == 3 + for tr in st: + assert tr.stats.network == "AE" + + data_set.create_reference("ref2", + obspy.UTCDateTime("2013-05-24T05:50:00"), + obspy.UTCDateTime("2013-05-24T05:55:00"), + chan="BHZ") + st = data_set.get_data_for_reference("ref2") + + assert len(st) == 2 + for tr in st: + assert tr.stats.channel == "BHZ" + + def test_data_set_creation(tmpdir): """ Test data set creation with a small test dataset. From 7f64a33c684aa4ed317bb0782af0af34d8d01c55 Mon Sep 17 00:00:00 2001 From: Malcolm White Date: Fri, 27 Apr 2018 09:15:15 -0700 Subject: [PATCH 04/13] Resolve Python 2/3 inconsitencies causing tests to fail. Resolve Python 2/3 inconsitencies causing tests to fail. Merge remote-tracking branch 'origin/dev/waveform_references' into dev/waveform_references --- pyasdf/asdf_data_set.py | 121 ++++++++++++++++------------------------ 1 file changed, 48 insertions(+), 73 deletions(-) diff --git a/pyasdf/asdf_data_set.py b/pyasdf/asdf_data_set.py index 6598631..fe30611 100644 --- a/pyasdf/asdf_data_set.py +++ b/pyasdf/asdf_data_set.py @@ -1325,40 +1325,28 @@ def create_reference(self, ref, starttime, endtime, net=None, sta=None, etc... """ - if isinstance(net, str) or isinstance(net, unicode): - net = (net,) - elif isinstance(net, tuple) or isinstance(net, list) or net is None: - pass - else: - raise(TypeError(net)) - - if isinstance(sta, str) or isinstance(sta, unicode): - sta = (sta,) - elif isinstance(sta, tuple) or isinstance(sta, list) or sta is None: - pass - else: - raise(TypeError(sta)) - - if isinstance(loc, str) or isinstance(loc, unicode): - loc = (loc,) - elif isinstance(loc, tuple) or isinstance(loc, list) or loc is None: - pass - else: - raise(TypeError(loc)) + def _coerce(obj): + if isinstance(obj, str): + obj = (obj,) + elif isinstance(obj, tuple)\ + or isinstance(obj, list)\ + or obj is None: + pass + else: + try: + if isinstance(obj, unicode): + obj = (obj,) + else: + raise(TypeError(obj)) + except NameError: + pass + return(obj) - if isinstance(chan, str) or isinstance(chan, unicode): - chan = (chan,) - elif isinstance(chan, tuple) or isinstance(chan, list) or chan is None: - pass - else: - raise(TypeError(chan)) - - if isinstance(tag, str) or isinstance(tag, unicode): - tag = (tag,) - elif isinstance(tag, tuple) or isinstance(tag, list) or tag is None: - pass - else: - raise(TypeError(tag)) + net = _coerce(net) + sta = _coerce(sta) + loc = _coerce(loc) + chan = _coerce(chan) + tag = _coerce(tag) _ref_dtype = h5py.special_dtype(ref=h5py.RegionReference) @@ -1386,10 +1374,10 @@ def _predicate_locchantag(_key): and _predicate_tag(_key)) _wf_grp = self._waveform_group - for _station_name in itertools.ifilter(_predicate_netsta, - self._waveform_group.keys()): - for _key in itertools.ifilter(_predicate_locchantag, - _wf_grp[_station_name].keys()): + for _station_name in filter(_predicate_netsta, + self._waveform_group.keys()): + for _key in filter(_predicate_locchantag, + _wf_grp[_station_name].keys()): _net, _sta, _loc, _remainder = _key.split(".") _chan = _remainder.split("__")[0] @@ -1494,38 +1482,29 @@ def get_data_for_reference(self, ref, net=None, sta=None, loc=None, etc... """ - if not isinstance(ref, str) and not isinstance(ref, unicode): - raise(TypeError("reference must be type ::str::")) if ref not in self._reference_group: raise(IOError("reference does not exist: %s" % ref)) - if isinstance(net, str) or isinstance(net, unicode): - net = (net,) - elif isinstance(net, tuple) or isinstance(net, list) or net is None: - pass - else: - raise(TypeError(net)) - - if isinstance(sta, str) or isinstance(sta, unicode): - sta = (sta,) - elif isinstance(sta, tuple) or isinstance(sta, list) or sta is None: - pass - else: - raise(TypeError(sta)) - - if isinstance(loc, str) or isinstance(loc, unicode): - loc = (loc,) - elif isinstance(loc, tuple) or isinstance(loc, list) or loc is None: - pass - else: - raise(TypeError(loc)) + def _coerce(obj): + if isinstance(obj, str): + obj = (obj,) + elif isinstance(obj, tuple)\ + or isinstance(obj, list)\ + or obj is None: + pass + else: + try: + if isinstance(obj, unicode): + obj = (obj,) + else: + raise(TypeError(obj)) + except NameError: + pass - if isinstance(chan, str) or isinstance(chan, unicode): - chan = (chan,) - elif isinstance(chan, tuple) or isinstance(chan, list) or chan is None: - pass - else: - raise(TypeError(chan)) + net = _coerce(net) + sta = _coerce(sta) + loc = _coerce(loc) + chan = _coerce(chan) def _predicate_net(_key): return(net is None or _key in net) @@ -1541,20 +1520,16 @@ def _predicate_chan(_key): _st = obspy.Stream() _ref_grp = self._reference_group[ref] - for _net in itertools.ifilter(_predicate_net, - _ref_grp.keys()): + for _net in filter(_predicate_net, _ref_grp.keys()): _net_grp = _ref_grp[_net] - for _sta in itertools.ifilter(_predicate_sta, - _net_grp.keys()): + for _sta in filter(_predicate_sta, _net_grp.keys()): _sta_grp = _net_grp[_sta] - for _loc in itertools.ifilter(_predicate_loc, - _sta_grp.keys()): + for _loc in filter(_predicate_loc, _sta_grp.keys()): _loc_grp = _sta_grp[_loc] - for _chan in itertools.ifilter(_predicate_chan, - _loc_grp.keys()): + for _chan in filter(_predicate_chan, _loc_grp.keys()): _ds = _loc_grp[_chan] _ref = _ds[0] _tr = obspy.Trace(data=self.__file[_ref][_ref]) From c7048e6441219d5656f04b9f4a069f9ac6514fd5 Mon Sep 17 00:00:00 2001 From: Malcolm White Date: Fri, 27 Apr 2018 15:49:24 -0700 Subject: [PATCH 05/13] Add some more tests. --- pyasdf/tests/test_asdf_data_set.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/pyasdf/tests/test_asdf_data_set.py b/pyasdf/tests/test_asdf_data_set.py index 1e62395..39ca50b 100644 --- a/pyasdf/tests/test_asdf_data_set.py +++ b/pyasdf/tests/test_asdf_data_set.py @@ -101,7 +101,6 @@ def test_reference_creation(tmpdir): obspy.UTCDateTime("2013-05-24T05:55:00"), net="AE") st = data_set.get_data_for_reference("ref1") - assert len(st) == 3 for tr in st: assert tr.stats.network == "AE" @@ -111,11 +110,32 @@ def test_reference_creation(tmpdir): obspy.UTCDateTime("2013-05-24T05:55:00"), chan="BHZ") st = data_set.get_data_for_reference("ref2") - assert len(st) == 2 for tr in st: assert tr.stats.channel == "BHZ" + data_set.create_reference("ref3", + obspy.UTCDateTime("2013-05-24T05:50:00"), + obspy.UTCDateTime("2013-05-24T05:55:00"), + chan=("BHN", "BHE")) + st = data_set.get_data_for_reference("ref3") + assert len(st) == 4 + for tr in st: + assert tr.stats.channel in ("BHN", "BHE") + + data_set.create_reference("ref4", + obspy.UTCDateTime("2013-05-24T05:50:00"), + obspy.UTCDateTime("2013-05-24T05:55:00"), + net="TA", + sta=("POKR",), + chan="BHZ") + st = data_set.get_data_for_reference("ref4") + assert len(st) == 1 + for tr in st: + assert tr.stats.channel == "BHZ"\ + and tr.stats.station == "POKR"\ + and tr.stats.network == "TA" + def test_data_set_creation(tmpdir): """ From 50b5a6640d63a28a5725a4ebc83a1fef2b5b8d27 Mon Sep 17 00:00:00 2001 From: Malcolm White Date: Fri, 27 Apr 2018 17:51:31 -0700 Subject: [PATCH 06/13] Add missing return statement. --- pyasdf/asdf_data_set.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyasdf/asdf_data_set.py b/pyasdf/asdf_data_set.py index fe30611..a23ce42 100644 --- a/pyasdf/asdf_data_set.py +++ b/pyasdf/asdf_data_set.py @@ -1376,9 +1376,9 @@ def _predicate_locchantag(_key): _wf_grp = self._waveform_group for _station_name in filter(_predicate_netsta, self._waveform_group.keys()): + print(_station_name) for _key in filter(_predicate_locchantag, _wf_grp[_station_name].keys()): - _net, _sta, _loc, _remainder = _key.split(".") _chan = _remainder.split("__")[0] @@ -1500,6 +1500,7 @@ def _coerce(obj): raise(TypeError(obj)) except NameError: pass + return(obj) net = _coerce(net) sta = _coerce(sta) From e592b6fdc54aac418e3ae5835274fcea8cd4f4df Mon Sep 17 00:00:00 2001 From: Malcolm White Date: Sun, 6 May 2018 17:46:04 -0700 Subject: [PATCH 07/13] Use path/offset methodology instead of RegionReferences. --- pyasdf/asdf_data_set.py | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/pyasdf/asdf_data_set.py b/pyasdf/asdf_data_set.py index a23ce42..61a6356 100644 --- a/pyasdf/asdf_data_set.py +++ b/pyasdf/asdf_data_set.py @@ -1348,7 +1348,7 @@ def _coerce(obj): chan = _coerce(chan) tag = _coerce(tag) - _ref_dtype = h5py.special_dtype(ref=h5py.RegionReference) + #_ref_dtype = h5py.special_dtype(ref=h5py.RegionReference) def _predicate_net(_key): return(net is None or _key.split(".")[0] in net) @@ -1376,11 +1376,9 @@ def _predicate_locchantag(_key): _wf_grp = self._waveform_group for _station_name in filter(_predicate_netsta, self._waveform_group.keys()): - print(_station_name) for _key in filter(_predicate_locchantag, _wf_grp[_station_name].keys()): - _net, _sta, _loc, _remainder = _key.split(".") - _chan = _remainder.split("__")[0] + _net, _sta, _loc, _chan = _key.split("__")[0].split(".") _ds = self._waveform_group["%s/%s" % (_station_name, _key)] @@ -1391,8 +1389,8 @@ def _predicate_locchantag(_key): continue _offset = int((starttime-_ts)*_samprate) - _nsamp = int((endtime-starttime)*_samprate) - _ref = _ds.regionref[_offset:_offset+_nsamp+1] + _nsamp = int(round((endtime-starttime)*_samprate, 0)) + #_ref = _ds.regionref[_offset:_offset+_nsamp+1] if ref not in self._reference_group: _ref_grp = self._reference_group.create_group(ref) @@ -1409,13 +1407,14 @@ def _predicate_locchantag(_key): del(_ref_grp[_handle]) if _handle not in _ref_grp: - _ref_ds = _ref_grp.create_dataset(_handle, - (1,), - dtype=_ref_dtype) - _ref_ds.attrs["sampling_rate"] = _ds.attrs["sampling_rate"] + _ref = _ref_grp.create_dataset(_handle, + (2,), + dtype=np.int64) + _ref.attrs["reference_path"] = _ds.name + _ref.attrs["sampling_rate"] = _ds.attrs["sampling_rate"] _ts = _ds.attrs["starttime"] + int(_offset/_samprate*1.e9) - _ref_ds.attrs["starttime"] = _ts - _ref_ds[0] = _ref + _ref.attrs["starttime"] = _ts + _ref[:] = [_offset, _offset+_nsamp] else: print("Will not overwrite existing reference") continue @@ -1531,17 +1530,20 @@ def _predicate_chan(_key): _loc_grp = _sta_grp[_loc] for _chan in filter(_predicate_chan, _loc_grp.keys()): - _ds = _loc_grp[_chan] - _ref = _ds[0] - _tr = obspy.Trace(data=self.__file[_ref][_ref]) + _ref = _loc_grp[_chan] + _tr = obspy.Trace( + data=self.__file[ + _ref.attrs["reference_path"] + ][_ref[0]: _ref[1]] + ) _tr.stats.network = _net if _net != "__" else "" _tr.stats.station = _sta if _sta != "__" else "" _tr.stats.location = _loc if _loc != "__" else "" _tr.stats.channel = _chan if _chan != "__" else "" _tr.stats.starttime = obspy.UTCDateTime( - _ds.attrs["starttime"]*1e-9 + _ref.attrs["starttime"]*1e-9 ) - _tr.stats.delta = 1/_ds.attrs["sampling_rate"] + _tr.stats.delta = 1/_ref.attrs["sampling_rate"] _st.append(_tr) return(_st) From bed89cecc521520966691a3b71fd8417634fc52f Mon Sep 17 00:00:00 2001 From: Malcolm White Date: Sat, 19 May 2018 13:26:14 -0700 Subject: [PATCH 08/13] Add I/O for masked Traces Make backwards compatible with older ASDF versions I/O of masked arrays of float/integer data. Ignore some test files. --- .gitignore | 3 +++ pyasdf/asdf_data_set.py | 32 ++++++++++++++++++++++++++++---- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index bfc34a2..8cb055b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ *.egg-info doc/_build/ .cache +*.pyc +.coverage +.pytest_cache diff --git a/pyasdf/asdf_data_set.py b/pyasdf/asdf_data_set.py index 9f9e716..5768c5a 100644 --- a/pyasdf/asdf_data_set.py +++ b/pyasdf/asdf_data_set.py @@ -37,7 +37,6 @@ import prov import prov.model - # Minimum compatibility wrapper between Python 2 and 3. try: filter = itertools.ifilter @@ -811,7 +810,13 @@ def _get_waveform(self, waveform_name, starttime=None, endtime=None): data = self.__file["Waveforms"]["%s.%s" % (network, station)][ waveform_name] - tr = obspy.Trace(data=data[idx_start: idx_end]) + if "mask" in data.attrs and data.attrs["mask"] is not False: + _data = np.ma.masked_values(data[idx_start: idx_end], + data.attrs["mask"]) + else: + _data = data[idx_start: idx_end] + + tr = obspy.Trace(data=_data) tr.stats.starttime = data_starttime tr.stats.sampling_rate = data.attrs["sampling_rate"] tr.stats.network = network @@ -1167,6 +1172,8 @@ def add_waveforms(self, waveform, tag, event_id=None, origin_id=None, # Actually add the data. for trace in waveform: + if isinstance(trace.data, np.ma.masked_array): + self.__set_masked_array_fill_value(trace) # Complicated multi-step process but it enables one to use # parallel I/O with the same functions. info = self._add_trace_get_collective_information( @@ -1179,6 +1186,16 @@ def add_waveforms(self, waveform, tag, event_id=None, origin_id=None, self._add_trace_write_collective_information(info) self._add_trace_write_independent_information(info, trace) + def __set_masked_array_fill_value(self, trace): + if trace.data.dtype.kind in ("i", "u"): + _info = np.iinfo + elif trace.data.dtype.kind == "f" : + _info = np.finfo + else: + raise(NotImplementedError("fill value for dtype %s not defined"\ + % trace.data.dtype)) + trace.data.set_fill_value(_info(trace.data.dtype).min) + def __parse_and_validate_tag(self, tag): tag = tag.strip() if tag.lower() == "stationxml": @@ -1295,7 +1312,7 @@ def _add_trace_write_independent_information(self, info, trace): :param trace: :return: """ - self._waveform_group[info["data_name"]][:] = trace.data + self._waveform_group[info["data_name"]][:] = np.ma.filled(trace.data) def _add_trace_write_collective_information(self, info): """ @@ -1367,6 +1384,12 @@ def _add_trace_get_collective_information( else: fletcher32 = True + # Determine appropriate mask value. + if not isinstance(trace.data, np.ma.masked_array): + _mask = np.bool(False) + else: + _mask = trace.data.fill_value + info = { "station_name": station_name, "data_name": group_name, @@ -1384,7 +1407,8 @@ def _add_trace_get_collective_information( # Starttime is the epoch time in nanoseconds. "starttime": int(round(trace.stats.starttime.timestamp * 1.0E9)), - "sampling_rate": trace.stats.sampling_rate + "sampling_rate": trace.stats.sampling_rate, + "mask": _mask } } From 6d590f15809be1f0856b02ab6dd389b73b65c01f Mon Sep 17 00:00:00 2001 From: Malcolm White Date: Sat, 19 May 2018 15:52:16 -0700 Subject: [PATCH 09/13] Add I/O for masked Traces Make backwards compatible with older ASDF versions I/O of masked arrays of float/integer data. Ignore some test files. Fix broken identity test --- .gitignore | 3 +++ pyasdf/asdf_data_set.py | 32 ++++++++++++++++++++++++++++---- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index bfc34a2..8cb055b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ *.egg-info doc/_build/ .cache +*.pyc +.coverage +.pytest_cache diff --git a/pyasdf/asdf_data_set.py b/pyasdf/asdf_data_set.py index 9f9e716..79cfde4 100644 --- a/pyasdf/asdf_data_set.py +++ b/pyasdf/asdf_data_set.py @@ -37,7 +37,6 @@ import prov import prov.model - # Minimum compatibility wrapper between Python 2 and 3. try: filter = itertools.ifilter @@ -811,7 +810,13 @@ def _get_waveform(self, waveform_name, starttime=None, endtime=None): data = self.__file["Waveforms"]["%s.%s" % (network, station)][ waveform_name] - tr = obspy.Trace(data=data[idx_start: idx_end]) + if "mask" in data.attrs and data.attrs["mask"] != np.bool(False): + _data = np.ma.masked_values(data[idx_start: idx_end], + data.attrs["mask"]) + else: + _data = data[idx_start: idx_end] + + tr = obspy.Trace(data=_data) tr.stats.starttime = data_starttime tr.stats.sampling_rate = data.attrs["sampling_rate"] tr.stats.network = network @@ -1167,6 +1172,8 @@ def add_waveforms(self, waveform, tag, event_id=None, origin_id=None, # Actually add the data. for trace in waveform: + if isinstance(trace.data, np.ma.masked_array): + self.__set_masked_array_fill_value(trace) # Complicated multi-step process but it enables one to use # parallel I/O with the same functions. info = self._add_trace_get_collective_information( @@ -1179,6 +1186,16 @@ def add_waveforms(self, waveform, tag, event_id=None, origin_id=None, self._add_trace_write_collective_information(info) self._add_trace_write_independent_information(info, trace) + def __set_masked_array_fill_value(self, trace): + if trace.data.dtype.kind in ("i", "u"): + _info = np.iinfo + elif trace.data.dtype.kind == "f": + _info = np.finfo + else: + raise(NotImplementedError("fill value for dtype %s not defined" + % trace.data.dtype)) + trace.data.set_fill_value(_info(trace.data.dtype).min) + def __parse_and_validate_tag(self, tag): tag = tag.strip() if tag.lower() == "stationxml": @@ -1295,7 +1312,7 @@ def _add_trace_write_independent_information(self, info, trace): :param trace: :return: """ - self._waveform_group[info["data_name"]][:] = trace.data + self._waveform_group[info["data_name"]][:] = np.ma.filled(trace.data) def _add_trace_write_collective_information(self, info): """ @@ -1367,6 +1384,12 @@ def _add_trace_get_collective_information( else: fletcher32 = True + # Determine appropriate mask value. + if not isinstance(trace.data, np.ma.masked_array): + _mask = np.bool(False) + else: + _mask = trace.data.fill_value + info = { "station_name": station_name, "data_name": group_name, @@ -1384,7 +1407,8 @@ def _add_trace_get_collective_information( # Starttime is the epoch time in nanoseconds. "starttime": int(round(trace.stats.starttime.timestamp * 1.0E9)), - "sampling_rate": trace.stats.sampling_rate + "sampling_rate": trace.stats.sampling_rate, + "mask": _mask } } From c6db4d23f098fc6324d0d4fc675508525c989721 Mon Sep 17 00:00:00 2001 From: Malcolm White Date: Sat, 19 May 2018 21:54:10 -0700 Subject: [PATCH 10/13] Add test for masked Trace I/O --- pyasdf/tests/test_asdf_data_set.py | 54 ++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/pyasdf/tests/test_asdf_data_set.py b/pyasdf/tests/test_asdf_data_set.py index 3967793..fd96587 100644 --- a/pyasdf/tests/test_asdf_data_set.py +++ b/pyasdf/tests/test_asdf_data_set.py @@ -144,6 +144,60 @@ def test_data_set_creation(tmpdir): assert cat_file == cat_asdf +def test_masked_data_creation(tmpdir): + asdf_filename = os.path.join(tmpdir.strpath, "test.h5") + data_path = os.path.join(data_dir, "small_sample_data_set") + + data_set = ASDFDataSet(asdf_filename) + + filename = os.path.join(data_path, "AE.113A..BHZ.mseed") + + ts1 = obspy.UTCDateTime("2013-05-24T05:40:00") + te1 = obspy.UTCDateTime("2013-05-24T06:00:00") + ts2 = obspy.UTCDateTime("2013-05-24T06:10:00") + te2 = obspy.UTCDateTime("2013-05-24T06:50:00") + + st_file_raw = obspy.read(filename) + + st_file_masked = st_file_raw.copy().trim(starttime=ts1, endtime=te1)\ + + st_file_raw.copy().trim(starttime=ts2, endtime=te2) + st_file_masked.merge() + + # This will cast dtype from int to float + st_file_masked_filtered = st_file_masked.copy() + st_file_masked_filtered = st_file_masked_filtered.split() + st_file_masked_filtered.filter("bandpass", freqmin=0.1, freqmax=10) + st_file_masked_filtered.merge() + + data_set.add_waveforms(st_file_masked, tag="masked") + data_set.add_waveforms(st_file_masked_filtered, tag="masked_filtered") + + st_asdf_masked = data_set.waveforms["AE.113A"]["masked"] + st_asdf_masked_filtered = data_set.waveforms["AE.113A"]["masked_filtered"] + + trfm = st_file_masked[0] + trfmf = st_file_masked_filtered[0] + tram = st_asdf_masked[0] + tramf = st_asdf_masked_filtered[0] + + for tr in (trfm, trfmf): + del(tr.stats.mseed) + del(tr.stats._format) + del(tr.stats.processing) + + for tr in (tram, tramf): + del(tr.stats.asdf) + del(tr.stats._format) + + assert trfm.stats == tram.stats + assert all(trfm.data.mask == tram.data.mask) + assert all(trfm.data[~trfm.data.mask] == tram.data[~tram.data.mask]) + + assert trfmf.stats == tramf.stats + assert all(trfmf.data.mask == tramf.data.mask) + assert all(trfmf.data[~trfmf.data.mask] == tramf.data[~tramf.data.mask]) + + def test_equality_checks(example_data_set): """ Tests the equality operations. From 901017c8ef7aec57e472e143507a64a226615058 Mon Sep 17 00:00:00 2001 From: Malcolm White Date: Mon, 21 May 2018 17:40:01 -0700 Subject: [PATCH 11/13] Add argument to configure dataset chunking --- pyasdf/asdf_data_set.py | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/pyasdf/asdf_data_set.py b/pyasdf/asdf_data_set.py index 79cfde4..c77f031 100644 --- a/pyasdf/asdf_data_set.py +++ b/pyasdf/asdf_data_set.py @@ -86,6 +86,7 @@ class ASDFDataSet(object): def __init__(self, filename, compression="gzip-3", shuffle=True, debug=False, mpi=None, mode="a", single_item_read_limit_in_mb=1024.0, + chunk_size=None, format_version=None): """ :type filename: str @@ -133,6 +134,8 @@ def __init__(self, filename, compression="gzip-3", shuffle=True, the interactive command line when just exploring an ASDF data set. There are other ways to still access data and even this setting can be overwritten. + :param chunk_size: Dataset chunk size in seconds. + :type chunk_size: tuple, None :type format_version: str :type format_version: The version of ASDF to use. If not given, it will use the most recent version (currently 1.0.1) if the @@ -143,6 +146,10 @@ def __init__(self, filename, compression="gzip-3", shuffle=True, "ASDF version '%s' is not supported. Supported versions: %s" % (format_version, ", ".join(SUPPORTED_FORMAT_VERSIONS))) + # Dataset chunk size in seconds. Set to True for auto-chunking; + # None for no chunking. + self.chunk_size = chunk_size + self.__force_mpi = mpi self.debug = debug @@ -1059,7 +1066,8 @@ def _get_dataset_within_tolerance(station_group, trace): # If this did not work - append. self.add_waveforms(waveform=trace, tag=tag) - def add_waveforms(self, waveform, tag, event_id=None, origin_id=None, + def add_waveforms(self, waveform, tag, chunk_size=None, + event_id=None, origin_id=None, magnitude_id=None, focal_mechanism_id=None, provenance_id=None, labels=None): """ @@ -1073,6 +1081,10 @@ def add_waveforms(self, waveform, tag, event_id=None, origin_id=None, mandatory for all traces and facilitates identification of the data within one ASDF volume. The ``"raw_record"`` path is, by convention, reserved to raw, recorded, unprocessed data. + :param chunk_size: Dataset chunk size in seconds. This + overrides the default class value specified at object + instatiation. + :type chunk_size: tuple, bool :type tag: str :param event_id: The event or id which the waveform is associated with. This is useful for recorded data if a clear association is @@ -1170,6 +1182,10 @@ def add_waveforms(self, waveform, tag, event_id=None, origin_id=None, tag = self.__parse_and_validate_tag(tag) waveform = self.__parse_waveform_input_and_validate(waveform) + chunk_size = chunk_size if chunk_size is not None\ + else self.chunk_size if self.chunk_size is not None \ + else None + # Actually add the data. for trace in waveform: if isinstance(trace.data, np.ma.masked_array): @@ -1177,8 +1193,8 @@ def add_waveforms(self, waveform, tag, event_id=None, origin_id=None, # Complicated multi-step process but it enables one to use # parallel I/O with the same functions. info = self._add_trace_get_collective_information( - trace, tag, event_id=event_id, origin_id=origin_id, - magnitude_id=magnitude_id, + trace, tag, chunk_size=chunk_size, event_id=event_id, + origin_id=origin_id, magnitude_id=magnitude_id, focal_mechanism_id=focal_mechanism_id, provenance_id=provenance_id, labels=labels) if info is None: @@ -1347,7 +1363,7 @@ def __get_waveform_ds_name(self, net, sta, loc, cha, start, end, tag): tag=tag) def _add_trace_get_collective_information( - self, trace, tag, event_id=None, origin_id=None, + self, trace, tag, chunk_size=None, event_id=None, origin_id=None, magnitude_id=None, focal_mechanism_id=None, provenance_id=None, labels=None): """ @@ -1371,6 +1387,11 @@ def _add_trace_get_collective_information( loc=trace.stats.location, cha=trace.stats.channel, start=trace.stats.starttime, end=trace.stats.endtime, tag=tag) + if chunk_size is None or chunk_size is True: + chunks = chunk_size + else: + chunks = (int(round(chunk_size * trace.stats.sampling_rate, 0)),) + group_name = "%s/%s" % (station_name, data_name) if group_name in self._waveform_group: msg = "Data '%s' already exists in file. Will not be added!" % \ @@ -1401,7 +1422,8 @@ def _add_trace_get_collective_information( "compression_opts": self.__compression[1], "shuffle": self.__shuffle, "fletcher32": fletcher32, - "maxshape": (None,) + "maxshape": (None,), + "chunks": chunks }, "dataset_attrs": { # Starttime is the epoch time in nanoseconds. From 9ffde25c697c7a817d4a66f2cb838baeda3cfe8a Mon Sep 17 00:00:00 2001 From: Malcolm White Date: Mon, 21 May 2018 17:51:16 -0700 Subject: [PATCH 12/13] Nest References group under AuxiliaryData --- pyasdf/asdf_data_set.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pyasdf/asdf_data_set.py b/pyasdf/asdf_data_set.py index 61a6356..ff9e2ab 100644 --- a/pyasdf/asdf_data_set.py +++ b/pyasdf/asdf_data_set.py @@ -246,8 +246,8 @@ def __init__(self, filename, compression="gzip-3", shuffle=True, self.__file.create_group("Provenance") if "AuxiliaryData" not in self.__file and mode != "r": self.__file.create_group("AuxiliaryData") - if "References" not in self.__file and mode != "r": - self.__file.create_group("References") + if "References" not in self.__file["AuxiliaryData"] and mode != "r": + self.__file.create_group("AuxiliaryData/References") # Easy access to the waveforms. self.waveforms = StationAccessor(self) @@ -366,7 +366,7 @@ def _auxiliary_data_group(self): @property def _reference_group(self): - return self.__file["References"] + return self.__file["AuxiliaryData/References"] @property def asdf_format_version_in_file(self): @@ -1234,7 +1234,7 @@ def __parse_waveform_input_and_validate(self, waveform): def create_reference(self, ref, starttime, endtime, net=None, sta=None, loc=None, chan=None, tag=None, overwrite=False): """ - Creates a region reference for fast lookup of data segments. + Creates a reference for fast lookup of data segments. :param ref: The reference label to apply. :type ref: str @@ -1348,8 +1348,6 @@ def _coerce(obj): chan = _coerce(chan) tag = _coerce(tag) - #_ref_dtype = h5py.special_dtype(ref=h5py.RegionReference) - def _predicate_net(_key): return(net is None or _key.split(".")[0] in net) @@ -1390,7 +1388,6 @@ def _predicate_locchantag(_key): _offset = int((starttime-_ts)*_samprate) _nsamp = int(round((endtime-starttime)*_samprate, 0)) - #_ref = _ds.regionref[_offset:_offset+_nsamp+1] if ref not in self._reference_group: _ref_grp = self._reference_group.create_group(ref) From 945c4d02063c46627d59110092e9e517f50fc086 Mon Sep 17 00:00:00 2001 From: Malcolm White Date: Mon, 21 May 2018 22:02:00 -0700 Subject: [PATCH 13/13] Refactor waveform extraction --- pyasdf/asdf_data_set.py | 41 +++++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/pyasdf/asdf_data_set.py b/pyasdf/asdf_data_set.py index a4fae13..394f823 100644 --- a/pyasdf/asdf_data_set.py +++ b/pyasdf/asdf_data_set.py @@ -818,6 +818,12 @@ def _get_waveform(self, waveform_name, starttime=None, endtime=None): self.single_item_read_limit_in_mb)) raise ASDFValueError(msg) + tr = self.__extract_waveform(waveform_name, idx_start, idx_end) + + return(tr) + + def __extract_waveform(self, waveform_name, idx_start, idx_end): + network, station, location, channel = waveform_name.split(".")[:4] channel = channel[:channel.find("__")] data = self.__file["Waveforms"]["%s.%s" % (network, station)][ @@ -830,7 +836,7 @@ def _get_waveform(self, waveform_name, starttime=None, endtime=None): _data = data[idx_start: idx_end] tr = obspy.Trace(data=_data) - tr.stats.starttime = data_starttime + tr.stats.starttime = obspy.UTCDateTime(data.attrs["starttime"]*1.E-9) tr.stats.sampling_rate = data.attrs["sampling_rate"] tr.stats.network = network tr.stats.station = station @@ -860,7 +866,7 @@ def _get_waveform(self, waveform_name, starttime=None, endtime=None): # Add the tag to the stats dictionary. details.tag = wf_name2tag(waveform_name) - return tr + return(tr) def _get_auxiliary_data(self, data_type, tag): group = self._auxiliary_data_group[data_type][tag] @@ -1407,11 +1413,11 @@ def _predicate_locchantag(_key): _wf_grp = self._waveform_group for _station_name in filter(_predicate_netsta, self._waveform_group.keys()): - for _key in filter(_predicate_locchantag, + for waveform_name in filter(_predicate_locchantag, _wf_grp[_station_name].keys()): - _net, _sta, _loc, _chan = _key.split("__")[0].split(".") + _net, _sta, _loc, _chan = waveform_name.split("__")[0].split(".") - _ds = self._waveform_group["%s/%s" % (_station_name, _key)] + _ds = self._waveform_group["%s/%s" % (_station_name, waveform_name)] _ts = obspy.UTCDateTime(_ds.attrs["starttime"]*1e-9) _samprate = _ds.attrs["sampling_rate"] @@ -1421,6 +1427,8 @@ def _predicate_locchantag(_key): _offset = int((starttime-_ts)*_samprate) _nsamp = int(round((endtime-starttime)*_samprate, 0)) + idx_start = _offset if _offset >= 0 else 0 + idx_end = _offset + _nsamp if ref not in self._reference_group: _ref_grp = self._reference_group.create_group(ref) @@ -1440,11 +1448,11 @@ def _predicate_locchantag(_key): _ref = _ref_grp.create_dataset(_handle, (2,), dtype=np.int64) - _ref.attrs["reference_path"] = _ds.name + _ref.attrs["waveform_name"] = waveform_name _ref.attrs["sampling_rate"] = _ds.attrs["sampling_rate"] _ts = _ds.attrs["starttime"] + int(_offset/_samprate*1.e9) _ref.attrs["starttime"] = _ts - _ref[:] = [_offset, _offset+_nsamp] + _ref[:] = [idx_start, idx_end] else: print("Will not overwrite existing reference") continue @@ -1561,19 +1569,12 @@ def _predicate_chan(_key): for _chan in filter(_predicate_chan, _loc_grp.keys()): _ref = _loc_grp[_chan] - _tr = obspy.Trace( - data=self.__file[ - _ref.attrs["reference_path"] - ][_ref[0]: _ref[1]] - ) - _tr.stats.network = _net if _net != "__" else "" - _tr.stats.station = _sta if _sta != "__" else "" - _tr.stats.location = _loc if _loc != "__" else "" - _tr.stats.channel = _chan if _chan != "__" else "" - _tr.stats.starttime = obspy.UTCDateTime( - _ref.attrs["starttime"]*1e-9 - ) - _tr.stats.delta = 1/_ref.attrs["sampling_rate"] + + waveform_name = _ref.attrs["waveform_name"] + idx_start, idx_end = _ref[:] + _tr = self.__extract_waveform(waveform_name, + idx_start, + idx_end) _st.append(_tr) return(_st)