Skip to content

Commit

Permalink
Code for large-scale study of temporal shift in health insurance claims
Browse files Browse the repository at this point in the history
  • Loading branch information
cxji committed May 6, 2023
0 parents commit 70d1f9f
Show file tree
Hide file tree
Showing 103 changed files with 16,154 additions and 0 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Large-Scale Study of Temporal Shift in Health Insurance Claims

To reproduce the experiments in this paper:
1. Modify `config.py` to set the database name, schema, and output directories.
2. The `data_extraction` directory contains a pipeline to extract data for the large-scale study.
3. The `temporal_shift_scan` directory implements our algorithms to test and scan for temporal shift.

The `utils` directory contains supporting functions. To reproduce our conda environment, run `conda create --prefix NEWENV --file conda_env_pkgs.txt`.
237 changes: 237 additions & 0 deletions conda_env_pkgs.txt

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# database parameters
db_name = 'placeholder'
nonstationarity_schema_name = 'placeholder'

# directory parameters
cache_dir = 'placeholder'
omop_data_dir = 'placeholder'
outcome_data_dir = 'placeholder'
experiment_dir = 'placeholder'
logging_dir = 'placeholder'
interaction_dir = 'placeholder'

domain_shift_dir = 'placeholder'
60 changes: 60 additions & 0 deletions data_extraction/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
## Data extraction for large-scale study

We build a pipeline to extract data for a large-scale study following the guidelines in Appendix B.

Pre-requisites:
- postgres database containing `cdm` schema in OMOP CDM v6 format
- pip installed version of `omop-learn` package in the same parent directory as this repo

To clean the lab measurements:
- To identify measurements with value 0 that should be null and replace them with null, first get a preliminary version of the reference ranges by running `python3 replace_zeros_with_null_in_measurement.py --create_table=range_{direction} --version=[int]` to produce `cdm_measurement_aux.measurement_{direction}_references`, a table with the most likely reference range for each measurement concept. Set `{direction}` to `low` and `high`.
- Run `python3 replace_zeros_with_null_in_measurement.py --create_table=nulls_replacing_zero --version=[int]` to produce `cdm_measurement_aux.measurement_with_nulls_replacing_zero`, a replicate of the measurement table with nulls replacing zeros that likely should be null.
- Index the newly created tables by running `\i measurement_with_nulls_replacing_zero_indexes.sql` in postgres.
- To load references from clinical sources, run `python3 load_lab_references.py`
- To create cohort and gender-specific reference ranges and standardize references among the top 100 most frequent lab measurements, run `python3 create_lab_reference_tables.py --direction=low` or `high`. If the cohort and gender-specific reference ranges tables have already been created, add `--standardize_only` to only standardize the measurements.
- To drop non-standard measurements, run `python3 create_standardized_measurement_table.py`

Initial steps to set up automated data extraction:
- To create the prediction date tables, run `python3 create_prediction_date_tables.py`
- To create the race and ethnicity concept tables, run `python3 create_race_and_ethnicity_tables.py`
- To create the general cohort tables, run `python3 extract_data.py --produce_cohort_only=1_year` and `3_years`
- To identify the 100 most frequent outcomes in the cohort for the 3 categories, run `python3 extract_top_condition_outcomes.py`, `python3 extract_top_procedure_outcomes.py`, and `python3 extract_top_lab_outcomes.py`. Copy the condition and lab output files to new files ending in `_abbreviated.txt` instead of `.txt`. In these new files, abbreviate the concept names for plot titles in the condition and lab output files.
- Condition and lab outcomes can be defined by individual concept IDs. To automate condition or lab outcomes, run `python3 write_automated_data_extraction_scripts.py --outcome_name=` with `condition` or `lab`
- To extract procedure groups, run `python3 create_procedure_cohort_count_table.py` and then `python3 extract_procedure_groupings.py` with the following arguments:
- `--include=`: a comma-separated list of phrases where a concept with any of these phrases is included
- `--exclude=`: a comma-separated list of phrases where a concept with any of these phrases is excluded
- `--group_name=`: name of group for files
- `--group_name_readable=`: name of group for plot titles
- To replicate our procedure outcomes, run `./run_procedure_group_extraction.sh` and `python3 write_automated_procedure_data_extraction_scripts.py`

To run automated data extraction:
- To extract data for a single outcome, run `extract_data.py` with the following arguments:
- `--outcome=`: `eol`, `condition`, `procedure`, `lab`, `lab_group`
- `--outcome_id=`: concept ID for condition, procedure, or measurement, comma-separated list allowed for procedures or lab group
- `--direction=`: `low` or `high` for labs or lab groups, whether we are predicting outcomes above or below range
- `--omit_features`: if only extracting outcomes
- `--finalize`: if creating final dataset when features have already been extracted (use in conjunction with omit_features)
- `--outcome_name=`: name for procedure group, no spaces allowed since using for file name
- `--outcome_name_readable=`: name of condition, procedure, or lab outcome for plot title
- `--debug_size`: use to specify limited cohort size for debugging
- `--cohort_plot_only`: use flag to plot the cohort size and outcome frequency after data has been extracted
- `--feature_windows`: specify comma-separated list of window lengths in days for features, default: 30
- `--fold`: specify which data fold to extract, default: 0
- Note: `extract_omop_data.py` contains the methods called in `extract_data.py`
- After a single condition outcome and a single procedure/lab outcome has been run to generate the shared files, run each `./data_extraction_scripts/run_nonstationarity_check_for_{outcome_name}_outcomes_batch{idx}.sh` to extract data for each outcome.

To compute statistics referenced in the paper:
- To get some cohort statistics, run `python3 compute_cohort_statistics.py`
- To plot frequencies of different eGFR labs over time (reproduce Figure 6 in our paper), run `python3 plot_egfr_label_shift.py`
- To examine some non-stationary lab outcomes, run `python3 examine_nonstationary_lab_outcomes.py` with the optional flag `--order_rates` to examine lab order rates instead of outcome frequencies.

The following tables will be created:
- `prediction_dates_1_year` and `prediction_dates_3_years`: prediction dates from 2014 to 2020 for the 1 year table and 2016 to 2020 for the 3 year table
- `race_concepts` and `ethnicity_concepts`: concept IDs and names for race and ethnicity
- `omop_cohort_1_year` and `omop_cohort_3_years`: Patients in each cohort, where 1 and 3 years specify number of years or prior observation required
- `monthly_eligibility_1_year` and `monthly_eligibility_3_years`: Each month a patient is eligible to be included
- `cohort_procedure_outcome_counts`: counts of procedures in the cohort
- `measurement_references_from_sources`, `measurement_unit_specific_references_from_sources`, `measurement_units_to_drop`, and `measurements_out_of_range`: information for standardizing measurements from clinical sources
- `measurement_age_gender_specific_high_references` and `measurement_age_gender_specific_low_references`: references derived from measurements table
- `measurement_age_gender_specific_standardized_high_references` and `measurement_age_gender_specific_standardized_low_references`: references derived and standardized
- outcome tables: outcomes for each patient and month
102 changes: 102 additions & 0 deletions data_extraction/compute_cohort_statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import json
import sys
from datetime import datetime
from os.path import dirname, abspath, join

sys.path.append(dirname(dirname(abspath(__file__))))
import config

sys.path.append(join(dirname(dirname(abspath(__file__))), 'utils'))
from logging_utils import set_up_logger
from h5py_utils import load_data_from_h5py

def compute_number_unique_labs(outcome_specific_data_dir,
logger):
'''
Compute number of lab concepts in covariate set
@param outcome_specific_data_dir: str, path to outcome data directory
@param logger: logger, for INFO messages
@return: None
'''
# load lab feature names
outcome_full_dir = config.outcome_data_dir + outcome_specific_data_dir
if outcome_specific_data_dir.startswith('dataset_condition'):
freq_str = 'freq100'
else:
freq_str = 'freq300'
with open(outcome_full_dir + 'fold0_' + freq_str + '_labs_feature_names.json', 'r') as f:
lab_feature_names = json.load(f)

# compute number of lab types
lab_concept_ids = set()
for lab_name in lab_feature_names:
lab_concept_ids.add(lab_name.split(' - lab -')[0])
logger.info(str(len(lab_concept_ids)) + ' lab types in ' + outcome_specific_data_dir + ' outcome set-up')

def compute_number_features(outcome_specific_data_dir,
logger):
'''
Compute number of covariates
@param outcome_specific_data_dir: str, path to outcome data directory
@param logger: logger, for INFO messages
@return: None
'''
# compute number of features from name lists
outcome_full_dir = config.outcome_data_dir + outcome_specific_data_dir
feature_types = ['conditions', 'procedures', 'labs', 'drugs', 'specialties', 'general', 'age']
total_num_features = 0
if outcome_specific_data_dir.startswith('dataset_condition'):
freq_str = 'freq100'
else:
freq_str = 'freq300'
for feature_type in feature_types:
with open(outcome_full_dir + 'fold0_' + freq_str + '_' + feature_type + '_feature_names.json', 'r') as f:
total_num_features += len(json.load(f))
logger.info(str(total_num_features) + ' features in ' + outcome_specific_data_dir + ' outcome set-up')

def compute_patient_statistics(eligibility_matrix_filename,
logger):
'''
Compute number of patients and number of samples per patient in set-up
@param eligibility_matrix_filename: str, hf5 file containing eligibility matrix
@param logger: logger, for INFO messages
@return: None
'''
# load eligibility matrix
eligibility_matrix = load_data_from_h5py(eligibility_matrix_filename)['monthly_eligibility']

# number of patients
logger.info(str(eligibility_matrix.shape[0]) + ' patients in ' + eligibility_matrix_filename)

# number of samples per patient
logger.info(str(float(eligibility_matrix.sum())/eligibility_matrix.shape[0]) + ' samples per patient in '
+ eligibility_matrix_filename)

if __name__ == '__main__':

logging_filename = config.logging_dir + 'cohort_statistics_' \
+ datetime.now().strftime("%m_%d_%Y_%H_%M_%S") + '.log'
logger = set_up_logger('logger_main',
logging_filename)

top_lab_outcome_name = 'dataset_lab_3009744_low_outcomes/'
top_condition_outcome_name = 'dataset_condition_254761_outcomes/'

compute_number_unique_labs(top_lab_outcome_name,
logger)
compute_number_features(top_lab_outcome_name,
logger)
compute_number_features(top_condition_outcome_name,
logger)

lab_eligibility_matrix_name = config.omop_data_dir + 'eligibility_1_year.hf5'
condition_eligibility_matrix_name = config.omop_data_dir + 'eligibility_3_years.hf5'
condition_reindexed_eligibility_matrix_name = config.outcome_data_dir + top_condition_outcome_name \
+ 'condition_254761_eligibility.hf5'

compute_patient_statistics(lab_eligibility_matrix_name,
logger)
compute_patient_statistics(condition_eligibility_matrix_name,
logger)
compute_patient_statistics(condition_reindexed_eligibility_matrix_name,
logger)
108 changes: 108 additions & 0 deletions data_extraction/create_lab_reference_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import os
import sqlalchemy
import sys
import argparse
from os.path import dirname, abspath, join

sys.path.append(dirname(dirname(abspath(__file__))))
import config

sys.path.append(join(dirname(dirname(abspath(__file__))), 'utils'))
from db_utils import session_scope

def create_lab_reference_table(direction,
min_ref_count = 5,
num_characters = 15,
avg_diff_threshold = 5):
'''
Create table {nonstationarity_schema_name}.measurement_age_gender_specific_{direction}_references
Tables will have the following columns:
1. concept_id
2. concept_name
3. age_range: entries range 1: <= 30, range 2: 31 - 50, range 3: 51 - 70, range 4: > 70
4. gender_source_value: M or F
5. similar_concept_id: if reference was taken from a similar concept
6. similar_concept_name
7. range_{direction}: reference
8. from_opposite_gender: 1 if reference was taken from opposite gender
9. from_different_age_range: 1 if reference was taken from another age range - not present in eol_version
10. from_general_reference: 1 if reference was taken from general population
@param direction: str, low or high, direction of reference
@param min_ref_count: int, minimum number of times reference has to occur to be used
@param num_characters: int, number of initial characters required to match between similar concepts
@param avg_diff_threshold: int, distance between average non-zero values allowed for similar concepts
@return: None
'''
assert direction in {'low', 'high'}

if direction == 'low':
reference_order = 'ASC'
else:
reference_order = 'DESC'

reference_table_sql_filename = 'create_age_gender_specific_reference_table.sql'

with open(join(dirname(abspath(__file__)),
'sql',
reference_table_sql_filename), 'r') as f:
create_table_sql = f.read()
create_table_sql = create_table_sql.format(direction = direction,
reference_order = reference_order,
min_ref_count = min_ref_count,
num_characters = num_characters,
avg_diff_threshold = avg_diff_threshold,
schema_name = config.nonstationarity_schema_name)

engine = sqlalchemy.create_engine('postgresql://' + config.db_name,
echo=False,
connect_args = {"host": '/var/run/postgresql/'})
with session_scope(engine) as session:
session.execute(create_table_sql)
session.commit()

def standardize_lab_reference_table(direction):
'''
Create table {nonstationarity_schema_name}.measurement_age_gender_specific_standardized_{direction}_references
with the same structure as {nonstationarity_schema_name}.measurement_age_gender_specific_{direction}_references
and some references replaced by values from clinical sources.
@param direction: str, low or high, direction of reference
@return: None
'''
assert direction in {'low', 'high'}

reference_table_sql_filename = 'create_standardized_lab_reference_tables.sql'
with open(join(dirname(abspath(__file__)),
'sql',
reference_table_sql_filename), 'r') as f:
create_table_sql = f.read()
create_table_sql = create_table_sql.format(direction = direction,
schema_name = config.nonstationarity_schema_name)

engine = sqlalchemy.create_engine('postgresql://' + config.db_name,
echo=False,
connect_args = {"host": '/var/run/postgresql/'})
with session_scope(engine) as session:
session.execute(create_table_sql)
session.commit()

if __name__ == '__main__':

parser = argparse.ArgumentParser(description='Extract reference tables.')
parser.add_argument('--direction',
action='store',
type=str,
help='Specify whether creating table for low or high reference ranges.')
parser.add_argument('--standardize_only',
action='store_true',
default=False,
help='Only standardize references table. Assumes references table was already created.')

args = parser.parse_args()
assert args.direction in {'low', 'high'}

if not args.standardize_only:
# larger threshold than general reference table since smaller cohort has larger variance
create_lab_reference_table(args.direction,
avg_diff_threshold = 10)

standardize_lab_reference_table(args.direction)
43 changes: 43 additions & 0 deletions data_extraction/create_prediction_date_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import sqlalchemy
import sys
from os.path import dirname, abspath, join

sys.path.append(dirname(dirname(abspath(__file__))))
import config

sys.path.append(join(dirname(dirname(abspath(__file__))), 'utils'))
from db_utils import session_scope

def write_prediction_dates():
'''
Write prediction dates from 2014 to 2020 to text file
@return: None
'''
prediction_dates = []
months = ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']
for year in range(2014, 2021):
for month_idx in range(len(months)):
prediction_dates.append(str(year) + '-' + months[month_idx] + '-01')

with open('prediction_dates.txt', 'w') as f:
f.write('\n'.join(prediction_dates))

def create_prediction_date_tables():
'''
Create tables prediction_dates_1_year and prediction_dates_3_years
@return: None
'''
with open('sql/load_prediction_date_tables.sql', 'r') as f:
sql = f.read()

engine = sqlalchemy.create_engine('postgresql://' + config.db_name,
echo = False,
connect_args = {"host": '/var/run/postgresql/'})
with session_scope(engine) as session:
session.execute(sql.format(schema_name = config.nonstationarity_schema_name))
session.commit()

if __name__ == '__main__':

write_prediction_dates()
create_prediction_date_tables()
21 changes: 21 additions & 0 deletions data_extraction/create_procedure_cohort_count_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import sys
from os.path import dirname, abspath, join

sys.path.append(dirname(dirname(abspath(__file__))))
import config

sys.path.append(join(dirname(dirname(abspath(__file__))), 'utils'))
from db_utils import session_scope

if __name__ == '__main__':

with open('sql/create_procedure_cohort_count_table.sql', 'r') as f:
procedure_count_sql = f.read()
procedure_count_sql = procedure_count_sql.format(schema_name = config.nonstationarity_schema_name)

engine = sqlalchemy.create_engine('postgresql://' + config.db_name,
echo=False,
connect_args = {"host": '/var/run/postgresql/'})
with session_scope(engine) as session:
proc_results = session.execute(sqlalchemy.text(procedure_count_sql))
session.commit()
Loading

0 comments on commit 70d1f9f

Please sign in to comment.