Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: cmanaha/python-elasticsearch-logger
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: master
Choose a base ref
...
head repository: SHolzhauer/python-elasticsearch-logger
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: main
Choose a head ref
Able to merge. These branches can be automatically merged.
  • 8 commits
  • 4 files changed
  • 1 contributor

Commits on Oct 30, 2020

  1. Copy the full SHA
    d51351e View commit details
  2. Adding check for es version

    If ES version is 6.0.0 or above, do not add the `_type` field as it's deprecated.
    SHolzhauer committed Oct 30, 2020
    Copy the full SHA
    937258a View commit details
  3. updating README's

    SHolzhauer committed Oct 30, 2020
    Copy the full SHA
    75223a3 View commit details
  4. readme

    SHolzhauer committed Oct 30, 2020
    Copy the full SHA
    478db39 View commit details

Commits on Nov 1, 2020

  1. Copy the full SHA
    c4b82d3 View commit details
  2. Copy the full SHA
    26f767d View commit details

Commits on Nov 26, 2020

  1. Copy the full SHA
    30c8095 View commit details
  2. Copy the full SHA
    9d4a793 View commit details
Showing with 232 additions and 21 deletions.
  1. +95 −0 README.md
  2. 0 README.rst → arch.README
  3. +136 −21 cmreslogging/handlers.py
  4. +1 −0 requirements/requirements_py36.txt
95 changes: 95 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@

# CMRESHandler.py
This library provides an Elasticsearch logging appender compatible with the
python standard `logging <https://docs.python.org/2/library/logging.html>`_ library.

The code source is in github at [https://github.com/SHolzhauer/python-elasticsearch-logger](https://github.com/SHolzhauer/python-elasticsearch-logger)

This is a fork of the [original work](https://github.com/cmanaha/python-elasticsearch-logger) by cmanaha.

**Tested against**
* `Elasticsearch 7.3.1` with `Python3.6`


## Installation
WIP

## Requirements
This library requires the following dependencies
#### Python 2
- elasticsearch
- requests
- enum
#### Python 3
- elasticsearch
- requests
- packaging

## Using the handler
To initialise and create the handler, just add the handler to your logger as follow
```python
from cmreslogging.handlers import CMRESHandler
handler = CMRESHandler(hosts=[{'host': 'localhost', 'port': 9200}],
auth_type=CMRESHandler.AuthType.NO_AUTH,
es_index_name="my_python_index")
log = logging.getLogger("PythonTest")
log.setLevel(logging.INFO)
log.addHandler(handler)
```

You can add fields upon initialisation, providing more data of the execution context
```python
from cmreslogging.handlers import CMRESHandler
handler = CMRESHandler(hosts=[{'host': 'localhost', 'port': 9200}],
auth_type=CMRESHandler.AuthType.NO_AUTH,
es_index_name="my_python_index",
es_additional_fields={'App': 'MyAppName', 'Environment': 'Dev'})
log = logging.getLogger("PythonTest")
log.setLevel(logging.INFO)
log.addHandler(handler)
```

This additional fields will be applied to all logging fields and recorded in elasticsearch

To log, use the regular commands from the logging library

```python
log.info("This is an info statement that will be logged into elasticsearch")
```

Your code can also dump additional extra fields on a per log basis that can be used to instrument
operations. For example, when reading information from a database you could do something like
```python
start_time = time.time()
database_operation()
db_delta = time.time() - start_time
log.debug("DB operation took %.3f seconds" % db_delta, extra={'db_execution_time': db_delta})
```
The code above executes the DB operation, measures the time it took and logs an entry that contains
in the message the time the operation took as string and for convenience, it creates another field
called db_execution_time with a float that can be used to plot the time this operations are taking using
Kibana on top of elasticsearch

## Initialisation parameters
The constructors takes the following parameters:
- hosts: The list of hosts that elasticsearch clients will connect, multiple hosts are allowed, for example
```python
[{'host':'host1','port':9200}, {'host':'host2','port':9200}]
```
- auth_type: The authentication currently support CMRESHandler.AuthType = NO_AUTH, BASIC_AUTH, KERBEROS_AUTH
- auth_details: When CMRESHandler.AuthType.BASIC_AUTH is used this argument must contain a tuple of string with the user and password that will be used to authenticate against the Elasticsearch servers, for example ('User','Password')
- aws_access_key: When ``CMRESHandler.AuthType.AWS_SIGNED_AUTH`` is used this argument must contain the AWS key id of the the AWS IAM user
- aws_secret_key: When ``CMRESHandler.AuthType.AWS_SIGNED_AUTH`` is used this argument must contain the AWS secret key of the the AWS IAM user
- aws_region: When ``CMRESHandler.AuthType.AWS_SIGNED_AUTH`` is used this argument must contain the AWS region of the the AWS Elasticsearch servers, for example ``'us-east'``
- use_ssl: A boolean that defines if the communications should use SSL encrypted communication
- verify_ssl: A boolean that defines if the SSL certificates are validated or not
- buffer_size: An int, Once this size is reached on the internal buffer results are flushed into ES
- flush_frequency_in_sec: A float representing how often and when the buffer will be flushed
- es_index_name: A string with the prefix of the elasticsearch index that will be created. Note a date with
YYYY.MM.dd, ``python_logger`` used by default
- index_name_frequency: The frequency to use as part of the index naming. Currently supports
`CMRESHandler.IndexNameFrequency.DAILY`, `CMRESHandler.IndexNameFrequency.WEEKLY`,
`CMRESHandler.IndexNameFrequency.MONTHLY`, `CMRESHandler.IndexNameFrequency.YEARLY`, `CMRESHandler.IndexNameFrequency.NONE` by default the daily rotation
is used
- es_doc_type: A string with the name of the document type that will be used ``python_log`` used by default
- es_additional_fields: A dictionary with all the additional fields that you would like to add to the logs
File renamed without changes.
157 changes: 136 additions & 21 deletions cmreslogging/handlers.py
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
from enum import Enum
from elasticsearch import helpers as eshelpers
from elasticsearch import Elasticsearch, RequestsHttpConnection
from packaging import version

try:
from requests_kerberos import HTTPKerberosAuth, DISABLED
@@ -56,6 +57,7 @@ class IndexNameFrequency(Enum):
WEEKLY = 1
MONTHLY = 2
YEARLY = 3
NONE = 4

# Defaults for the class
__DEFAULT_ELASTICSEARCH_HOST = [{'host': 'localhost', 'port': 9200}]
@@ -74,7 +76,7 @@ class IndexNameFrequency(Enum):
__DEFAULT_ES_INDEX_NAME = 'python_logger'
__DEFAULT_ES_DOC_TYPE = 'python_log'
__DEFAULT_RAISE_ON_EXCEPTION = False
__DEFAULT_TIMESTAMP_FIELD_NAME = "timestamp"
__DEFAULT_TIMESTAMP_FIELD_NAME = "@timestamp"

__LOGGING_FILTER_FIELDS = ['msecs',
'relativeCreated',
@@ -115,11 +117,20 @@ def _get_yearly_index_name(es_index_name):
"""
return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y'))

@staticmethod
def _get_none_index_name(es_index_name):
""" Return elasticsearch index name
:param: index_name the prefix to be used in the index
:return: A srting containing the elasticsearch indexname used which should include the date and specific year
"""
return "{0!s}".format(es_index_name)

_INDEX_FREQUENCY_FUNCION_DICT = {
IndexNameFrequency.DAILY: _get_daily_index_name,
IndexNameFrequency.WEEKLY: _get_weekly_index_name,
IndexNameFrequency.MONTHLY: _get_monthly_index_name,
IndexNameFrequency.YEARLY: _get_yearly_index_name
IndexNameFrequency.YEARLY: _get_yearly_index_name,
IndexNameFrequency.NONE: _get_none_index_name
}

def __init__(self,
@@ -190,8 +201,11 @@ def __init__(self,
self.index_name_frequency = index_name_frequency
self.es_doc_type = es_doc_type
self.es_additional_fields = es_additional_fields.copy()
self.es_additional_fields.update({'host': socket.gethostname(),
'host_ip': socket.gethostbyname(socket.gethostname())})
self.es_additional_fields.update({
'host': {
"host.name": socket.gethostname(),
'ip': socket.gethostbyname(socket.gethostname())
}})
self.raise_on_indexing_exceptions = raise_on_indexing_exceptions
self.default_timestamp_field_name = default_timestamp_field_name

@@ -276,6 +290,47 @@ def __get_es_datetime_str(timestamp):
current_date = datetime.datetime.utcfromtimestamp(timestamp)
return "{0!s}.{1:03d}Z".format(current_date.strftime('%Y-%m-%dT%H:%M:%S'), int(current_date.microsecond / 1000))

def ecs_format(self, key):
"""
Returns an ECS formatted key
:param key: original record
:return: record with renamed fields
"""

if key == "args":
ecs_key = "process.args"
elif key == "levelname":
ecs_key = "log.level"
elif key == "pathname":
ecs_key = "process.executable"
elif key == "filename":
ecs_key = "log.origin.file.name"
elif key == "module":
ecs_key = "log.logger"
elif key == "exc_info":
ecs_key = "error.type"
elif key == "exc_text":
ecs_key = "error.message"
elif key == "stack_info":
ecs_key = "error.stack_trace"
elif key == "lineno":
ecs_key = "log.origin.file.line"
elif key == "funcName":
ecs_key = "log.origin.function"
elif key == "thread":
ecs_key = "process.thread.id"
elif key == "threadName":
ecs_key = "process.thread.name"
elif key == "processName":
ecs_key = "process.name"
elif key == "process":
ecs_key = "process.pid"
else:
ecs_key = key

return ecs_key.split(".")

def flush(self):
""" Flushes the buffer into ES
:return: None
@@ -285,26 +340,56 @@ def flush(self):
self._timer = None

if self._buffer:
# Determine ES version
try:
with self._buffer_lock:
logs_buffer = self._buffer
self._buffer = []
actions = (
{
'_index': self._index_name_func.__func__(self.es_index_name),
'_type': self.es_doc_type,
'_source': log_record
}
for log_record in logs_buffer
)
eshelpers.bulk(
client=self.__get_es_client(),
actions=actions,
stats_only=True
)
es_client = self.__get_es_client()
except Exception as exception:
if self.raise_on_indexing_exceptions:
raise exception
else:
cluster_info = es_client.info()

if version.parse(cluster_info["version"]["number"]) >= version.parse("6.0.0"):
try:
with self._buffer_lock:
logs_buffer = self._buffer
self._buffer = []
actions = (
{
'_index': self._index_name_func.__func__(self.es_index_name),
'_source': log_record
}
for log_record in logs_buffer
)
eshelpers.bulk(
client=es_client,
actions=actions,
stats_only=True
)
except Exception as exception:
if self.raise_on_indexing_exceptions:
raise exception
else:
try:
with self._buffer_lock:
logs_buffer = self._buffer
self._buffer = []
actions = (
{
'_index': self._index_name_func.__func__(self.es_index_name),
'_type': self.es_doc_type,
'_source': log_record
}
for log_record in logs_buffer
)
eshelpers.bulk(
client=es_client,
actions=actions,
stats_only=True
)
except Exception as exception:
if self.raise_on_indexing_exceptions:
raise exception

def close(self):
""" Flushes the buffer and release any outstanding resource
@@ -330,7 +415,37 @@ def emit(self, record):
if key not in CMRESHandler.__LOGGING_FILTER_FIELDS:
if key == "args":
value = tuple(str(arg) for arg in value)
rec[key] = "" if value is None else value
if key not in ["msg", "name"]:
ecs_key = self.ecs_format(key)
if len(ecs_key) == 1:
rec[ecs_key[0]] = "" if value is None else value
elif len(ecs_key) == 2:
if ecs_key[0] in rec:
rec[ecs_key[0]][ecs_key[1]] = "" if value is None else value
else:
rec[ecs_key[0]] = {}
rec[ecs_key[0]][ecs_key[1]] = "" if value is None else value
elif len(ecs_key) == 3:
if ecs_key[0] in rec and ecs_key[1] in rec[ecs_key[0]]:
rec[ecs_key[0]][ecs_key[1]][ecs_key[2]] = "" if value is None else value
elif ecs_key[0] in rec:
rec[ecs_key[0]] = {}
rec[ecs_key[0]][ecs_key[1]] = {}
rec[ecs_key[0]][ecs_key[1]][ecs_key[2]] = "" if value is None else value
elif len(ecs_key) == 4:
if ecs_key[0] in rec and ecs_key[1] in rec[ecs_key[0]] and ecs_key[2] in rec[ecs_key[0]][ecs_key[1]]:
rec[ecs_key[0]][ecs_key[1]][ecs_key[2]][ecs_key[3]] = "" if value is None else value
elif ecs_key[0] in rec and ecs_key[1] in rec[ecs_key[0]]:
rec[ecs_key[0]][ecs_key[1]] = {}
rec[ecs_key[0]][ecs_key[1]][ecs_key[2]] = {}
rec[ecs_key[0]][ecs_key[1]][ecs_key[2]][ecs_key[3]] = "" if value is None else value
elif ecs_key[0] in rec:
rec[ecs_key[0]] = {}
rec[ecs_key[0]][ecs_key[1]] = {}
rec[ecs_key[0]][ecs_key[1]][ecs_key[2]] = {}
rec[ecs_key[0]][ecs_key[1]][ecs_key[2]][ecs_key[3]] = "" if value is None else value
else:
rec[key] = "" if value is None else value
rec[self.default_timestamp_field_name] = self.__get_es_datetime_str(record.created)
with self._buffer_lock:
self._buffer.append(rec)
1 change: 1 addition & 0 deletions requirements/requirements_py36.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
elasticsearch==5.4.0
requests==2.18.1
packaging==20.4