Skip to content

Commit

Permalink
Merge remote-tracking branch 'origine-main/main' into Feature-add-Pos…
Browse files Browse the repository at this point in the history
…tgresql-support

# Conflicts:
#	openstef_dbc/data_interface.py
  • Loading branch information
TheoPascoli committed Sep 4, 2024
2 parents 0de32ae + 05b3b0f commit edf2bdd
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 60 deletions.
13 changes: 6 additions & 7 deletions openstef_dbc/data_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,12 @@ def check_influx_available(self):

def exec_sql_query(self, query: str, params: dict = None):
try:
connection = self.sql_engine.connect()
if params is None:
params = {}
cursor = connection.execute(query, **params)
connection.close()
if cursor.cursor is not None:
return pd.DataFrame(cursor.fetchall())
with self.sql_engine.connect() as connection:
if params is None:
params = {}
cursor = connection.execute(query, **params)
if cursor.cursor is not None:
return pd.DataFrame(cursor.fetchall())
except sqlalchemy.exc.OperationalError as e:
self.logger.error("Lost connection to {} database".format(self.sql_db_type.upper()), exc_info=e)
raise
Expand Down
7 changes: 5 additions & 2 deletions openstef_dbc/services/ems.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def get_load_sid(
datetime_end (datetime): End datetime.
forecast_resolution (str): The forecast resolution, for example '15T'
aggregated (boolean): Should the results be aggregated per sid or not.
average_output:
average_output (boolean): Should the agregation be an average or not.
include_n_entries_column (boolean): Should the number of systems agregated be returned or not.
Returns:
- pd.DataFrame(index=datetimeIndex, columns)
Expand Down Expand Up @@ -208,7 +209,9 @@ def get_load_pid(

# Use optimized load retrieval if possible
if aggregated and not ignore_factor:
return self._get_load_pid_optimized(pid, datetime_start, datetime_end)
return self._get_load_pid_optimized(
pid, datetime_start, datetime_end, forecast_resolution
)

# Get systems that belong to this prediction
systems = Systems().get_systems_by_pid(pid)
Expand Down
29 changes: 19 additions & 10 deletions openstef_dbc/services/model_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def get_model_input(
datetime_start: str = None,
datetime_end: str = None,
forecast_resolution: str = "15min",
market_price: str = "APX",
) -> pd.DataFrame:
"""Get model input.
Expand All @@ -44,7 +45,8 @@ def get_model_input(
datetime_end (datetime, optional): End datetime. Defaults to None.
forecast_resolution (str, optional): Time resolution of model input
(see pandas Date Offset frequency strings). Defaults to "15min".
market_price (str, optional): Name of the market place if market data is requested.
Default to "APX".
Returns:
pd.DataFrame: Model input.
"""
Expand Down Expand Up @@ -79,6 +81,8 @@ def get_model_input(
datetime_end=datetime_end,
forecast_resolution=forecast_resolution,
location=location,
country=country,
market_price=market_price,
)

# Create model input with datetime index
Expand All @@ -92,6 +96,7 @@ def get_model_input(
else:
self.logger.warning("No load data returned, fill with NaN.")
model_input["load"] = np.nan

# Add predictors
model_input = pd.concat([model_input, predictors], axis=1)

Expand All @@ -107,6 +112,7 @@ def get_solar_input(
datetime_start: datetime = None,
sid: str = None,
country: str = "NL",
source: str = "optimum",
) -> pd.DataFrame:
"""This function retrieves the radiation and cloud forecast for the nearest weather location
and the relevant pvdata from a specific system or region.
Expand All @@ -121,6 +127,10 @@ def get_solar_input(
- datetime_start: datetime of forecast
- source: preferred weather source as a string, default for wind is DSN
- country (str, optional): Country of were the location is located. Defaults to "NL", The Netherlands.
- source (str or list of str): which weather models should be used.
Options: "OWM", "DSN", "WUN", "harmonie", "harm_arome", "harm_arome_fallback", "icon", "optimum",
Default: 'optimum'. This combines harmonie, harm_arome, icon and DSN,
taking the (heuristicly) best available source for each moment in time
"""
if datetime_start is None:
datetime_start = datetime.utcnow()
Expand Down Expand Up @@ -149,13 +159,7 @@ def get_solar_input(
end = datetime_start + timedelta(minutes=forecast_horizon)

weather_data = Weather().get_weather_data(
location,
weather_params,
start,
end,
source="optimum",
resolution="15min",
country=country,
location, weather_params, start, end, source, forecast_resolution, country
)

# Interpolate weather data to 15 minute values
Expand All @@ -169,7 +173,12 @@ def get_solar_input(
# Get PV_load from influx (end time at start of forecast)
end = datetime_start
pvdata = Ems().get_load_sid(
sid, start, end, "15T", aggregated=True, average_output=radius == 0
sid,
start,
end,
forecast_resolution,
aggregated=True,
average_output=radius == 0,
)

# If no load was found return None
Expand Down Expand Up @@ -228,7 +237,7 @@ def get_wind_input(
datetime_start=datetime_start,
datetime_end=datetime_end,
source=source,
resolution="15min",
resolution=forecast_resolution,
country=country,
)

Expand Down
27 changes: 20 additions & 7 deletions openstef_dbc/services/predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ def get_predictors(
datetime_end: datetime.datetime,
forecast_resolution: Optional[str] = None,
location: Union[str, Tuple[float, float]] = None,
predictor_groups: Union[List[PredictorGroups], List[str], None] = None,
country: str = "NL",
source: Union[List[str], str] = "optimum",
predictor_groups: Union[List[PredictorGroups], List[str], None] = None,
market_price: str = "APX",
) -> pd.DataFrame:
"""Get predictors.
Expand All @@ -43,9 +45,15 @@ def get_predictors(
Defaults to None.
country (str, optional): Country code (for weather data).
Defaults to "NL".
source (str or list of str): which weather models should be used.
Options: "OWM", "DSN", "WUN", "harmonie", "harm_arome", "harm_arome_fallback", "icon", "optimum",
Default: 'optimum'. This combines harmonie, harm_arome, icon and DSN,
taking the (heuristicly) best available source for each moment in time
predictor_groups (Optional[List[str]], optional): The groups of predictors
to include (see the PredictorGroups enum for allowed values). When set to
None or not given all predictor groups will be returned. Defaults to None.
market_price (str, optional): Name of the market place if market data is requested.
Default to "APX".
Returns:
pd.DataFrame: Requested predictors with timezone aware datetime index.
Expand Down Expand Up @@ -76,18 +84,19 @@ def get_predictors(
location=location,
country=country,
forecast_resolution=forecast_resolution,
source=source,
)
predictors.append(weather_data_predictors)

if PredictorGroups.MARKET_DATA in predictor_groups:
market_data_predictors = self.get_market_data(
datetime_start, datetime_end, forecast_resolution=forecast_resolution
datetime_start, datetime_end, forecast_resolution, market_price
)
predictors.append(market_data_predictors)

if PredictorGroups.LOAD_PROFILES in predictor_groups:
load_profiles_predictors = self.get_load_profiles(
datetime_start, datetime_end, forecast_resolution=forecast_resolution
datetime_start, datetime_end, forecast_resolution
)
predictors.append(load_profiles_predictors)

Expand All @@ -98,9 +107,10 @@ def get_market_data(
datetime_start: datetime.datetime,
datetime_end: datetime.datetime,
forecast_resolution: str = None,
market_price: str = "APX",
) -> pd.DataFrame:
electricity_price = self.get_electricity_price(
datetime_start, datetime_end, forecast_resolution
datetime_start, datetime_end, forecast_resolution, market_price
)

return electricity_price
Expand All @@ -110,16 +120,18 @@ def get_electricity_price(
datetime_start: datetime.datetime,
datetime_end: datetime.datetime,
forecast_resolution: str = None,
market_price: str = "APX",
) -> pd.DataFrame:
bind_params = {
"_start": datetime_start,
"_stop": datetime_end,
"market_price": market_price,
}

query = f"""
from(bucket: "forecast_latest/autogen")
|> range(start: {bind_params["_start"].strftime('%Y-%m-%dT%H:%M:%SZ')}, stop: {bind_params["_stop"].strftime('%Y-%m-%dT%H:%M:%SZ')})
|> filter(fn: (r) => r._measurement == "marketprices" and r._field == "Price" and r.Name=="APX")
|> filter(fn: (r) => r._measurement == "marketprices" and r._field == "Price" and r.Name=="{bind_params["market_price"]}")
"""

result = _DataInterface.get_instance().exec_influx_query(query, bind_params)
Expand All @@ -137,7 +149,7 @@ def get_electricity_price(
start=datetime_start, end=datetime_end, freq=forecast_resolution
)
)
electricity_price.rename(columns=dict(Price="APX"), inplace=True)
electricity_price.rename(columns=dict(Price=market_price), inplace=True)

if forecast_resolution and electricity_price.empty is False:
electricity_price = electricity_price.resample(forecast_resolution).ffill()
Expand Down Expand Up @@ -243,6 +255,7 @@ def get_weather_data(
location: Union[Tuple[float, float], str],
forecast_resolution: str = None,
country: str = "NL",
source: Union[List[str], str] = "optimum",
) -> pd.DataFrame:
# Get weather data
weather_params = [
Expand All @@ -266,7 +279,7 @@ def get_weather_data(
weather_params,
datetime_start,
datetime_end,
source="optimum",
source=source,
country=country,
)

Expand Down
28 changes: 22 additions & 6 deletions openstef_dbc/services/splitting.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def get_wind_ref(
datetime_start: datetime,
datetime_end: datetime,
country: str = "NL",
forecast_resolution: str = "15T",
source: str = "optimum",
) -> pd.DataFrame:
"""Function that gets windspeed data from the influx database and converts it to windref data suitable
for splitting energy.
Expand All @@ -84,7 +86,8 @@ def get_wind_ref(
["windspeed_100m"],
datetime_start=datetime_start,
datetime_end=datetime_end,
source="optimum",
source=source,
resolution=forecast_resolution,
country=country,
)

Expand All @@ -104,6 +107,7 @@ def _get_solar_ref(
location: Union[Tuple[float, float], str],
datetime_start: datetime = None,
datetime_end: datetime = None,
forecast_resolution: str = "15T",
) -> pd.DataFrame:
"""Function that gets PV data from the influx database and converts it to solar_ref data suitable
for splitting energy.
Expand Down Expand Up @@ -132,7 +136,7 @@ def _get_solar_ref(
sid=systems["sid"],
datetime_start=datetime_start,
datetime_end=datetime_end,
forecast_resolution="15T",
forecast_resolution=forecast_resolution,
aggregated=True,
average_output=True,
)
Expand All @@ -149,26 +153,37 @@ def get_input_energy_splitting(
pj: dict,
datetime_start: datetime = None,
datetime_end: datetime = None,
forecast_resolution: str = "15T",
ignore_factor: bool = False,
country: str = "NL",
source: str = "optimum",
) -> pd.DataFrame:
if datetime_start is None:
datetime_start = datetime.utcnow() - timedelta(days=90)
if datetime_end is None:
datetime_end = datetime.utcnow()

# Get standard load profiles (StandaardJaarVerbruik in Dutch)
sjv = Predictor().get_load_profiles(datetime_start, datetime_end)
sjv = Predictor().get_load_profiles(
datetime_start, datetime_end, forecast_resolution
)

# Get windpower reference
wind_ref = self.get_wind_ref(
(pj["lat"], pj["lon"]), datetime_start, datetime_end
(pj["lat"], pj["lon"]),
datetime_start,
datetime_end,
country,
forecast_resolution,
source,
)

# Get load data
load = Ems().get_load_pid(
pj["id"],
datetime_start,
datetime_end,
forecast_resolution,
ignore_factor=ignore_factor,
)

Expand All @@ -177,10 +192,11 @@ def get_input_energy_splitting(
location=(pj["lat"], pj["lon"]),
datetime_start=datetime_start,
datetime_end=datetime_end,
forecast_resolution=forecast_resolution,
)

# Resample to 15min
solar_ref = solar_ref.resample("15T").mean()
# Resample to forecast_resolution
solar_ref = solar_ref.resample(forecast_resolution).mean()

# Invert solar_ref and windref as electricity is produced and not consumed
solar_ref *= -1
Expand Down
Loading

0 comments on commit edf2bdd

Please sign in to comment.