Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds in the load_all_lab_tests mgmt command #1430

Open
wants to merge 1 commit into
base: cerner-merge-branch
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
383 changes: 383 additions & 0 deletions intrahospital_api/management/commands/load_all_lab_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,383 @@
import gzip
import shutil
from django.core.management.base import BaseCommand
from django.db import connection
from django.db import transaction
from elcid import models as elcid_models
from plugins.labtests import models as lab_models
from elcid.utils import timing
from django.utils import timezone
from django.conf import settings
import pytds
import csv
import os


LAB_TEST_COLUMNS = [
"Relevant_Clinical_Info", # Becomes clinical info
# Datetime ordered is observation date
# if there is no observation date, we
# use request date
"Observation_date",
"Request_Date",
"Result_ID", # Lab number
"Specimen_Site", # Site
# Status, this comes through as
# F for complete
# There is also empty stryings, Nones
# and 'I' that we store as Pending
"OBX_Status", # TODO is there more than one status per test number, test_ocde
"Result_ID", # lab number
"OBR_exam_code_ID", # test_code
"OBR_exam_code_Text", # test_name
"Encounter_Consultant_Name", # encounter_consultant_name
"Encounter_Location_Code", # encounter_location_code
"Encounter_Location_Name", # encounter_location_name
"Accession_number", # accession_number
"Department", # department_int
]

OBSERVATION_COLUMNS = [
"last_updated", # last_updated
# observation_datetime is Observation_date
# if this isn't populated its Request_Date
"Observation_date",
"Request_Date",
"Reported_date", # reported_datetime
"Result_Range", # reference_range
"OBX_exam_code_Text", # observation_name
"OBX_id", # observation_number
"Result_Value", # observation_value
"Result_Units", # units
]

UPSTREAM_DB_COLUMNS = "Patient_Number," + ",".join(LAB_TEST_COLUMNS) + "," + ",".join(OBSERVATION_COLUMNS)

# Get all results from upstream, order the results
# by the three fields that define a unique lab test
# so that in future when we iterate over to create
# observations we can cache the call to lab tests
GET_ALL_RESULTS = f"""
SELECT
{UPSTREAM_DB_COLUMNS}
FROM tQuest.Pathology_Result_View
WHERE Patient_Number IS NOT null
AND Patient_Number <> ''
AND Result_ID IS NOT null
AND Result_ID <> ''
"""

DELETE = """
-- create a temp table delete_or_keep which we will load patient number, lab number and test name
CREATE TEMP TABLE patient_id_lab_number_name (
patient_id INT NOT NULL,
lab_number VARCHAR (255) NOT NULL,
test_name VARCHAR (255) NOT NULL
) ON COMMIT DROP;

COPY patient_id_lab_number_name (patient_id,lab_number,test_name) FROM '{csv_file}' DELIMITER ',' CSV HEADER;

CREATE INDEX lab_index ON labtests_labtest (patient_id, test_name, lab_number);

DELETE FROM ipc_infectionalert WHERE lab_test_id IN (
SELECT labtests_labtest.id FROM labtests_labtest, patient_id_lab_number_name
WHERE labtests_labtest.patient_id = patient_id_lab_number_name.patient_id
AND labtests_labtest.test_name = patient_id_lab_number_name.test_name
AND labtests_labtest.lab_number = patient_id_lab_number_name.lab_number
);

DELETE FROM labtests_labtest USING patient_id_lab_number_name
WHERE labtests_labtest.patient_id = patient_id_lab_number_name.patient_id
AND labtests_labtest.test_name = patient_id_lab_number_name.test_name
AND labtests_labtest.lab_number = patient_id_lab_number_name.lab_number;

DELETE FROM labtests_observation
WHERE NOT EXISTS (
SELECT null
FROM labtests_labtest
WHERE labtests_observation.test_id = labtests_labtest.id
);

DROP INDEX lab_index;
"""


RESULTS_CSV = "results.csv"
OBSERVATIONS_CSV = "observations.csv"
LABTEST_CSV = "lab_tests.csv"
DELETE_CSV = "patient_id_lab_number_test_name.csv"



def get_mrn_to_patient_id():
"""
Returns a map of all MRNs from demographics and Merged MRN
to the corresponding patient id.
"""
mrn_to_patient_id = {}
demographics_mrn_and_patient_id = list(
elcid_models.Demographics.objects.exclude(
hospital_number=None,
)
.exclude(hospital_number="")
.values_list("hospital_number", "patient_id")
)

for mrn, patient_id in demographics_mrn_and_patient_id:
mrn_to_patient_id[mrn] = patient_id

merged_mrn_and_patient_id = list(
elcid_models.MergedMRN.objects.values_list("mrn", "patient_id")
)

for mrn, patient_id in merged_mrn_and_patient_id:
mrn_to_patient_id[mrn] = patient_id

return mrn_to_patient_id



@timing
def write_results():
"""
Get all lab test data for MRNs that are within elcid.
Create a row as we will save to our tables but without id, test id or
our
Lookup the patient id

Strip the MRN of leading zeros before writing it to the file
"""
mrn_to_patient_id = get_mrn_to_patient_id()
with open(RESULTS_CSV, "w") as m:
writer = None
with pytds.connect(
settings.TRUST_DB["ip_address"],
settings.TRUST_DB["database"],
settings.TRUST_DB["username"],
settings.TRUST_DB["password"],
as_dict=True,
) as conn:
with conn.cursor() as cur:
cur.execute(GET_ALL_RESULTS)
while True:
rows = cur.fetchmany()
if not rows:
break
for upstream_row in rows:
if not upstream_row["Patient_Number"]:
continue
mrn = upstream_row["Patient_Number"].lstrip("0")
patient_id = mrn_to_patient_id.get(mrn)
if not patient_id:
continue
if upstream_row["Result_ID"] is None:
continue
if len(upstream_row["Result_ID"].strip()) == 0:
continue
if upstream_row["OBR_exam_code_Text"] is None:
continue
if len(upstream_row["OBR_exam_code_Text"].strip()) == 0:
continue
columns = ["patient_id"] + LAB_TEST_COLUMNS + OBSERVATION_COLUMNS
if writer is None:
writer = csv.DictWriter(m, fieldnames=columns)
writer.writeheader()
row = {"patient_id": patient_id}
for k, v in upstream_row.items():
if not k == 'Patient_Number':
row[k] = v
writer.writerow(row)


def write_lab_test_csv():
"""
Reads the results csv where the data exists as it exists
in the upstream table.

Writes the lab test csv where the headers match our lab test fields
and the data is formatted into what we would save to our lab test.
It also adds the patient_id column with the elcid patient id in it.
"""
seen = set()
writer = None
with open(RESULTS_CSV) as m:
reader = csv.DictReader(m)
with open(LABTEST_CSV, "w") as a:
for idx, row in enumerate(reader):
key = (row["patient_id"], row["Result_ID"], row["OBR_exam_code_Text"],)
if key in seen:
continue
seen.add(key)
our_row = cast_to_lab_test_dict(row)
if idx == 0:
headers = our_row.keys()
writer = csv.DictWriter(a, fieldnames=headers)
writer.writeheader()
writer.writerow(our_row)



@timing
def write_delete_csv():
with open(LABTEST_CSV) as l:
reader = csv.DictReader(l)
with open(DELETE_CSV, "w") as d:
field_names = ["patient_id", "lab_number", "test_name"]
writer = csv.DictWriter(d, fieldnames=field_names)
writer.writeheader()
for row in reader:
writer.writerow({f: row[f] for f in field_names})


@timing
def write_observation_csv():
"""
Reads the results csv where the data exists as it exists
in the upstream table.

Writes the observation csv where the headers match our observation fields
and the data is formatted into what we would save to our observation.
It also adds the test_id column with the elcid lab test id in it.
"""
patient_id_lab_number_test_name_to_test_id = get_patient_id_lab_number_test_name_to_test_id()
with open(RESULTS_CSV) as m:
reader = csv.DictReader(m)
with open(OBSERVATIONS_CSV, "w") as a:
writer = None
for row in reader:
key = (row["patient_id"], row["Result_ID"], row["OBR_exam_code_Text"],)
lt_id = patient_id_lab_number_test_name_to_test_id[key]
obs_dict = cast_to_observation_dict(row, lt_id)
if writer is None:
writer = csv.DictWriter(a, fieldnames=obs_dict.keys())
writer.writeheader()
writer.writerow(obs_dict)

def get_patient_id_lab_number_test_name_to_test_id():
values = lab_models.LabTest.objects.values_list(
'patient_id',
'lab_number',
'test_name',
'id'
)
result = {}
for patient_id, lab_number, test_name, test_id in values:
result[(patient_id, lab_number, test_name,)] = test_id
return result

def cast_to_lab_test_dict(row):
"""
Creates a dictionary from an upstream row with keys, values
of what we want to save in our lab test model
"""
result = {"patient_id": row["patient_id"]}
result["clinical_info"] = row["Relevant_Clinical_Info"]
result["datetime_ordered"] = row.get("Observation_date")
result["lab_number"] = row["Result_ID"]

site = row["Specimen_Site"]
if site and "^" in site and "-" in site:
site = site.split("^")[1].strip().split("-")[0].strip()
result["site"] = site

status_abbr = row["OBX_Status"]

if status_abbr == "F":
status = "complete"
else:
status = "pending"
result["status"] = status

result["test_code"] = row["OBR_exam_code_ID"]
result["test_name"] = row["OBR_exam_code_Text"]
result["encounter_consultant_name"] = row["Encounter_Consultant_Name"]
result["encounter_location_code"] = row["Encounter_Location_Code"]
result["encounter_location_name"] = row["Encounter_Location_Name"]
result["accession_number"] = row["Accession_number"]
dep = row.get("Department")
result["department_int"] = None
result["created_at"] = timezone.now()
result["updated_at"] = timezone.now()
if dep:
result["department_int"] = int(dep)
return result


def cast_to_observation_dict(row):
"""
Creates a dictionary from an upstream row with keys, values
of what we want to save in our observation model
"""
result = {}
result["last_updated"] = row["last_updated"]
result["observation_datetime"] = row["Observation_date"]
if not result["observation_datetime"]:
result["observation_datetime"] = row["Request_Date"]
result["reported_datetime"] = row["Reported_date"]
result["reference_range"] = row["Result_Range"]
result["observation_number"] = row["OBX_id"]
result["observation_name"] = row["OBX_exam_code_Text"]
result["observation_value"] = row["Result_Value"]
result["units"] = row["Result_Units"]
result["created_at"] = timezone.now()
return result


def get_db_columns(model):
all_fields = model._meta.get_fields()
result = []
fields_to_ignore = set(
["observation", "test", "created_at", "updated_at", "id", "infectionalert"]
)
for field in all_fields:
field_name = field.name
if field_name == "patient":
field_name = "patient_id"
if field_name not in fields_to_ignore:
result.append(field_name)
return result


def call_db_command(sql):
with connection.cursor() as cursor:
cursor.execute(
sql
)


def get_csv_fields(file_name):
"""
Gets the column names from a csv file.
"""
with open(file_name) as m:
reader = csv.DictReader(m)
headers = next(reader).keys()
return list(headers)


class Command(BaseCommand):
@timing
@transaction.atomic
def handle(self, *args, **options):
now = timezone.now()
cwd = os.getcwd()
print(f'starting {now}')
# Write all the columns we need out of the upstream table
# into out table
write_results()
write_lab_test_csv()
write_delete_csv()
call_db_command(DELETE.format(csv_file=os.path.join(cwd, DELETE_CSV)))
lab_columns = ",".join(get_csv_fields(LABTEST_CSV))
lab_test_csv = os.path.join(cwd, LABTEST_CSV)
copy_in_lab_tests = f"""
COPY labtests_labtest({lab_columns}) FROM '{lab_test_csv}' WITH (FORMAT csv, header);
"""
call_db_command(copy_in_lab_tests)
write_observation_csv()
obs_columns = ",".join(get_csv_fields(OBSERVATIONS_CSV))
obs_csv = os.path.join(cwd, OBSERVATIONS_CSV)
copy_in_lab_tests = f"""
COPY labtests_observation({obs_columns}) FROM '{obs_csv}' WITH (FORMAT csv, header);
"""
print(f'finished {now}')