-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: time series collection, resource handling
Create domain objects for TimeSeriesResource, TimeSeriesCollection++. This should make the behavior more clear, and provide more flexibility in the future. Previously, time_series_collection.py and time_series_collection_mapper.py did a lot of stuff. In addition to dealing with the resource data and validating that, the yaml data was also validated. As this was bundled it was difficult to reuse some of the logic. Each separate step should now be available by using the correct class.
- Loading branch information
Showing
22 changed files
with
956 additions
and
989 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
from datetime import datetime | ||
from operator import itemgetter | ||
from typing import List, Tuple | ||
|
||
from scipy.interpolate import interp1d | ||
from typing_extensions import Self | ||
|
||
from libecalc.common.list.list_utils import transpose | ||
from libecalc.dto.types import InterpolationType | ||
|
||
|
||
class TimeSeries: | ||
def __init__( | ||
self, | ||
reference_id: str, | ||
time_vector: List[datetime], | ||
series: List[float], | ||
extrapolate: bool, | ||
interpolation_type: InterpolationType, | ||
): | ||
self.reference_id = reference_id | ||
self.time_vector = time_vector | ||
self.series = series | ||
self._extrapolate = extrapolate | ||
self._interpolation_type = interpolation_type | ||
|
||
@staticmethod | ||
def _get_interpolation_kind(rate_interpolation_type: InterpolationType) -> str: | ||
if rate_interpolation_type == InterpolationType.LINEAR: | ||
return "linear" | ||
elif rate_interpolation_type == InterpolationType.RIGHT: | ||
return "previous" | ||
elif rate_interpolation_type == InterpolationType.LEFT: | ||
return "next" | ||
else: | ||
raise ValueError(f"Invalid interpolation type, got {rate_interpolation_type}.") | ||
|
||
def _interpolate(self, time_vector: List[datetime], rate_interpolation_type: InterpolationType) -> List[float]: | ||
interpolation_kind = self._get_interpolation_kind( | ||
rate_interpolation_type=rate_interpolation_type, | ||
) | ||
|
||
start_time = self.time_vector[0] | ||
|
||
setup_times: List[float] | ||
if len(self.time_vector) == 1: | ||
# add dummy time 1 second later | ||
setup_times = [0, 1] | ||
setup_y = 2 * self.series | ||
else: | ||
# Interpolator x variable is number of seconds from first date time | ||
setup_times = [(time - start_time).total_seconds() for time in self.time_vector] | ||
setup_y = self.series | ||
|
||
interpolator = interp1d(x=setup_times, y=setup_y, kind=interpolation_kind) | ||
target_times = [(time - start_time).total_seconds() for time in time_vector] | ||
return list(interpolator(target_times)) | ||
|
||
def fit_to_time_vector( | ||
self, | ||
time_vector: List[datetime], | ||
) -> Self: | ||
start, end = self.time_vector[0], self.time_vector[-1] | ||
number_of_entries_before, entries_between, number_of_entries_after = split_time_vector( | ||
time_vector, start=start, end=end | ||
) | ||
|
||
if self._extrapolate: | ||
extrapolation_after_value = self.series[-1] | ||
else: | ||
extrapolation_after_value = 0.0 | ||
|
||
before_values = [0.0] * number_of_entries_before | ||
between_values = self._interpolate( | ||
time_vector=entries_between, rate_interpolation_type=self._interpolation_type | ||
) | ||
after_values = [extrapolation_after_value] * number_of_entries_after | ||
|
||
return self.__class__( | ||
reference_id=self.reference_id, | ||
time_vector=time_vector, | ||
series=[*before_values, *between_values, *after_values], | ||
extrapolate=self._extrapolate, | ||
interpolation_type=self._interpolation_type, | ||
) | ||
|
||
def sort(self) -> Self: | ||
sort_columns = [self.time_vector, self.series] | ||
sort_rows = transpose(sort_columns) | ||
sorted_rows = sorted(sort_rows, key=itemgetter(0)) | ||
sorted_columns = transpose(sorted_rows) | ||
self.time_vector = sorted_columns[0] | ||
self.series = sorted_columns[1] | ||
return self | ||
|
||
|
||
def split_time_vector( | ||
time_vector: List[datetime], | ||
start: datetime, | ||
end: datetime, | ||
) -> Tuple[int, List[datetime], int]: | ||
"""Find the entries between start and end, also counting the number of entries before start and after end.""" | ||
number_of_entries_before = len([date for date in time_vector if date < start]) | ||
number_of_entries_after = len([date for date in time_vector if date > end]) | ||
entries_between = [date for date in time_vector if start <= date <= end] | ||
return number_of_entries_before, entries_between, number_of_entries_after |
91 changes: 91 additions & 0 deletions
91
src/libecalc/presentation/yaml/domain/time_series_collection.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
from datetime import datetime | ||
from typing import List | ||
|
||
from typing_extensions import Self, assert_never | ||
|
||
from libecalc.common.errors.exceptions import InvalidResource | ||
from libecalc.dto.types import InterpolationType | ||
from libecalc.presentation.yaml.domain.time_series import TimeSeries | ||
from libecalc.presentation.yaml.domain.time_series_exceptions import TimeSeriesNotFound | ||
from libecalc.presentation.yaml.domain.time_series_provider import TimeSeriesProvider | ||
from libecalc.presentation.yaml.domain.time_series_resource import TimeSeriesResource | ||
from libecalc.presentation.yaml.resource import Resource | ||
from libecalc.presentation.yaml.validation_errors import ValidationError | ||
from libecalc.presentation.yaml.yaml_types.time_series.yaml_time_series import ( | ||
YamlDefaultTimeSeriesCollection, | ||
YamlMiscellaneousTimeSeriesCollection, | ||
YamlTimeSeriesCollection, | ||
) | ||
|
||
|
||
class InvalidTimeSeriesCollection(ValidationError): | ||
def __init__(self, name: str, message: str): | ||
super().__init__(f"Invalid time series '{name}': {message}") | ||
|
||
|
||
class TimeSeriesCollection(TimeSeriesProvider): | ||
""" | ||
TimeSeriesCollection is a collection of time series (TimeSeriesResource) and common properties for all the time | ||
series in the collection. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
name: str, | ||
resource: TimeSeriesResource, | ||
interpolation: InterpolationType, | ||
extrapolation: bool, | ||
influence_time_vector: bool, | ||
): | ||
self.name = name | ||
self._resource = resource | ||
self._interpolation = interpolation | ||
self._extrapolation = extrapolation | ||
self._influence_time_vector = influence_time_vector | ||
|
||
def should_influence_time_vector(self) -> bool: | ||
return self._influence_time_vector | ||
|
||
def get_time_vector(self) -> List[datetime]: | ||
return self._resource.get_time_vector() | ||
|
||
def get_time_series_references(self) -> List[str]: | ||
return self._resource.get_headers() | ||
|
||
def get_time_series(self, time_series_id: str) -> TimeSeries: | ||
try: | ||
return TimeSeries( | ||
reference_id=f"{self.name};{time_series_id}", | ||
time_vector=self.get_time_vector(), | ||
series=self._resource.get_column(time_series_id), | ||
extrapolate=self._extrapolation, | ||
interpolation_type=self._interpolation, | ||
).sort() | ||
except InvalidResource as e: | ||
raise TimeSeriesNotFound( | ||
f"Unable to find time series with reference '{time_series_id}' in collection '{self.name}'" | ||
) from e | ||
|
||
@classmethod | ||
def from_yaml(cls, resource: Resource, yaml_collection: YamlTimeSeriesCollection) -> Self: | ||
try: | ||
time_series_resource = TimeSeriesResource(resource) | ||
time_series_resource.validate() | ||
except InvalidResource as e: | ||
raise InvalidTimeSeriesCollection(yaml_collection.name, str(e)) from e | ||
|
||
if isinstance(yaml_collection, YamlDefaultTimeSeriesCollection): | ||
interpolation = InterpolationType.RIGHT | ||
extrapolation = False | ||
elif isinstance(yaml_collection, YamlMiscellaneousTimeSeriesCollection): | ||
interpolation = InterpolationType[yaml_collection.interpolation_type] | ||
extrapolation = yaml_collection.extrapolation if yaml_collection.extrapolation is not None else False | ||
else: | ||
assert_never(yaml_collection) | ||
return cls( | ||
name=yaml_collection.name, | ||
resource=time_series_resource, | ||
interpolation=interpolation, | ||
extrapolation=extrapolation, | ||
influence_time_vector=yaml_collection.influence_time_vector, | ||
) |
56 changes: 56 additions & 0 deletions
56
src/libecalc/presentation/yaml/domain/time_series_collections.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
from datetime import datetime | ||
from typing import Dict, List, Set | ||
|
||
from libecalc.common.errors.exceptions import InvalidResource | ||
from libecalc.presentation.yaml.domain.time_series import TimeSeries | ||
from libecalc.presentation.yaml.domain.time_series_collection import InvalidTimeSeriesCollection, TimeSeriesCollection | ||
from libecalc.presentation.yaml.domain.time_series_exceptions import TimeSeriesNotFound | ||
from libecalc.presentation.yaml.domain.time_series_provider import TimeSeriesProvider | ||
from libecalc.presentation.yaml.resource import Resource | ||
from libecalc.presentation.yaml.yaml_types.time_series.yaml_time_series import YamlTimeSeriesCollection | ||
|
||
|
||
class TimeSeriesCollections(TimeSeriesProvider): | ||
""" | ||
TimeSeriesCollections keeps several TimeSeriesCollection classes and can provide info about those, such as all time | ||
steps in all collections. | ||
""" | ||
|
||
def __init__(self, time_series: List[YamlTimeSeriesCollection], resources: Dict[str, Resource]): | ||
time_series_collections: Dict[str, TimeSeriesCollection] = {} | ||
for time_series_collection in time_series: | ||
resource = resources[time_series_collection.file] | ||
try: | ||
time_series_collections[time_series_collection.name] = TimeSeriesCollection.from_yaml( | ||
resource=resource, | ||
yaml_collection=time_series_collection, | ||
) | ||
except InvalidResource as e: | ||
# Catch validation when initializing TimeSeriesResource | ||
raise InvalidTimeSeriesCollection(time_series_collection.name, str(e)) from e | ||
self._time_series_collections = time_series_collections | ||
|
||
def get_time_series_references(self) -> List[str]: | ||
time_series_references = [] | ||
for collection in self._time_series_collections.values(): | ||
for time_series_reference in collection.get_time_series_references(): | ||
time_series_references.append(f"{collection.name};{time_series_reference}") | ||
return time_series_references | ||
|
||
def get_time_series(self, time_series_id: str) -> TimeSeries: | ||
reference_id_parts = time_series_id.split(";") | ||
if len(reference_id_parts) != 2: | ||
raise TimeSeriesNotFound(time_series_id) | ||
[collection_id, time_series_id] = reference_id_parts | ||
|
||
if collection_id not in self._time_series_collections: | ||
raise TimeSeriesNotFound(time_series_id) | ||
|
||
return self._time_series_collections[collection_id].get_time_series(time_series_id) | ||
|
||
def get_time_vector(self) -> Set[datetime]: | ||
time_vector: Set[datetime] = set() | ||
for time_series_collection in self._time_series_collections.values(): | ||
if time_series_collection.should_influence_time_vector(): | ||
time_vector = time_vector.union(time_series_collection.get_time_vector()) | ||
return time_vector |
Oops, something went wrong.