Skip to content

Commit

Permalink
Create reanalysis_data.py
Browse files Browse the repository at this point in the history
  • Loading branch information
aclerc committed Apr 22, 2024
1 parent ec95c1f commit 4e0b0eb
Showing 1 changed file with 187 additions and 0 deletions.
187 changes: 187 additions & 0 deletions wind_up/reanalysis_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
from dataclasses import dataclass

import pandas as pd

from wind_up.constants import (
REANALYSIS_WD_COL,
REANALYSIS_WS_COL,
ROWS_PER_HOUR,
TIMEBASE_PD_TIMEDELTA,
TIMESTAMP_COL,
)
from wind_up.models import PlotConfig, WindUpConfig
from wind_up.plots.reanalysis_plots import plot_find_best_shift_and_corr, plot_wf_and_reanalysis_sample_timeseries


def reanalysis_upsample(reanalysis_raw_df: pd.DataFrame) -> pd.DataFrame:
reanalysis_df = reanalysis_raw_df.resample(TIMEBASE_PD_TIMEDELTA, label="left").last()
upsample_factor = round(len(reanalysis_df) / len(reanalysis_raw_df))
# extend the end of the index e.g. by 50 minutes for 10 minute timebase
reanalysis_df = pd.concat(
[
reanalysis_df,
pd.DataFrame(
index=pd.date_range(
start=reanalysis_df.index[-1] + pd.Timedelta(TIMEBASE_PD_TIMEDELTA),
periods=upsample_factor - 1,
freq=TIMEBASE_PD_TIMEDELTA,
),
),
],
)
reanalysis_df = reanalysis_df.ffill(limit=upsample_factor - 1)
if "100_m_hws_mean_mps" in reanalysis_df.columns and "100_m_hwd_mean_deg-n_true" in reanalysis_df.columns:
reanalysis_df = reanalysis_df.rename(
columns={"100_m_hws_mean_mps": REANALYSIS_WS_COL, "100_m_hwd_mean_deg-n_true": REANALYSIS_WD_COL},
)
elif "50_m_hws_mean_mps" in reanalysis_df.columns and "50_m_hwd_mean_deg-n_true" in reanalysis_df.columns:
reanalysis_df = reanalysis_df.rename(
columns={"50_m_hws_mean_mps": REANALYSIS_WS_COL, "50_m_hwd_mean_deg-n_true": REANALYSIS_WD_COL},
)
else:
msg = "reanalysis wind speed and direction columns not found"
raise RuntimeError(msg)
reanalysis_df.index.name = TIMESTAMP_COL
return reanalysis_df


def find_best_shift_and_corr(
*,
wf_ws_df: pd.DataFrame,
reanalysis_df: pd.DataFrame,
wf_name: str,
datastream_id: str,
plot_cfg: PlotConfig | None,
wf_ws_lower_limit: float = 0,
) -> tuple[float, int]:
ws_filt_df = wf_ws_df["WindSpeedMean"][wf_ws_df["WindSpeedMean"] >= wf_ws_lower_limit].to_frame()
best_s = -99
best_corr = -99.0
shifts = []
corrs = []
for s in range(round(-24 * ROWS_PER_HOUR), round(24 * ROWS_PER_HOUR)):
this_corr = float(ws_filt_df.corrwith(reanalysis_df[REANALYSIS_WS_COL].shift(s)).squeeze())
shifts.append(s)
corrs.append(this_corr)
if this_corr > best_corr:
best_corr = this_corr
best_s = s
if plot_cfg is not None:
plot_find_best_shift_and_corr(
wf_ws_df=ws_filt_df,
reanalysis_df=reanalysis_df,
shifts=shifts,
corrs=corrs,
wf_name=wf_name,
datastream_id=datastream_id,
best_corr=best_corr,
best_s=best_s,
plot_cfg=plot_cfg,
)

print(f"{datastream_id} best correlation is {best_corr:.6f} with a shift of {best_s}")

return best_corr, best_s


def calc_wf_mean_wind_speed_df(
wf_df: pd.DataFrame,
*,
num_turbines: int,
allowed_data_coverage_width: float,
) -> pd.DataFrame:
wf_ws_df = wf_df.groupby(TIMESTAMP_COL).agg(
WindSpeedMean=pd.NamedAgg(column="WindSpeedMean", aggfunc=lambda x: x.mean()),
data_coverage=pd.NamedAgg(column="WindSpeedMean", aggfunc=lambda x: x.count() / num_turbines),
)
median_data_coverage = wf_ws_df["data_coverage"].median()
wf_ws_df = wf_ws_df[(wf_ws_df["data_coverage"] - median_data_coverage).abs() < allowed_data_coverage_width / 2]
return wf_ws_df.dropna()


def calc_filename_from_dsid_and_dates(dsid: str, date_from: pd.Timestamp, date_to: pd.Timestamp) -> str:
return f"{dsid}_{date_from.strftime('%Y%m%d')}_{date_to.strftime('%Y%m%d')}.parquet"


def get_dsid_and_dates_from_filename(filename: str) -> tuple[str, pd.Timestamp, pd.Timestamp]:
fname = filename.replace(".parquet", "")
date_to = fname.split("_")[-1]
date_from = fname.split("_")[-2]
dsid = "_".join(fname.split("_")[:-2])
return (
dsid,
pd.to_datetime(date_from, format="%Y%m%d").tz_localize("UTC"),
pd.to_datetime(date_to, format="%Y%m%d").tz_localize("UTC"),
)


@dataclass
class ReanalysisDataset:
id: str
data: pd.DataFrame


def add_reanalysis_data(
wf_df: pd.DataFrame,
*,
reanalysis_datasets: list[ReanalysisDataset],
cfg: WindUpConfig,
plot_cfg: PlotConfig | None,
require_full_coverage: bool = False,
) -> pd.DataFrame:
data_coverage_width = 0.1
wf_ws_df = calc_wf_mean_wind_speed_df(
wf_df,
num_turbines=len(cfg.asset.wtgs),
allowed_data_coverage_width=data_coverage_width,
)
max_data_coverage_width = 0.3
while len(wf_ws_df) < (60 * 24 * ROWS_PER_HOUR) and data_coverage_width < max_data_coverage_width:
data_coverage_width += 0.05
wf_ws_df = calc_wf_mean_wind_speed_df(
wf_df,
num_turbines=len(cfg.asset.wtgs),
allowed_data_coverage_width=data_coverage_width,
)

best_id = None
best_df = None
best_corr = -99.0
best_s = None
for reanalysis_dataset in reanalysis_datasets:
_starts_later = reanalysis_dataset.data.index.min() > cfg.lt_first_dt_utc_start
_ends_earlier = reanalysis_dataset.data.index.max() < cfg.analysis_last_dt_utc_start
if (_starts_later or _ends_earlier) and require_full_coverage:
continue # skip to next dataset

dsid = reanalysis_dataset.id
this_reanalysis_df = reanalysis_upsample(reanalysis_dataset.data)

this_corr, this_s = find_best_shift_and_corr(
wf_ws_df=wf_ws_df,
reanalysis_df=this_reanalysis_df,
wf_name=cfg.asset.name,
datastream_id=dsid,
plot_cfg=plot_cfg,
)
if this_corr > best_corr:
best_id = dsid
best_df = this_reanalysis_df
best_corr = this_corr
best_s = this_s

if best_df is None:
msg = "no best_id found"
raise RuntimeError(msg)

print(f"{best_id} has best correlation: {best_corr:.3f} with a shift of {best_s}")

wf_and_reanalysis_df = wf_df.merge(
best_df.shift(best_s)[[REANALYSIS_WS_COL, REANALYSIS_WD_COL]],
how="left",
left_index=True,
right_index=True,
)
if plot_cfg is not None:
plot_wf_and_reanalysis_sample_timeseries(wf_df=wf_and_reanalysis_df, plot_cfg=plot_cfg)
return wf_and_reanalysis_df

0 comments on commit 4e0b0eb

Please sign in to comment.