Skip to content

Commit

Permalink
added retry logic for insert batch
Browse files Browse the repository at this point in the history
  • Loading branch information
vkrishna1084 committed Feb 27, 2025
1 parent 0af5014 commit bc9329a
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions vsb/databases/opensearch/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,24 @@ def __init__(
self.namespace = namespace

def insert_batch(self, batch: RecordList):
actions = []
action = {"index": {"_index": self.index_name}}
for rec in batch:
vector_document = {"vsb_vec_id": rec.id, "v_content": np.array(rec.values)}
actions.append(action)
actions.append(vector_document)

# Bulk ingest documents
upload_response = self.client.bulk(body=actions)
@retry(
wait=wait_exponential_jitter(initial=0.1, jitter=0.1),
stop=stop_after_attempt(5),
after=after_log(logger, logging.DEBUG),
)
def do_insert_with_retry():
actions = []
action = {"index": {"_index": self.index_name}}
for rec in batch:
vector_document = {"vsb_vec_id": rec.id, "v_content": np.array(rec.values)}
actions.append(action)
actions.append(vector_document)
# Bulk ingest documents
return self.client.bulk(body=actions,request_timeout=600)

upload_response = do_insert_with_retry()
if upload_response["errors"]:
logger.debug(f"OpenSearchDB: Error inserting batch: {upload_response['errors']}")

def update_batch(self, batch: list[Record]):
self.insert_batch(batch)
Expand Down Expand Up @@ -102,7 +111,7 @@ def __init__(
self.client = OpenSearch(
hosts=[{"host": self.host, "port": 443}],
http_auth=awsauth,
timeout=300,
timeout=600,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
Expand Down

0 comments on commit bc9329a

Please sign in to comment.