From a8bc113a98115e5077d312ce6df2e047c1c0b6dc Mon Sep 17 00:00:00 2001 From: praneethratna Date: Tue, 3 Oct 2023 02:35:53 +0530 Subject: [PATCH] refactored ek80 xml parser --- echopype/convert/parse_base.py | 2 +- echopype/convert/set_groups_ek80.py | 21 +- echopype/convert/utils/ek_raw_parsers.py | 383 +++++++------------- echopype/tests/convert/test_convert_ek80.py | 6 +- 4 files changed, 140 insertions(+), 272 deletions(-) diff --git a/echopype/convert/parse_base.py b/echopype/convert/parse_base.py index 3f4d960e4e..61a28b4318 100644 --- a/echopype/convert/parse_base.py +++ b/echopype/convert/parse_base.py @@ -233,7 +233,7 @@ def _read_datagrams(self, fid): 'water_level_draft': 0.0, 'water_level_draft_is_manual': 0, 'transducer_name': 'Unknown', - 'transducer_sound_speed': 1490.0}, + 'sound_speed': 1490.0}, 'xml': '\r\n\r\n'} """ # noqa num_datagrams_parsed = 0 diff --git a/echopype/convert/set_groups_ek80.py b/echopype/convert/set_groups_ek80.py index 77b06cdf32..22771b921a 100644 --- a/echopype/convert/set_groups_ek80.py +++ b/echopype/convert/set_groups_ek80.py @@ -150,7 +150,7 @@ def set_env(self) -> xr.Dataset: }, ) - varnames = ["sound_velocity_source", "transducer_name", "transducer_sound_speed"] + varnames = ["sound_velocity_source", "transducer_name", "sound_speed"] for vn in varnames: if vn in self.parser_obj.environment: dict_env[vn] = ( @@ -193,12 +193,12 @@ def set_env(self) -> xr.Dataset: def set_sonar(self, beam_group_type: list = ["power", None]) -> xr.Dataset: # Collect unique variables params = [ - "transducer_frequency", + "frequency", "serial_number", "transducer_name", "transducer_serial_number", "application_name", - "application_version", + "version", "channel_id_short", ] var = defaultdict(list) @@ -231,7 +231,7 @@ def set_sonar(self, beam_group_type: list = ["power", None]) -> xr.Dataset: sonar_vars = { "frequency_nominal": ( ["channel"], - var["transducer_frequency"], + var["frequency"], { "units": "Hz", "long_name": "Transducer frequency", @@ -282,7 +282,7 @@ def set_sonar(self, beam_group_type: list = ["power", None]) -> xr.Dataset: # will not try to populate sonar_serial_number from the raw datagrams "sonar_serial_number": "", "sonar_software_name": var["application_name"][0], - "sonar_software_version": var["application_version"][0], + "sonar_software_version": var["version"][0], "sonar_type": "echosounder", } ds = ds.assign_attrs(sonar_attr_dict) @@ -294,7 +294,7 @@ def set_platform(self) -> xr.Dataset: freq = np.array( [ - self.parser_obj.config_datagram["configuration"][ch]["transducer_frequency"] + self.parser_obj.config_datagram["configuration"][ch]["frequency"] for ch in self.sorted_channel["power_complex"] ] ) @@ -455,7 +455,7 @@ def _assemble_ds_ping_invariant(self, params, data_type): freq = np.array( [ - self.parser_obj.config_datagram["configuration"][ch]["transducer_frequency"] + self.parser_obj.config_datagram["configuration"][ch]["frequency"] for ch in self.sorted_channel[data_type] ] ) @@ -678,7 +678,7 @@ def _add_freq_start_end_ds(self, ds_tmp: xr.Dataset, ch: str) -> xr.Dataset: freq_start = np.array(self.parser_obj.ping_data_dict["frequency_start"][ch]) freq_stop = np.array(self.parser_obj.ping_data_dict["frequency_end"][ch]) elif not self.sorted_channel["power"]: - freq = self.parser_obj.config_datagram["configuration"][ch]["transducer_frequency"] + freq = self.parser_obj.config_datagram["configuration"][ch]["frequency"] freq_start = np.full(len(self.parser_obj.ping_time[ch]), freq) freq_stop = freq_start else: @@ -1239,7 +1239,7 @@ def set_vendor(self) -> xr.Dataset: # - receiver sampling frequency # - transceiver type table_params = [ - "transducer_frequency", + "frequency", "impedance", # transceiver impedance (z_er), different from transducer impedance (z_et) "rx_sample_frequency", # receiver sampling frequency "transceiver_type", @@ -1272,7 +1272,7 @@ def set_vendor(self) -> xr.Dataset: { "frequency_nominal": ( ["channel"], - param_dict["transducer_frequency"], + param_dict["frequency"], { "units": "Hz", "long_name": "Transducer frequency", @@ -1361,6 +1361,7 @@ def set_vendor(self) -> xr.Dataset: for p in cal_params: if p in config[ch_id]["calibration"]: # only for parameters that exist in dict param_dict[p] = (["cal_frequency"], config[ch_id]["calibration"][p]) + print(config[ch_id]["calibration"]["frequency"]) ds_ch = xr.Dataset( data_vars=param_dict, coords={ diff --git a/echopype/convert/utils/ek_raw_parsers.py b/echopype/convert/utils/ek_raw_parsers.py index 8c349a0f2a..0a1d2247b9 100644 --- a/echopype/convert/utils/ek_raw_parsers.py +++ b/echopype/convert/utils/ek_raw_parsers.py @@ -619,80 +619,6 @@ class SimradXMLParser(_SimradDatagramParser): # of float values parsed from a string that uses ';' to separate values. Empty strings # for fieldname and/or parse char result in the default action for those parsing steps. - channel_parsing_options = { - "MaxTxPowerTransceiver": [int, "", ""], - "PulseDuration": [float, "", ";"], - "PulseDurationFM": [float, "pulse_duration_fm", ";"], - "SampleInterval": [float, "", ";"], - "ChannelID": [str, "channel_id", ""], - "HWChannelConfiguration": [str, "hw_channel_configuration", ""], - } - - transceiver_parsing_options = { - "TransceiverNumber": [int, "", ""], - "Version": [str, "transceiver_version", ""], - "IPAddress": [str, "ip_address", ""], - "Impedance": [int, "", ""], - } - - transducer_parsing_options = { - "SerialNumber": [str, "transducer_serial_number", ""], - "Frequency": [float, "transducer_frequency", ""], - "FrequencyMinimum": [float, "transducer_frequency_minimum", ""], - "FrequencyMaximum": [float, "transducer_frequency_maximum", ""], - "BeamType": [int, "transducer_beam_type", ""], - "Gain": [float, "", ";"], - "SaCorrection": [float, "", ";"], - "MaxTxPowerTransducer": [float, "", ""], - "EquivalentBeamAngle": [float, "", ""], - "BeamWidthAlongship": [float, "", ""], - "BeamWidthAthwartship": [float, "", ""], - "AngleSensitivityAlongship": [float, "", ""], - "AngleSensitivityAthwartship": [float, "", ""], - "AngleOffsetAlongship": [float, "", ""], - "AngleOffsetAthwartship": [float, "", ""], - "DirectivityDropAt2XBeamWidth": [ - float, - "directivity_drop_at_2x_beam_width", - "", - ], - "TransducerOffsetX": [float, "", ""], - "TransducerOffsetY": [float, "", ""], - "TransducerOffsetZ": [float, "", ""], - "TransducerAlphaX": [float, "", ""], - "TransducerAlphaY": [float, "", ""], - "TransducerAlphaZ": [float, "", ""], - } - - header_parsing_options = {"Version": [str, "application_version", ""]} - - envxdcr_parsing_options = {"SoundSpeed": [float, "transducer_sound_speed", ""]} - - environment_parsing_options = { - "Depth": [float, "", ""], - "Acidity": [float, "", ""], - "Salinity": [float, "", ""], - "SoundSpeed": [float, "", ""], - "Temperature": [float, "", ""], - "Latitude": [float, "", ""], - "SoundVelocityProfile": [float, "", ";"], - "DropKeelOffset": [float, "", ""], - "DropKeelOffsetIsManual": [int, "", ""], - "WaterLevelDraft": [float, "", ""], - "WaterLevelDraftIsManual": [int, "", ""], - } - - parameter_parsing_options = { - "ChannelID": [str, "channel_id", ""], - "ChannelMode": [int, "", ""], - "PulseForm": [int, "", ""], - "Frequency": [float, "", ""], - "PulseDuration": [float, "", ""], - "SampleInterval": [float, "", ""], - "TransmitPower": [float, "", ""], - "Slope": [float, "", ""], - } - def __init__(self): headers = {0: [("type", "4s"), ("low_date", "L"), ("high_date", "L")]} _SimradDatagramParser.__init__(self, "XML", headers) @@ -707,52 +633,6 @@ def _unpack_contents(self, raw_string, bytes_read, version): :returns: None """ - def dict_to_dict(xml_dict, data_dict, parse_opts): - """ - dict_to_dict appends the ETree xml value dicts to a provided dictionary - and along the way converts the key name to conform to the project's - naming convention and optionally parses and or converts values as - specified in the parse_opts dictionary. - """ - - for k in xml_dict: - # check if we're parsing this key/value - if k in parse_opts: - # try to parse the string - if parse_opts[k][2]: - try: - data = xml_dict[k].split(parse_opts[k][2]) - except: - # bad or empty parse character(s) provided - data = xml_dict[k] - else: - # no parse char provided - nothing to parse - data = xml_dict[k] - - # try to convert to specified type - if isinstance(data, list): - for i in range(len(data)): - try: - data[i] = parse_opts[k][0](data[i]) - except: - pass - else: - data = parse_opts[k][0](data) - - # and add the value to the provided dict - if parse_opts[k][1]: - # add using the specified key name - data_dict[parse_opts[k][1]] = data - else: - # add using the default key name wrangling - data_dict[camelcase2snakecase(k)] = data - else: - # nothing to do with the value string - data = xml_dict[k] - - # add the parameter to the provided dictionary - data_dict[camelcase2snakecase(k)] = data - header_values = struct.unpack( self.header_fmt(version), raw_string[: self.header_size(version)] ) @@ -792,161 +672,148 @@ def dict_to_dict(xml_dict, data_dict, parse_opts): # parse it if data["subtype"] == "configuration": # parse the Transceiver section - for tcvr in root.iter("Transceiver"): - # parse the Transceiver section - tcvr_xml = tcvr.attrib - - # parse the Channel section -- this works with multiple channels - # under 1 transceiver - for tcvr_ch in tcvr.iter("Channel"): - tcvr_ch_xml = tcvr_ch.attrib - channel_id = tcvr_ch_xml["ChannelID"] - - # create the configuration dict for this channel - data["configuration"][channel_id] = {} - - # add the transceiver data to the config dict (this is - # replicated for all channels) - dict_to_dict( - tcvr_xml, - data["configuration"][channel_id], - self.transceiver_parsing_options, - ) - - # add the general channel data to the config dict - dict_to_dict( - tcvr_ch_xml, - data["configuration"][channel_id], - self.channel_parsing_options, - ) - - # check if there are >1 transducer under a single transceiver channel - if len(list(tcvr_ch)) > 1: - ValueError("Found >1 transducer under a single transceiver channel!") - else: # should only have 1 transducer - tcvr_ch_xducer = tcvr_ch.find( - "Transducer" - ) # get Element of this xducer - f_par = tcvr_ch_xducer.findall("FrequencyPar") - # Save calibration parameters - if f_par: - cal_par = { - "frequency": np.array( - [int(f.attrib["Frequency"]) for f in f_par] - ), - "gain": np.array([float(f.attrib["Gain"]) for f in f_par]), - "impedance": np.array( - [float(f.attrib["Impedance"]) for f in f_par] - ), - "phase": np.array([float(f.attrib["Phase"]) for f in f_par]), - "beamwidth_alongship": np.array( - [float(f.attrib["BeamWidthAlongship"]) for f in f_par] - ), - "beamwidth_athwartship": np.array( - [float(f.attrib["BeamWidthAthwartship"]) for f in f_par] - ), - "angle_offset_alongship": np.array( - [float(f.attrib["AngleOffsetAlongship"]) for f in f_par] - ), - "angle_offset_athwartship": np.array( - [float(f.attrib["AngleOffsetAthwartship"]) for f in f_par] - ), - } - data["configuration"][channel_id]["calibration"] = cal_par - # add the transducer data to the config dict - dict_to_dict( - tcvr_ch_xducer.attrib, - data["configuration"][channel_id], - self.transducer_parsing_options, - ) - - # get unique transceiver channel number stored in channel_id - tcvr_ch_num = TCVR_CH_NUM_MATCHER.search(channel_id)[0] - - # parse the Transducers section from the root - # TODO Remove Transducers if doesn't exist - xducer = root.find("Transducers") - if xducer is not None: - # built occurrence lookup table for transducer name - xducer_name_list = [] - for xducer_ch in xducer.iter("Transducer"): - xducer_name_list.append(xducer_ch.attrib["TransducerName"]) - - # find matching transducer for this channel_id - match_found = False - for xducer_ch in xducer.iter("Transducer"): - if not match_found: - xducer_ch_xml = xducer_ch.attrib - match_name = ( - xducer_ch.attrib["TransducerName"] - == tcvr_ch_xducer.attrib["TransducerName"] - ) - if xducer_ch.attrib["TransducerSerialNumber"] == "": - match_sn = False - else: - match_sn = ( - xducer_ch.attrib["TransducerSerialNumber"] - == tcvr_ch_xducer.attrib["SerialNumber"] - ) - match_tcvr = ( - tcvr_ch_num in xducer_ch.attrib["TransducerCustomName"] - ) - - # if find match add the transducer mounting details - if ( - Counter(xducer_name_list)[ + for child in root.iter(): + if child.tag == "Header": + header_dict = child + if child.tag == "Transceiver": + for channel in child.iter("Channel"): + channel_id = channel.attrib["ChannelID"] + data["configuration"][channel_id] = {} + self.append_data(header_dict, data["configuration"][channel_id]) + self.append_data(child, data["configuration"][channel_id]) + self.append_data(channel, data["configuration"][channel_id]) + for key in data["configuration"][channel_id].keys(): + if ( + key == "pulse_duration" + or key == "pulse_duration_f_m" + or key == "sample_interval" + ): + str_data = data["configuration"][channel_id][key].split(";") + for i in range(len(str_data)): + try: + str_data[i] = float(str_data[i]) + except: + pass + data["configuration"][channel_id][key] = str_data + + if len(list(channel)) > 1: + ValueError( + "Found >1 transducer under a single transceiver channel!" + ) + else: + transducer = channel.find("Transducer") + self.append_data(transducer, data["configuration"][channel_id]) + for key in data["configuration"][channel_id].keys(): + if key == "gain" or key == "sa_correction": + str_data = data["configuration"][channel_id][key].split(";") + for i in range(len(str_data)): + try: + str_data[i] = float(str_data[i]) + except: + pass + data["configuration"][channel_id][key] = str_data + cal_pars = {} + freqpar = transducer.findall("FrequencyPar") + if freqpar: + for freq in freqpar: + self.append_data(freq, cal_pars) + data["configuration"][channel_id]["calibration"] = cal_pars + # get unique transceiver channel number stored in channel_id + tcvr_ch_num = TCVR_CH_NUM_MATCHER.search(channel_id)[0] + + # parse the Transducers section from the root + # TODO Remove Transducers if doesn't exist + xducer = root.find("Transducers") + if xducer is not None: + # built occurrence lookup table for transducer name + xducer_name_list = [] + for xducer_ch in xducer.iter("Transducer"): + xducer_name_list.append(xducer_ch.attrib["TransducerName"]) + + # find matching transducer for this channel_id + match_found = False + for xducer_ch in xducer.iter("Transducer"): + if not match_found: + match_name = ( xducer_ch.attrib["TransducerName"] - ] - > 1 - ): - # if more than one transducer has the same name - # only check sn and transceiver unique number - match_found = match_sn or match_tcvr - else: - match_found = match_name or match_sn or match_tcvr - - # add transducer mounting details - if match_found: - dict_to_dict( - xducer_ch_xml, - data["configuration"][channel_id], - self.transducer_parsing_options, + == transducer.attrib["TransducerName"] + ) + if xducer_ch.attrib["TransducerSerialNumber"] == "": + match_sn = False + else: + match_sn = ( + xducer_ch.attrib["TransducerSerialNumber"] + == transducer.attrib["SerialNumber"] + ) + match_tcvr = ( + tcvr_ch_num in xducer_ch.attrib["TransducerCustomName"] ) - # add the header data to the config dict - h = root.find("Header") - dict_to_dict( - h.attrib, - data["configuration"][channel_id], - self.header_parsing_options, - ) + # if find match add the transducer mounting details + if ( + Counter(xducer_name_list)[ + xducer_ch.attrib["TransducerName"] + ] + > 1 + ): + # if more than one transducer has the same name + # only check sn and transceiver unique number + match_found = match_sn or match_tcvr + else: + match_found = match_name or match_sn or match_tcvr + + # add transducer mounting details + if match_found: + self.append_data( + xducer_ch, data["configuration"][channel_id] + ) elif data["subtype"] == "parameter": # parse the parameter XML datagram for h in root.iter("Channel"): - parm_xml = h.attrib - # add the data to the environment dict - dict_to_dict(parm_xml, data["parameter"], self.parameter_parsing_options) + self.append_data(h, data["parameter"]) elif data["subtype"] == "environment": # parse the environment XML datagram for h in root.iter("Environment"): - env_xml = h.attrib - # add the data to the environment dict - dict_to_dict(env_xml, data["environment"], self.environment_parsing_options) + self.append_data(h, data["environment"]) + for key in data["environment"].keys(): + if key == "sound_velocity_profile": + str_data = data["environment"][key].split(";") + for i in range(len(str_data)): + try: + str_data[i] = float(str_data[i]) + except: + pass + data["environment"][key] = str_data for h in root.iter("Transducer"): - transducer_xml = h.attrib - # add the data to the environment dict - dict_to_dict( - transducer_xml, - data["environment"], - self.envxdcr_parsing_options, - ) + self.append_data(h, data["environment"]) data["xml"] = xml_string return data + def append_data(self, child, data): + if len(child.attrib) > 0: + for key, text in child.items(): + if all(char == "\n" for char in text): + continue + else: + try: + val = int(text) + except ValueError: + try: + val = float(text) + except: + val = str(text) + if key == "ChannelID": + data["channel_id"] = val + if child.tag == "FrequencyPar": + data[camelcase2snakecase(key)] = [] + data[camelcase2snakecase(key)].append(val) + else: + data[camelcase2snakecase(key)] = val + def _pack_contents(self, data, version): def to_CamelCase(xml_param): """ diff --git a/echopype/tests/convert/test_convert_ek80.py b/echopype/tests/convert/test_convert_ek80.py index 14b57b5fbe..6b5a8eb51c 100644 --- a/echopype/tests/convert/test_convert_ek80.py +++ b/echopype/tests/convert/test_convert_ek80.py @@ -46,9 +46,9 @@ def check_env_xml(echodata): assert env_var in echodata["Environment"] assert echodata["Environment"][env_var].dims == ("time1",) assert all([env_var_value in expected_env_var_values for env_var_value in echodata["Environment"][env_var]]) - assert "transducer_sound_speed" in echodata["Environment"] - assert echodata["Environment"]["transducer_sound_speed"].dims == ("time1",) - assert (1480 <= echodata["Environment"]["transducer_sound_speed"]).all() and (echodata["Environment"]["transducer_sound_speed"] <= 1500).all() + assert "sound_speed" in echodata["Environment"] + assert echodata["Environment"]["sound_speed"].dims == ("time1",) + assert (1480 <= echodata["Environment"]["sound_speed"]).all() and (echodata["Environment"]["sound_speed"] <= 1500).all() assert "sound_velocity_profile" in echodata["Environment"] assert echodata["Environment"]["sound_velocity_profile"].dims == ("time1", "sound_velocity_profile_depth") assert (1470 <= echodata["Environment"]["sound_velocity_profile"]).all() and (echodata["Environment"]["sound_velocity_profile"] <= 1500).all()