Skip to content

Commit 6bb7d8b

Browse files
authored
Feature/s3 alternative download (#55)
* add new parameters for an alternative download * add skip_storage_download property * remove messages and better parametrization * update prefix usage * fix 404 error when downloading using key or keys items * fix ModuleNotFoundError: No module named 'pkg_resources'
1 parent 45b5d46 commit 6bb7d8b

File tree

5 files changed

+157
-49
lines changed

5 files changed

+157
-49
lines changed

.github/workflows/pytest.yml

+2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ jobs:
2929
run: |
3030
python -m pip install --upgrade pip
3131
pip install poetry
32+
poetry self add setuptools
3233
poetry install
34+
poetry run pip install setuptools
3335
- name: pytest
3436
run: |
3537
poetry run pytest -p no:warnings --tb=line

amazon_s3/s3reader.py

+108-39
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from typing import Any, Callable, Dict, List, Optional, Union
1414
from urllib.parse import quote, unquote
1515
from saia_ingest.config import Defaults
16-
from saia_ingest.utils import detect_file_extension
16+
from saia_ingest.utils import detect_file_extension, do_get, parse_date
1717
from saia_ingest.profile_utils import get_bearer_token, get_json_response_from_url
1818
from saia_ingest.file_utils import calculate_file_hash
1919

@@ -61,6 +61,7 @@ def __init__(
6161
source_doc_id: Optional[str] = None,
6262
alternative_document_service: Optional[Dict[str, str]] = None,
6363
detect_file_duplication: Optional[bool] = False,
64+
skip_storage_download: Optional[bool] = False,
6465
**kwargs: Any,
6566
) -> None:
6667
"""Initialize S3 bucket and key, along with credentials if needed.
@@ -123,6 +124,14 @@ def __init__(
123124
self.source_base_url = source_base_url
124125
self.source_doc_id = source_doc_id
125126
self.detect_file_duplication = detect_file_duplication
127+
self.skip_storage_download = skip_storage_download
128+
self.headers = {
129+
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
130+
}
131+
self.metadata_key_element = None
132+
self.description_template = None
133+
self.timestamp_tag = 'publishdate'
134+
self.extension_tag = 'fileextension'
126135

127136
self.s3 = None
128137
self.s3_client = None
@@ -202,6 +211,7 @@ def get_files_from_url(self) -> list[str]:
202211
file_paths = []
203212
downloaded_files = []
204213
doc_nums = []
214+
prefix = ""
205215

206216
bearer_params = self.alternative_document_service.get('bearer_token', None)
207217
if bearer_params is None:
@@ -213,6 +223,13 @@ def get_files_from_url(self) -> list[str]:
213223
self.bearer_scope = bearer_params.get('scope', None)
214224
self.bearer_grant_type = bearer_params.get('grant_type', None)
215225

226+
metadata_whitelist_items = self.alternative_document_service.get('metadata_whitelist_items', [])
227+
self.timestamp_tag = self.alternative_document_service.get('timestamp_tag', 'publishdate')
228+
self.extension_tag = self.alternative_document_service.get('extension_tag', 'fileextension')
229+
self.metadata_key_element = self.alternative_document_service.get('metadata_key_element', 'documentid')
230+
self.description_template = self.alternative_document_service.get('description_template', None)
231+
self.skip_storage_download = self.alternative_document_service.get('skip_storage_download', False)
232+
216233
key_vault_params = self.alternative_document_service.get('key_vault', None)
217234
if key_vault_params is not None:
218235
key_vault_name = key_vault_params.get('name', None)
@@ -313,7 +330,7 @@ def get_files_from_url(self) -> list[str]:
313330
doc_name = item.get('docname', '')
314331
file_type = item.get('filetype', 'None')
315332

316-
if file_type is not None and not self.is_supported_extension(file_type.lower()):
333+
if not self.skip_storage_download and file_type is not None and not self.is_supported_extension(file_type.lower()):
317334
skip_count += 1
318335
self.skip_dict[doc_num] = item
319336
logging.getLogger().warning(f"{doc_num} '{doc_name}' with '{file_type}' extension discarded")
@@ -326,18 +343,56 @@ def get_files_from_url(self) -> list[str]:
326343
complete_file_path = f"{temp_dir}/{doc_num}.{extension}"
327344
if os.path.exists(complete_file_path):
328345
continue
329-
try:
330-
self.download_s3_file(doc_num, temp_dir, downloaded_files)
331-
doc_nums.append(doc_num)
332-
except Exception as e:
333-
self.error_count += 1
334-
self.error_dict[doc_num] = item
335-
logging.getLogger().error(f"Error downloading {original_key} '{doc_name}' {e}")
336-
continue
337346

338-
# add item to be processed later
339-
self.element_ids.add(doc_num)
340-
self.element_dict[doc_num] = item
347+
if self.skip_storage_download:
348+
doc_num = item.get('document_id', None)
349+
url = item.get('url', None)
350+
filename_with_extension = os.path.basename(url)
351+
doc_name, file_extension = os.path.splitext(filename_with_extension)
352+
file_extension = file_extension.lstrip(".")
353+
file_type = self.get_file_extension(filename_with_extension)
354+
original_key = f"{self.prefix}/{doc_num}" if self.prefix else doc_num
355+
try:
356+
response = do_get(None, url, self.headers)
357+
if response.status_code == 200:
358+
filename = f"{doc_num}.{file_type}"
359+
complete_file_path = f"{temp_dir}/{filename}"
360+
content = response.content
361+
with open(complete_file_path, 'wb') as file:
362+
file.write(content)
363+
downloaded_files.append(complete_file_path)
364+
doc_nums.append(doc_num)
365+
complete_metadata_file_path = f"{temp_dir}/{doc_num}{self.json_extension}"
366+
filtered_metadata_item = self.get_metadata_whitelist_items(item, metadata_whitelist_items)
367+
filtered_metadata_item.update({self.extension_tag: file_type})
368+
self.write_object_to_file(filtered_metadata_item, complete_metadata_file_path)
369+
if self.use_augment_metadata:
370+
self.element_ids.add(doc_num)
371+
self.element_dict[doc_num] = filtered_metadata_item
372+
user_metadata = self.augment_metadata(temp_dir, filename, filtered_metadata_item, self.timestamp_tag)
373+
if user_metadata:
374+
self.write_object_to_file(user_metadata, complete_metadata_file_path)
375+
else:
376+
logging.getLogger().error(f"Failed to download file. Status code: {response.status_code}")
377+
continue
378+
except Exception as e:
379+
logging.getLogger().error(f"Error downloading {doc_num} '{doc_name}' {e}")
380+
continue
381+
else:
382+
try:
383+
prefix = f"{self.prefix}/{doc_num}" if self.prefix else doc_num
384+
self.download_s3_file(prefix, temp_dir, downloaded_files)
385+
doc_nums.append(doc_num)
386+
except Exception as e:
387+
self.error_count += 1
388+
self.error_dict[doc_num] = item
389+
logging.getLogger().error(f"Error downloading {original_key} '{doc_name}' {e}")
390+
continue
391+
392+
if not self.skip_storage_download:
393+
# add item to be processed later
394+
self.element_ids.add(doc_num)
395+
self.element_dict[doc_num] = item
341396

342397
logging.getLogger().debug(f" {original_key} to {doc_num}")
343398

@@ -350,7 +405,7 @@ def get_files_from_url(self) -> list[str]:
350405
_ = self.save_debug(self.element_dict, prefix='denodo')
351406

352407
if self.process_files:
353-
_ = self.rename_files(downloaded_files, temp_dir, self.excluded_exts, None, self.json_extension, self.prefix + '/', 'fileextension')
408+
_ = self.rename_files(downloaded_files, temp_dir, self.excluded_exts, None, self.json_extension, prefix)
354409

355410
if self.element_ids is not None and len(self.element_ids) > 0:
356411
file_paths = []
@@ -365,6 +420,14 @@ def get_files_from_url(self) -> list[str]:
365420
file_paths = [os.path.join(temp_dir, f) for f in os.listdir(temp_dir) if os.path.isfile(os.path.join(temp_dir, f)) and not f.endswith((self.json_extension, self.metadata_extension))]
366421
return file_paths
367422

423+
def get_metadata_whitelist_items(self, initial: dict, metadata_whitelist: list[str]) -> dict:
424+
"""
425+
Filters the given dictionary, keeping only keys that exist in metadata_whitelist.
426+
427+
:param initial: Dictionary to be filtered.
428+
:return: A new dictionary containing only the whitelisted keys.
429+
"""
430+
return {k: v for k, v in initial.items() if k in metadata_whitelist}
368431

369432
def save_debug(self, serialized_docs: any, prefix:str) -> str:
370433
debug_folder = os.path.join(os.getcwd(), 'debug')
@@ -389,7 +452,7 @@ def get_files(self) -> list[str]:
389452
if self.use_local_folder:
390453

391454
if self.process_files:
392-
_ = self.rename_files(None, self.local_folder, self.excluded_exts, None, self.json_extension, self.prefix + '/', 'fileextension')
455+
_ = self.rename_files(None, self.local_folder, self.excluded_exts, None, self.json_extension, self.prefix + '/')
393456

394457
for f in os.listdir(self.local_folder):
395458
f_extension = self.get_file_extension(f)
@@ -411,12 +474,14 @@ def get_files(self) -> list[str]:
411474

412475
if self.key:
413476
logging.getLogger().info(f"key: '{self.key}'")
414-
self.download_s3_file(self.key, temp_dir, file_paths)
477+
original_key = f"{self.prefix}/{self.key}" if self.prefix else self.key
478+
self.download_s3_file(original_key, temp_dir, file_paths)
415479
count = 1
416480
elif self.keys:
417481
logging.getLogger().info(f"keys: '{len(self.keys)}'")
418482
for key in self.keys:
419-
self.download_s3_file(key, temp_dir, file_paths)
483+
original_key = f"{self.prefix}/{key}" if self.prefix else key
484+
self.download_s3_file(original_key, temp_dir, file_paths)
420485
count = len(self.keys)
421486
else:
422487
bucket = self.s3.Bucket(self.bucket)
@@ -464,7 +529,7 @@ def get_files(self) -> list[str]:
464529
self.skip_count = skip_count
465530

466531
if self.process_files:
467-
renamed_files = self.rename_files(file_paths, temp_dir, self.excluded_exts, None, self.json_extension, self.prefix + '/', 'fileextension')
532+
renamed_files = self.rename_files(file_paths, temp_dir, self.excluded_exts, None, self.json_extension, self.prefix + '/')
468533

469534
if file_paths is None:
470535
file_paths = [os.path.join(temp_dir, f) for f in os.listdir(temp_dir) if os.path.isfile(os.path.join(temp_dir, f)) and not f.endswith((self.json_extension, self.metadata_extension))]
@@ -477,7 +542,9 @@ def download_s3_file(self, key: str, temp_dir: str, file_paths: list):
477542
"""Download a single file"""
478543
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/download_file.html#S3.Client.download_file
479544
filepath = f"{temp_dir}/{key}"
480-
original_key = f"{self.prefix}/{key}" if self.prefix else key
545+
folder_path = os.path.dirname(filepath)
546+
os.makedirs(folder_path, exist_ok=True)
547+
original_key = key
481548
try:
482549
self.s3.meta.client.download_file(self.bucket, original_key, filepath)
483550
file_paths.append(filepath)
@@ -531,7 +598,6 @@ def rename_files(
531598
main_extension: str,
532599
metadata_extension: str,
533600
key_prefix: str,
534-
extension_tag: str = 'fileextension'
535601
) -> list[str]:
536602
'''Process all files in a folder, renaming them and adding metadata files'''
537603
if not os.path.exists(folder_path):
@@ -549,12 +615,10 @@ def rename_files(
549615
file_name = os.path.splitext(os.path.basename(file_item))[0]
550616
files.append(file_name)
551617

552-
timestamp_tag = 'publishdate'
553-
554618
renamed_files = []
555619
# Process each file
556620
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_parallel_executions) as executor:
557-
futures = [executor.submit(self.rename_file, folder_path, excluded_exts, main_extension, metadata_extension, key_prefix, file_item, timestamp_tag, extension_tag) for file_item in files]
621+
futures = [executor.submit(self.rename_file, folder_path, excluded_exts, main_extension, metadata_extension, key_prefix, file_item, self.timestamp_tag, self.extension_tag) for file_item in files]
558622
for future in concurrent.futures.as_completed(futures):
559623
try:
560624
file_path = future.result()
@@ -656,10 +720,9 @@ def augment_metadata(
656720
# Remove entries where the value is not desired
657721
initial_metadata = {k: v for k, v in input_metadata.items() if v not in [None, 'null', '']}
658722
try:
659-
id = initial_metadata.get('documentid', '')
723+
id = initial_metadata.get(self.metadata_key_element, '')
660724
date_string = initial_metadata.get(timestamp_tag, '')
661725
date_string_description = date_string
662-
date_string_format = "%m/%d/%Y"
663726
doc_url = None
664727

665728
merge_metadata_elements = self.alternative_document_service.get('merge_metadata', False)
@@ -674,18 +737,18 @@ def augment_metadata(
674737
if mapping:
675738
initial_metadata = self.update_with_mapping(initial_metadata, mapping)
676739
date_string = initial_metadata.get(timestamp_tag, date_string)
677-
date_string_format = "%Y-%m-%dT%H:%M:%S"
678740
doc_url = initial_metadata.get('docurl', None)
679741

680742
if date_string is not None:
681-
# Change from MM/DD/YYYY to YYYYMMDD format
682-
date_object = datetime.strptime(date_string, date_string_format)
683-
formatted_date = date_object.strftime("%Y%m%d")
684-
date_string_description = f"{date_object.month}/{date_object.day}/{date_object.year}"
685-
year = date_object.strftime("%Y")
686-
# Add year
687-
initial_metadata[timestamp_tag] = formatted_date
688-
initial_metadata['year'] = year
743+
try:
744+
formatted_date, day, month, year = parse_date(date_string)
745+
date_string_description = f"{month}/{day}/{year}"
746+
initial_metadata['date_description'] = date_string_description
747+
# Add year
748+
initial_metadata[timestamp_tag] = formatted_date
749+
initial_metadata['year'] = year
750+
except ValueError as e:
751+
logging.getLogger().error(f"{e}")
689752

690753
if self.detect_file_duplication:
691754
file_path = f"{folder_path}/{document_name}"
@@ -700,12 +763,18 @@ def augment_metadata(
700763
source_url = f"{self.source_base_url}?{self.source_doc_id}={id}&CONTDISP=INLINE"
701764
initial_metadata['url'] = source_url
702765

703-
name = initial_metadata.get('filename', id)
704-
activity = initial_metadata.get('disclosureactivity', '')
705-
description = f"{name} | {date_string_description} | {activity}"
706-
707-
initial_metadata['description'] = description
766+
self.generate_description(initial_metadata, date_string_description, id)
708767
except Exception as e:
709768
logging.getLogger().error(f"Error augmenting metadata for '{document_name}' from {initial_metadata} Error: {e}")
710769

711770
return initial_metadata
771+
772+
def generate_description(self, initial_metadata, date_string_description, id:any):
773+
774+
if not self.description_template is None:
775+
description = self.description_template.format(**initial_metadata)
776+
else:
777+
name = initial_metadata.get('filename', id)
778+
activity = initial_metadata.get('disclosureactivity', '')
779+
description = f"{name} | {date_string_description} | {activity}"
780+
initial_metadata['description'] = description

saia_ingest/file_utils.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def are_files_identical(file1: Path, file2: Path) -> bool:
3232
return False
3333

3434

35-
def load_hashes_from_json(folder: Path) -> Dict[str, Any]:
35+
def load_hashes_from_json(folder: Path, id:str='documentid') -> Dict[str, Any]:
3636
"""Load all existing hashes from JSON files in the folder."""
3737
hash_index = {}
3838
duplicate_count = 0
@@ -42,15 +42,16 @@ def load_hashes_from_json(folder: Path) -> Dict[str, Any]:
4242
data = json.load(f)
4343
if Defaults.FILE_HASH in data:
4444
file_hash = data[Defaults.FILE_HASH]
45-
document_id = data["documentid"]
45+
document_id = data[id]
4646
if file_hash in hash_index:
4747
duplicate_count += 1
4848
logging.getLogger().warning(f"{document_id} duplicate detected: using {hash_index[file_hash]}")
4949
else:
5050
hash_index[file_hash] = document_id
5151
except (json.JSONDecodeError, FileNotFoundError) as e:
5252
print(f"Error reading {json_file}: {e}")
53-
logging.getLogger().warning(f"{duplicate_count} duplicates found")
53+
if duplicate_count > 0:
54+
logging.getLogger().warning(f"{duplicate_count} duplicates found")
5455
return hash_index
5556

5657

@@ -66,6 +67,5 @@ def check_for_duplicates(new_file: Path, folder: Path) -> bool:
6667
print(f"Duplicate found! {new_file} matches {hash_index[new_file_hash]}")
6768
return True
6869
else:
69-
print(f"No duplicates found for {new_file}.")
7070
return False
7171

saia_ingest/ingestor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ def ingest_s3(
553553

554554
saia_file_ids_to_delete = search_failed_to_delete(file_paths)
555555
if detect_file_duplication and len(file_paths) > 0:
556-
hash_index = load_hashes_from_json(Path(download_dir))
556+
hash_index = load_hashes_from_json(Path(download_dir), loader.metadata_key_element)
557557
duplicate_ids = []
558558
for new_file in file_paths[:]: # Iterate over a copy
559559
new_file_hash = calculate_file_hash(new_file)

0 commit comments

Comments
 (0)