Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port ErrorMonitor class from ruby connectors #2671

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions config.yml.example
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,34 @@
## This will be logged on 'DEBUG' log level. Note: this depends on the service.log_level, not elasticsearch.log_level
#elasticsearch.bulk.enable_operations_logging: false
#
## ------------------------------- Elasticsearch: Bulk - Error Monitor ----------------------------------
#
## Configurations related to Error Monitor functionality of the syncs when documents are ingested into Elasticsearch.
## Errors can happen during ingestion of content into Elasticsearch - for example a document could violate the index mapping.
## If this error happens rarely, we're safe to ignore such documents.
## Error monitor in this section is taking care of this by ignoring transient errors while ingesting documents into Elasticsearch.
## If any of the thresholds defined in this section are exceeded, then the whole sync is failed.
#
## Switch for enabling/disabling error monitor
## When disabled, errors are only counted - they never cause failures (legacy behavior)
#elasticsearch.bulk.error_monitor.enabled: true
#
## Total number of errors that will be tolerated per sync.
## Once number of errors exceed this number, the sync will terminate.
#elasticsearch.bulk.error_monitor.max_total_errors: 1000
#
## Maximum number of consecutive errors that will be tolerated per sync.
## Once number of consecutive errors exceed this number, the sync will terminate.
#elasticsearch.bulk.error_monitor.max_consecutive_errors: 10
#
## Maximum error rate within the window size that will be tolerated per sync.
## Once error rate exceeds this number, the sync will terminate.
## Once number of errors exceed this number, the sync will terminate.
#elasticsearch.bulk.error_monitor.max_error_rate: 0.15
#
## Number of last operations used for tracking the error rate.
#elasticsearch.bulk.error_monitor.error_window_size: 100
#
## ------------------------------- Elasticsearch: Experimental ------------------------
#
## Experimental configuration options for Elasticsearch interactions.
Expand Down Expand Up @@ -203,10 +231,6 @@
#service.job_cleanup_interval: 300
#
#
## Connector service log level.
#service.log_level: INFO
#
#
## ------------------------------- Extraction Service ----------------------------------
#
## Local extraction service-related configurations.
Expand Down
18 changes: 18 additions & 0 deletions connectors/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ def _default_config():
"retry_interval": DEFAULT_ELASTICSEARCH_RETRY_INTERVAL,
"concurrent_downloads": 10,
"enable_operations_logging": False,
"error_monitor": {
"enabled": True,
"max_total_errors": 1000,
"max_consecutive_errors": 10,
"max_error_rate": 0.15,
"error_window_size": 100,
"error_queue_size": 10,
},
},
"max_retries": DEFAULT_ELASTICSEARCH_MAX_RETRIES,
"retry_interval": DEFAULT_ELASTICSEARCH_RETRY_INTERVAL,
Expand All @@ -92,6 +100,16 @@ def _default_config():
"max_file_download_size": DEFAULT_MAX_FILE_SIZE,
"job_cleanup_interval": 300,
"log_level": "INFO",
"extraction": {
"error_monitor": {
"enabled": True,
"max_total_errors": 1000,
"max_consecutive_errors": 10,
"max_error_rate": 0.15,
"error_window_size": 100,
"error_queue_size": 10,
},
},
},
"sources": {
"azure_blob_storage": "connectors.sources.azure_blob_storage:AzureBlobStorageDataSource",
Expand Down
1 change: 0 additions & 1 deletion connectors/es/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ async def execute_with_retry(self, func):
retry += 1
try:
result = await func()

return result
except ConnectionTimeout:
self._logger.warning(
Expand Down
75 changes: 50 additions & 25 deletions connectors/es/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
DEFAULT_QUEUE_SIZE,
ConcurrentTasks,
Counters,
ErrorMonitor,
MemQueue,
aenumerate,
get_size,
Expand Down Expand Up @@ -110,6 +111,10 @@ def __init__(self, cause=None):
self.__cause__ = cause


class DocumentIngestionError(Exception):
pass


class Sink:
"""Send bulk operations in batches by consuming a queue.

Expand All @@ -136,6 +141,7 @@ def __init__(
max_concurrency,
max_retries,
retry_interval,
error_monitor,
logger_=None,
enable_bulk_operations_logging=False,
):
Expand All @@ -145,6 +151,7 @@ def __init__(
self.pipeline = pipeline
self.chunk_mem_size = chunk_mem_size * 1024 * 1024
self.bulk_tasks = ConcurrentTasks(max_concurrency=max_concurrency)
self.error_monitor = error_monitor
self.max_retires = max_retries
self.retry_interval = retry_interval
self.error = None
Expand Down Expand Up @@ -272,18 +279,19 @@ async def _process_bulk_response(self, res, ids_to_ops, do_log=False):
successful_result = result in SUCCESSFUL_RESULTS
if not successful_result:
if "error" in item[action_item]:
message = f"Failed to execute '{action_item}' on document with id '{doc_id}'. Error: {item[action_item].get('error')}"
self.error_monitor.track_error(DocumentIngestionError(message))
if do_log:
self._logger.debug(
f"Failed to execute '{action_item}' on document with id '{doc_id}'. Error: {item[action_item].get('error')}"
)
self._logger.debug(message)
self.counters.increment(RESULT_ERROR, namespace=BULK_RESPONSES)
else:
message = f"Executed '{action_item}' on document with id '{doc_id}', but got non-successful result: {result}"
self.error_monitor.track_error(DocumentIngestionError(message))
if do_log:
self._logger.debug(
f"Executed '{action_item}' on document with id '{doc_id}', but got non-successful result: {result}"
)
self._logger.debug(message)
self.counters.increment(RESULT_UNDEFINED, namespace=BULK_RESPONSES)
else:
self.error_monitor.track_success()
if do_log:
self._logger.debug(
f"Successfully executed '{action_item}' on document with id '{doc_id}'. Result: {result}"
Expand Down Expand Up @@ -452,24 +460,31 @@ def __init__(
self.skip_unchanged_documents = skip_unchanged_documents

async def _deferred_index(self, lazy_download, doc_id, doc, operation):
data = await lazy_download(doit=True, timestamp=doc[TIMESTAMP_FIELD])

if data is not None:
self.counters.increment(BIN_DOCS_DOWNLOADED)
data.pop("_id", None)
data.pop(TIMESTAMP_FIELD, None)
doc.update(data)

doc.pop("_original_filename", None)

await self.put_doc(
{
"_op_type": operation,
"_index": self.index,
"_id": doc_id,
"doc": doc,
}
)
try:
data = await lazy_download(doit=True, timestamp=doc[TIMESTAMP_FIELD])

if data is not None:
self.counters.increment(BIN_DOCS_DOWNLOADED)
data.pop("_id", None)
data.pop(TIMESTAMP_FIELD, None)
doc.update(data)

doc.pop("_original_filename", None)

await self.put_doc(
{
"_op_type": operation,
"_index": self.index,
"_id": doc_id,
"doc": doc,
}
)
except ForceCanceledError:
raise
Comment on lines +482 to +483
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also re-raise aio CanceledError?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure it's needed. I'll double check and add if it is

except Exception as ex:
self._logger.error(
f"Failed to do deferred index operation for doc {doc_id}: {ex}"
)

def force_cancel(self):
self._canceled = True
Expand Down Expand Up @@ -602,6 +617,10 @@ async def get_docs(self, generator, skip_unchanged_documents=False):
# too many errors happened when downloading
lazy_downloads.raise_any_exception()

# We try raising every loop to not miss a moment when
# too many errors happened when downloading
lazy_downloads.raise_any_exception()

await asyncio.sleep(0)

# Sit and wait until an error happens
Expand All @@ -612,6 +631,7 @@ async def get_docs(self, generator, skip_unchanged_documents=False):
raise
finally:
# wait for all downloads to be finished
# even if we errored out
await lazy_downloads.join()
artem-shelkovnikov marked this conversation as resolved.
Show resolved Hide resolved

await self.enqueue_docs_to_delete(existing_ids)
Expand Down Expand Up @@ -824,6 +844,8 @@ def __init__(self, elastic_config, logger_=None):
self._sink_task = None
self.error = None
self.canceled = False
error_monitor_config = elastic_config.get("bulk", {}).get("error_monitor", {})
self.error_monitor = ErrorMonitor(error_monitor_config)

async def close(self):
await self.es_management_client.close()
Expand Down Expand Up @@ -893,7 +915,9 @@ def _extractor_task_running(self):

async def cancel(self):
if self._sink_task_running():
self._logger.info(f"Canceling the Sink task: {self._sink_task.get_name()}")
self._logger.info(
f"Canceling the Sink task: {self._sink_task.get_name()}" # pyright: ignore
seanstory marked this conversation as resolved.
Show resolved Hide resolved
)
self._sink_task.cancel()
else:
self._logger.debug(
Expand Down Expand Up @@ -1039,6 +1063,7 @@ async def async_bulk(
max_concurrency=max_concurrency,
max_retries=max_bulk_retries,
retry_interval=retry_interval,
error_monitor=self.error_monitor,
logger_=self._logger,
enable_bulk_operations_logging=enable_bulk_operations_logging,
)
Expand Down
2 changes: 1 addition & 1 deletion connectors/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ async def download_and_extract_file(
if return_doc_if_failed:
return doc
else:
return
return None

@asynccontextmanager
async def create_temp_file(self, file_extension):
Expand Down
7 changes: 6 additions & 1 deletion connectors/sync_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
INDEXED_DOCUMENT_VOLUME,
)
from connectors.source import BaseDataSource
from connectors.utils import truncate_id
from connectors.utils import ErrorMonitor, truncate_id

UTF_8 = "utf-8"

Expand Down Expand Up @@ -113,6 +113,10 @@ def __init__(
self.es_config = es_config
self.service_config = service_config
self.sync_orchestrator = None
error_monitor_config = service_config.get("extraction", {}).get(
"error_monitor", {}
)
self.error_monitor = ErrorMonitor(**error_monitor_config)
self.job_reporting_task = None
self.bulk_options = self.es_config.get("bulk", {})
self._start_time = None
Expand Down Expand Up @@ -149,6 +153,7 @@ async def execute(self):
configuration=self.sync_job.configuration
)
self.data_provider.set_logger(self.sync_job.logger)
self.data_provider.set_error_monitor(self.error_monitor)
self.data_provider.set_framework_config(
self._data_source_framework_config()
)
Expand Down
105 changes: 105 additions & 0 deletions connectors/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,3 +994,108 @@ def get(self, key) -> int:

def to_dict(self):
return deepcopy(self._storage)

pass


class TooManyErrors(Exception):
pass


class ErrorMonitor:
def __init__(
self,
enabled=True,
max_total_errors=1000,
max_consecutive_errors=10,
max_error_rate=0.15,
error_window_size=100,
error_queue_size=10,
):
# When disabled, only track errors
self.enabled = enabled

self.max_error_rate = max_error_rate
self.error_window_size = error_window_size
self.error_window = [False] * error_window_size
self.error_window_index = 0

self.error_queue = []
self.error_queue_size = error_queue_size

self.consecutive_error_count = 0
self.max_consecutive_errors = max_consecutive_errors

self.max_total_errors = max_total_errors
self.total_success_count = 0
self.total_error_count = 0

def track_success(self):
self.consecutive_error_count = 0
self.total_success_count += 1
self._update_error_window(False)

def track_error(self, error):
self.total_error_count += 1
self.consecutive_error_count += 1

self.error_queue.append(error)

if len(self.error_queue) > self.error_queue_size:
self.error_queue.pop(0)

self._update_error_window(True)
self.last_error = error

self._raise_if_necessary()

def _update_error_window(self, value):
# We keep the errors array of the size self.error_window_size this way, imagine self.error_window_size = 5
# Error array inits as falses:
# [ false, false, false, false, false ]
# Third document raises an error:
# [ false, false, true, false, false ]
# ^^^^
# 2 % 5 == 2
# Fifth document raises an error:
# [ false, false, true, false, true ]
# ^^^^
# 4 % 5 == 4
# Sixth document raises an error:
# [ true, false, true, false, true ]
# ^^^^
# 5 % 5 == 0
#
# Eigth document is successful:
# [ true, false, false, false, true ]
# ^^^^^
# 7 % 5 == 2
# And so on.
self.error_window[self.error_window_index] = value
self.error_window_index = (self.error_window_index + 1) % self.error_window_size

def _error_window_error_rate(self):
if self.error_window_size == 0:
return 0

errors = list(filter(lambda x: x is True, self.error_window))

error_rate = len(errors) / self.error_window_size

return error_rate

def _raise_if_necessary(self):
if not self.enabled:
return

if self.consecutive_error_count > self.max_consecutive_errors:
msg = f"Exceeded maximum consecutive errors - saw {self.consecutive_error_count} errors in a row. Last error: {self.last_error}"
raise TooManyErrors(msg) from self.last_error
elif self.total_error_count > self.max_total_errors:
msg = f"Exceeded maximum total error count - saw {self.total_error_count} errors. Last error: {self.last_error}"
raise TooManyErrors(msg) from self.last_error
elif self.error_window_size > 0:
error_rate = self._error_window_error_rate()
if error_rate > self.max_error_rate:
msg = f"Exceeded maximum error ratio of {self.max_error_rate} for last {self.error_window_size} operations. Last error: {self.last_error}"
raise TooManyErrors(msg) from self.last_error
Loading