Skip to content

Commit

Permalink
Updated fraction() to simplify it (only need to run on present sample…
Browse files Browse the repository at this point in the history
… fractions), implemented getting it from tada, default to running it on nitrogen that way.
  • Loading branch information
jbousquin committed Feb 5, 2024
1 parent 1882878 commit 48fa750
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 78 deletions.
15 changes: 10 additions & 5 deletions harmonize_wq/harmonize.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,12 +648,17 @@ def harmonize(df_in, char_val, units_out=None, errors='raise',
# Note: just phosphorus right now
# Total is TP (digested) from the whole water sample (vs total dissolved)
# Dissolved is TDP (total) filtered water digested (vs undigested DIP)
if out_col == 'Phosphorus':
frac_dict = {'TP_Phosphorus': ['Total'],
'TDP_Phosphorus': ['Dissolved'],
'Other_Phosphorus': [''],}
# Make columns for Sample Fractions, loudly if unexpected (not in dict)
if out_col in ['Phosphorus', 'Nitrogen']:
# NOTE: only top level fractions, while TADA has lower for:
#'Chlorophyll a', 'Turbidity', 'Fecal Coliform', 'Escherichia coli'
if out_col=='Phosphorus':
frac_dict = {'TP_Phosphorus': ['Total'],
'TDP_Phosphorus': ['Dissolved'],
'Other_Phosphorus': [''],}
else:
frac_dict = 'TADA'
frac_dict = wqp.fraction(frac_dict) # Run sample fraction on WQP


df_out = wqp.df

Expand Down
12 changes: 8 additions & 4 deletions harmonize_wq/tests/test_harmonize_WQP.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ def test_harmonize_nitrogen():
actual = harmonize.harmonize(NARROW_RESULTS3, 'Nitrogen')
# Test that the dataframe has expected type, size, cols, and rows
assert isinstance(actual, pandas.core.frame.DataFrame) # Test type
assert actual.size == 16482 # Test size
assert actual.size == 16728 # Test size
assert 'Nitrogen' in actual.columns # Check for column
assert len(actual['Nitrogen'].dropna()) == 182 # Number of results
# Confirm orginal data was not altered
Expand Down Expand Up @@ -708,7 +708,11 @@ def test_harmonize_nitrogen():
# TODO: add test case where 'g/kg'
# TODO: add test case where 'cm3/g @STP'
# TODO: add test case where 'cm3/g STP'


# check sample fraction, everything went to total mixed forms
assert len(actual['Nitrogen'].dropna()) == 182, "Fraction issue"
fract_col = 'TOTAL NITROGEN_ MIXED FORMS'
assert len(actual[fract_col].dropna()) == 182, "Fraction issue"

#@pytest.mark.skip(reason="no change")
def test_harmonize_conductivity():
Expand Down Expand Up @@ -1220,8 +1224,8 @@ def test_split_table(harmonized_tables):
'DetectionQuantitationLimitMeasure/MeasureValue',
'DetectionQuantitationLimitMeasure/MeasureUnitCode',
'ProviderName', 'QA_flag', 'Nitrogen', 'Speciation',
'Conductivity', 'Activity_datetime',
'Depth']
'TOTAL NITROGEN_ MIXED FORMS', 'Conductivity',
'Activity_datetime', 'Depth']
assert list(actual_main.columns) == expected
expected = ['ActivityStartDate', 'ActivityStartTime/Time',
'ActivityStartTime/TimeZoneCode',
Expand Down
142 changes: 73 additions & 69 deletions harmonize_wq/wq_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ def replace_unit_by_dict(self, val_dict, mask=None):
for item in val_dict.items():
self._replace_in_col(col, item[0], item[1], mask)

def fraction(self, frac_dict=None, suffix=None,
def fraction(self, frac_dict=None, catch_all=None, suffix=None,
fract_col='ResultSampleFractionText'):
"""Create columns for sample fractions using frac_dict to set names.
Expand All @@ -801,6 +801,8 @@ def fraction(self, frac_dict=None, suffix=None,
frac_dict : dict, optional
Dictionary where {fraction_name : new_col}.
The default None starts with an empty dictionary.
catch_all : str, optional
Name for new field to map sample fractions not mapped by frac_dict
suffix : str, optional
String to add to the end of any new column name.
The default None, uses out_col attribute.
Expand All @@ -817,83 +819,85 @@ def fraction(self, frac_dict=None, suffix=None,
--------
Not fully implemented with TADA table yet.
"""
# Check for sample fraction column
harmonize.df_checks(self.df, [fract_col])

c_mask = self.c_mask

fracs = list(set(self.df[c_mask][fract_col])) # List of fracs in data

if ' ' in fracs:
#TODO: new col instead of overwrite
# Replace bad sample fraction w/ nan
self.df = self._replace_in_col(fract_col, ' ', nan, c_mask)
fracs.remove(' ')

df_out = self.df # Set var for easier referencing
char = list(set(df_out[self.c_mask]['CharacteristicName']))[0]


# Deal with lack of args
if suffix is None:
suffix = self.out_col
if catch_all is None:
catch_all = f'Other_{suffix}'

catch_all = f'Other_{suffix}'
# Set up dict for what sample fraction to what col
if frac_dict is None:
frac_dict = {catch_all: ''}
else:
if catch_all not in frac_dict.keys():
frac_dict[catch_all] = ['']
frac_dict = {}
elif frac_dict=='TADA':
# Get dictionary for updates from TADA (note keys are all caps)
tada = domains.harmonize_TADA_dict()[char.upper()]
frac_dict = {}
for key in tada:
# Add keys another level down
frac_dict[key] = list(tada[key])
# Add their values
frac_dict[key] += [x for v in tada[key].values() for x in v]
#else: dict was already provided
if catch_all not in frac_dict.keys():
frac_dict[catch_all] = ['', nan]
# Make sure catch_all exists
if not isinstance(frac_dict[catch_all], list):
frac_dict[catch_all] = [frac_dict[catch_all]]

# Get all domain values
#accepted_fracs = list(domains.get_domain_dict('ResultSampleFraction').keys())
for key in domains.get_domain_dict('ResultSampleFraction').keys():
# Check against expected fractions and add others to catch_all
if key not in [x for v in frac_dict.values() for x in v]:
frac_dict[catch_all] += [key]
# Flatten for some uses
samp_fract_set = sorted({x for v in frac_dict.values() for x in v})

# Check for sample fraction column
harmonize.df_checks(self.df, [fract_col])
# Replace bad sample fraction w/ nan
self.df = self._replace_in_col(fract_col, ' ', nan, c_mask)

df_out = self.df # Set var for easier referencing

# Clean up sample fraction column based on charName
# Get char
c_dict = domains.out_col_lookup()
char = list(c_dict.keys())[list(c_dict.values()).index(self.out_col)]
# Get dictionary for updates from TADA
harmonize_dict = domains.harmonize_TADA_dict()
# TADA keys are all caps
harmonize_fract = harmonize_dict[char.upper()]
# Loop through dictionary making updates to sample fraction
for fract_set in harmonize_fract.values():
for row in fract_set.items():
fract_mask = df_out[c_mask][fract_col].isin(row[1]) # Mask by values
df_out[c_mask][fract_mask][fract_col] = row[0] # Update to key
# Compare df_out againt self.df to add QA flag if changed
cond_change = ~(df_out[fract_col] == self.df[fract_col])
cond_na = df_out[fract_col].notna()
df_out[cond_change & cond_na]
# TODO: LEFT OFF ABOVE IS STILL EMPTY

self.df = df_out

# Make column for any unexpected Sample Fraction values, loudly
for s_f in set(df_out[c_mask][fract_col].dropna()):
if s_f not in samp_fract_set:
char = f"{s_f.replace(' ', '_')}_{suffix}"
frac_dict[char] = s_f
warn(f'Warning: "{char}" column for {s_f}, may be error')
# TODO: add QA_flag
# Test we didn't skip any SampleFraction
samp_fract_set = sorted({x for v in frac_dict.values() for x in v})
for s_f in set(df_out[c_mask][fract_col].dropna()):
assert s_f in samp_fract_set, f'{s_f} check in {fract_col}'
# Create out columns for each sample fraction
# First cut to make the keys work as column names
for key in frac_dict:
frac_dict[key.replace(',', '_')] = frac_dict.pop(key)
for key in frac_dict:
if key == self.out_col:
#TODO: prevent it from over-writing any col
# If it is the same col name as the out_col add '_1'
frac_dict[key+'_1'] = frac_dict.pop(key)

# Compare sample fractions against expected
init_fracs = [x for v in frac_dict.values() for x in v]
not_init = [frac for frac in fracs if frac not in init_fracs]
if len(not_init)>0:
# TODO: when to add QA_flag?
smp = f'{char} sample fractions not in frac_dict'
solution = f'expected domains, mapped to "{catch_all}"'
print(f'{len(not_init)} {smp}')
# Compare against domains
all_fracs = list(domains.get_domain_dict('ResultSampleFraction'))
add_fracs = [frac for frac in not_init if frac in all_fracs]
# Add new fractions to frac_dict mapped to catch_all
if len(add_fracs)>0:
print(f'{len(add_fracs)} {smp} found in {solution}')
frac_dict[catch_all] += add_fracs
bad_fracs = [frac for frac in not_init if frac not in all_fracs]
if len(bad_fracs)>0:
warn(f'{len(bad_fracs)} {smp} or {solution}')
frac_dict[catch_all] += bad_fracs

# Loop through dictionary making updates based on sample fraction
for frac in frac_dict.items():
col = frac[0] # New column name
for smp_frac in frac[1]:
if smp_frac in set(df_out.loc[c_mask, fract_col].dropna()):
# New subset mask for sample frac
f_mask = c_mask & (df_out[fract_col]==smp_frac)
# Copy measure to new col (new col name from char_list)
df_out.loc[f_mask, col] = df_out.loc[f_mask, suffix]
elif smp_frac == '':
# Values where sample fraction missing go to catch all
if df_out.loc[c_mask, fract_col].isnull().values.any():
# New subset mask
f_mask = c_mask & (df_out[fract_col].isnull())
# Copy measure to new col
df_out.loc[f_mask, col] = df_out.loc[f_mask, suffix]
frac_mask = df_out[fract_col].isin(frac[1]) & c_mask
# Make sure they exist in the data
if any(frac_mask):
# add col and copy results over
df_out.loc[frac_mask, frac[0]] = df_out.loc[frac_mask, self.out_col]

self.df = df_out

return frac_dict
Expand Down

0 comments on commit 48fa750

Please sign in to comment.