Skip to content

Commit 37c30fe

Browse files
Add bulk-update command and support for TIMDEX parquet dataset (#359)
Why these changes are being introduced: * The timdex-index-manager (TIM) app needs to support the v2 parquet dataset, which now contains records for both indexing and deleting. The new CLI command performs a "bulk update" given a subset of the dataset (filtered by 'run_date' and 'run_id') and uses the timdex-dataset-api library to read records from the TIMDEXDataset. By introducing a new CLI command, it doesn't require the feature flagging approach, allowing the existing CLI commands and helper functions to remain untouched for v1 purposes. How this addresses that need: * Implement 'bulk-update' CLI command * Add unit tests for 'bulk-update' * Update README Side effects of this change: * TIM remains backwards v1 compatible but will now support v2 runs. Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-428
1 parent b6e8893 commit 37c30fe

File tree

8 files changed

+1140
-766
lines changed

8 files changed

+1140
-766
lines changed

Makefile

+10-1
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,13 @@ dist-stage:
9090
publish-stage:
9191
docker login -u AWS -p $$(aws ecr get-login-password --region us-east-1) $(ECR_URL_STAGE)
9292
docker push $(ECR_URL_STAGE):latest
93-
docker push $(ECR_URL_STAGE):`git describe --always`
93+
docker push $(ECR_URL_STAGE):`git describe --always`
94+
95+
##############################
96+
# Local Opensearch commands
97+
##############################
98+
99+
local-opensearch: # Run a local instance of Opensearch via Docker Compose
100+
docker pull opensearchproject/opensearch:latest
101+
docker pull opensearchproject/opensearch-dashboards:latest
102+
docker compose --env-file .env up

Pipfile

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ opensearch-py = "*"
1111
rich-click = "*"
1212
sentry-sdk = "*"
1313
smart-open = {extras = ["s3"], version = "*"}
14+
timdex-dataset-api = { git = "git+https://github.com/MITLibraries/timdex-dataset-api.git"}
1415

1516
[dev-packages]
1617
black = "*"

Pipfile.lock

+947-737
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

+13-24
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ TIMDEX! Index Manager (TIM) is a Python CLI application for managing TIMDEX indi
99
- To update dependencies: `make update`
1010
- To run unit tests: `make test`
1111
- To lint the repo: `make lint`
12+
- To run local OpenSearch with Docker: `make local-opensearch`
1213
- To run the app: `pipenv run tim --help`
1314

1415
**Important note:** The sections that follow provide instructions for running OpenSearch **locally with Docker**. These instructions are useful for testing. Please make sure the environment variable `TIMDEX_OPENSEARCH_ENDPOINT` is **not** set before proceeding.
@@ -92,34 +93,21 @@ For a more detailed example with test data, please refer to the Confluence docum
9293

9394
### Required ENV
9495

95-
```
96-
# Set to `dev` for local development, this will be set to `stage` and `prod` in those environments by Terraform.
97-
WORKSPACE=dev
96+
```shell
97+
WORKSPACE=### Set to `dev` for local development, this will be set to `stage` and `prod` in those environments by Terraform.
9898
```
9999
100100
## Optional ENV
101101
102-
```
103-
# Only needed if AWS region changes from the default of us-east-1.
104-
AWS_REGION=
105-
106-
# Chunk size limit for sending requests to the bulk indexing endpoint, in bytes. Defaults to 104857600 (100 * 1024 * 1024) if not set.
107-
OPENSEARCH_BULK_MAX_CHUNK_BYTES=
108-
109-
# Maximum number of retries when sending requests to the bulk indexing endpoint. Defaults to 50 if not set.
110-
OPENSEARCH_BULK_MAX_RETRIES=
111-
112-
# Only used for OpenSearch requests that tend to take longer than the default timeout of 10 seconds, such as bulk or index refresh requests. Defaults to 120 seconds if not set.
113-
OPENSEARCH_REQUEST_TIMEOUT=
114-
115-
# The ingest process logs the # of records indexed every nth record. Set this env variable to any integer to change the frequency of logging status updates. Can be useful for development/debugging. Defaults to 1000 if not set.
116-
STATUS_UPDATE_INTERVAL=
117-
118-
# If using a local Docker OpenSearch instance, this isn't needed. Otherwise set to OpenSearch instance endpoint without the http scheme (e.g., "search-timdex-env-1234567890.us-east-1.es.amazonaws.com"). Can also be passed directly to the CLI via the `--url` option.
119-
TIMDEX_OPENSEARCH_ENDPOINT=
120-
121-
# If set to a valid Sentry DSN, enables Sentry exception monitoring This is not needed for local development.
122-
SENTRY_DSN=
102+
```shell
103+
AWS_REGION=### Only needed if AWS region changes from the default of us-east-1.
104+
OPENSEARCH_BULK_MAX_CHUNK_BYTES=### Chunk size limit for sending requests to the bulk indexing endpoint, in bytes. Defaults to 104857600 (100 * 1024 * 1024) if not set.
105+
OPENSEARCH_BULK_MAX_RETRIES=### Maximum number of retries when sending requests to the bulk indexing endpoint. Defaults to 50 if not set.
106+
OPENSEARCH_INITIAL_ADMIN_PASSWORD=###If using a local Docker OpenSearch instance, this must be set (for versions >= 2.12.0).
107+
OPENSEARCH_REQUEST_TIMEOUT=### Only used for OpenSearch requests that tend to take longer than the default timeout of 10 seconds, such as bulk or index refresh requests. Defaults to 120 seconds if not set.
108+
STATUS_UPDATE_INTERVAL=### The ingest process logs the # of records indexed every nth record. Set this env variable to any integer to change the frequency of logging status updates. Can be useful for development/debugging. Defaults to 1000 if not set.
109+
TIMDEX_OPENSEARCH_ENDPOINT=### If using a local Docker OpenSearch instance, this isn't needed. Otherwise set to OpenSearch instance endpoint without the http scheme (e.g., "search-timdex-env-1234567890.us-east-1.es.amazonaws.com"). Can also be passed directly to the CLI via the `--url` option.
110+
SENTRY_DSN=### If set to a valid Sentry DSN, enables Sentry exception monitoring This is not needed for local development.
123111
```
124112
125113
## CLI commands
@@ -153,6 +141,7 @@ Usage: tim [OPTIONS] COMMAND [ARGS]...
153141
╭─ Bulk record processing commands ───────────────────────────────────────────────────────────────────────────────────────────────────╮
154142
│ bulk-index Bulk index records into an index. │
155143
│ bulk-delete Bulk delete records from an index. │
144+
│ bulk-update Bulk update records from an index. │
156145
╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
157146
```
158147

compose.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ services:
1010
- discovery.type=single-node
1111
- bootstrap.memory_lock=true
1212
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
13+
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
1314
volumes:
1415
- opensearch-local-data:/usr/share/opensearch/data
1516
networks:
@@ -21,6 +22,7 @@ services:
2122
environment:
2223
- "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true"
2324
- 'OPENSEARCH_HOSTS=["http://opensearch:9200"]'
25+
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
2426
networks:
2527
- opensearch-local-net
2628
volumes:

pyproject.toml

+2-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ disallow_untyped_defs = true
77
exclude = ["tests/"]
88

99
[[tool.mypy.overrides]]
10-
module = ["ijson", "smart_open"]
10+
module = ["ijson", "smart_open", "timdex_dataset_api"]
1111
ignore_missing_imports = true
1212

1313
[tool.pytest.ini_options]
@@ -27,8 +27,6 @@ select = ["ALL", "PT"]
2727

2828
ignore = [
2929
# default
30-
"ANN101",
31-
"ANN102",
3230
"COM812",
3331
"D107",
3432
"N812",
@@ -41,6 +39,7 @@ ignore = [
4139
"D102",
4240
"D103",
4341
"D104",
42+
"G004",
4443
"PLR0912",
4544
"PLR0913",
4645
"PLR0915",

tests/test_cli.py

+97
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
import json
12
import re
3+
from unittest.mock import MagicMock, patch
24

35
from freezegun import freeze_time
46

57
from tim.cli import main
8+
from tim.errors import BulkIndexingError
69

710
from .conftest import EXIT_CODES, my_vcr
811

@@ -256,3 +259,97 @@ def test_bulk_delete_with_source_success(caplog, runner):
256259
"from index 'alma-2022-09-01t00-00-00'" in caplog.text
257260
)
258261
assert "Bulk deletion complete!" in caplog.text
262+
263+
264+
@patch("timdex_dataset_api.dataset.TIMDEXDataset.load")
265+
@patch("tim.helpers.validate_bulk_cli_options")
266+
@patch("tim.opensearch.bulk_delete")
267+
@patch("tim.opensearch.bulk_index")
268+
def test_bulk_update_with_source_success(
269+
mock_bulk_index,
270+
mock_bulk_delete,
271+
mock_validate_bulk_cli_options,
272+
mock_timdex_dataset,
273+
caplog,
274+
monkeypatch,
275+
runner,
276+
):
277+
monkeypatch.delenv("TIMDEX_OPENSEARCH_ENDPOINT", raising=False)
278+
mock_bulk_index.return_value = {
279+
"created": 1000,
280+
"updated": 0,
281+
"errors": 0,
282+
"total": 1000,
283+
}
284+
mock_bulk_delete.return_value = {"deleted": 0, "errors": 0, "total": 0}
285+
mock_validate_bulk_cli_options.return_value = "alma"
286+
mock_timdex_dataset.return_value = MagicMock()
287+
288+
result = runner.invoke(
289+
main,
290+
[
291+
"bulk-update",
292+
"--source",
293+
"alma",
294+
"--run-date",
295+
"2024-12-01",
296+
"--run-id",
297+
"abc123",
298+
"s3://test-timdex-bucket/dataset",
299+
],
300+
)
301+
assert result.exit_code == EXIT_CODES["success"]
302+
assert (
303+
"Bulk update complete: "
304+
f'{{"index": {json.dumps(mock_bulk_index())}, '
305+
f'"delete": {json.dumps(mock_bulk_delete())}}}' in caplog.text
306+
)
307+
308+
309+
@patch("timdex_dataset_api.dataset.TIMDEXDataset.load")
310+
@patch("tim.helpers.validate_bulk_cli_options")
311+
@patch("tim.opensearch.bulk_delete")
312+
@patch("tim.opensearch.bulk_index")
313+
def test_bulk_update_with_source_raise_bulk_indexing_error(
314+
mock_bulk_index,
315+
mock_bulk_delete,
316+
mock_validate_bulk_cli_options,
317+
mock_timdex_dataset,
318+
caplog,
319+
monkeypatch,
320+
runner,
321+
):
322+
monkeypatch.delenv("TIMDEX_OPENSEARCH_ENDPOINT", raising=False)
323+
mock_bulk_index.side_effect = BulkIndexingError(
324+
record="alma:0", index="index", error="exception"
325+
)
326+
mock_bulk_delete.return_value = {"deleted": 0, "errors": 0, "total": 0}
327+
mock_validate_bulk_cli_options.return_value = "alma"
328+
mock_timdex_dataset.return_value = MagicMock()
329+
330+
index_results_default = {
331+
"created": 0,
332+
"updated": 0,
333+
"errors": 0,
334+
"total": 0,
335+
}
336+
337+
result = runner.invoke(
338+
main,
339+
[
340+
"bulk-update",
341+
"--source",
342+
"alma",
343+
"--run-date",
344+
"2024-12-01",
345+
"--run-id",
346+
"abc123",
347+
"s3://test-timdex-bucket/dataset",
348+
],
349+
)
350+
assert result.exit_code == EXIT_CODES["success"]
351+
assert (
352+
"Bulk update complete: "
353+
f'{{"index": {json.dumps(index_results_default)}, '
354+
f'"delete": {json.dumps(mock_bulk_delete())}}}' in caplog.text
355+
)

tim/cli.py

+68-1
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
# ruff: noqa: TRY003, EM101
2+
import json
23
import logging
34
from datetime import timedelta
45
from time import perf_counter
56

67
import rich_click as click
8+
from timdex_dataset_api import TIMDEXDataset
79

810
from tim import errors, helpers
911
from tim import opensearch as tim_os
1012
from tim.config import PRIMARY_ALIAS, VALID_SOURCES, configure_logger, configure_sentry
13+
from tim.errors import BulkIndexingError
1114

1215
logger = logging.getLogger(__name__)
1316

@@ -23,7 +26,7 @@
2326
},
2427
{
2528
"name": "Bulk record processing commands",
26-
"commands": ["bulk-index", "bulk-delete"],
29+
"commands": ["bulk-index", "bulk-delete", "bulk-update"],
2730
},
2831
]
2932
}
@@ -252,6 +255,7 @@ def promote(ctx: click.Context, index: str, alias: list[str]) -> None:
252255
# Bulk record processing commands
253256

254257

258+
# NOTE: FEATURE FLAG: 'bulk_index' supports ETL v1
255259
@main.command()
256260
@click.option("-i", "--index", help="Name of the index to bulk index records into.")
257261
@click.option(
@@ -295,6 +299,7 @@ def bulk_index(ctx: click.Context, index: str, source: str, filepath: str) -> No
295299
)
296300

297301

302+
# NOTE: FEATURE FLAG: 'bulk_delete' supports ETL v1
298303
@main.command()
299304
@click.option("-i", "--index", help="Name of the index to bulk delete records from.")
300305
@click.option(
@@ -334,3 +339,65 @@ def bulk_delete(ctx: click.Context, index: str, source: str, filepath: str) -> N
334339
results["deleted"],
335340
results["total"],
336341
)
342+
343+
344+
@main.command()
345+
@click.option(
346+
"-i",
347+
"--index",
348+
help="Name of the index on which to perform bulk indexing and deletion.",
349+
)
350+
@click.option(
351+
"-s",
352+
"--source",
353+
type=click.Choice(VALID_SOURCES),
354+
help="Source whose primary-aliased index to bulk index records into.",
355+
)
356+
@click.option("-d", "--run-date", help="Run date, formatted as YYYY-MM-DD.")
357+
@click.option("-rid", "--run-id", help="Run ID.")
358+
@click.argument("dataset_path", type=click.Path())
359+
@click.pass_context
360+
def bulk_update(
361+
ctx: click.Context,
362+
index: str,
363+
source: str,
364+
run_date: str,
365+
run_id: str,
366+
dataset_path: str,
367+
) -> None:
368+
"""Bulk update records for an index.
369+
370+
Must provide either the name of an existing index in the cluster or a valid source.
371+
If source is provided, it will perform indexing and/or deletion of records for
372+
the primary-aliased index for the source.
373+
374+
The method will read transformed records from a TIMDEXDataset
375+
located at dataset_path using the 'timdex-dataset-api' library. The dataset
376+
is filtered by run date and run ID.
377+
378+
Logs an error and aborts if the provided index doesn't exist in the cluster.
379+
"""
380+
client = ctx.obj["CLIENT"]
381+
index = helpers.validate_bulk_cli_options(index, source, client)
382+
383+
logger.info(f"Bulk updating records from dataset '{dataset_path}' into '{index}'")
384+
385+
index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0}
386+
delete_results = {"deleted": 0, "errors": 0, "total": 0}
387+
388+
td = TIMDEXDataset(location=dataset_path)
389+
td.load(run_date=run_date, run_id=run_id)
390+
391+
# bulk index records
392+
records_to_index = td.read_transformed_records_iter(action="index")
393+
try:
394+
index_results.update(tim_os.bulk_index(client, index, records_to_index))
395+
except BulkIndexingError as exception:
396+
logger.info(f"Bulk indexing failed: {exception}")
397+
398+
# bulk delete records
399+
records_to_delete = td.read_dicts_iter(columns=["timdex_record_id"], action="delete")
400+
delete_results.update(tim_os.bulk_delete(client, index, records_to_delete))
401+
402+
summary_results = {"index": index_results, "delete": delete_results}
403+
logger.info(f"Bulk update complete: {json.dumps(summary_results)}")

0 commit comments

Comments
 (0)