Skip to content

Commit

Permalink
Dps Fetch Refactor #7: Extend utils with new functionality (#1045)
Browse files Browse the repository at this point in the history
* Extend utils with new functionality

* add hashable bound to type var
  • Loading branch information
haakonvt authored Sep 30, 2022
1 parent c281ac0 commit f0b0525
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 40 deletions.
32 changes: 24 additions & 8 deletions cognite/client/utils/_auxiliary.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
import warnings
from decimal import Decimal
from types import ModuleType
from typing import Any, Dict, List, Sequence, Tuple, Union
from typing import Any, Dict, Hashable, Iterator, List, Sequence, Set, Tuple, TypeVar, Union
from urllib.parse import quote

import cognite.client
from cognite.client.exceptions import CogniteImportError
from cognite.client.utils._version_checker import get_newest_version_in_major_release

T = TypeVar("T")
THashable = TypeVar("THashable", bound=Hashable)


@functools.lru_cache(maxsize=128)
def to_camel_case(snake_case_string: str) -> str:
Expand All @@ -35,11 +38,12 @@ def to_snake_case(camel_case_string: str) -> str:
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()


def convert_all_keys_to_camel_case(d: dict) -> dict:
new_d = {}
for k, v in d.items():
new_d[to_camel_case(k)] = v
return new_d
def convert_all_keys_to_camel_case(d: Dict[str, Any]) -> Dict[str, Any]:
return dict(zip(map(to_camel_case, d.keys()), d.values()))


def convert_all_keys_to_snake_case(d: Dict[str, Any]) -> Dict[str, Any]:
return dict(zip(map(to_snake_case, d.keys()), d.values()))


def json_dump_default(x: Any) -> Any:
Expand Down Expand Up @@ -125,11 +129,12 @@ def _check_client_has_newest_major_version() -> None:
)


def random_string(size: int = 100) -> str:
return "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size))
def random_string(size: int = 100, sample_from: str = string.ascii_uppercase + string.digits) -> str:
return "".join(random.choices(sample_from, k=size))


class PriorityQueue:
# TODO: Just use queue.PriorityQueue()
def __init__(self) -> None:
self.__heap: List[Any] = []
self.__id = 0
Expand All @@ -146,6 +151,11 @@ def __bool__(self) -> bool:
return len(self.__heap) > 0


def split_into_n_parts(seq: Sequence[T], /, n: int) -> Iterator[Sequence[T]]:
# NB: Chaotic sampling: jumps n for each starting position
yield from (seq[i::n] for i in range(n))


def split_into_chunks(collection: Union[List, Dict], chunk_size: int) -> List[Union[List, Dict]]:
chunks: List[Union[List, Dict]] = []
if isinstance(collection, list):
Expand Down Expand Up @@ -173,3 +183,9 @@ def convert_true_match(true_match: Union[dict, list, Tuple[Union[int, str], Unio
return true_match
else:
raise ValueError("true_matches should be a dictionary or a two-element list: found {}".format(true_match))


def find_duplicates(seq: Sequence[THashable]) -> Set[THashable]:
seen: Set[THashable] = set()
add = seen.add # skip future attr lookups for perf
return set(x for x in seq if x in seen or add(x))
2 changes: 1 addition & 1 deletion cognite/client/utils/_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def raise_compound_exception_if_failed_tasks(
str_format_element_fn: Optional[Callable] = None,
) -> None:
if not self.exceptions:
return
return None
task_unwrap_fn = (lambda x: x) if task_unwrap_fn is None else task_unwrap_fn
if task_list_element_unwrap_fn is not None:
successful = []
Expand Down
23 changes: 20 additions & 3 deletions cognite/client/utils/_identifier.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import numbers
from typing import Dict, Generic, Iterable, List, Optional, Sequence, TypeVar, Union, cast, overload
from typing import Dict, Generic, Iterable, List, Optional, Sequence, Tuple, TypeVar, Union, cast, overload

T_ID = TypeVar("T_ID", int, str)

Expand All @@ -25,12 +25,29 @@ def load(cls, id: Optional[int] = None, external_id: Optional[str] = None) -> "I
def as_primitive(self) -> T_ID:
return self.__value

def as_dict(self) -> Dict[str, T_ID]:
def as_dict(self, camel_case: bool = True) -> Dict[str, T_ID]:
if isinstance(self.__value, str):
return {"externalId": self.__value}
if camel_case:
return {"externalId": self.__value}
return {"external_id": self.__value}
else:
return {"id": self.__value}

def as_tuple(self, camel_case: bool = True) -> Tuple[str, T_ID]:
if isinstance(self.__value, str):
if camel_case:
return ("externalId", self.__value)
return ("external_id", self.__value)
else:
return ("id", self.__value)

def __str__(self) -> str:
identifier_type, identifier = self.as_tuple(camel_case=False)
return f"{type(self).__name__}({identifier_type}={identifier!r})"

def __repr__(self) -> str:
return str(self)


class ExternalId(Identifier[str]):
...
Expand Down
77 changes: 49 additions & 28 deletions cognite/client/utils/_time.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,41 @@
import numbers
import re
import time
import warnings
from datetime import datetime, timezone
from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional, Tuple, Union

_unit_in_ms_without_week = {"s": 1000, "m": 60000, "h": 3600000, "d": 86400000}
_unit_in_ms = {**_unit_in_ms_without_week, "w": 604800000}
UNIT_IN_MS_WITHOUT_WEEK = {"s": 1000, "m": 60000, "h": 3600000, "d": 86400000}
UNIT_IN_MS = {**UNIT_IN_MS_WITHOUT_WEEK, "w": 604800000}

MIN_TIMESTAMP_MS = -2208988800000
MAX_TIMESTAMP_MS = 2556143999999


def datetime_to_ms(dt: datetime) -> int:
if dt.tzinfo is None:
warnings.warn(
"Interpreting given naive datetime as UTC instead of local time (against Python default behaviour). "
"This will change in the next major release (4.0.0). Please use (timezone) aware datetimes "
"or convert it yourself to integer (number of milliseconds since epoch, leap seconds excluded).",
FutureWarning,
)
dt = dt.replace(tzinfo=timezone.utc)
"""Converts datetime object to milliseconds since epoch.
Args:
dt (datetime): Naive or aware datetime object. Naive datetimes are interpreted as local time.
Returns:
ms: Milliseconds since epoch (negative for time prior to 1970-01-01)
"""
return int(1000 * dt.timestamp())


def ms_to_datetime(ms: Union[int, float]) -> datetime:
"""Converts milliseconds since epoch to datetime object.
Args:
ms (Union[int, float]): Milliseconds since epoch
ms (Union[int, float]): Milliseconds since epoch.
Returns:
datetime: Naive datetime object in UTC.
datetime: Aware datetime object in UTC.
"""
if ms < 0:
raise ValueError("ms must be greater than or equal to zero.")
if not (MIN_TIMESTAMP_MS <= ms <= MAX_TIMESTAMP_MS):
raise ValueError(f"`ms` does not satisfy: {MIN_TIMESTAMP_MS} <= ms <= {MAX_TIMESTAMP_MS}")

warnings.warn(
"This function, `ms_to_datetime` returns a naive datetime object in UTC. This is against "
"the default interpretation of naive datetimes in Python (i.e. local time). This behaviour will "
"change to returning timezone-aware datetimes in UTC in the next major release (4.0.0).",
FutureWarning,
)
return datetime.utcfromtimestamp(ms / 1000)
return datetime.utcfromtimestamp(ms / 1000).replace(tzinfo=timezone.utc)


def time_string_to_ms(pattern: str, string: str, unit_in_ms: Dict[str, int]) -> Optional[int]:
Expand All @@ -56,7 +49,7 @@ def time_string_to_ms(pattern: str, string: str, unit_in_ms: Dict[str, int]) ->


def granularity_to_ms(granularity: str) -> int:
ms = time_string_to_ms(r"(\d+)({})", granularity, _unit_in_ms_without_week)
ms = time_string_to_ms(r"(\d+)({})", granularity, UNIT_IN_MS_WITHOUT_WEEK)
if ms is None:
raise ValueError(
"Invalid granularity format: `{}`. Must be on format <integer>(s|m|h|d). E.g. '5m', '3h' or '1d'.".format(
Expand All @@ -75,7 +68,7 @@ def time_ago_to_ms(time_ago_string: str) -> int:
"""Returns millisecond representation of time-ago string"""
if time_ago_string == "now":
return 0
ms = time_string_to_ms(r"(\d+)({})-ago", time_ago_string, _unit_in_ms)
ms = time_string_to_ms(r"(\d+)({})-ago", time_ago_string, UNIT_IN_MS)
if ms is None:
raise ValueError(
"Invalid time-ago format: `{}`. Must be on format <integer>(s|m|h|d|w)-ago or 'now'. E.g. '3d-ago' or '1w-ago'.".format(
Expand Down Expand Up @@ -112,7 +105,7 @@ def timestamp_to_ms(timestamp: Union[int, float, str, datetime]) -> int:


def _convert_time_attributes_in_dict(item: Dict) -> Dict:
TIME_ATTRIBUTES = [
TIME_ATTRIBUTES = {
"start_time",
"end_time",
"last_updated_time",
Expand All @@ -121,7 +114,7 @@ def _convert_time_attributes_in_dict(item: Dict) -> Dict:
"scheduled_execution_time",
"source_created_time",
"source_modified_time",
]
}
new_item = {}
for k, v in item.items():
if k in TIME_ATTRIBUTES:
Expand All @@ -142,3 +135,31 @@ def convert_time_attributes_to_datetime(item: Union[Dict, List[Dict]]) -> Union[
new_items.append(_convert_time_attributes_in_dict(el))
return new_items
raise TypeError("item must be dict or list of dicts")


def align_start_and_end_for_granularity(start: int, end: int, granularity: str) -> Tuple[int, int]:
# Note the API always aligns `start` with 1s, 1m, 1h or 1d (even when given e.g. 73h)
remainder = start % granularity_unit_to_ms(granularity)
if remainder:
# Floor `start` when not exactly at boundary
start -= remainder
gms = granularity_to_ms(granularity)
remainder = (end - start) % gms
if remainder:
# Ceil `end` when not exactly at boundary decided by `start + N * granularity`
end += gms - remainder
return start, end


def split_time_range(start: int, end: int, n_splits: int, granularity_in_ms: int) -> List[int]:
if n_splits < 1:
raise ValueError(f"Cannot split into less than 1 piece, got {n_splits=}")
tot_ms = end - start
if n_splits * granularity_in_ms > tot_ms:
raise ValueError(
f"Given time interval ({tot_ms=}) could not be split as `{n_splits=}` times `{granularity_in_ms=}` "
"is larger than the interval itself."
)
# Find a `delta_ms` thats a multiple of granularity in ms (trivial for raw queries).
delta_ms = granularity_in_ms * round(tot_ms / n_splits / granularity_in_ms)
return [*(start + delta_ms * i for i in range(n_splits)), end]

0 comments on commit f0b0525

Please sign in to comment.