Skip to content

Commit

Permalink
refactor: calculate timesteps separately (#284)
Browse files Browse the repository at this point in the history
Process one timestep at a time in v2
  • Loading branch information
jsolaas authored Nov 16, 2023
1 parent f1af9e6 commit bd9d684
Show file tree
Hide file tree
Showing 26 changed files with 2,393 additions and 1,810 deletions.
133 changes: 28 additions & 105 deletions src/libecalc/common/priority_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,147 +2,70 @@
import typing
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from functools import reduce
from typing import Dict, Generic, List, TypeVar

import numpy as np

from libecalc.common.priorities import Priorities, PriorityID
from libecalc.common.units import Unit
from libecalc.common.utils.rates import (
TimeSeriesBoolean,
TimeSeriesString,
)
from libecalc.common.priorities import PriorityID

TResult = TypeVar("TResult")
TPriority = TypeVar("TPriority")

ComponentID = str


@dataclass
class PriorityOptimizerResult(Generic[TResult]):
priorities_used: TimeSeriesString
priority_used: PriorityID
priority_results: List[typing.Any] # TODO: typing. This is the consumer results merged based on priorities used


@dataclass
class EvaluatorResult(Generic[TResult]):
id: ComponentID
result: TResult
is_valid: TimeSeriesBoolean


class PriorityOptimizer(Generic[TResult, TPriority]):
@staticmethod
def get_component_ids(
priority_results: Dict[datetime, Dict[PriorityID, Dict[ComponentID, TResult]]]
) -> List[ComponentID]:
component_ids = []
for timestep in priority_results:
for priority in priority_results[timestep]:
for component_id in priority_results[timestep][priority].keys():
if component_id not in component_ids:
component_ids.append(component_id)
return component_ids

def collect_consumer_results(
self,
priorities_used: TimeSeriesString,
priority_results: Dict[datetime, Dict[PriorityID, Dict[ComponentID, TResult]]],
) -> List[typing.Any]: # TODO: Any type since we don't have access to component_result within TResult
"""
Merge consumer results into a single result per consumer based on the operational settings used. I.e. pick results
from the correct operational setting result and merge into a single result per consumer.
Args:
priorities_used:
priority_results:
Returns: List of merged consumer results
is_valid: bool

"""
component_ids = self.get_component_ids(priority_results)

consumer_results: Dict[ComponentID, typing.Any] = {}
for component_id in component_ids:
for timestep_index, timestep in enumerate(priorities_used.timesteps):
priority_used = priorities_used.values[timestep_index]
prev_result = consumer_results.get(component_id)
consumer_result_subset = priority_results[timestep][priority_used][
component_id
].component_result # TODO: Accessing something the type does not make clear exists

if prev_result is None:
consumer_results[component_id] = consumer_result_subset
else:
consumer_results[component_id] = prev_result.merge(consumer_result_subset)

return list(consumer_results.values())

class PriorityOptimizer(Generic[TResult]):
def optimize(
self,
timesteps: List[datetime],
priorities: Priorities[TPriority],
evaluator: typing.Callable[[datetime, TPriority], List[EvaluatorResult[TResult]]],
priorities: List[PriorityID],
evaluator: typing.Callable[[PriorityID], List[EvaluatorResult[TResult]]],
) -> PriorityOptimizerResult:
"""
Given a list of priorities, evaluate each priority using the evaluator. If the result of an evaluation is valid
the priority is selected, if invalid try the next priority.
We process each timestep separately.
It will default to the last priority if all settings fails
Args:
timesteps: The timesteps that we want to figure out which priority to use for.
priorities: Dict of priorities, key is used to identify the priority in the results.
priorities: List of priorities
evaluator: The evaluator function gives a list of results back, each result with its own unique id.
Returns:
PriorityOptimizerResult: result containing priorities used and a list of the results merged on priorities
used,
"""
is_valid = TimeSeriesBoolean(timesteps=timesteps, values=[False] * len(timesteps), unit=Unit.NONE)
priorities_used = TimeSeriesString(
timesteps=timesteps,
values=[list(priorities.keys())[-1]] * len(timesteps),
unit=Unit.NONE,
)
priority_results: Dict[datetime, Dict[PriorityID, Dict[str, TResult]]] = defaultdict(dict)

for timestep_index, timestep in enumerate(timesteps):
priority_results[timestep] = defaultdict(dict)
for priority_name, priority_value in priorities.items():
evaluator_results = evaluator(timestep, priority_value)
for evaluator_result in evaluator_results:
priority_results[timestep][priority_name][evaluator_result.id] = evaluator_result.result

# Check if consumers are valid for this operational setting, should be valid for all consumers
all_evaluator_results_valid = reduce(
operator.mul, [evaluator_result.is_valid for evaluator_result in evaluator_results]
)
all_evaluator_results_valid_indices = np.nonzero(all_evaluator_results_valid.values)[0]
all_evaluator_results_valid_indices_period_shifted = [
axis_indices + timestep_index for axis_indices in all_evaluator_results_valid_indices
]

# Remove already valid indices, so we don't overwrite priority used with the latest valid
new_valid_indices = [
i for i in all_evaluator_results_valid_indices_period_shifted if not is_valid.values[i]
]

# Register the valid timesteps as valid and keep track of the operational setting used
is_valid[new_valid_indices] = True
priorities_used[new_valid_indices] = priority_name

if all(is_valid.values):
# quit as soon as all time-steps are valid. This means that we do not need to test all settings.
break
priority_used = priorities[-1]
priority_results: Dict[PriorityID, Dict[str, TResult]] = defaultdict(dict)

for priority in priorities:
evaluator_results = evaluator(priority)
for evaluator_result in evaluator_results:
priority_results[priority][evaluator_result.id] = evaluator_result.result

# Check if consumers are valid for this priority, should be valid for all consumers
all_evaluator_results_valid = reduce(
operator.mul, [evaluator_result.is_valid for evaluator_result in evaluator_results]
)

if all_evaluator_results_valid:
priority_used = priority
# quit as soon as all time-steps are valid. This means that we do not need to test all settings.
break
return PriorityOptimizerResult(
priorities_used=priorities_used,
priority_results=self.collect_consumer_results(
priorities_used=priorities_used, priority_results=priority_results
),
priority_used=priority_used,
priority_results=[
ecalc_model_result.component_result for ecalc_model_result in priority_results[priority_used].values()
],
)
87 changes: 66 additions & 21 deletions src/libecalc/common/stream_conditions.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
import operator
from datetime import datetime
from functools import reduce
from typing import List, Literal, Optional
from typing import List, Optional

from pydantic import BaseModel, Extra
from typing_extensions import Self

from libecalc.common.string.string_utils import to_camel_case
from libecalc.common.string.string_utils import generate_id, to_camel_case
from libecalc.common.utils.rates import TimeSeriesFloat, TimeSeriesStreamDayRate
from libecalc.domain.stream_conditions import (
Density,
Pressure,
Rate, # TODO: import from domain, domain also imports from common
StreamConditions,
Temperature,
)


class StreamConditions(BaseModel):
class TimeSeriesStreamConditions(BaseModel):
class Config:
extra = Extra.forbid
alias_generator = to_camel_case
allow_population_by_field_name = True

id: str
name: str
rate: Optional[TimeSeriesStreamDayRate]
pressure: Optional[TimeSeriesFloat]
temperature: Optional[TimeSeriesFloat]
fluid_density: Optional[TimeSeriesFloat] = None

def mix(self, *other_streams: "StreamConditions") -> "StreamConditions":
def mix(self, *other_streams: "TimeSeriesStreamConditions") -> "TimeSeriesStreamConditions":
"""
Mix two streams. This needs to be expanded to handle fluids (density, composition, etc.).
Expand Down Expand Up @@ -50,14 +58,15 @@ def mix(self, *other_streams: "StreamConditions") -> "StreamConditions":
# TODO: return a warning object with the specific timesteps?
raise ValueError("Increasing pressure when mixing streams. That should not happen.")

return StreamConditions(
return TimeSeriesStreamConditions(
id=generate_id(*[stream.id for stream in streams]),
name=f"Mixed-{'-'.join(stream.name for stream in streams)}",
rate=reduce(operator.add, [stream.rate for stream in streams]),
pressure=target_pressure,
fluid_density=self.fluid_density, # TODO: Check that they are equal? Or handle it?
)

def get_subset_for_timestep(self, current_timestep: datetime) -> Self:
def for_timestep(self, current_timestep: datetime) -> StreamConditions:
"""
For a given timestep, get the stream that is relevant for that timestep only.
Expand All @@ -68,28 +77,64 @@ def get_subset_for_timestep(self, current_timestep: datetime) -> Self:
"""
return StreamConditions(
id=self.id,
name=self.name,
rate=self.rate.for_timestep(current_timestep) if self.rate is not None else None,
pressure=self.pressure.for_timestep(current_timestep) if self.pressure is not None else None,
fluid_density=self.fluid_density.for_timestep(current_timestep) if self.fluid_density is not None else None,
timestep=current_timestep,
rate=Rate(value=self.rate.for_timestep(current_timestep).values[0], unit=self.rate.unit)
if self.rate is not None
else None,
pressure=Pressure(value=self.pressure.for_timestep(current_timestep).values[0], unit=self.pressure.unit)
if self.pressure is not None
else None,
density=Density(
value=self.fluid_density.for_timestep(current_timestep).values[0], unit=self.fluid_density.unit
)
if self.fluid_density is not None
else None,
temperature=Temperature(
value=self.temperature.for_timestep(current_timestep).values[0], unit=self.temperature.unit
)
if self.temperature is not None
else None,
)

@classmethod
def from_stream_condition(cls, stream_conditions: StreamConditions) -> "TimeSeriesStreamConditions":
return TimeSeriesStreamConditions(
id=stream_conditions.id,
name=stream_conditions.name,
rate=TimeSeriesStreamDayRate(
timesteps=[stream_conditions.timestep],
values=[stream_conditions.rate.value],
unit=stream_conditions.rate.unit,
),
pressure=TimeSeriesFloat(
timesteps=[stream_conditions.timestep],
values=[stream_conditions.pressure.value],
unit=stream_conditions.pressure.unit,
),
temperature=TimeSeriesFloat(
timesteps=[stream_conditions.timestep],
values=[stream_conditions.temperature.value],
unit=stream_conditions.temperature.unit,
)
if stream_conditions.temperature is not None
else None,
fluid_density=TimeSeriesFloat(
timesteps=[stream_conditions.timestep],
values=[stream_conditions.density.value],
unit=stream_conditions.density.unit,
)
if stream_conditions.density is not None
else None,
)

@classmethod
def mix_all(cls, streams: List["StreamConditions"]) -> "StreamConditions":
def mix_all(cls, streams: List["TimeSeriesStreamConditions"]) -> "TimeSeriesStreamConditions":
if len(streams) == 0:
raise ValueError("No streams to mix")
if len(streams) == 1:
return streams[0].copy()

first, *rest = streams
return first.copy().mix(*rest)


class Stage(BaseModel):
class Config:
extra = Extra.forbid
alias_generator = to_camel_case
allow_population_by_field_name = True

name: Literal["inlet", "before_choke", "outlet"]
stream: StreamConditions
4 changes: 4 additions & 0 deletions src/libecalc/common/utils/rates.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@ def __eq__(self, other: object) -> bool:
and self.unit == other.unit
)

def append(self, timestep: datetime, value: TimeSeriesValue):
self.timesteps.append(timestep)
self.values.append(value)


class TimeSeriesString(TimeSeries[str]):
def resample(self, freq: Frequency, include_start_date: bool, include_end_date: bool) -> Self:
Expand Down
4 changes: 2 additions & 2 deletions src/libecalc/core/consumers/base/component.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import List

from libecalc.common.stream_conditions import StreamConditions
from libecalc.common.stream_conditions import TimeSeriesStreamConditions
from libecalc.common.utils.rates import TimeSeriesFloat
from libecalc.core.result import EcalcModelResult
from libecalc.dto import VariablesMap
Expand All @@ -21,7 +21,7 @@ class BaseConsumerWithoutOperationalSettings(ABC):
id: str

@abstractmethod
def get_max_rate(self, inlet_stream: StreamConditions, target_pressure: TimeSeriesFloat) -> List[float]:
def get_max_rate(self, inlet_stream: TimeSeriesStreamConditions, target_pressure: TimeSeriesFloat) -> List[float]:
...

@abstractmethod
Expand Down
Loading

0 comments on commit bd9d684

Please sign in to comment.