Skip to content
This repository was archived by the owner on Oct 9, 2019. It is now read-only.

Allows running of QARTOD tests on NetCDF files via CLI #66

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions ioos_qartod/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from netCDF4 import Dataset
import quantities as pq
from cf_units import Unit
import multiprocessing
from ioos_qartod.qc_tests import qc
from copy import deepcopy
from functools import partial
import pprint
from datetime import datetime
import pytz
import numpy as np


# don't apply unit conversions to these variables
convert_blacklist = {'low_reps', 'high_reps'}

# variables that represent a change in some quantity. Additive differences
# will be removed
delta_vars = {'eps'}

def load_ds_and_process(ds_loc, config_params, output_loc=None):
"""
Takes a dataset location and outputs it to another place
params: ds_loc: string or netCDF Dataset
"""
#ds = xarray.open_dataset(ds_loc)
ds = Dataset(ds_loc, 'a')
dat_vars = ds.variables
standard_name_dat_vars = []
std_names = [dat_vars[v] for v in dat_vars if 'standard_name'
in dat_vars[v].ncattrs() and not
('flag_values' in dat_vars[v].ncattrs()
or
'flag_meanings' in dat_vars[v].ncattrs())
and dat_vars[v].standard_name
in config_params]

for var in std_names:
print(var)
test_params = convert_params(var.units, config_params[var.standard_name])
do_tests(ds, var, test_params)
# show history
applied_ts = datetime.now(tz=pytz.utc)
# store qartod parameter invocations
# TODO
hist_str = "\n{}: {}".format(applied_ts, pprint.pformat(config_params))
if hasattr(ds, 'history'):
ds.history += hist_str
else:
ds.history = hist_str
ds.close()

def convert_params(var_units, cfg_section):
"""Converts tests over with proper unit conversions"""
test_units = Unit(cfg_section['units'])
var_units = Unit(var_units)
# units that test parameters were specified in
# if the units are the same between the tests and the actual variable,
# do nothing
if test_units == var_units:
tests_dict = cfg_section['tests'];
# otherwise convert the units contained within the tests
else:
# py2/3 compat warning: iteritems will fail
tests_dict = deepcopy(cfg_section['tests'])
for test, params in tests_dict.iteritems():
for arg, val in params.iteritems():
if arg not in convert_blacklist:
# Unit.convert won't work on lists or stock iterables,
# but will work on scalar numeric types
rep_val = np.array(val) if hasattr(val, '__iter__') else val
if arg not in delta_vars:
tests_dict[test][arg] = test_units.convert(rep_val,
var_units)
# need to remove additive portion for deltas
else:
tests_dict[test][arg] = (test_units.convert(rep_val,
var_units) -
test_units.convert(0, var_units)
)
return tests_dict

# TODO: convert to use multiprocessing
def do_tests(ds, var, cfg_section):
"""Runs tests on the specified variable"""
for test, params in cfg_section.iteritems():
print(test, params)
# FIXME: improve safety for user supplied input. Maybe whitelist
# functions?
p_apply = partial(getattr(qc, test), **params)
# TODO: vectorize functions to operate faster
# loop invariant
if var.ndim > 1:
test_res = np.apply_along_axis(p_apply, 1, var)
else:
test_res = p_apply(var)
new_var_name = "{}_{}_qc".format(var.name, test)
try:
ds.createVariable(new_var_name, 'i1', var.dimensions)
except RuntimeError:
print("Var {} already exists, not creating new variable")
new_var = ds.variables[new_var_name]
ds.variables[new_var_name][:] = test_res
new_var.flag_values = np.array([1, 2, 3, 4, 9], dtype='i1')
new_var.flag_meanings = "GOOD NOT_EVALUATED SUSPECT BAD MISSING"
21 changes: 13 additions & 8 deletions ioos_qartod/qc_tests/qc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import multiprocessing


"""All tests operate on single time series"""
class QCFlags:
"""Primary flags for QARTOD."""
# Don't subclass Enum since values don't fit nicely into a numpy array.
Expand Down Expand Up @@ -140,16 +141,20 @@ def spike_check(arr, low_thresh, high_thresh, prev_qc=None):
The flag is set at point n-1.
"""
# Subtract the average from point at index n-1 and get the absolute value.
if low_thresh >= high_thresh:
if arr.size < 3:
flag_arr = np.full(arr.shape, QCFlags.UNKNOWN)
return
elif low_thresh >= high_thresh:
raise ValueError("Low theshold value must be less than high threshold "
"value.")
val = np.abs(np.convolve(arr, [-0.5, 1, -0.5], mode='same'))
# First and last elements can't contain three points,
# so set difference to zero so these will avoid getting spike flagged.
val[[0, -1]] = 0
flag_arr = ((val < low_thresh) +
((val >= low_thresh) & (val < high_thresh)) * QCFlags.SUSPECT +
(val >= high_thresh) * QCFlags.BAD_DATA)
else:
val = np.abs(np.convolve(arr, [-0.5, 1, -0.5], mode='same'))
# First and last elements can't contain three points,
# so set difference to zero so these will avoid getting spike flagged.
val[[0, -1]] = 0
flag_arr = ((val < low_thresh) +
((val >= low_thresh) & (val < high_thresh)) * QCFlags.SUSPECT +
(val >= high_thresh) * QCFlags.BAD_DATA)
if prev_qc is not None:
set_prev_qc(flag_arr, prev_qc)
return flag_arr
Expand Down
60 changes: 60 additions & 0 deletions ioos_qartod/run_config.example.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
flat_line: &flat_line
low_reps: 5
high_reps: 9

sea_water_speed:
units: 'm/s'
tests:
range_check:
user_span: [0, 2.5]
sensor_span: [0, 2.8]

sea_water_current_direction:
units: '1'
tests:
range_check:
sensor_span: [0, 360]

flat_line_check:
<<: *flat_line

sea_water_electrical_conductivity:
units: 'S m-1'

tests:
range_check:
sensor_span: [4000, 65000]
user_span: [10000, 60000]

flat_line_check:
<<: *flat_line
eps: 0.02

sea_water_temperature:
# temperature highly dependent on current location, depth, season
# may be better to use climatological test for gliders.
units: 'degC'

tests:
range_check:
user_span: [0, 25]
sensor_span: [-2, 38]
spike_check:
low_thresh: 5
high_thresh: 10
flat_line_check:
<<: *flat_line
eps: 0.02


sea_water_practical_salinity:
units: '1'
tests:
range_check:
user_span: [0, 38]
sensor_span: [0, 70]
spike_check:
low_thresh: 1
high_thresh: 4
#rate_of_change_test:
# thresh_val: 30
6 changes: 5 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
numpy
pandas
quantities
pyproj
nose
cf_units>=1.0
pytz==2016.3
PyYAML==3.11
numpy==1.11.0
netCDF4==1.2.4
24 changes: 24 additions & 0 deletions run_qartod.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/python
import argparse
import yaml
from ioos_qartod import handlers

def main():
parser = argparse.ArgumentParser(description='Run QARTOD tests on a local NetCDF file or remote OPeNDAP dataset')
parser.add_argument('--config-file', '-c', required=True,
help=("YAML config file containing variables and QARTOD "
"test configurations"))
parser.add_argument('dataset_location',
help=("Defines the location of a NetCDF file or remote "
"OPeNDAP dataset to be checked."))

args = parser.parse_args()


with open(args.config_file, 'r') as f:
config = yaml.load(f)

handlers.load_ds_and_process(args.dataset_location, config)

if __name__ == '__main__':
main()