From f3dbd0645bfaa045f4ae10f63bc6c04d49f7c359 Mon Sep 17 00:00:00 2001 From: buanzo Date: Sun, 17 May 2020 10:35:58 -0300 Subject: [PATCH] configuration validation thru Confuse templates. NOTE: confuse.OneOf() is buggy - see https://github.com/beetbox/confuse/issues/88 --- humed.py | 136 +++++++++++++++++++++++++++++++-------- sample_humed_config.yaml | 8 ++- 2 files changed, 115 insertions(+), 29 deletions(-) diff --git a/humed.py b/humed.py index 7858a28..14b3e87 100644 --- a/humed.py +++ b/humed.py @@ -44,7 +44,7 @@ # See: # https://github.com/beetbox/confuse/blob/master/example/__init__.py config_template = { # TODO: add debug. check confuse.Bool() - 'listen_url': confuse.String(), + 'endpoint': confuse.String(), 'transfer_method': confuse.OneOf(TRANSFER_METHODS), 'remote_syslog': { 'server': confuse.String(), @@ -63,17 +63,18 @@ def __init__(self, config): # We will only expose config if needed # self.config = config self.debug = config['debug'].get() - self.listen_url = config['listen_url'].get() + self.endpoint = config['endpoint'].get() self.transfer_method = config['transfer_method'].get() - self.transfer_method_args = config[self.transfer_method].get() # TODO: improve self.logger = logging.getLogger('humed-{}'.format(self.transfer_method)) self.logger.setLevel(logging.INFO) if self.transfer_method is 'logstash': + self.transfer_method_args = config[self.transfer_method].get() host = self.transfer_method_args['host'].get() port = self.transfer_method_args['host'].get() self.logger.addHandler(AsyncLSH(host, port, database_path='logstash.db')) elif self.transfer_method is 'remote_syslog': + self.transfer_method_args = config[self.transfer_method].get() server = self.config['remote_syslog']['server'].get() port = self.config['remote_syslog']['port'].get() proto = self.config['remote_syslog']['proto'].get() @@ -159,9 +160,11 @@ def process_transfers(self): for item in pendientes: # TODO: send to master-hume if self.transfer_method == 'logstash': - self.logstash(item=item) + ret = self.logstash(item=item) elif self.transfer_method == 'syslog': - self.syslog(item=item) + ret = self.syslog(item=item) # using std SysLogHandler + elif self.transfer_method == 'remote_syslog': + ret = self.syslog(item=item) # using std SysLogHandler # if sent ok then: # self.transfer_ok(archivo=archivo) # if error return(False) @@ -169,13 +172,13 @@ def process_transfers(self): def logstash(self,item=None): if item is None: - return() # FIX: should not happen + return(False) # FIX: should not happen rowid = item[0] ts = item[1] try: humepkt = json.loads(item[3]) except Exception as ex: - return() # FIX: malformed json at this stage? mmm + return(False) # FIX: malformed json at this stage? mmm hume = humepkt['hume'] if 'process' in humepkt.keys(): # This data is optional in hume (-a) process = humepkt['process'] @@ -211,23 +214,101 @@ def logstash(self,item=None): # error -> error # critical -> critical # debug -> debug - if level == 'ok' or level == 'info': - # https://python-logstash-async.readthedocs.io/en/stable/usage.html# - self.logger.info('hume({}): {}'.format(hostname, msg), extra=extra) - elif level == 'warn': - self.logger.warning('hume({}) {}'.format(hostname, msg), extra=extra) - elif level == 'error': - self.logger.error('hume({}): {}'.format(hostname, msg), extra=extra) - elif level == 'critical': - self.logger.critical('hume({}): {}'.format(hostname, msg), extra=extra) - elif level == 'debug': - self.logger.debug('hume({}): {}'.format(hostname, msg), extra=extra) + try: + if level == 'ok' or level == 'info': + # https://python-logstash-async.readthedocs.io/en/stable/usage.html# + self.logger.info('hume({}): {}'.format(hostname, msg), extra=extra) + elif level == 'warn': + self.logger.warning('hume({}) {}'.format(hostname, msg), extra=extra) + elif level == 'error': + self.logger.error('hume({}): {}'.format(hostname, msg), extra=extra) + elif level == 'critical': + self.logger.critical('hume({}): {}'.format(hostname, msg), extra=extra) + elif level == 'debug': + self.logger.debug('hume({}): {}'.format(hostname, msg), extra=extra) + except Exception: # TODO: improve exception handling + return(False) + else: + return(True) + + def syslog(self,item=None): + # This function handles both local and remote syslog + # according to logging.handlers.SysLogHandler() + if item is None: + return(False) # FIX: should not happen + + # Required data: + rowid = item[0] + ts = item[1] + try: + humepkt = json.loads(item[3]) + except Exception as ex: + return(False) # FIX: malformed json at this stage? mmm + hume = humepkt['hume'] + + # Optional data + if 'process' in humepkt.keys(): # This data is optional in hume (-a) + process = humepkt['process'] + else: + process = None + + # Extract info from hume to prepare syslog message + # TODO: decide if we should split these in the parent caller + # pros: tidier + # cons: makes development of other transfer methods more cumbersome? + level = hume['level'] + msg = hume['msg'] + task = hume['task'] + tags = hume['tags'] + humecmd = hume['humecmd'] + timestamp = hume['timestamp'] + # hostname + hostname = socket.getfqdn() # FIX: add a hostname configuration keyword + + # We dont have the 'extra' field for syslog, in contrast to logstash + msg = '[{}-{}-{}] TAGS=[{}] HUMECMD={} MSG={}'.format(timestamp, + task, + humelevel, + tags, + humecmd, + msg) + if process is not None: + msg = '{} PROC={}'.format(msg, + json.dumps(extra['process'])) + else: + msg = '{} PROC=None'.format(msg) + # Hume level does not relate completely, because 'ok' is not + # a syslog severity, closest is info but... TODO: think about this + # hume level -> syslog severity + # ---------------------------- + # ok -> info + # info -> info + # warn -> warning + # error -> error + # critical -> critical + # debug -> debug + try: + if level == 'ok' or level == 'info': + # https://python-logstash-async.readthedocs.io/en/stable/usage.html# + self.logger.info('hume({}): {}'.format(hostname, msg)) + elif level == 'warn': + self.logger.warning('hume({}) {}'.format(hostname, msg)) + elif level == 'error': + self.logger.error('hume({}): {}'.format(hostname, msg)) + elif level == 'critical': + self.logger.critical('hume({}): {}'.format(hostname, msg)) + elif level == 'debug': + self.logger.debug('hume({}): {}'.format(hostname, msg)) + except Exception: # TODO: improve exception handling + return(False) + else: + return(True) def run(self): # Humed main loop sock = zmq.Context().socket(zmq.PULL) - # print("Binding to '{}'".format(self.listen_url)) - sock.bind(self.listen_url) + # print("Binding to '{}'".format(self.endpoint)) + sock.bind(self.endpoint) # 2a - Await hume message over zmp while True: hume = {} @@ -250,20 +331,21 @@ def main(): # First, parse configuration config = confuse.Configuration('humed') # Config defaults - config['listen_url'] = 'tcp://localhost:198' - config['remote_syslog']['address'] = 'localhost' + config['endpoint'] = 'tcp://127.0.0.1:198' + config['remote_syslog']['server'] = 'localhost' config['remote_syslog']['proto'] = 'udp' config['remote_syslog']['port'] = 514 parser = argparse.ArgumentParser() - parser.add_argument('--listen_url', - help='Listening url for humed zeromq') config['debug'] = DEBUG parser.add_argument('--debug', - help='Enable debug') + help='Enables debug messages') args = parser.parse_args() config.set_args(args) - print('Reading configuration from {}/{}'.format(config.config_dir(), - confuse.CONFIG_FILENAME)) + try: + valid = config.get(template=config_template) + except Exception as ex: + print('Humed: Config file validation error: {}'.format(ex)) + sys.exit(2) print('-----[ CONFIG DUMP ]-----') print(config.dump()) print('Available Transfer Methods: {}'.format(TRANSFER_METHODS)) diff --git a/sample_humed_config.yaml b/sample_humed_config.yaml index 423fc98..b8461c1 100644 --- a/sample_humed_config.yaml +++ b/sample_humed_config.yaml @@ -1,6 +1,10 @@ -listen_url: tcp://127.0.0.1:198 +# NOT an url. Plesee see http://api.zeromq.org/2-1:zmq-tcp +endpoint: tcp://127.0.0.1:198 transfer_method: logstash logstash: host: 127.0.0.1 port: 24224 - \ No newline at end of file +remote_syslog: + server: syslog.example.net + proto: udp + port: 514