diff --git a/ioos_qartod/handlers.py b/ioos_qartod/handlers.py new file mode 100644 index 0000000..81f76d9 --- /dev/null +++ b/ioos_qartod/handlers.py @@ -0,0 +1,105 @@ +from netCDF4 import Dataset +import quantities as pq +from cf_units import Unit +import multiprocessing +from ioos_qartod.qc_tests import qc +from copy import deepcopy +from functools import partial +import pprint +from datetime import datetime +import pytz +import numpy as np + + +# don't apply unit conversions to these variables +convert_blacklist = {'low_reps', 'high_reps'} + +# variables that represent a change in some quantity. Additive differences +# will be removed +delta_vars = {'eps'} + +def load_ds_and_process(ds_loc, config_params, output_loc=None): + """ + Takes a dataset location and outputs it to another place + params: ds_loc: string or netCDF Dataset + """ + #ds = xarray.open_dataset(ds_loc) + ds = Dataset(ds_loc, 'a') + dat_vars = ds.variables + standard_name_dat_vars = [] + std_names = [dat_vars[v] for v in dat_vars if 'standard_name' + in dat_vars[v].ncattrs() and not + ('flag_values' in dat_vars[v].ncattrs() + or + 'flag_meanings' in dat_vars[v].ncattrs()) + and dat_vars[v].standard_name + in config_params] + + for var in std_names: + print(var) + test_params = convert_params(var.units, config_params[var.standard_name]) + do_tests(ds, var, test_params) + # show history + applied_ts = datetime.now(tz=pytz.utc) + # store qartod parameter invocations + # TODO + hist_str = "\n{}: {}".format(applied_ts, pprint.pformat(config_params)) + if hasattr(ds, 'history'): + ds.history += hist_str + else: + ds.history = hist_str + ds.close() + +def convert_params(var_units, cfg_section): + """Converts tests over with proper unit conversions""" + test_units = Unit(cfg_section['units']) + var_units = Unit(var_units) + # units that test parameters were specified in + # if the units are the same between the tests and the actual variable, + # do nothing + if test_units == var_units: + tests_dict = cfg_section['tests']; + # otherwise convert the units contained within the tests + else: + # py2/3 compat warning: iteritems will fail + tests_dict = deepcopy(cfg_section['tests']) + for test, params in tests_dict.iteritems(): + for arg, val in params.iteritems(): + if arg not in convert_blacklist: + # Unit.convert won't work on lists or stock iterables, + # but will work on scalar numeric types + rep_val = np.array(val) if hasattr(val, '__iter__') else val + if arg not in delta_vars: + tests_dict[test][arg] = test_units.convert(rep_val, + var_units) + # need to remove additive portion for deltas + else: + tests_dict[test][arg] = (test_units.convert(rep_val, + var_units) - + test_units.convert(0, var_units) + ) + return tests_dict + +# TODO: convert to use multiprocessing +def do_tests(ds, var, cfg_section): + """Runs tests on the specified variable""" + for test, params in cfg_section.iteritems(): + print(test, params) + # FIXME: improve safety for user supplied input. Maybe whitelist + # functions? + p_apply = partial(getattr(qc, test), **params) + # TODO: vectorize functions to operate faster + # loop invariant + if var.ndim > 1: + test_res = np.apply_along_axis(p_apply, 1, var) + else: + test_res = p_apply(var) + new_var_name = "{}_{}_qc".format(var.name, test) + try: + ds.createVariable(new_var_name, 'i1', var.dimensions) + except RuntimeError: + print("Var {} already exists, not creating new variable") + new_var = ds.variables[new_var_name] + ds.variables[new_var_name][:] = test_res + new_var.flag_values = np.array([1, 2, 3, 4, 9], dtype='i1') + new_var.flag_meanings = "GOOD NOT_EVALUATED SUSPECT BAD MISSING" diff --git a/ioos_qartod/qc_tests/qc.py b/ioos_qartod/qc_tests/qc.py index 3d65aa0..f712a17 100644 --- a/ioos_qartod/qc_tests/qc.py +++ b/ioos_qartod/qc_tests/qc.py @@ -5,6 +5,7 @@ import multiprocessing +"""All tests operate on single time series""" class QCFlags: """Primary flags for QARTOD.""" # Don't subclass Enum since values don't fit nicely into a numpy array. @@ -140,16 +141,20 @@ def spike_check(arr, low_thresh, high_thresh, prev_qc=None): The flag is set at point n-1. """ # Subtract the average from point at index n-1 and get the absolute value. - if low_thresh >= high_thresh: + if arr.size < 3: + flag_arr = np.full(arr.shape, QCFlags.UNKNOWN) + return + elif low_thresh >= high_thresh: raise ValueError("Low theshold value must be less than high threshold " "value.") - val = np.abs(np.convolve(arr, [-0.5, 1, -0.5], mode='same')) - # First and last elements can't contain three points, - # so set difference to zero so these will avoid getting spike flagged. - val[[0, -1]] = 0 - flag_arr = ((val < low_thresh) + - ((val >= low_thresh) & (val < high_thresh)) * QCFlags.SUSPECT + - (val >= high_thresh) * QCFlags.BAD_DATA) + else: + val = np.abs(np.convolve(arr, [-0.5, 1, -0.5], mode='same')) + # First and last elements can't contain three points, + # so set difference to zero so these will avoid getting spike flagged. + val[[0, -1]] = 0 + flag_arr = ((val < low_thresh) + + ((val >= low_thresh) & (val < high_thresh)) * QCFlags.SUSPECT + + (val >= high_thresh) * QCFlags.BAD_DATA) if prev_qc is not None: set_prev_qc(flag_arr, prev_qc) return flag_arr diff --git a/ioos_qartod/run_config.example.yml b/ioos_qartod/run_config.example.yml new file mode 100644 index 0000000..03f3423 --- /dev/null +++ b/ioos_qartod/run_config.example.yml @@ -0,0 +1,60 @@ +flat_line: &flat_line + low_reps: 5 + high_reps: 9 + +sea_water_speed: + units: 'm/s' + tests: + range_check: + user_span: [0, 2.5] + sensor_span: [0, 2.8] + +sea_water_current_direction: + units: '1' + tests: + range_check: + sensor_span: [0, 360] + + flat_line_check: + <<: *flat_line + +sea_water_electrical_conductivity: + units: 'S m-1' + + tests: + range_check: + sensor_span: [4000, 65000] + user_span: [10000, 60000] + + flat_line_check: + <<: *flat_line + eps: 0.02 + +sea_water_temperature: + # temperature highly dependent on current location, depth, season + # may be better to use climatological test for gliders. + units: 'degC' + + tests: + range_check: + user_span: [0, 25] + sensor_span: [-2, 38] + spike_check: + low_thresh: 5 + high_thresh: 10 + flat_line_check: + <<: *flat_line + eps: 0.02 + + +sea_water_practical_salinity: + units: '1' + tests: + range_check: + user_span: [0, 38] + sensor_span: [0, 70] + spike_check: + low_thresh: 1 + high_thresh: 4 + #rate_of_change_test: + # thresh_val: 30 diff --git a/requirements.txt b/requirements.txt index 9e65689..6da27d2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,9 @@ -numpy pandas quantities pyproj nose +cf_units>=1.0 +pytz==2016.3 +PyYAML==3.11 +numpy==1.11.0 +netCDF4==1.2.4 diff --git a/run_qartod.py b/run_qartod.py new file mode 100755 index 0000000..4a27b8e --- /dev/null +++ b/run_qartod.py @@ -0,0 +1,24 @@ +#!/usr/bin/python +import argparse +import yaml +from ioos_qartod import handlers + +def main(): + parser = argparse.ArgumentParser(description='Run QARTOD tests on a local NetCDF file or remote OPeNDAP dataset') + parser.add_argument('--config-file', '-c', required=True, + help=("YAML config file containing variables and QARTOD " + "test configurations")) + parser.add_argument('dataset_location', + help=("Defines the location of a NetCDF file or remote " + "OPeNDAP dataset to be checked.")) + + args = parser.parse_args() + + + with open(args.config_file, 'r') as f: + config = yaml.load(f) + + handlers.load_ds_and_process(args.dataset_location, config) + +if __name__ == '__main__': + main()