Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/s3 alternative download #55

Merged
merged 8 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install poetry
poetry self add setuptools
poetry install
poetry run pip install setuptools
- name: pytest
run: |
poetry run pytest -p no:warnings --tb=line
147 changes: 108 additions & 39 deletions amazon_s3/s3reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import quote, unquote
from saia_ingest.config import Defaults
from saia_ingest.utils import detect_file_extension
from saia_ingest.utils import detect_file_extension, do_get, parse_date
from saia_ingest.profile_utils import get_bearer_token, get_json_response_from_url
from saia_ingest.file_utils import calculate_file_hash

Expand Down Expand Up @@ -61,6 +61,7 @@ def __init__(
source_doc_id: Optional[str] = None,
alternative_document_service: Optional[Dict[str, str]] = None,
detect_file_duplication: Optional[bool] = False,
skip_storage_download: Optional[bool] = False,
**kwargs: Any,
) -> None:
"""Initialize S3 bucket and key, along with credentials if needed.
Expand Down Expand Up @@ -123,6 +124,14 @@ def __init__(
self.source_base_url = source_base_url
self.source_doc_id = source_doc_id
self.detect_file_duplication = detect_file_duplication
self.skip_storage_download = skip_storage_download
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
self.metadata_key_element = None
self.description_template = None
self.timestamp_tag = 'publishdate'
self.extension_tag = 'fileextension'

self.s3 = None
self.s3_client = None
Expand Down Expand Up @@ -202,6 +211,7 @@ def get_files_from_url(self) -> list[str]:
file_paths = []
downloaded_files = []
doc_nums = []
prefix = ""

bearer_params = self.alternative_document_service.get('bearer_token', None)
if bearer_params is None:
Expand All @@ -213,6 +223,13 @@ def get_files_from_url(self) -> list[str]:
self.bearer_scope = bearer_params.get('scope', None)
self.bearer_grant_type = bearer_params.get('grant_type', None)

metadata_whitelist_items = self.alternative_document_service.get('metadata_whitelist_items', [])
self.timestamp_tag = self.alternative_document_service.get('timestamp_tag', 'publishdate')
self.extension_tag = self.alternative_document_service.get('extension_tag', 'fileextension')
self.metadata_key_element = self.alternative_document_service.get('metadata_key_element', 'documentid')
self.description_template = self.alternative_document_service.get('description_template', None)
self.skip_storage_download = self.alternative_document_service.get('skip_storage_download', False)

key_vault_params = self.alternative_document_service.get('key_vault', None)
if key_vault_params is not None:
key_vault_name = key_vault_params.get('name', None)
Expand Down Expand Up @@ -313,7 +330,7 @@ def get_files_from_url(self) -> list[str]:
doc_name = item.get('docname', '')
file_type = item.get('filetype', 'None')

if file_type is not None and not self.is_supported_extension(file_type.lower()):
if not self.skip_storage_download and file_type is not None and not self.is_supported_extension(file_type.lower()):
skip_count += 1
self.skip_dict[doc_num] = item
logging.getLogger().warning(f"{doc_num} '{doc_name}' with '{file_type}' extension discarded")
Expand All @@ -326,18 +343,56 @@ def get_files_from_url(self) -> list[str]:
complete_file_path = f"{temp_dir}/{doc_num}.{extension}"
if os.path.exists(complete_file_path):
continue
try:
self.download_s3_file(doc_num, temp_dir, downloaded_files)
doc_nums.append(doc_num)
except Exception as e:
self.error_count += 1
self.error_dict[doc_num] = item
logging.getLogger().error(f"Error downloading {original_key} '{doc_name}' {e}")
continue

# add item to be processed later
self.element_ids.add(doc_num)
self.element_dict[doc_num] = item
if self.skip_storage_download:
doc_num = item.get('document_id', None)
url = item.get('url', None)
filename_with_extension = os.path.basename(url)
doc_name, file_extension = os.path.splitext(filename_with_extension)
file_extension = file_extension.lstrip(".")
file_type = self.get_file_extension(filename_with_extension)
original_key = f"{self.prefix}/{doc_num}" if self.prefix else doc_num
try:
response = do_get(None, url, self.headers)
if response.status_code == 200:
filename = f"{doc_num}.{file_type}"
complete_file_path = f"{temp_dir}/{filename}"
content = response.content
with open(complete_file_path, 'wb') as file:
file.write(content)
downloaded_files.append(complete_file_path)
doc_nums.append(doc_num)
complete_metadata_file_path = f"{temp_dir}/{doc_num}{self.json_extension}"
filtered_metadata_item = self.get_metadata_whitelist_items(item, metadata_whitelist_items)
filtered_metadata_item.update({self.extension_tag: file_type})
self.write_object_to_file(filtered_metadata_item, complete_metadata_file_path)
if self.use_augment_metadata:
self.element_ids.add(doc_num)
self.element_dict[doc_num] = filtered_metadata_item
user_metadata = self.augment_metadata(temp_dir, filename, filtered_metadata_item, self.timestamp_tag)
if user_metadata:
self.write_object_to_file(user_metadata, complete_metadata_file_path)
else:
logging.getLogger().error(f"Failed to download file. Status code: {response.status_code}")
continue
except Exception as e:
logging.getLogger().error(f"Error downloading {doc_num} '{doc_name}' {e}")
continue
else:
try:
prefix = f"{self.prefix}/{doc_num}" if self.prefix else doc_num
self.download_s3_file(prefix, temp_dir, downloaded_files)
doc_nums.append(doc_num)
except Exception as e:
self.error_count += 1
self.error_dict[doc_num] = item
logging.getLogger().error(f"Error downloading {original_key} '{doc_name}' {e}")
continue

if not self.skip_storage_download:
# add item to be processed later
self.element_ids.add(doc_num)
self.element_dict[doc_num] = item

logging.getLogger().debug(f" {original_key} to {doc_num}")

Expand All @@ -350,7 +405,7 @@ def get_files_from_url(self) -> list[str]:
_ = self.save_debug(self.element_dict, prefix='denodo')

if self.process_files:
_ = self.rename_files(downloaded_files, temp_dir, self.excluded_exts, None, self.json_extension, self.prefix + '/', 'fileextension')
_ = self.rename_files(downloaded_files, temp_dir, self.excluded_exts, None, self.json_extension, prefix)

if self.element_ids is not None and len(self.element_ids) > 0:
file_paths = []
Expand All @@ -365,6 +420,14 @@ def get_files_from_url(self) -> list[str]:
file_paths = [os.path.join(temp_dir, f) for f in os.listdir(temp_dir) if os.path.isfile(os.path.join(temp_dir, f)) and not f.endswith((self.json_extension, self.metadata_extension))]
return file_paths

def get_metadata_whitelist_items(self, initial: dict, metadata_whitelist: list[str]) -> dict:
"""
Filters the given dictionary, keeping only keys that exist in metadata_whitelist.

:param initial: Dictionary to be filtered.
:return: A new dictionary containing only the whitelisted keys.
"""
return {k: v for k, v in initial.items() if k in metadata_whitelist}

def save_debug(self, serialized_docs: any, prefix:str) -> str:
debug_folder = os.path.join(os.getcwd(), 'debug')
Expand All @@ -389,7 +452,7 @@ def get_files(self) -> list[str]:
if self.use_local_folder:

if self.process_files:
_ = self.rename_files(None, self.local_folder, self.excluded_exts, None, self.json_extension, self.prefix + '/', 'fileextension')
_ = self.rename_files(None, self.local_folder, self.excluded_exts, None, self.json_extension, self.prefix + '/')

for f in os.listdir(self.local_folder):
f_extension = self.get_file_extension(f)
Expand All @@ -411,12 +474,14 @@ def get_files(self) -> list[str]:

if self.key:
logging.getLogger().info(f"key: '{self.key}'")
self.download_s3_file(self.key, temp_dir, file_paths)
original_key = f"{self.prefix}/{self.key}" if self.prefix else self.key
self.download_s3_file(original_key, temp_dir, file_paths)
count = 1
elif self.keys:
logging.getLogger().info(f"keys: '{len(self.keys)}'")
for key in self.keys:
self.download_s3_file(key, temp_dir, file_paths)
original_key = f"{self.prefix}/{key}" if self.prefix else key
self.download_s3_file(original_key, temp_dir, file_paths)
count = len(self.keys)
else:
bucket = self.s3.Bucket(self.bucket)
Expand Down Expand Up @@ -464,7 +529,7 @@ def get_files(self) -> list[str]:
self.skip_count = skip_count

if self.process_files:
renamed_files = self.rename_files(file_paths, temp_dir, self.excluded_exts, None, self.json_extension, self.prefix + '/', 'fileextension')
renamed_files = self.rename_files(file_paths, temp_dir, self.excluded_exts, None, self.json_extension, self.prefix + '/')

if file_paths is None:
file_paths = [os.path.join(temp_dir, f) for f in os.listdir(temp_dir) if os.path.isfile(os.path.join(temp_dir, f)) and not f.endswith((self.json_extension, self.metadata_extension))]
Expand All @@ -477,7 +542,9 @@ def download_s3_file(self, key: str, temp_dir: str, file_paths: list):
"""Download a single file"""
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/download_file.html#S3.Client.download_file
filepath = f"{temp_dir}/{key}"
original_key = f"{self.prefix}/{key}" if self.prefix else key
folder_path = os.path.dirname(filepath)
os.makedirs(folder_path, exist_ok=True)
original_key = key
try:
self.s3.meta.client.download_file(self.bucket, original_key, filepath)
file_paths.append(filepath)
Expand Down Expand Up @@ -531,7 +598,6 @@ def rename_files(
main_extension: str,
metadata_extension: str,
key_prefix: str,
extension_tag: str = 'fileextension'
) -> list[str]:
'''Process all files in a folder, renaming them and adding metadata files'''
if not os.path.exists(folder_path):
Expand All @@ -549,12 +615,10 @@ def rename_files(
file_name = os.path.splitext(os.path.basename(file_item))[0]
files.append(file_name)

timestamp_tag = 'publishdate'

renamed_files = []
# Process each file
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_parallel_executions) as executor:
futures = [executor.submit(self.rename_file, folder_path, excluded_exts, main_extension, metadata_extension, key_prefix, file_item, timestamp_tag, extension_tag) for file_item in files]
futures = [executor.submit(self.rename_file, folder_path, excluded_exts, main_extension, metadata_extension, key_prefix, file_item, self.timestamp_tag, self.extension_tag) for file_item in files]
for future in concurrent.futures.as_completed(futures):
try:
file_path = future.result()
Expand Down Expand Up @@ -656,10 +720,9 @@ def augment_metadata(
# Remove entries where the value is not desired
initial_metadata = {k: v for k, v in input_metadata.items() if v not in [None, 'null', '']}
try:
id = initial_metadata.get('documentid', '')
id = initial_metadata.get(self.metadata_key_element, '')
date_string = initial_metadata.get(timestamp_tag, '')
date_string_description = date_string
date_string_format = "%m/%d/%Y"
doc_url = None

merge_metadata_elements = self.alternative_document_service.get('merge_metadata', False)
Expand All @@ -674,18 +737,18 @@ def augment_metadata(
if mapping:
initial_metadata = self.update_with_mapping(initial_metadata, mapping)
date_string = initial_metadata.get(timestamp_tag, date_string)
date_string_format = "%Y-%m-%dT%H:%M:%S"
doc_url = initial_metadata.get('docurl', None)

if date_string is not None:
# Change from MM/DD/YYYY to YYYYMMDD format
date_object = datetime.strptime(date_string, date_string_format)
formatted_date = date_object.strftime("%Y%m%d")
date_string_description = f"{date_object.month}/{date_object.day}/{date_object.year}"
year = date_object.strftime("%Y")
# Add year
initial_metadata[timestamp_tag] = formatted_date
initial_metadata['year'] = year
try:
formatted_date, day, month, year = parse_date(date_string)
date_string_description = f"{month}/{day}/{year}"
initial_metadata['date_description'] = date_string_description
# Add year
initial_metadata[timestamp_tag] = formatted_date
initial_metadata['year'] = year
except ValueError as e:
logging.getLogger().error(f"{e}")

if self.detect_file_duplication:
file_path = f"{folder_path}/{document_name}"
Expand All @@ -700,12 +763,18 @@ def augment_metadata(
source_url = f"{self.source_base_url}?{self.source_doc_id}={id}&CONTDISP=INLINE"
initial_metadata['url'] = source_url

name = initial_metadata.get('filename', id)
activity = initial_metadata.get('disclosureactivity', '')
description = f"{name} | {date_string_description} | {activity}"

initial_metadata['description'] = description
self.generate_description(initial_metadata, date_string_description, id)
except Exception as e:
logging.getLogger().error(f"Error augmenting metadata for '{document_name}' from {initial_metadata} Error: {e}")

return initial_metadata

def generate_description(self, initial_metadata, date_string_description, id:any):

if not self.description_template is None:
description = self.description_template.format(**initial_metadata)
else:
name = initial_metadata.get('filename', id)
activity = initial_metadata.get('disclosureactivity', '')
description = f"{name} | {date_string_description} | {activity}"
initial_metadata['description'] = description
8 changes: 4 additions & 4 deletions saia_ingest/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def are_files_identical(file1: Path, file2: Path) -> bool:
return False


def load_hashes_from_json(folder: Path) -> Dict[str, Any]:
def load_hashes_from_json(folder: Path, id:str='documentid') -> Dict[str, Any]:
"""Load all existing hashes from JSON files in the folder."""
hash_index = {}
duplicate_count = 0
Expand All @@ -42,15 +42,16 @@ def load_hashes_from_json(folder: Path) -> Dict[str, Any]:
data = json.load(f)
if Defaults.FILE_HASH in data:
file_hash = data[Defaults.FILE_HASH]
document_id = data["documentid"]
document_id = data[id]
if file_hash in hash_index:
duplicate_count += 1
logging.getLogger().warning(f"{document_id} duplicate detected: using {hash_index[file_hash]}")
else:
hash_index[file_hash] = document_id
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"Error reading {json_file}: {e}")
logging.getLogger().warning(f"{duplicate_count} duplicates found")
if duplicate_count > 0:
logging.getLogger().warning(f"{duplicate_count} duplicates found")
return hash_index


Expand All @@ -66,6 +67,5 @@ def check_for_duplicates(new_file: Path, folder: Path) -> bool:
print(f"Duplicate found! {new_file} matches {hash_index[new_file_hash]}")
return True
else:
print(f"No duplicates found for {new_file}.")
return False

2 changes: 1 addition & 1 deletion saia_ingest/ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ def ingest_s3(

saia_file_ids_to_delete = search_failed_to_delete(file_paths)
if detect_file_duplication and len(file_paths) > 0:
hash_index = load_hashes_from_json(Path(download_dir))
hash_index = load_hashes_from_json(Path(download_dir), loader.metadata_key_element)
duplicate_ids = []
for new_file in file_paths[:]: # Iterate over a copy
new_file_hash = calculate_file_hash(new_file)
Expand Down
Loading