Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snowwormhole operation #33

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 53 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -3,8 +3,58 @@

SnowCast team SkiingPeople

# Goal
## Overview

Predict SWE with reliable accuracy and stability.
This document provides an overview of a comprehensive full-stack workflow designed for forecasting snow water equivalent (SWE). The workflow includes multiple nodes representing different stages and components involved in data processing, model creation, training, prediction, and deployment.

# Team Members
## Getting Started

### Prerequisites

- Python 3.8 or higher

### Installation

- Install Geoweaver:
-- Download and install Geoweaver from geoweaver.dev.

- Download the Workflow:
-- Download the ZIP file containing the workflow from the release page.
-- Extract the ZIP file to your desired location.
-- Loading and Running the Workflow

- Open Geoweaver:
-- Launch Geoweaver on your machine.

- Import the Workflow:
--Use the import button in Geoweaver to load the latest version of the workflow from the extracted directory.

- Run the Workflow:
-- After importing, you can view the workflow nodes and their configurations in Geoweaver.
-- Click the run button in Geoweaver to execute the workflow step-by-step as configured.

### Workflow Description

The workflow begins with data collection and integration from various sources. Nodes such as data_sentinel2, data_terrainFeatures, data_gee_modis_station_only, data_gee_sentinel1_station_only, and others handle the gathering and preprocessing of satellite data, terrain features, and meteorological data. These datasets are crucial for accurate model training and predictions.

Next, the workflow transitions to model creation and training. Nodes like model_creation_lstm, model_creation_ghostnet, model_creation_xgboost, and several others are used to set up different types of machine learning models. The model_train_validate node is then executed to train and validate these models using the integrated data. This step ensures that the models are well-trained and capable of making accurate predictions.

Finally, the workflow focuses on generating predictions and deploying the results. The model_predict node is used to produce predictions based on the trained models. Real-time data testing is performed using nodes such as amsr_testing_realtime and gridmet_testing. Visualization and deployment of results are managed by nodes like convert_results_to_images and deploy_images_to_website, allowing users to easily interpret and share the outcomes.

## Contributing

We welcome contributions to enhance and expand this workflow. To contribute, follow these steps:

- Fork the repository.
- Create a new branch (git checkout -b feature/your-feature).
- Commit your changes (git commit -am 'Add new feature').
- Push to the branch (git push origin feature/your-feature).
- Create a new Pull Request.

By participating in this project, you can help improve data processing, model training, and prediction accuracy. Your contributions will enable others to benefit from advanced data analysis and machine learning techniques.

## License
This project is licensed under the MIT License - see the LICENSE file for details.

## Acknowledgments
Thanks to all contributors and open-source projects that made this workflow possible. Your efforts in data collection, model development, and community support are greatly appreciated.
211 changes: 156 additions & 55 deletions code/amsr_features.py
Original file line number Diff line number Diff line change
@@ -5,7 +5,12 @@
import numpy as np
import pandas as pd
from datetime import datetime
import dask
import dask.dataframe as dd
import dask.delayed as delayed
import dask.bag as db
import xarray as xr
from snowcast_utils import work_dir, train_start_date, train_end_date

def copy_he5_files(source_dir, destination_dir):
'''
@@ -49,63 +54,159 @@ def find_closest_index(target_latitude, target_longitude, lat_grid, lon_grid):

return lat_idx, lon_idx, lat_grid[lat_idx, lon_idx], lon_grid[lat_idx, lon_idx]

def extract_amsr_values_save_to_csv(amsr_data_dir, output_csv_file):
'''
Extract AMSR data values and save them to a CSV file.

Args:
amsr_data_dir (str): The directory containing AMSR .he5 files.
output_csv_file (str): The path to the output CSV file.

Returns:
None
'''
def create_snotel_station_to_amsr_mapper(
new_base_station_list_file,
target_csv_path
):
station_data = pd.read_csv(new_base_station_list_file)


date = "2022-10-01"
date = date.replace("-", ".")
he5_date = date.replace(".", "")

# Check if the CSV already exists

if os.path.exists(target_csv_path):
print(f"File {target_csv_path} already exists, skipping..")
df = pd.read_csv(target_csv_path)
return df

target_amsr_hdf_path = f"{work_dir}/amsr_testing/testing_amsr_{date}.he5"
if os.path.exists(target_amsr_hdf_path):
print(f"File {target_amsr_hdf_path} already exists, skip downloading..")
else:
cmd = f"curl --output {target_amsr_hdf_path} -b ~/.urs_cookies -c ~/.urs_cookies -L -n -O https://n5eil01u.ecs.nsidc.org/AMSA/AU_DySno.001/{date}/AMSR_U2_L3_DailySnow_B02_{he5_date}.he5"
print(f'Running command: {cmd}')
subprocess.run(cmd, shell=True)

df = pd.DataFrame(columns=['amsr_lat', 'amsr_lon',
'amsr_lat_idx', 'amsr_lon_idx',
'station_lat', 'station_lon'])
# Read the HDF
file = h5py.File(target_amsr_hdf_path, 'r')
hem_group = file['HDFEOS/GRIDS/Northern Hemisphere']
lat = hem_group['lat'][:]
lon = hem_group['lon'][:]

# Replace NaN values with 0
lat = np.nan_to_num(lat, nan=0.0)
lon = np.nan_to_num(lon, nan=0.0)

# Convert the AMSR grid into our gridMET 1km grid
for idx, row in station_data.iterrows():
target_lat = row['latitude']
target_lon = row['longitude']

# compare the performance and find the fastest way to search nearest point
closest_lat_idx, closest_lon_idx, closest_lat, closest_lon = find_closest_index(target_lat, target_lon, lat, lon)
df.loc[len(df.index)] = [closest_lat,
closest_lon,
closest_lat_idx,
closest_lon_idx,
target_lat,
target_lon]

# Save the new converted AMSR to CSV file
df.to_csv(target_csv_path, index=False)

print('AMSR mapper csv is created.')
return df


def extract_amsr_values_save_to_csv(amsr_data_dir, output_csv_file, new_base_station_list_file, start_date, end_date):
if os.path.exists(output_csv_file):
os.remove(output_csv_file)

# Open the output CSV file in append mode using the csv module
with open(output_csv_file, 'a', newline='') as csvfile:
# Create a csv writer object
writer = csv.writer(csvfile)

# If the file is empty, write the header row
if os.path.getsize(output_csv_file) == 0:
writer.writerow(["date", "lat", "lon", "amsr_swe"])

# Loop through all the .he5 files in the directory
for filename in os.listdir(amsr_data_dir):
if filename.endswith('.he5'):
file_path = os.path.join(amsr_data_dir, filename)
print(file_path)

data_field_ds = xr.open_dataset(file_path, group='/HDFEOS/GRIDS/Northern Hemisphere/Data Fields')
swe_df = data_field_ds["SWE_NorthernDaily"].values

latlon_ds = xr.open_dataset(file_path, group='/HDFEOS/GRIDS/Northern Hemisphere')
lat_df = latlon_ds["lat"].values
lon_df = latlon_ds["lon"].values

swe_variable = data_field_ds['SWE_NorthernDaily'].assign_coords(
lat=latlon_ds['lat'],
lon=latlon_ds['lon']
)

date_str = filename.split('_')[-1].split('.')[0]
date = datetime.strptime(date_str, '%Y%m%d')
df_val = swe_variable.to_dataframe()

swe_variable = swe_variable.to_dataframe().reset_index()
for idx, row in station_data.iterrows():
desired_lat = row['lat']
desired_lon = row['lon']
swe_variable['distance'] = np.sqrt((swe_variable['lat'] - desired_lat)**2 + (swe_variable['lon'] - desired_lon)**2)
closest_row = swe_variable.loc[swe_variable['distance'].idxmin()]
writer.writerow([date, desired_lat, desired_lon, closest_row['SWE_NorthernDaily']])

amsr_data_dir = '/home/chetana/gridmet_test_run/amsr'
output_csv_file = '/home/chetana/gridmet_test_run/training_amsr_data_tmp.csv'

station_cell_mapping = pd.read_csv('/home/chetana/gridmet_test_run/training_test_cords.csv')

extract_amsr_values_save_to_csv(amsr_data_dir, output_csv_file)
target_csv_path = f'{work_dir}/training_snotel_station_to_amsr_mapper.csv'
mapper_df = create_snotel_station_to_amsr_mapper(new_base_station_list_file,
target_csv_path)

# station_data = pd.read_csv(new_base_station_list_file)

start_date = datetime.strptime(start_date, "%Y-%m-%d")
end_date = datetime.strptime(end_date, "%Y-%m-%d")

# Create a Dask DataFrame
dask_station_data = dd.from_pandas(mapper_df, npartitions=1)

# Function to process each file
def process_file(filename):
file_path = os.path.join(amsr_data_dir, filename)
print(file_path)

file = h5py.File(file_path, 'r')
hem_group = file['HDFEOS/GRIDS/Northern Hemisphere']

date_str = filename.split('_')[-1].split('.')[0]
date = datetime.strptime(date_str, '%Y%m%d')

if not (start_date <= date <= end_date):
print(f"{date} is not in the training period, skipping..")
return None

new_date_str = date.strftime("%Y-%m-%d")
swe = hem_group['Data Fields/SWE_NorthernDaily'][:]
flag = hem_group['Data Fields/Flags_NorthernDaily'][:]
# Create an empty Pandas DataFrame with the desired columns
result_df = pd.DataFrame(columns=['date', 'lat', 'lon', 'AMSR_SWE'])

# Sample loop to add rows to the Pandas DataFrame using dask.delayed
@delayed
def process_row(row, swe, new_date_str):
closest_lat_idx = int(row['amsr_lat_idx'])
closest_lon_idx = int(row['amsr_lon_idx'])
closest_swe = swe[closest_lat_idx, closest_lon_idx]

return pd.DataFrame([[
new_date_str,
row['station_lat'],
row['station_lon'],
closest_swe]],
columns=result_df.columns
)


# List of delayed computations
delayed_results = [process_row(row, swe, new_date_str) for _, row in mapper_df.iterrows()]

# Compute the delayed results and concatenate them into a Pandas DataFrame
result_df = dask.compute(*delayed_results)
result_df = pd.concat(result_df, ignore_index=True)

# Print the final Pandas DataFrame
#print(result_df)

return result_df

# Get the list of files
files = [f for f in os.listdir(amsr_data_dir) if f.endswith('.he5')]

# Create a Dask Bag from the files
dask_bag = db.from_sequence(files, npartitions=2)

# Process files in parallel
processed_data = dask_bag.map(process_file).filter(lambda x: x is not None).compute()

# Concatenate the processed data
combined_df = pd.concat(processed_data, ignore_index=True)

# Save the combined DataFrame to a CSV file
combined_df.to_csv(output_csv_file, index=False)

print(f"Merged data saved to {output_csv_file}")


if __name__ == "__main__":
amsr_data_dir = '/home/chetana/gridmet_test_run/amsr'
new_base_station_list_file = f"{work_dir}/all_snotel_cdec_stations_active_in_westus.csv"
new_base_df = pd.read_csv(new_base_station_list_file)
print(new_base_df.head())
output_csv_file = f"{new_base_station_list_file}_amsr_dask.csv"

start_date = train_start_date
end_date = train_end_date

extract_amsr_values_save_to_csv(amsr_data_dir, output_csv_file, new_base_station_list_file, start_date, end_date)

Loading