Skip to content

Commit

Permalink
Merge pull request #63 from openclimatefix/feature/issue-31
Browse files Browse the repository at this point in the history
add chunk functionality for met office archiving
  • Loading branch information
jcamier authored Feb 23, 2025
2 parents df05d79 + 0e8d821 commit a8808ba
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
14 changes: 11 additions & 3 deletions src/open_data_pvnet/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,20 @@ def parallel_archive(
raise future.exception()


def handle_monthly_consolidation(provider: str, year: int, month: int, **kwargs):
def handle_monthly_consolidation(**kwargs):
"""Handle consolidating data into zarr.zip files."""
logger.debug(f"Received kwargs: {kwargs}")
chunks = parse_chunks(kwargs.get("chunks"))
base_path = Path("data")
year = kwargs.get("year")
month = kwargs.get("month")
day = kwargs.get("day")

logger.debug(f"Extracted values - year: {year}, month: {month}")

if year is None or month is None:
raise ValueError("Year and month must be specified for consolidation")

try:
if day is not None:
# Consolidate a single day
Expand All @@ -284,9 +292,9 @@ def handle_monthly_consolidation(provider: str, year: int, month: int, **kwargs)
for file in successful_files:
logger.info(f"- {file}")

# Now create the monthly file
# Now create the monthly file with safe_chunks=False
logger.info("\nCreating monthly consolidated file")
monthly_file = merge_days_to_month(base_path, year, month, chunks)
monthly_file = merge_days_to_month(base_path, year, month, chunks, safe_chunks=False)
logger.info(f"Successfully created monthly file: {monthly_file}")
else:
logger.warning("No daily files were created, cannot create monthly file")
Expand Down
21 changes: 17 additions & 4 deletions src/open_data_pvnet/utils/data_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,19 @@ def load_zarr_data_for_day( # noqa: C901
logger.warning(f"Error closing store: {e}")


def save_consolidated_zarr(dataset: xr.Dataset, output_path: Union[str, Path]) -> None:
def save_consolidated_zarr(
dataset: xr.Dataset, output_path: Union[str, Path], safe_chunks: bool = False
) -> None:
"""Save a consolidated dataset to zarr format.
Args:
dataset: The xarray Dataset to save
output_path: Path where to save the zarr archive
safe_chunks: If False, allow chunk sizes that might overlap dask chunks
"""
logger.info(f"Saving consolidated dataset to {output_path}")
temp_dir = None

try:
# Get the first variable name from the dataset
first_var = list(dataset.data_vars)[0]
Expand All @@ -303,7 +308,14 @@ def save_consolidated_zarr(dataset: xr.Dataset, output_path: Union[str, Path]) -

# Save to temporary zarr directory
logger.info("Writing to temporary directory...")
dataset.to_zarr(temp_dir, mode="w", encoding=encoding, compute=True, consolidated=True)
dataset.to_zarr(
temp_dir,
mode="w",
encoding=encoding,
compute=True,
consolidated=True,
safe_chunks=safe_chunks,
)

# Create zip archive
logger.info("Creating zip archive...")
Expand All @@ -320,7 +332,7 @@ def save_consolidated_zarr(dataset: xr.Dataset, output_path: Union[str, Path]) -

finally:
# Cleanup temporary directory
if temp_dir.exists():
if temp_dir is not None and temp_dir.exists():
shutil.rmtree(temp_dir)


Expand Down Expand Up @@ -424,6 +436,7 @@ def merge_days_to_month(
year: int,
month: int,
chunks: Optional[dict] = None,
safe_chunks: bool = False,
) -> Path:
"""Merge all daily zarr.zip files in a month into a single monthly zarr.zip file."""
logger.info(f"\nMerging daily files for {year}-{month:02d}")
Expand Down Expand Up @@ -489,7 +502,7 @@ def merge_days_to_month(

# Save consolidated monthly file
logger.info(f"Saving monthly dataset to: {monthly_output}")
save_consolidated_zarr(monthly_dataset, monthly_output)
save_consolidated_zarr(monthly_dataset, monthly_output, safe_chunks=safe_chunks)
logger.info(f"Successfully created monthly file: {monthly_output}")

# Close all datasets
Expand Down

0 comments on commit a8808ba

Please sign in to comment.