Skip to content

Commit

Permalink
chore: numpy ndarray typing
Browse files Browse the repository at this point in the history
  • Loading branch information
olelod committed May 31, 2023
1 parent 250928c commit 6f2e92b
Show file tree
Hide file tree
Showing 39 changed files with 459 additions and 370 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numpy as np
from numpy import float64
from numpy.typing import NDArray

"""
NOTE! A "list util" class is not the best, but maybe we should try to
Expand Down Expand Up @@ -56,7 +57,9 @@ def group_data_by_value_at_index(index: int, row_based_data: List[List[Any]]) ->
return chart_grouped_by_index


def elementwise_sum(*vectors: Sequence[Optional[float]], timesteps: Optional[List[datetime]] = None) -> np.ndarray:
def elementwise_sum(
*vectors: Sequence[Optional[float]], timesteps: Optional[List[datetime]] = None
) -> NDArray[np.float64]:
"""Sum up multiple vectors elementwise.
E.g. if we provide three lists [1,20], [2,10], [1,30], the result will be [1+2+1,20+10+30] = [4,60]
Expand All @@ -81,7 +84,7 @@ def elementwise_sum(*vectors: Sequence[Optional[float]], timesteps: Optional[Lis

def elementwise_multiplication(
*vectors: Sequence[Optional[float]], timesteps: Optional[List[datetime]] = None
) -> np.ndarray:
) -> NDArray[np.float64]:
"""Multiply multiple vectors elementwise.
E.g. if we provide three lists [1,20], [2,10], [1,30], the result will be [1*2*1,20*10*30] = [2,6000]
Expand All @@ -104,7 +107,7 @@ def elementwise_multiplication(
return result


def array_to_list(result_array: Union[np.ndarray, List[np.ndarray], None]) -> Optional[List]:
def array_to_list(result_array: Union[NDArray[np.float64], List[NDArray[np.float64]], None]) -> Optional[List]:
"""Method to convert numpy arrays and list of numpy arrays into lists (or list of lists). Method is used recurrsivly on lists so needs to handle None aswell.
Args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import pandas as pd
from libecalc.common.exceptions import ProgrammingError
from libecalc.common.units import UnitConstants
from numpy.typing import ArrayLike
from numpy.typing import ArrayLike, NDArray


def calculate_delta_days(time_vector: ArrayLike) -> np.ndarray:
def calculate_delta_days(time_vector: ArrayLike) -> NDArray[np.float64]:
return np.array([x.total_seconds() / UnitConstants.SECONDS_IN_A_DAY for x in np.diff(time_vector)])


Expand Down
3 changes: 2 additions & 1 deletion src/ecalc/libraries/libecalc/common/libecalc/common/units.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@

import numpy as np
from libecalc.common.logger import logger
from numpy.typing import NDArray
from pydantic.validators import enum_validator

TInput = TypeVar("TInput", bound=Union[int, float, np.ndarray, list])
TInput = TypeVar("TInput", bound=Union[int, float, NDArray[np.float64], list])


def _type_handler(unit_func: Callable[[TInput], TInput]) -> Callable[[TInput], TInput]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import Union

import numpy as np
from numpy.typing import NDArray


def transform_linear(
values: Union[np.ndarray, float],
values: Union[NDArray[np.float64], float],
constant: float = 0.0,
factor: float = 1.0,
) -> Union[np.ndarray, float]:
) -> Union[NDArray[np.float64], float]:
"""Linear transformation of an array. May typically be used for energy functions to adjust a result
according to given energy_usage_adjustment_constant and energy_usage_adjustment_factor.
"""
Expand Down
27 changes: 14 additions & 13 deletions src/ecalc/libraries/libecalc/common/libecalc/common/utils/rates.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from libecalc.common.time_utils import Frequency, Period, calculate_delta_days
from libecalc.common.units import Unit
from libecalc.dto.types import RateType
from numpy.typing import NDArray
from pydantic import validator
from pydantic.fields import ModelField
from pydantic.generics import GenericModel
Expand All @@ -35,7 +36,7 @@

class Rates:
@staticmethod
def to_stream_day(calendar_day_rates: np.ndarray, regularity: List[float]) -> np.ndarray:
def to_stream_day(calendar_day_rates: NDArray[np.float64], regularity: List[float]) -> NDArray[np.float64]:
"""
Convert (production) rate from calendar day to stream day
Expand All @@ -50,7 +51,7 @@ def to_stream_day(calendar_day_rates: np.ndarray, regularity: List[float]) -> np
return np.divide(calendar_day_rates, regularity, out=np.zeros_like(calendar_day_rates), where=regularity != 0.0) # type: ignore[comparison-overlap]

@staticmethod
def to_calendar_day(stream_day_rates: np.ndarray, regularity: List[float]) -> np.ndarray:
def to_calendar_day(stream_day_rates: NDArray[np.float64], regularity: List[float]) -> NDArray[np.float64]:
"""Convert (production) rate from stream day to calendar day.
Args:
Expand All @@ -63,7 +64,7 @@ def to_calendar_day(stream_day_rates: np.ndarray, regularity: List[float]) -> np
return stream_day_rates * np.asarray(regularity, dtype=np.float64)

@staticmethod
def forward_fill_nan_values(rates: np.ndarray) -> np.ndarray:
def forward_fill_nan_values(rates: NDArray[np.float64]) -> NDArray[np.float64]:
"""
Forward fill Nan-values
Expand All @@ -77,8 +78,8 @@ def forward_fill_nan_values(rates: np.ndarray) -> np.ndarray:

@staticmethod
def to_volumes(
rates: Union[List[float], List[TimeSeriesValue], np.ndarray], time_steps: Iterable[datetime]
) -> np.ndarray:
rates: Union[List[float], List[TimeSeriesValue], NDArray[np.float64]], time_steps: Iterable[datetime]
) -> NDArray[np.float64]:
"""
Computes the volume between two dates from the corresponding rates, according to the given frequency.
Note that the code does not perform any interpolation or extrapolation,
Expand All @@ -98,8 +99,8 @@ def to_volumes(

@staticmethod
def compute_cumulative(
volumes: Union[List[float], np.ndarray[float, numpy.dtype[numpy.float64]]]
) -> np.ndarray[float, numpy.dtype[numpy.float64]]:
volumes: Union[List[float], NDArray[np.float64][float, numpy.dtype[numpy.float64]]]
) -> NDArray[np.float64][float, numpy.dtype[numpy.float64]]:
"""
Compute cumulative volumes from a list of periodic volumes
Expand All @@ -116,8 +117,8 @@ def compute_cumulative(

@staticmethod
def compute_cumulative_volumes_from_daily_rates(
rates: Union[List[float], List[TimeSeriesValue], np.ndarray], time_steps: Iterable[datetime]
) -> np.ndarray:
rates: Union[List[float], List[TimeSeriesValue], NDArray[np.float64]], time_steps: Iterable[datetime]
) -> NDArray[np.float64]:
"""
Compute cumulative production volumes based on production rates and the corresponding dates.
The production rates are assumed to be constant between the different dates.
Expand Down Expand Up @@ -444,15 +445,15 @@ def reindex(self, time_steps: List[datetime]) -> Self:
if self.timesteps[0] <= time_step <= self.timesteps[-1] and time_step not in self.timesteps:
raise ValueError(f"Could not reindex volumes. Missing time step `{time_step}`.")

cumulative_volumes = Rates.compute_cumulative(self.values) # type: ignore[arg-type]
cumulative_volumes = Rates.compute_cumulative(self.values)

re_indexed_cumulative_values = pd.Series(index=self.timesteps, data=cumulative_volumes).reindex(time_steps)

# Diffing cumulative volume in order to go back to volumes per period.
re_indexed_volumes = re_indexed_cumulative_values.diff().shift(-1)[:-1]

return self.__class__(
timesteps=re_indexed_volumes.index.to_pydatetime().tolist(), # type: ignore
timesteps=re_indexed_volumes.index.to_pydatetime().tolist(),
values=re_indexed_volumes.tolist(),
unit=self.unit,
)
Expand All @@ -471,7 +472,7 @@ def cumulative(self) -> TimeSeriesVolumesCumulative:
"""
return TimeSeriesVolumesCumulative(
timesteps=self.timesteps,
values=list(Rates.compute_cumulative(self.values)), # type: ignore[arg-type]
values=list(Rates.compute_cumulative(self.values)),
unit=self.unit,
)

Expand Down Expand Up @@ -747,7 +748,7 @@ def resample(self, freq: Frequency) -> TimeSeriesRate:
else:
return new_time_series.to_stream_day()

def __getitem__(self, indices: Union[slice, int, List[int], np.ndarray]) -> TimeSeriesRate:
def __getitem__(self, indices: Union[slice, int, List[int], NDArray[np.float64]]) -> TimeSeriesRate:
if isinstance(indices, slice):
return self.__class__(
timesteps=self.timesteps[indices],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
CompressorOperationalSettings,
)
from libecalc.dto.core_specs.pump.operational_settings import PumpOperationalSettings
from numpy.typing import NDArray

Consumer = TypeVar("Consumer", bound=Union[Compressor, Pump])
ConsumerOperationalSettings = TypeVar(
Expand Down Expand Up @@ -302,7 +303,9 @@ def _topologically_sort_consumers_by_crossover(crossover: List[int], consumers:
return [consumers[consumer_index] for consumer_index in nx.topological_sort(graph)]

@staticmethod
def _get_crossover_rates(max_rate: List[float], rates: List[List[float]]) -> Tuple[np.ndarray, List[List[float]]]:
def _get_crossover_rates(
max_rate: List[float], rates: List[List[float]]
) -> Tuple[NDArray[np.float64], List[List[float]]]:
total_rate = np.sum(rates, axis=0)
crossover_rate = np.where(total_rate > max_rate, total_rate - max_rate, 0)
rates_within_capacity = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from libecalc.core.result import GeneratorSetResult
from libecalc.dto.types import RateType
from libecalc.dto.variables import VariablesMap
from numpy.typing import NDArray


class Genset:
Expand All @@ -29,7 +30,7 @@ def __init__(
def evaluate(
self,
variables_map: VariablesMap,
power_requirement: np.ndarray,
power_requirement: NDArray[np.float64],
) -> GeneratorSetResult:
"""Warning! We are converting energy usage to NaN when the energy usage models has invalid timesteps. this will
probably be changed soon.
Expand Down Expand Up @@ -88,7 +89,9 @@ def evaluate(
),
)

def evaluate_fuel_rate(self, power_requirement: np.ndarray, variables_map: dto.VariablesMap) -> np.ndarray:
def evaluate_fuel_rate(
self, power_requirement: NDArray[np.float64], variables_map: dto.VariablesMap
) -> NDArray[np.float64]:
result = np.full_like(power_requirement, fill_value=np.nan).astype(float)
for period, model in self.temporal_generator_set_model.items():
if Period.intersects(period, variables_map.period):
Expand All @@ -97,8 +100,8 @@ def evaluate_fuel_rate(self, power_requirement: np.ndarray, variables_map: dto.V
return result

def evaluate_power_capacity_margin(
self, power_requirement: np.ndarray, variables_map: dto.VariablesMap
) -> np.ndarray:
self, power_requirement: NDArray[np.float64], variables_map: dto.VariablesMap
) -> NDArray[np.float64]:
result = np.zeros_like(power_requirement).astype(float)
for period, model in self.temporal_generator_set_model.items():
if Period.intersects(period, variables_map.period):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from libecalc.dto import VariablesMap
from libecalc.dto.base import ComponentType
from libecalc.dto.types import ConsumptionType
from numpy.typing import NDArray


def get_operational_settings_used_from_consumer_result(
Expand Down Expand Up @@ -314,7 +315,7 @@ def reindex_time_vector(
time_vector: Iterable[datetime],
new_time_vector: Iterable[datetime],
fillna: Union[float, str] = 0.0,
) -> np.ndarray:
) -> NDArray[np.float64]:
"""Based on a consumer time function result (EnergyFunctionResult), the corresponding time vector and
the consumer time vector, we calculate the actual consumer (consumption) rate.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@
ConsumerFunctionType,
)
from libecalc.core.models.results.base import EnergyFunctionResult
from numpy.typing import NDArray
from pydantic import BaseModel


class ConditionsAndPowerLossResult(NamedTuple):
condition: np.ndarray
power_loss_factor: np.ndarray
energy_usage_after_condition_before_power_loss_factor: np.ndarray
resulting_energy_usage: np.ndarray
resulting_power_usage: Optional[np.ndarray]
condition: NDArray[np.float64]
power_loss_factor: NDArray[np.float64]
energy_usage_after_condition_before_power_loss_factor: NDArray[np.float64]
resulting_energy_usage: NDArray[np.float64]
resulting_power_usage: Optional[NDArray[np.float64]]


class ConsumerFunctionResultBase(BaseModel):
Expand All @@ -30,16 +31,16 @@ class ConsumerFunctionResultBase(BaseModel):

typ: ConsumerFunctionType

time_vector: np.ndarray
is_valid: np.ndarray
energy_usage: np.ndarray
energy_usage_before_power_loss_factor: Optional[np.ndarray]
condition: Optional[np.ndarray]
power_loss_factor: Optional[np.ndarray]
time_vector: NDArray[np.float64]
is_valid: NDArray[np.float64]
energy_usage: NDArray[np.float64]
energy_usage_before_power_loss_factor: Optional[NDArray[np.float64]]
condition: Optional[NDArray[np.float64]]
power_loss_factor: Optional[NDArray[np.float64]]
energy_function_result: Optional[Union[EnergyFunctionResult, List[EnergyFunctionResult]]]

# New! to support fuel to power rate...for e.g. compressors emulating turbine
power: Optional[np.ndarray]
power: Optional[NDArray[np.float64]]

class Config:
arbitrary_types_allowed = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
)
from libecalc.dto import VariablesMap
from libecalc.expression import Expression
from numpy.typing import NDArray


def calculate_energy_usage_with_conditions_and_power_loss(
variables_map: VariablesMap,
energy_usage: np.ndarray,
energy_usage: NDArray[np.float64],
condition_expression: Expression,
power_loss_factor_expression: Expression,
power_usage: Optional[np.ndarray] = None,
power_usage: Optional[NDArray[np.float64]] = None,
) -> ConditionsAndPowerLossResult:
condition = get_condition_from_expression(
variables_map=variables_map,
Expand Down Expand Up @@ -64,7 +65,7 @@ def calculate_energy_usage_with_conditions_and_power_loss(
def get_condition_from_expression(
variables_map: VariablesMap,
condition_expression: Expression,
) -> Optional[np.ndarray]:
) -> Optional[NDArray[np.int_]]:
"""Evaluate condition expression and compute resulting condition vector.
Args:
Expand All @@ -80,12 +81,12 @@ def get_condition_from_expression(
)
condition = (condition != 0).astype(int)
else:
condition = None
return None

return condition
return np.array(condition)


def apply_condition(input_array: np.ndarray, condition: Optional[np.ndarray]) -> np.ndarray:
def apply_condition(input_array: NDArray[np.float64], condition: Optional[NDArray[np.float64]]) -> NDArray[np.float64]:
"""Apply condition to input array in the following way:
- Input values kept as is if condition is 1
- Input values set to 0 if condition is 0
Expand All @@ -110,7 +111,7 @@ def apply_condition(input_array: np.ndarray, condition: Optional[np.ndarray]) ->
def get_power_loss_factor_from_expression(
variables_map: VariablesMap,
power_loss_factor_expression: Expression,
) -> Optional[np.ndarray]:
) -> Optional[NDArray[np.float64]]:
"""Evaluate power loss factor expression and compute resulting power loss factor vector.
Args:
Expand All @@ -120,18 +121,18 @@ def get_power_loss_factor_from_expression(
Returns:
Assembled power loss factor vector
"""
power_loss_factor = (
power_loss_factor_expression.evaluate(
if power_loss_factor_expression is not None:
power_loss_factor = power_loss_factor_expression.evaluate(
variables=variables_map.variables, fill_length=len(variables_map.time_vector)
)
if power_loss_factor_expression is not None
else None
)

return power_loss_factor
else:
return None
return np.array(power_loss_factor)


def apply_power_loss_factor(energy_usage: np.ndarray, power_loss_factor: Optional[np.ndarray]) -> np.ndarray:
def apply_power_loss_factor(
energy_usage: NDArray[np.float64], power_loss_factor: Optional[NDArray[np.float64]]
) -> NDArray[np.float64]:
"""Apply resulting required power taking a (cable/motor...) power loss factor into account.
Args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from libecalc.core.models.results import CompressorTrainResult, PumpModelResult
from libecalc.core.models.results.base import EnergyFunctionResult
from numpy.typing import NDArray


def get_single_consumer_models(
Expand Down Expand Up @@ -122,7 +123,7 @@ def get_operational_settings_results_from_consumer_result(

def map_energy_function_results(
result: EnergyFunctionResult,
time_vector: np.ndarray,
time_vector: NDArray[np.float64],
name: str,
) -> List[core_results.ConsumerModelResult]:
"""Returns a list of results that are specific to each consumer. This can be details for compressor trains with
Expand Down
Loading

0 comments on commit 6f2e92b

Please sign in to comment.