diff --git a/data_analysis.py b/data_analysis.py index a2a72ee..115fe6d 100644 --- a/data_analysis.py +++ b/data_analysis.py @@ -2,7 +2,7 @@ import json import re import time -from datetime import date, datetime +from datetime import datetime import pandas as pd @@ -56,16 +56,8 @@ def analyze(data, source): "missing": int(dob_col.isna().sum()), } - expected_min_dob = "1997-01-01" - # roughly, 19 years old at the start of CODI observation period (2016) - # expected_max_dob is trickier because we don't know when the data was populated - # TODO: add another command line arg? - if source == "csv": - if "-" in notnull_dobs[0]: - out_of_range = notnull_dobs[notnull_dobs < expected_min_dob] - stats["dob"]["count_earlier_dob_than_expected"] = len(out_of_range) - else: + if "-" not in notnull_dobs.iloc[0]: # the date format will either be YYYY-MM-DD or YYMMDD # we'll assume it's consistent across a single file @@ -81,21 +73,6 @@ def analyze(data, source): ) # str-ify the Timestamps again stats["dob"]["max_parsed"] = str(parsed_dobs.max()) - expected_min_dob = pd.to_datetime(expected_min_dob, format="%Y-%m-%d") - - out_of_range = parsed_dobs[parsed_dobs < expected_min_dob] - stats["dob"]["count_earlier_dob_than_expected"] = len(out_of_range) - - else: - # different DBs may return different data types for the DOB column - if type(notnull_dobs[0]) is date: - expected_min_dob = date.fromisoformat(expected_min_dob) - # else - # assume it's a string in ISO format, no change should be needed - - out_of_range = notnull_dobs[notnull_dobs < expected_min_dob] - stats["dob"]["count_earlier_dob_than_expected"] = len(out_of_range) - sex_col = case_insensitive_lookup(data, "sex", source) stats["sex"] = top_N(sex_col) diff --git a/extract.py b/extract.py index 5c2b480..0d1947e 100644 --- a/extract.py +++ b/extract.py @@ -97,6 +97,8 @@ def clean_zip(household_zip): def clean_dob_fromstr(dob_str, date_format): + if not dob_str: + return "" return strftime("%Y-%m-%d", strptime(dob_str, date_format)) @@ -230,11 +232,17 @@ def handle_row(row, report, version): output_row.append(clean_string(family_name)) dob = case_insensitive_lookup(row, "DOB", version) - output_row.append(dob.strftime("%Y-%m-%d")) + if dob: + output_row.append(dob.strftime("%Y-%m-%d")) + else: + output_row.append("") sex = case_insensitive_lookup(row, "sex", version) validate(report, "sex", sex) - output_row.append(sex.strip()) + if sex: + output_row.append(sex.strip()) + else: + output_row.append("") phone_number = case_insensitive_lookup(row, "phone", version) validate(report, "phone_number", phone_number) diff --git a/utils/data_reader.py b/utils/data_reader.py index 029a490..c523983 100644 --- a/utils/data_reader.py +++ b/utils/data_reader.py @@ -52,7 +52,7 @@ def add_parser_db_args(parser): default=V2, choices=[V1, V2], help="Version of the CODI Data Model schema to use. " - f'Valid options are "{V1}" or "{V2}"', + f'Valid options are "{V1}" or "{V2}". Default is "{V2}"', ) parser.add_argument( @@ -79,8 +79,29 @@ def add_parser_db_args(parser): "--schema_name", dest="v2_schema", default="cdm", - help="Name of the database schema containing the CODI DEMOGRAPHIC" - " and PRIVATE_DEMOGRAPHIC tables", + help="Name of the database schema containing the PRIVATE_DEMOGRAPHIC" + " and PRIVATE_ADDRESS_HISTORY tables in a v2 database. " + "Default is 'cdm'", + ) + parser.add_argument( + "--address_selection", + dest="v2_address_selection", + choices=["full", "preferred", "single"], + default="full", + help="Determines the approach for selecting a single address per PATID" + ' from PRIVATE_ADDRESS_HISTORY. Options: Use "single" if ' + "the data is already guaranteed to only contain one address per PATID." + ' Use "preferred" if the database is guaranteed to only contain one ' + "address with address_preferred='Y' per PATID. " + 'Use "full" if the database may contain multiple preferred addresses' + " for different dates/types/use. This option will select " + "the most recent preferred address by start date." + "Default if not specified is 'full'", + ) + parser.add_argument( + "--debug_query", + action="store_true", + help="Aids in debugging by printing out the actual DB query being run", ) @@ -171,6 +192,10 @@ def get_query(engine, version, args): ) query = select([identifier]) + + if args.debug_query: + print(query) + return query else: # note there is also the `demographic` table, but @@ -193,36 +218,71 @@ def get_query(engine, version, args): schema=args.v2_schema, ) - addr_period_order = prv_address.columns.address_period_start.desc() - - # Different SQL engines have different semantics for sorting DESC: - # Postgres and Oracle put nulls first, so we want NULLS LAST - # MSSQL puts nulls last, but doesn't support NULLS LAST - # so we use this hack to get NULLS LAST for all main dialects. - # For safety, in case other engines also don't support NULLS LAST, - # only apply it to the ones that we know it works on - # (vs not applying it to the ones we know it doesn't) - - # TODO: test on MySQL - deferring since none of our partners use it now - - # known engine dialect names are "mssql", "postgresql", and "oracle" - if engine.dialect.name in ["postgresql", "oracle"]: - addr_period_order = addr_period_order.nulls_last() - - subquery = ( - select(prv_address.columns.addressid) - .filter(prv_address.columns.patid == prv_demo.columns.patid) - .order_by(prv_address.columns.address_preferred.desc()) - .order_by(addr_period_order) - .limit(1) - .correlate(prv_demo) - .scalar_subquery() - ) + if args.v2_address_selection == "single": + # The user said their data is guaranteed to only have a single + # address per PATID. This simplifies the query to just + # join the tables together with no additional filters + query = select([prv_demo, prv_address]).filter( + prv_demo.columns.patid == prv_address.columns.patid + ) + elif args.v2_address_selection == "preferred": + # The user said their data may have multiple addresses, + # but is guaranteed that only one per PATID will be preferred. + # This simplifies the query to just select ADDRESS_PREFERRED=Y + query = select([prv_demo, prv_address]).filter( + prv_demo.columns.patid == prv_address.columns.patid, + prv_address.columns.address_preferred == "Y", + ) + else: + # The user indicated the data may have multiple preferreds, + # (or at least did not select one of the above options) + # so we select the most recent by date. + # The PCOR schema includes "type" (physical/postal/both/unknown) + # and "use" (home/word/temp/old/unknown) fields, and the hierarchy + # of the possible combination of those options is not well-defined. + # (eg, should we pick physical/work over a both-type/unknown-use?) + # For simplicity and performance we will just pick + # the first preferred address we find, sorting by date. + # Going forward, a better solution is likely to include all of + # an individuals' addresses in PPRL, rather than more complex ways + # of picking a single one. + + addr_period_order = prv_address.columns.address_period_start.desc() + + # Different SQL engines have different semantics for sorting DESC: + # Postgres and Oracle put nulls first, so we want NULLS LAST + # MSSQL puts nulls last, but doesn't support NULLS LAST + # so we use this hack to get NULLS LAST for all main dialects. + # For safety, in case other engines also don't support NULLS LAST, + # only apply it to the ones that we know it works on + # (vs not applying it to the ones we know it doesn't) + + # TODO: test on MySQL - deferring since none of our partners use it now + + # known engine dialect names are "mssql", "postgresql", and "oracle" + if engine.dialect.name in ["postgresql", "oracle"]: + addr_period_order = addr_period_order.nulls_last() + + subquery = ( + select(prv_address.columns.addressid) + .filter( + prv_address.columns.patid == prv_demo.columns.patid, + prv_address.columns.address_preferred == "Y", + ) + .order_by(prv_address.columns.address_preferred.desc()) + .order_by(addr_period_order) + .limit(1) + .correlate(prv_demo) + .scalar_subquery() + ) - query = select([prv_demo, prv_address]).filter( - prv_demo.columns.patid == prv_address.columns.patid, - prv_address.columns.addressid == subquery, - ) + query = select([prv_demo, prv_address]).filter( + prv_demo.columns.patid == prv_address.columns.patid, + prv_address.columns.addressid == subquery, + ) + + if args.debug_query: + print(query) return query