diff --git a/cognite/client/_api/datapoint_constants.py b/cognite/client/_api/datapoint_constants.py new file mode 100644 index 0000000000..affe1d23e7 --- /dev/null +++ b/cognite/client/_api/datapoint_constants.py @@ -0,0 +1,87 @@ +from datetime import datetime +from typing import Dict, Iterable, List, Optional, TypedDict, Union + +try: + import numpy as np + import numpy.typing as npt + + NUMPY_IS_AVAILABLE = True +except ImportError: # pragma no cover + NUMPY_IS_AVAILABLE = False + +if NUMPY_IS_AVAILABLE: + NumpyDatetime64NSArray = npt.NDArray[np.datetime64] + NumpyInt64Array = npt.NDArray[np.int64] + NumpyFloat64Array = npt.NDArray[np.float64] + NumpyObjArray = npt.NDArray[np.object_] + +# Datapoints API-limits: +DPS_LIMIT_AGG = 10_000 +DPS_LIMIT = 100_000 +POST_DPS_OBJECTS_LIMIT = 10_000 +FETCH_TS_LIMIT = 100 +RETRIEVE_LATEST_LIMIT = 100 + + +ALL_SORTED_DP_AGGS = sorted( + [ + "average", + "max", + "min", + "count", + "sum", + "interpolation", + "step_interpolation", + "continuous_variance", + "discrete_variance", + "total_variation", + ] +) + + +class CustomDatapointsQuery(TypedDict, total=False): + # No field required + start: Union[int, str, datetime, None] + end: Union[int, str, datetime, None] + aggregates: Optional[List[str]] + granularity: Optional[str] + limit: Optional[int] + include_outside_points: Optional[bool] + ignore_unknown_ids: Optional[bool] + + +class DatapointsQueryId(CustomDatapointsQuery): + id: int # required field + + +class DatapointsQueryExternalId(CustomDatapointsQuery): + external_id: str # required field + + +class CustomDatapoints(TypedDict, total=False): + # No field required + start: int + end: int + aggregates: Optional[List[str]] + granularity: Optional[str] + limit: int + include_outside_points: bool + + +class DatapointsPayload(CustomDatapoints): + items: List[CustomDatapoints] + + +DatapointsTypes = Union[int, float, str] + + +class DatapointsFromAPI(TypedDict): + id: int + externalId: Optional[str] + isString: bool + isStep: bool + datapoints: List[Dict[str, DatapointsTypes]] + + +DatapointsIdTypes = Union[int, DatapointsQueryId, Iterable[Union[int, DatapointsQueryId]]] +DatapointsExternalIdTypes = Union[str, DatapointsQueryExternalId, Iterable[Union[str, DatapointsQueryExternalId]]] diff --git a/cognite/client/_api/datapoint_tasks.py b/cognite/client/_api/datapoint_tasks.py new file mode 100644 index 0000000000..9db1521daa --- /dev/null +++ b/cognite/client/_api/datapoint_tasks.py @@ -0,0 +1,1165 @@ +from __future__ import annotations + +import math +import numbers +import operator as op +import warnings +from abc import abstractmethod +from datetime import datetime +from functools import cached_property +from itertools import chain +from threading import Lock +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Hashable, + Iterable, + Iterator, + List, + Literal, + NoReturn, + Optional, + Sequence, + Tuple, + TypeVar, + Union, + cast, + overload, +) + +from sortedcontainers import SortedDict, SortedList # type: ignore [import] + +from cognite.client._api.datapoint_constants import ( + DPS_LIMIT, + DPS_LIMIT_AGG, + NUMPY_IS_AVAILABLE, + CustomDatapoints, + CustomDatapointsQuery, + DatapointsExternalIdTypes, + DatapointsFromAPI, + DatapointsIdTypes, + DatapointsQueryExternalId, + DatapointsQueryId, + DatapointsTypes, +) +from cognite.client.data_classes.datapoints import Datapoints, DatapointsArray, DatapointsQuery +from cognite.client.utils._auxiliary import convert_all_keys_to_snake_case, to_camel_case +from cognite.client.utils._identifier import Identifier +from cognite.client.utils._time import ( + align_start_and_end_for_granularity, + granularity_to_ms, + split_time_range, + timestamp_to_ms, +) + +if NUMPY_IS_AVAILABLE: + import numpy as np + + +if TYPE_CHECKING: + import numpy.typing as npt + + +T = TypeVar("T") + + +class _SingleTSQueryValidator: + def __init__(self, user_query: DatapointsQuery) -> None: + self.user_query = user_query + self.defaults: CustomDatapointsQuery = dict( + start=user_query.start, + end=user_query.end, + limit=user_query.limit, + aggregates=user_query.aggregates, + granularity=user_query.granularity, + include_outside_points=user_query.include_outside_points, + ignore_unknown_ids=user_query.ignore_unknown_ids, + ) + + def validate_and_create_single_queries(self) -> List[_SingleTSQueryBase]: + queries = [] + if self.user_query.id is not None: + id_queries = self._validate_multiple_id(self.user_query.id) + queries.extend(id_queries) + if self.user_query.external_id is not None: + xid_queries = self._validate_multiple_xid(self.user_query.external_id) + queries.extend(xid_queries) + if queries: + return queries + raise ValueError("Pass at least one time series `id` or `external_id`!") + + def _validate_multiple_id(self, id: DatapointsIdTypes) -> List[_SingleTSQueryBase]: + return self._validate_id_or_xid(id, "id", numbers.Integral, is_external_id=False) + + def _validate_multiple_xid(self, external_id: DatapointsExternalIdTypes) -> List[_SingleTSQueryBase]: + return self._validate_id_or_xid(external_id, "external_id", str, is_external_id=True) + + def _validate_id_or_xid( + self, + id_or_xid: Union[DatapointsIdTypes, DatapointsExternalIdTypes], + arg_name: str, + exp_type: type, + is_external_id: bool, + ) -> List[_SingleTSQueryBase]: + + if isinstance(id_or_xid, (exp_type, dict)): + # Lazy - we postpone evaluation: + id_or_xid = [id_or_xid] # type: ignore [assignment] + + if not isinstance(id_or_xid, Sequence): + # We use Sequence which requires an odering of its iterable elements + self._raise_on_wrong_ts_identifier_type(id_or_xid, arg_name, exp_type) + + queries = [] + for ts in id_or_xid: + if isinstance(ts, exp_type): + # We merge 'defaults' and given ts-dict, ts-dict takes precedence: + ts_dct = {**self.defaults, arg_name: ts} + queries.append(self._validate_and_create_query(ts_dct)) # type: ignore [arg-type] + + elif isinstance(ts, dict): + ts_validated = self._validate_ts_query_dict_keys(ts, arg_name, exp_type) + ts_dct = {**self.defaults, **ts_validated} + queries.append(self._validate_and_create_query(ts_dct)) # type: ignore [arg-type] + else: # pragma: no cover + self._raise_on_wrong_ts_identifier_type(ts, arg_name, exp_type) + return queries + + @staticmethod + def _raise_on_wrong_ts_identifier_type( + id_or_xid: Union[DatapointsIdTypes, DatapointsExternalIdTypes], + arg_name: str, + exp_type: type, + ) -> NoReturn: + raise TypeError( + f"Got unsupported type {type(id_or_xid)}, as, or part of argument `{arg_name}`. Expected one of " + f"{exp_type}, {dict} or a (mixed) list of these, but got `{id_or_xid}`." + ) + + @staticmethod + def _validate_ts_query_dict_keys( + dct: Dict[str, Any], arg_name: str, exp_type: type + ) -> Union[DatapointsQueryId, DatapointsQueryExternalId]: + if arg_name not in dct: + if (arg_name_cc := to_camel_case(arg_name)) not in dct: + raise KeyError(f"Missing required key `{arg_name}` in dict: {dct}.") + # For backwards compatibility we accept identifiers in camel case: (Make copy to avoid side effects + # for user's input). Also means we need to return it. + dct[arg_name] = (dct := dct.copy()).pop(arg_name_cc) + + ts_identifier = dct[arg_name] + if not isinstance(ts_identifier, exp_type): + _SingleTSQueryValidator._raise_on_wrong_ts_identifier_type(ts_identifier, arg_name, exp_type) + + opt_dct_keys = {"start", "end", "aggregates", "granularity", "include_outside_points", "limit"} + bad_keys = set(dct) - opt_dct_keys - {arg_name} + if not bad_keys: + return dct # type: ignore [return-value] + raise KeyError( + f"Dict provided by argument `{arg_name}` included key(s) not understood: {sorted(bad_keys)}. " + f"Required key: `{arg_name}`. Optional: {list(opt_dct_keys)}." + ) + + def _validate_and_create_query( + self, dct: Union[DatapointsQueryId, DatapointsQueryExternalId] + ) -> _SingleTSQueryBase: + limit = self._verify_limit(dct["limit"]) + granularity, aggregates = dct["granularity"], dct["aggregates"] + + if not (granularity is None or isinstance(granularity, str)): + raise TypeError(f"Expected `granularity` to be of type `str` or None, not {type(granularity)}") + + elif not (aggregates is None or isinstance(aggregates, list)): + raise TypeError(f"Expected `aggregates` to be of type `list[str]` or None, not {type(aggregates)}") + + elif aggregates is None: + if granularity is None: + # Request is for raw datapoints: + raw_query = self._convert_parameters(dct, limit, is_raw=True) + if limit is None: + return _SingleTSQueryRawUnlimited(**raw_query) + return _SingleTSQueryRawLimited(**raw_query) + raise ValueError("When passing `granularity`, argument `aggregates` is also required.") + + # Aggregates must be a list at this point: + elif len(aggregates) == 0: + raise ValueError("Empty list of `aggregates` passed, expected at least one!") + + elif granularity is None: + raise ValueError("When passing `aggregates`, argument `granularity` is also required.") + + elif dct["include_outside_points"] is True: + raise ValueError("'Include outside points' is not supported for aggregates.") + # Request is for one or more aggregates: + agg_query = self._convert_parameters(dct, limit, is_raw=False) + if limit is None: + return _SingleTSQueryAggUnlimited(**agg_query) + return _SingleTSQueryAggLimited(**agg_query) + + def _convert_parameters( + self, + dct: Union[DatapointsQueryId, DatapointsQueryExternalId], + limit: Optional[int], + is_raw: bool, + ) -> Dict[str, Any]: + identifier = Identifier.of_either(dct.get("id"), dct.get("external_id")) # type: ignore [arg-type] + start, end = self._verify_time_range(dct["start"], dct["end"], dct["granularity"], is_raw, identifier) + converted = { + "identifier": identifier, + "start": start, + "end": end, + "limit": limit, + "ignore_unknown_ids": dct["ignore_unknown_ids"], + } + if is_raw: + converted["include_outside_points"] = dct["include_outside_points"] + else: + converted["aggregates"] = dct["aggregates"] + converted["granularity"] = dct["granularity"] + return converted + + def _verify_limit(self, limit: Optional[int]) -> Optional[int]: + if limit in {None, -1, math.inf}: + return None + elif isinstance(limit, numbers.Integral) and limit >= 0: # limit=0 is accepted by the API + try: + # We don't want weird stuff like numpy dtypes etc: + return int(limit) + except Exception: # pragma no cover + raise TypeError(f"Unable to convert given {limit=} to integer") + raise TypeError( + "Parameter `limit` must be a non-negative integer -OR- one of [None, -1, inf] to " + f"indicate an unlimited query. Got: {limit} with type: {type(limit)}" + ) + + def _verify_time_range( + self, + start: Union[int, str, datetime, None], + end: Union[int, str, datetime, None], + granularity: Optional[str], + is_raw: bool, + identifier: Identifier, + ) -> Tuple[int, int]: + if start is None: + start = 0 # 1970-01-01 + else: + start = timestamp_to_ms(start) + if end is None: + end = "now" + end = timestamp_to_ms(end) + + if end <= start: + raise ValueError( + f"Invalid time range, {end=} must be later than {start=} " + f"(from query: {identifier.as_dict(camel_case=False)})" + ) + if not is_raw: # API rounds aggregate query timestamps in a very particular fashion + start, end = align_start_and_end_for_granularity(start, end, cast(str, granularity)) + return start, end + + +class _SingleTSQueryBase: + def __init__( + self, + *, + identifier: Identifier, + start: int, + end: int, + max_query_limit: int, + limit: Optional[int], + include_outside_points: bool, + ignore_unknown_ids: bool, + ) -> None: + self.identifier = identifier + self.start = start + self.end = end + self.max_query_limit = max_query_limit + self.limit = limit + self.include_outside_points = include_outside_points + self.ignore_unknown_ids = ignore_unknown_ids + + self._is_missing: Optional[bool] = None + self._is_string: Optional[bool] = None + + if self.include_outside_points and self.limit is not None: + warnings.warn( + "When using `include_outside_points=True` with a finite `limit` you may get a large gap " + "between the last 'inside datapoint' and the 'after/outside' datapoint. Note also that the " + "up-to-two outside points come in addition to your given `limit`; asking for 5 datapoints might " + "yield 5, 6 or 7. It's a feature, not a bug ;)", + UserWarning, + ) + + @property + def capped_limit(self) -> int: + if self.limit is None: + return self.max_query_limit + return min(self.limit, self.max_query_limit) + + def override_max_query_limit(self, new_limit: int) -> None: + assert isinstance(new_limit, int) + self.max_query_limit = new_limit + + @property + @abstractmethod + def is_raw_query(self) -> bool: + ... + + @property + @abstractmethod + def ts_task_type(self) -> type[BaseConcurrentTask]: + ... + + @property + def is_missing(self) -> bool: + if self._is_missing is None: + raise RuntimeError("Before making API-calls the `is_missing` status is unknown") + return self._is_missing + + @is_missing.setter + def is_missing(self, value: bool) -> None: + assert isinstance(value, bool) + self._is_missing = value + + @property + def is_string(self) -> bool: + if self._is_string is None: + raise RuntimeError( + "For queries asking for raw datapoints, the `is_string` status is unknown before " + "any API-calls have been made" + ) + return self._is_string + + @is_string.setter + def is_string(self, value: bool) -> None: + assert isinstance(value, bool) + self._is_string = value + + +class _SingleTSQueryRaw(_SingleTSQueryBase): + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs, max_query_limit=DPS_LIMIT) + self.aggregates = self.aggregates_cc = None + self.granularity = None + + @property + def is_raw_query(self) -> bool: + return True + + def to_payload(self) -> Dict[str, Any]: + return { + **self.identifier.as_dict(), + "start": self.start, + "end": self.end, + "limit": self.capped_limit, + "includeOutsidePoints": self.include_outside_points, + } + + +class _SingleTSQueryRawLimited(_SingleTSQueryRaw): + def __init__(self, *, limit: int, **kwargs: Any) -> None: + super().__init__(limit=limit, **kwargs) + assert isinstance(limit, int) + + @property + def ts_task_type(self) -> type[ParallelLimitedRawTask]: + return ParallelLimitedRawTask + + +class _SingleTSQueryRawUnlimited(_SingleTSQueryRaw): + def __init__(self, *, limit: None, **kwargs: Any) -> None: + super().__init__(limit=limit, **kwargs) + + @property + def ts_task_type(self) -> type[ParallelUnlimitedRawTask]: + return ParallelUnlimitedRawTask + + +class _SingleTSQueryAgg(_SingleTSQueryBase): + def __init__(self, *, aggregates: List[str], granularity: str, **kwargs: Any) -> None: + agg_query_settings = dict(include_outside_points=False, max_query_limit=DPS_LIMIT_AGG) + super().__init__(**kwargs, **agg_query_settings) # type: ignore [arg-type] + self.aggregates = aggregates + self.granularity = granularity + + @property + def is_raw_query(self) -> bool: + return False + + @cached_property + def aggregates_cc(self) -> List[str]: + return list(map(to_camel_case, self.aggregates)) + + def to_payload(self) -> Dict[str, Any]: + return { + **self.identifier.as_dict(), + "start": self.start, + "end": self.end, + "aggregates": self.aggregates_cc, + "granularity": self.granularity, + "limit": self.capped_limit, + "includeOutsidePoints": self.include_outside_points, + } + + +class _SingleTSQueryAggLimited(_SingleTSQueryAgg): + def __init__(self, *, limit: int, **kwargs: Any) -> None: + super().__init__(limit=limit, **kwargs) + assert isinstance(limit, int) + + @property + def ts_task_type(self) -> type[ParallelLimitedAggTask]: + return ParallelLimitedAggTask + + +class _SingleTSQueryAggUnlimited(_SingleTSQueryAgg): + def __init__(self, *, limit: None, **kwargs: Any) -> None: + super().__init__(limit=limit, **kwargs) + + @property + def ts_task_type(self) -> type[ParallelUnlimitedAggTask]: + return ParallelUnlimitedAggTask + + +class DpsUnpackFns: + ts: Callable[[Dict], int] = op.itemgetter("timestamp") + raw_dp: Callable[[Dict], DatapointsTypes] = op.itemgetter("value") + ts_dp_tpl: Callable[[Dict], Tuple[int, DatapointsTypes]] = op.itemgetter("timestamp", "value") + ts_count_tpl: Callable[[Dict], Tuple[int, int]] = op.itemgetter("timestamp", "count") + count: Callable[[Dict], int] = op.itemgetter("count") + + @staticmethod + def custom_from_aggregates( + lst: List[str], + ) -> Callable[[List[Dict[str, DatapointsTypes]]], Tuple[DatapointsTypes, ...]]: + return op.itemgetter(*lst) + + +class DefaultSortedDict(SortedDict): + def __init__(self, default_factory: Callable[[], T], /, **kw: Any): + self.default_factory = default_factory + super().__init__(**kw) + + def __missing__(self, key: Hashable) -> T: + self[key] = self.default_factory() + return self[key] + + +def dps_container() -> DefaultSortedDict: + """Initialises a new sorted container for datapoints storage""" + return DefaultSortedDict(list) + + +def subtask_lst() -> SortedList: + """Initialises a new sorted list for subtasks""" + return SortedList(key=op.attrgetter("subtask_idx")) + + +def create_array_from_dps_container(container: DefaultSortedDict) -> npt.NDArray: + return np.hstack(list(chain.from_iterable(container.values()))) + + +def create_aggregates_arrays_from_dps_container(container: DefaultSortedDict, n_aggs: int) -> List[npt.NDArray]: + all_aggs_arr = np.vstack(list(chain.from_iterable(container.values()))) + return list(map(np.ravel, np.hsplit(all_aggs_arr, n_aggs))) + + +def create_list_from_dps_container(container: DefaultSortedDict) -> List: + return list(chain.from_iterable(chain.from_iterable(container.values()))) + + +def create_aggregates_list_from_dps_container(container: DefaultSortedDict) -> Iterator[List[List]]: + concatenated = chain.from_iterable(chain.from_iterable(container.values())) + return map(list, zip(*concatenated)) # rows to columns + + +class BaseDpsFetchSubtask: + def __init__( + self, + start: int, + end: int, + identifier: Identifier, + parent: BaseConcurrentTask, + priority: int, + max_query_limit: int, + n_dps_left: float, + is_raw_query: bool, + ) -> None: + self.start = start + self.end = end + self.identifier = identifier + self.parent = parent + self.priority = priority + self.is_raw_query = is_raw_query + self.max_query_limit = max_query_limit + self.n_dps_left = n_dps_left + + self.is_done = False + + @abstractmethod + def get_next_payload(self) -> Optional[Dict[str, Any]]: + ... + + @abstractmethod + def store_partial_result(self, res: DatapointsFromAPI) -> Optional[List[SplittingFetchSubtask]]: + ... + + +class OutsideDpsFetchSubtask(BaseDpsFetchSubtask): + """Fetches outside points and stores in parent""" + + def __init__(self, **kwargs: Any) -> None: + outside_dps_settings = dict(priority=0, is_raw_query=True, max_query_limit=0, n_dps_left=0) + super().__init__(**kwargs, **outside_dps_settings) # type: ignore [arg-type] + + def get_next_payload(self) -> Optional[Dict[str, Any]]: + if self.is_done: + return None + return self._create_payload_item() + + def _create_payload_item(self) -> Dict[str, Any]: + return { + **self.identifier.as_dict(), + "start": self.start, + "end": self.end, + "limit": 0, # Not a bug; it just returns the outside points + "includeOutsidePoints": True, + } + + def store_partial_result(self, res: DatapointsFromAPI) -> None: + if dps := res["datapoints"]: + self.parent._extract_outside_points(dps) + self.is_done = True + + +class SerialFetchSubtask(BaseDpsFetchSubtask): + """Fetches datapoints serially until complete, nice and simple. Stores data in parent""" + + def __init__( + self, + *, + limit: Optional[int], + aggregates: Optional[List[str]], + granularity: Optional[str], + subtask_idx: Tuple[float, ...], + **kwargs: Any, + ) -> None: + n_dps_left = math.inf if limit is None else limit + super().__init__(**kwargs, n_dps_left=n_dps_left) + self.limit = limit + self.aggregates = aggregates + self.granularity = granularity + self.subtask_idx = subtask_idx + self.n_dps_fetched = 0 + self.agg_kwargs = {} + + self.next_start = self.start + if not self.is_raw_query: + self.agg_kwargs = {"aggregates": self.aggregates, "granularity": self.granularity} + + def get_next_payload(self) -> Optional[Dict[str, Any]]: + if self.is_done: + return None + remaining = self.parent.remaining_limit(self) + if self.parent.ts_info is not None and remaining == 0: + # Since last time this task fetched points, earlier tasks have already fetched >= limit dps. + # (If ts_info isn't known, it means we still have to send out a request, happens when given limit=0) + self.is_done, ts_task = True, self.parent + with self.parent.lock: # Keep sorted list `subtasks` from being mutated + _ = ts_task.is_done # Trigger a check of parent task + # Update all subsequent subtasks to "is done": + i_start = 1 + ts_task.subtasks.index(self) + for task in ts_task.subtasks[i_start:]: + task.is_done = True + return None + return self._create_payload_item(math.inf if remaining is None else remaining) + + def _create_payload_item(self, remaining_limit: float) -> Dict[str, Any]: + return { + **self.identifier.as_dict(), + "start": self.next_start, + "end": self.end, + "limit": min(remaining_limit, self.n_dps_left, self.max_query_limit), + **self.agg_kwargs, + } + + def store_partial_result(self, res: DatapointsFromAPI) -> None: + if self.parent.ts_info is None: + # In eager mode, first task to complete gets the honor to store ts info: + self.parent._store_ts_info(res) + + if not (dps := res["datapoints"]): + self.is_done = True + return None + + n, last_ts = len(dps), cast(int, dps[-1]["timestamp"]) + self.parent._unpack_and_store(self.subtask_idx, dps) + self._update_state_for_next_payload(last_ts, n) + if self._is_task_done(n): + self.is_done = True + + def _update_state_for_next_payload(self, last_ts: int, n: int) -> None: + self.next_start = last_ts + self.parent.offset_next # Move `start` to prepare for next query + self.n_dps_left -= n + self.n_dps_fetched += n # Used to quit limited queries asap + + def _is_task_done(self, n: int) -> bool: + return self.n_dps_left == 0 or n < self.max_query_limit or self.next_start == self.end + + +class SplittingFetchSubtask(SerialFetchSubtask): + """Fetches data serially, but splits its time domain ("divide and conquer") based on the density + of returned datapoints. Stores data in parent""" + + def __init__(self, *, max_splitting_factor: int = 10, **kwargs: Any) -> None: + super().__init__(**kwargs) + self.max_splitting_factor = max_splitting_factor + self.split_subidx: int = 0 # Actual value doesnt matter (any int will do) + + def store_partial_result(self, res: DatapointsFromAPI) -> Optional[List[SplittingFetchSubtask]]: # type: ignore [override] + self.prev_start = self.next_start + super().store_partial_result(res) + if not self.is_done: + last_ts = res["datapoints"][-1]["timestamp"] + return self._split_self_into_new_subtasks_if_needed(cast(int, last_ts)) + return None + + def _create_subtasks_idxs(self, n_new_tasks: int) -> Iterable[Tuple[float, ...]]: + """Since this task may decide to split itself multiple times, we count backwards to keep order + (we rely on tuple sorting logic). Example using `self.subtask_idx=(4,)`: + - First split into e.g. 3 parts: (4,-3), (4,-2), (4,-1) + - Next, split into 2: (4, -5) and (4, -4). These now sort before the first split.""" + end = self.split_subidx + self.split_subidx -= n_new_tasks + yield from ((*self.subtask_idx, i) for i in range(self.split_subidx, end)) + + def _split_self_into_new_subtasks_if_needed(self, last_ts: int) -> Optional[List[SplittingFetchSubtask]]: + # How many new tasks because of % of time range was fetched? + tot_ms = self.end - (start := self.prev_start) + part_ms = last_ts - start + ratio_retrieved = part_ms / tot_ms + n_new_pct = math.floor(1 / ratio_retrieved) + # How many new tasks because of limit left (if limit)? + n_new_lim = math.inf + if (remaining_limit := self.parent.remaining_limit(self)) is not None: + n_new_lim = math.ceil(remaining_limit / self.max_query_limit) + # We pick strictest criterion: + n_new_tasks = min(cast(int, n_new_lim), n_new_pct, self.max_splitting_factor + 1) # +1 for "self next" + if n_new_tasks <= 1: # No point in splitting; no faster than this task just continuing + return None + # Find a `delta_ms` thats a multiple of granularity in ms (trivial for raw queries): + boundaries = split_time_range(last_ts, self.end, n_new_tasks, self.parent.offset_next) + self.end = boundaries[1] # We shift end of 'self' backwards + static_params = { + "parent": self.parent, + "priority": self.priority, + "identifier": self.identifier, + "aggregates": self.aggregates, + "granularity": self.granularity, + "max_query_limit": self.max_query_limit, + "is_raw_query": self.is_raw_query, + } + split_idxs = self._create_subtasks_idxs(n_new_tasks) + new_subtasks = [ + SplittingFetchSubtask( + start=start, end=end, limit=remaining_limit, subtask_idx=idx, **static_params # type: ignore [arg-type] + ) + for start, end, idx in zip(boundaries[1:-1], boundaries[2:], split_idxs) + ] + self.parent.subtasks.update(new_subtasks) + return new_subtasks + + +class BaseConcurrentTask: + def __init__( + self, + query: Any, # subclasses assert correct type + eager_mode: bool, + use_numpy: bool, + first_dps_batch: Optional[DatapointsFromAPI] = None, + first_limit: Optional[int] = None, + ) -> None: + self.query = query + self.eager_mode = eager_mode + self.use_numpy = use_numpy + self.ts_info = None + self.ts_data = dps_container() + self.dps_data = dps_container() + self.subtasks = subtask_lst() + self.subtask_outside_points: Optional[OutsideDpsFetchSubtask] = None + self.raw_dtype: Optional[type] = None + self._is_done = False + self.lock = Lock() + + self.has_limit = self.query.limit is not None + # When running large queries (i.e. not "eager"), all time series have a first batch fetched before + # further subtasks are created. This gives us e.g. outside points (if asked for) and ts info: + if not self.eager_mode: + assert first_limit is not None and first_dps_batch is not None # mypy... + dps = first_dps_batch.pop("datapoints") # type: ignore [misc] + self.ts_info = first_dps_batch # Store just the ts info + self.raw_dtype = self._decide_dtype_from_is_string(first_dps_batch["isString"]) + if not dps: + self._is_done = True + return None + self._store_first_batch(dps, first_limit) + + @property + def n_dps_first_batch(self) -> int: + if self.eager_mode: + return 0 + return len(self.ts_data[(0,)][0]) + + @property + def is_done(self) -> bool: + if self.ts_info is None: + return False + elif self._is_done: + return True + elif self.subtask_outside_points and not self.subtask_outside_points.is_done: + return False + elif self.subtasks: + self._is_done = all(task.is_done for task in self.subtasks) + return self._is_done + + @is_done.setter + def is_done(self, value: bool) -> None: + self._is_done = value + + @property + @abstractmethod + def offset_next(self) -> int: + ... + + @abstractmethod + def get_result(self) -> Union[Datapoints, DatapointsArray]: + ... + + @abstractmethod + def _unpack_and_store(self, idx: Tuple[float, ...], dps: List[Dict[str, DatapointsTypes]]) -> None: + ... + + @abstractmethod + def _extract_outside_points(self, dps: List[Dict[str, DatapointsTypes]]) -> None: + ... + + @abstractmethod + def _find_number_of_subtasks_uniform_split(self, tot_ms: int, n_workers_per_queries: int) -> int: + ... + + def split_into_subtasks(self, max_workers: int, n_tot_queries: int) -> List[SplittingFetchSubtask]: + # Given e.g. a single time series, we want to put all our workers to work by splitting into lots of pieces! + # As the number grows - or we start combining multiple into the same query - we want to split less: + # we hold back to not create too many subtasks: + if self.is_done: + return [] + n_workers_per_queries = max(1, round(max_workers / n_tot_queries)) + subtasks = self._create_uniformly_split_subtasks(n_workers_per_queries) + self.subtasks.update(subtasks) + if self.eager_mode and self.query.include_outside_points: + # In eager mode we do not get the "first dps batch" to extract outside points from: + self.subtask_outside_points = OutsideDpsFetchSubtask( + start=self.query.start, + end=self.query.end, + identifier=self.query.identifier, + parent=self, + ) + # Append the outside subtask to returned subtasks so that it will be queued: + subtasks.append(self.subtask_outside_points) # type: ignore [arg-type] + return subtasks + + def _create_uniformly_split_subtasks(self, n_workers_per_queries: int) -> List[SplittingFetchSubtask]: + start = self.query.start if self.eager_mode else self.first_start + tot_ms = (end := self.query.end) - start + n_periods = self._find_number_of_subtasks_uniform_split(tot_ms, n_workers_per_queries) + boundaries = split_time_range(start, end, n_periods, self.offset_next) + limit = self.query.limit - self.n_dps_first_batch if self.has_limit else None + return [ + SplittingFetchSubtask( + start=start, + end=end, + limit=limit, + subtask_idx=(i,), + parent=self, + priority=i - 1 if self.has_limit else 0, # Prioritise in chrono. order + identifier=self.query.identifier, + aggregates=self.query.aggregates_cc, + granularity=self.query.granularity, + max_query_limit=self.query.max_query_limit, + is_raw_query=self.query.is_raw_query, + ) + for i, (start, end) in enumerate(zip(boundaries[:-1], boundaries[1:]), 1) + ] + + def _decide_dtype_from_is_string(self, is_string: bool) -> type: + return np.object_ if is_string else np.float64 + + def _store_ts_info(self, res: DatapointsFromAPI) -> None: + self.ts_info = {k: v for k, v in res.items() if k != "datapoints"} # type: ignore [assignment] + if self.use_numpy: + self.raw_dtype = self._decide_dtype_from_is_string(res["isString"]) + + def _store_first_batch(self, dps: List[Dict[str, DatapointsTypes]], first_limit: int) -> None: + # Set `start` for the first subtask: + self.first_start = cast(int, dps[-1]["timestamp"]) + self.offset_next + self._unpack_and_store((0,), dps) + + # Are we done after first batch? + if self.first_start == self.query.end: + self._is_done = True + elif self.has_limit and len(dps) <= self.query.limit <= first_limit: + self._is_done = True + elif len(dps) < first_limit: + self._is_done = True + + def remaining_limit(self, subtask: BaseDpsFetchSubtask) -> Optional[int]: + if not self.has_limit: + return None + # For limited queries: if the sum of fetched points of earlier tasks have already hit/surpassed + # `limit`, we know for sure we can cancel later/future tasks: + remaining = cast(int, self.query.limit) + with self.lock: # Keep sorted list `subtasks` from being mutated + for task in self.subtasks: + # Sum up to - but not including - given subtask: + if task is subtask or (remaining := remaining - task.n_dps_fetched) <= 0: + break + return max(0, remaining) + + +class ConcurrentLimitedMixin(BaseConcurrentTask): + @property + def is_done(self) -> bool: + if self.ts_info is None: + return False + elif self._is_done: + return True + elif self.subtask_outside_points and not self.subtask_outside_points.is_done: + return False + elif self.subtasks: + # Checking if subtasks are done is not enough; we need to check if the sum of + # "len of dps takewhile is_done" has reached the limit. Additionally, each subtask might + # need to fetch a lot of the time subdomain. We want to quit early also when the limit is + # reached in the first (chronologically) non-finished subtask: + i_first_in_progress = True + n_dps_to_fetch = cast(int, self.query.limit) - self.n_dps_first_batch + for i, task in enumerate(self.subtasks): + if not (task.is_done or i_first_in_progress): + break + if i_first_in_progress: + i_first_in_progress = False + + n_dps_to_fetch -= task.n_dps_fetched + if n_dps_to_fetch == 0: + self._is_done = True + # Update all consecutive subtasks to "is done": + for task in self.subtasks[i + 1 :]: + task.is_done = True + break + # Stop forward search as current task is not done, and limit was not reached: + # (We risk that the next task is already done, and will thus miscount) + if not i_first_in_progress: + break + else: + # All subtasks are done, but limit was -not- reached: + self._is_done = True + return self._is_done + + @is_done.setter + def is_done(self, value: bool) -> None: # Kill switch + self._is_done = value + + +class BaseConcurrentRawTask(BaseConcurrentTask): + def __init__(self, **kwargs: Any) -> None: + self.dp_outside_start: Optional[Tuple[int, DatapointsTypes]] = None + self.dp_outside_end: Optional[Tuple[int, DatapointsTypes]] = None + super().__init__(**kwargs) + + @property + def offset_next(self) -> int: + return 1 # 1 ms + + def _create_empty_result(self) -> Union[Datapoints, DatapointsArray]: + if not self.use_numpy: + return Datapoints(**convert_all_keys_to_snake_case(self.ts_info), timestamp=[], value=[]) + return DatapointsArray._load( + { + **cast(dict, self.ts_info), + "timestamp": np.array([], dtype=np.int64), + "value": np.array([], dtype=self.raw_dtype), + } + ) + + def _no_data_fetched(self) -> bool: + return not any((self.ts_data, self.dp_outside_start, self.dp_outside_end)) + + def get_result(self) -> Union[Datapoints, DatapointsArray]: + if self._no_data_fetched(): + return self._create_empty_result() + if self.has_limit: + self._cap_dps_at_limit() + if self.query.include_outside_points: + self._include_outside_points_in_result() + if self.use_numpy: + return DatapointsArray._load( + { + **cast(dict, self.ts_info), + "timestamp": create_array_from_dps_container(self.ts_data), + "value": create_array_from_dps_container(self.dps_data), + } + ) + return Datapoints( + **convert_all_keys_to_snake_case(self.ts_info), + timestamp=create_list_from_dps_container(self.ts_data), + value=create_list_from_dps_container(self.dps_data), + ) + + def _find_number_of_subtasks_uniform_split(self, tot_ms: int, n_workers_per_queries: int) -> int: + # It makes no sense to split beyond what the max-size of a query allows (for a maximally dense + # time series), but that is rarely useful as 100k dps is just 1 min 40 sec... we guess an + # average density of points at 1 dp/sec, giving us split-windows no smaller than ~1 day: + return min(n_workers_per_queries, math.ceil((tot_ms / 1000) / self.query.max_query_limit)) + + def _cap_dps_at_limit(self) -> None: + # Note 1: Outside points do not count towards given limit (API spec) + # Note 2: Lock not needed; called after pool is shut down + count = 0 + for i, (subtask_idx, sublist) in enumerate(self.ts_data.items()): + for j, seq in enumerate(sublist): + if count + len(seq) < self.query.limit: + count += len(seq) + continue + end = self.query.limit - count + self.ts_data[subtask_idx][j] = seq[:end] + self.dps_data[subtask_idx][j] = self.dps_data[subtask_idx][j][:end] + # Chop off later arrays (or lists) in same sublist (if any): + self.ts_data[subtask_idx] = self.ts_data[subtask_idx][: j + 1] + self.dps_data[subtask_idx] = self.dps_data[subtask_idx][: j + 1] + # Remove later sublists (if any). We keep using DefaultSortedDicts due to the possibility of + # having to insert/add 'outside points' later: + (new_ts := dps_container()).update(self.ts_data.items()[: i + 1]) # type: ignore [index] + (new_dps := dps_container()).update(self.dps_data.items()[: i + 1]) # type: ignore [index] + self.ts_data, self.dps_data = new_ts, new_dps + return None + + def _include_outside_points_in_result(self) -> None: + for point, idx in zip((self.dp_outside_start, self.dp_outside_end), (-math.inf, math.inf)): + if point: + ts, dp = [point[0]], [point[1]] + if self.use_numpy: + ts = np.array(ts, dtype=np.int64) + dp = np.array(dp, dtype=self.raw_dtype) + self.ts_data[(idx,)].append(ts) + self.dps_data[(idx,)].append(dp) + + def _unpack_and_store(self, idx: Tuple[float, ...], dps: List[Dict[str, DatapointsTypes]]) -> None: + if self.use_numpy: # Faster than feeding listcomp to np.array: + self.ts_data[idx].append(np.fromiter(map(DpsUnpackFns.ts, dps), dtype=np.int64, count=len(dps))) # type: ignore [arg-type] + self.dps_data[idx].append(np.fromiter(map(DpsUnpackFns.raw_dp, dps), dtype=self.raw_dtype, count=len(dps))) # type: ignore [arg-type] + else: + self.ts_data[idx].append(list(map(DpsUnpackFns.ts, dps))) + self.dps_data[idx].append(list(map(DpsUnpackFns.raw_dp, dps))) + + def _store_first_batch(self, dps: List[Dict[str, DatapointsTypes]], first_limit: int) -> None: + if self.query.is_raw_query and self.query.include_outside_points: + self._extract_outside_points(dps) + if not dps: # We might have only gotten outside points + self._is_done = True + return None + super()._store_first_batch(dps, first_limit) + + def _extract_outside_points(self, dps: List[Dict[str, DatapointsTypes]]) -> None: + first_ts = cast(int, dps[0]["timestamp"]) + if first_ts < self.query.start: + # We got a dp before `start`, this should not impact our count towards `limit`: + self.dp_outside_start = DpsUnpackFns.ts_dp_tpl(dps.pop(0)) # Slow pop :( + if dps: + last_ts = cast(int, dps[-1]["timestamp"]) + if last_ts >= self.query.end: # >= because `end` is exclusive + self.dp_outside_end = DpsUnpackFns.ts_dp_tpl(dps.pop()) # Fast pop :) + + +class ParallelUnlimitedRawTask(BaseConcurrentRawTask): + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + # This entire method just to tell mypy: + assert isinstance(self.query, _SingleTSQueryRawUnlimited) + + +class ParallelLimitedRawTask(ConcurrentLimitedMixin, BaseConcurrentRawTask): + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + # This entire method just to tell mypy: + assert isinstance(self.query, _SingleTSQueryRawLimited) + + def _find_number_of_subtasks_uniform_split(self, tot_ms: int, n_workers_per_queries: int) -> int: + # We make the guess that the time series has ~1 dp/sec and use this in combination with the given + # limit to not split into too many queries (highest throughput when each request is close to max limit) + n_estimate_periods = math.ceil((tot_ms / 1000) / self.query.max_query_limit) + remaining_limit = self.query.limit - self.n_dps_first_batch + n_periods = max(1, math.ceil(remaining_limit / self.query.max_query_limit)) + # Pick the smallest N from constraints: + return min(n_workers_per_queries, n_periods, n_estimate_periods) + + +class BaseConcurrentAggTask(BaseConcurrentTask): + def __init__(self, *, query: _SingleTSQueryAgg, use_numpy: bool, **kwargs: Any) -> None: + aggregates_cc = query.aggregates_cc + self._set_aggregate_vars(aggregates_cc, use_numpy) + super().__init__(query=query, use_numpy=use_numpy, **kwargs) + + @cached_property + def offset_next(self) -> int: + return granularity_to_ms(self.query.granularity) + + def _set_aggregate_vars(self, aggregates_cc: List[str], use_numpy: bool) -> None: + self.float_aggs = aggregates_cc[:] + self.is_count_query = "count" in self.float_aggs + if self.is_count_query: + self.count_data = dps_container() + self.float_aggs.remove("count") # Only aggregate that is integer, handle separately + + self.has_non_count_aggs = bool(self.float_aggs) + if self.has_non_count_aggs: + self.agg_unpack_fn = DpsUnpackFns.custom_from_aggregates(self.float_aggs) + + self.first_non_count_agg, *others = self.float_aggs + self.single_non_count_agg = not others + + if use_numpy: + if self.single_non_count_agg: + self.dtype_aggs = np.dtype(np.float64) # type: ignore [assignment] + else: # (.., 1) is deprecated for some reason + self.dtype_aggs = np.dtype((np.float64, len(self.float_aggs))) + + def _find_number_of_subtasks_uniform_split(self, tot_ms: int, n_workers_per_queries: int) -> int: + n_max_dps = tot_ms // self.offset_next # evenly divides + return min(n_workers_per_queries, math.ceil(n_max_dps / self.query.max_query_limit)) + + def _create_empty_result(self) -> Union[Datapoints, DatapointsArray]: + if self.use_numpy: + arr_dct = {"timestamp": np.array([], dtype=np.int64)} + if self.is_count_query: + arr_dct["count"] = np.array([], dtype=np.int64) + if self.has_non_count_aggs: + arr_dct.update({agg: np.array([], dtype=np.float64) for agg in self.float_aggs}) + return DatapointsArray._load({**cast(dict, self.ts_info), **arr_dct}) + + lst_dct = {"timestamp": []} + if self.is_count_query: + lst_dct["count"] = [] + if self.has_non_count_aggs: + lst_dct.update({agg: [] for agg in self.float_aggs}) + return Datapoints(**convert_all_keys_to_snake_case({**cast(dict, self.ts_info), **lst_dct})) + + def get_result(self) -> Union[Datapoints, DatapointsArray]: + if not self.ts_data or self.query.limit == 0: + return self._create_empty_result() + if self.has_limit: + self._cap_dps_at_limit() + + if self.use_numpy: + arr_dct = {"timestamp": create_array_from_dps_container(self.ts_data)} + if self.is_count_query: + arr_dct["count"] = create_array_from_dps_container(self.count_data) + if self.has_non_count_aggs: + arr_lst = create_aggregates_arrays_from_dps_container(self.dps_data, len(self.float_aggs)) + arr_dct.update(dict(zip(self.float_aggs, arr_lst))) + return DatapointsArray._load({**cast(dict, self.ts_info), **arr_dct}) + + lst_dct = {"timestamp": create_list_from_dps_container(self.ts_data)} + if self.is_count_query: + lst_dct["count"] = create_list_from_dps_container(self.count_data) + if self.has_non_count_aggs: + if self.single_non_count_agg: + lst_dct[self.first_non_count_agg] = create_list_from_dps_container(self.dps_data) + else: + aggs_iter = create_aggregates_list_from_dps_container(self.dps_data) + lst_dct.update(dict(zip(self.float_aggs, aggs_iter))) + return Datapoints(**convert_all_keys_to_snake_case({**cast(dict, self.ts_info), **lst_dct})) + + def _cap_dps_at_limit(self) -> None: + count, to_update = 0, ["ts_data"] + if self.is_count_query: + to_update.append("count_data") + if self.has_non_count_aggs: + to_update.append("dps_data") + + for i, (subtask_idx, sublist) in enumerate(self.ts_data.items()): + for j, arr in enumerate(sublist): + if count + len(arr) < self.query.limit: + count += len(arr) + continue + end = self.query.limit - count + + for attr in to_update: + data = getattr(self, attr) + data[subtask_idx][j] = data[subtask_idx][j][:end] + data[subtask_idx] = data[subtask_idx][: j + 1] + setattr(self, attr, dict(data.items()[: i + 1])) # regular dict (no further inserts) + return None + + def _unpack_and_store(self, idx: Tuple[float, ...], dps: List[Dict[str, DatapointsTypes]]) -> None: + if self.use_numpy: + self._unpack_and_store_numpy(idx, dps) + else: + self._unpack_and_store_basic(idx, dps) + + def _unpack_and_store_numpy(self, idx: Tuple[float, ...], dps: List[Dict[str, DatapointsTypes]]) -> None: + n = len(dps) + self.ts_data[idx].append(np.fromiter(map(DpsUnpackFns.ts, dps), dtype=np.int64, count=n)) # type: ignore [arg-type] + + if self.is_count_query: + try: + arr = np.fromiter(map(DpsUnpackFns.count, dps), dtype=np.int64, count=n) # type: ignore [arg-type] + except KeyError: + # An interval with no datapoints (hence count does not exist) has data from another aggregate... probably + # (step_)interpolation. Since the resulting agg. arrays share timestamp, we would have to cast count to float in + # order to store the missing values as NaNs... We don't want that, so we fill with zeros to keep correct dtype: + arr = np.array([dp.get("count", 0) for dp in dps], dtype=np.int64) + self.count_data[idx].append(arr) + + if self.has_non_count_aggs: + try: # Fast method uses multi-key unpacking: + arr = np.fromiter(map(self.agg_unpack_fn, dps), dtype=self.dtype_aggs, count=n) # type: ignore [arg-type] + except KeyError: # An aggregate is missing, fallback to slower `dict.get(agg)`. + # This can happen when certain aggs. are undefined, e.g. `interpolate` at first interval if rounded down + arr = np.array([tuple(map(dp.get, self.float_aggs)) for dp in dps], dtype=np.float64) + self.dps_data[idx].append(arr.reshape(n, len(self.float_aggs))) + + def _unpack_and_store_basic(self, idx: Tuple[float, ...], dps: List[Dict[str, DatapointsTypes]]) -> None: + self.ts_data[idx].append(list(map(DpsUnpackFns.ts, dps))) + + if self.is_count_query: + self.count_data[idx].append([dp.get("count", 0) for dp in dps]) + + if self.has_non_count_aggs: + try: + lst = list(map(self.agg_unpack_fn, dps)) + except KeyError: + if self.single_non_count_agg: + lst = [dp.get(self.first_non_count_agg) for dp in dps] + else: + lst = [tuple(map(dp.get, self.float_aggs)) for dp in dps] + self.dps_data[idx].append(lst) + + +class ParallelUnlimitedAggTask(BaseConcurrentAggTask): + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + # This entire method just to tell mypy: + assert isinstance(self.query, _SingleTSQueryAggUnlimited) + + +class ParallelLimitedAggTask(ConcurrentLimitedMixin, BaseConcurrentAggTask): + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + # This entire method just to tell mypy: + assert isinstance(self.query, _SingleTSQueryAggLimited) + + def _find_number_of_subtasks_uniform_split(self, tot_ms: int, n_workers_per_queries: int) -> int: + remaining_limit = self.query.limit - self.n_dps_first_batch + n_max_dps = min(remaining_limit, tot_ms // self.offset_next) + return max(1, min(n_workers_per_queries, math.ceil(n_max_dps / self.query.max_query_limit))) diff --git a/cognite/client/_api/datapoints.py b/cognite/client/_api/datapoints.py index 8492cf8996..f8babec054 100644 --- a/cognite/client/_api/datapoints.py +++ b/cognite/client/_api/datapoints.py @@ -1,20 +1,558 @@ -import copy +from __future__ import annotations + +import functools +import heapq +import itertools import math -import re as regexp +import statistics +from abc import ABC, abstractmethod +from concurrent.futures import CancelledError, as_completed +from copy import copy from datetime import datetime -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, Union, cast - -import cognite.client.utils._time -from cognite.client import utils +from itertools import chain +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + Iterator, + List, + Literal, + Optional, + Sequence, + Set, + Tuple, + Union, + cast, + overload, +) + +from cognite.client._api.datapoint_constants import ( + DPS_LIMIT, + DPS_LIMIT_AGG, + FETCH_TS_LIMIT, + POST_DPS_OBJECTS_LIMIT, + RETRIEVE_LATEST_LIMIT, + CustomDatapoints, + DatapointsExternalIdTypes, + DatapointsFromAPI, + DatapointsIdTypes, + DatapointsPayload, +) +from cognite.client._api.datapoint_tasks import ( + BaseConcurrentTask, + SplittingFetchSubtask, + _SingleTSQueryBase, + _SingleTSQueryValidator, +) from cognite.client._api.synthetic_time_series import SyntheticDatapointsAPI from cognite.client._api_client import APIClient -from cognite.client.data_classes import Datapoints, DatapointsList, DatapointsQuery -from cognite.client.data_classes.datapoints import DatapointsExternalIdMaybeAggregate, DatapointsIdMaybeAggregate -from cognite.client.exceptions import CogniteAPIError +from cognite.client.data_classes import ( + Datapoints, + DatapointsArray, + DatapointsArrayList, + DatapointsList, + DatapointsQuery, +) +from cognite.client.exceptions import CogniteAPIError, CogniteNotFoundError +from cognite.client.utils._auxiliary import assert_type, local_import, split_into_chunks, split_into_n_parts +from cognite.client.utils._concurrency import collect_exc_info_and_raise, execute_tasks_concurrently from cognite.client.utils._identifier import Identifier, IdentifierSequence +from cognite.client.utils._priority_tpe import PriorityThreadPoolExecutor # type: ignore +from cognite.client.utils._time import timestamp_to_ms if TYPE_CHECKING: - import pandas + from concurrent.futures import Future + + import pandas as pd + + +TSQueryList = List[_SingleTSQueryBase] +PoolSubtaskType = Tuple[int, float, float, SplittingFetchSubtask] + + +def dps_fetch_selector( + dps_client: DatapointsAPI, + user_queries: Sequence[DatapointsQuery], +) -> DpsFetchStrategy: + max_workers = dps_client._config.max_workers + if max_workers < 1: # Dps fetching does not use fn `execute_tasks_concurrently`, so we must check: + raise RuntimeError(f"Invalid option for `{max_workers=}`. Must be at least 1") + all_queries, agg_queries, raw_queries = validate_and_split_user_queries(user_queries) + + # Running mode is decided based on how many time series are requested VS. number of workers: + if len(all_queries) <= max_workers: + # Start shooting requests from the hip immediately: + return EagerDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers) + # Fetch a smaller, chunked batch of dps from all time series - which allows us to do some rudimentary + # guesstimation of dps density - then chunk away: + return ChunkingDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers) + + +def validate_and_split_user_queries( + user_queries: Sequence[DatapointsQuery], +) -> Tuple[TSQueryList, TSQueryList, TSQueryList]: + split_qs: Tuple[TSQueryList, TSQueryList] = [], [] + all_queries = list( + chain.from_iterable( + query.validate_and_create_single_queries() for query in map(_SingleTSQueryValidator, user_queries) + ) + ) + for query in all_queries: + split_qs[query.is_raw_query].append(query) + return (all_queries, *split_qs) + + +class DpsFetchStrategy(ABC): + def __init__( + self, + dps_client: DatapointsAPI, + all_queries: TSQueryList, + agg_queries: TSQueryList, + raw_queries: TSQueryList, + max_workers: int, + ) -> None: + self.dps_client = dps_client + self.all_queries = all_queries + self.agg_queries = agg_queries + self.raw_queries = raw_queries + self.max_workers = max_workers + self.n_queries = len(all_queries) + + @overload + def fetch_all_datapoints(self, use_numpy: Literal[True]) -> DatapointsArrayList: + ... + + @overload + def fetch_all_datapoints(self, use_numpy: Literal[False]) -> DatapointsList: + ... + + def fetch_all_datapoints(self, use_numpy: bool) -> Union[DatapointsList, DatapointsArrayList]: + with PriorityThreadPoolExecutor(max_workers=self.max_workers) as pool: + ordered_results = self.fetch_all(pool, use_numpy) + return self._finalize_tasks(ordered_results, use_numpy) + + @overload + def _finalize_tasks( + self, + ordered_results: List[BaseConcurrentTask], + use_numpy: Literal[True], + ) -> DatapointsArrayList: + ... + + @overload + def _finalize_tasks( + self, + ordered_results: List[BaseConcurrentTask], + use_numpy: Literal[False], + ) -> DatapointsList: + ... + + def _finalize_tasks( + self, + ordered_results: List[BaseConcurrentTask], + use_numpy: bool, + ) -> Union[DatapointsList, DatapointsArrayList]: + lst_class = DatapointsArrayList if use_numpy else DatapointsList + return lst_class( + [ts_task.get_result() for ts_task in ordered_results], + cognite_client=self.dps_client._cognite_client, + ) + + @abstractmethod + def fetch_all(self, pool: PriorityThreadPoolExecutor, use_numpy: bool) -> List[BaseConcurrentTask]: + ... + + @abstractmethod + def _create_initial_tasks(self, pool: PriorityThreadPoolExecutor, use_numpy: bool) -> Tuple[Dict, Dict]: + ... + + +class EagerDpsFetcher(DpsFetchStrategy): + def request_datapoints_jit( + self, + task: SplittingFetchSubtask, + payload: Optional[CustomDatapoints] = None, + ) -> List[Optional[DatapointsFromAPI]]: + # Note: We delay getting the next payload as much as possible; this way, when we count number of + # points left to fetch JIT, we have the most up-to-date estimate (and may quit early): + if (item := task.get_next_payload()) is None: + return [None] + + (payload := copy(payload) or {})["items"] = [item] # type: ignore [typeddict-item] + return self.dps_client._post( + self.dps_client._RESOURCE_PATH + "/list", json=cast(Dict[str, Any], payload) + ).json()["items"] + + def fetch_all(self, pool: PriorityThreadPoolExecutor, use_numpy: bool) -> List[BaseConcurrentTask]: + futures_dct, ts_task_lookup = self._create_initial_tasks(pool, use_numpy) + + # Run until all top level tasks are complete: + while futures_dct: + future = next(as_completed(futures_dct)) + ts_task = (subtask := futures_dct.pop(future)).parent + res = self._get_result_with_exception_handling(future, ts_task, ts_task_lookup, futures_dct) + if res is None: + continue + # We may dynamically split subtasks based on what % of time range was returned: + if new_subtasks := subtask.store_partial_result(res): + self._queue_new_subtasks(pool, futures_dct, new_subtasks) + if ts_task.is_done: # "Parent" ts task might be done before a subtask is finished + if all(parent.is_done for parent in ts_task_lookup.values()): + pool.shutdown(wait=False) + break + if ts_task.has_limit: + # For finished limited queries, cancel all unstarted futures for same parent: + self._cancel_futures_for_finished_ts_task(ts_task, futures_dct) + continue + elif subtask.is_done: + continue + self._queue_new_subtasks(pool, futures_dct, [subtask]) + # Return only non-missing time series tasks in correct order given by `all_queries`: + return list(filter(None, map(ts_task_lookup.get, self.all_queries))) + + def _create_initial_tasks( + self, + pool: PriorityThreadPoolExecutor, + use_numpy: bool, + ) -> Tuple[Dict[Future, SplittingFetchSubtask], Dict[_SingleTSQueryBase, BaseConcurrentTask]]: + futures_dct: Dict[Future, SplittingFetchSubtask] = {} + ts_task_lookup, payload = {}, {"ignoreUnknownIds": False} + for query in self.all_queries: + ts_task = ts_task_lookup[query] = query.ts_task_type(query=query, eager_mode=True, use_numpy=use_numpy) + for subtask in ts_task.split_into_subtasks(self.max_workers, self.n_queries): + future = pool.submit(self.request_datapoints_jit, subtask, payload, priority=subtask.priority) + futures_dct[future] = subtask + return futures_dct, ts_task_lookup + + def _queue_new_subtasks( + self, + pool: PriorityThreadPoolExecutor, + futures_dct: Dict[Future, SplittingFetchSubtask], + new_subtasks: List[SplittingFetchSubtask], + ) -> None: + for task in new_subtasks: + future = pool.submit(self.request_datapoints_jit, task, priority=task.priority) + futures_dct[future] = task + + def _get_result_with_exception_handling( + self, + future: Future, + ts_task: BaseConcurrentTask, + ts_task_lookup: Dict[_SingleTSQueryBase, BaseConcurrentTask], + futures_dct: Dict[Future, SplittingFetchSubtask], + ) -> Optional[DatapointsFromAPI]: + try: + return future.result()[0] + except CancelledError: + return None + except CogniteAPIError as e: + if not (e.code == 400 and e.missing and ts_task.query.ignore_unknown_ids): + collect_exc_info_and_raise([e]) + elif ts_task.is_done: + return None + ts_task.is_done = True + del ts_task_lookup[ts_task.query] + self._cancel_futures_for_finished_ts_task(ts_task, futures_dct) + return None + + def _cancel_futures_for_finished_ts_task( + self, ts_task: BaseConcurrentTask, futures_dct: Dict[Future, SplittingFetchSubtask] + ) -> None: + for future, subtask in futures_dct.copy().items(): + # TODO: Change to loop over parent.subtasks? + if subtask.parent is ts_task: + future.cancel() + del futures_dct[future] + + +class ChunkingDpsFetcher(DpsFetchStrategy): + def __init__(self, *args: Any) -> None: + super().__init__(*args) + # To chunk efficiently, we have subtask pools (heap queues) that we use to prioritise subtasks + # when building/combining subtasks into a full query: + self.raw_subtask_pool: List[PoolSubtaskType] = [] + self.agg_subtask_pool: List[PoolSubtaskType] = [] + self.subtask_pools = (self.agg_subtask_pool, self.raw_subtask_pool) + # Combined partial queries storage (chunked, but not enough to fill a request): + self.next_items: List[Dict[str, Any]] = [] + self.next_subtasks: List[SplittingFetchSubtask] = [] + + self.counter = itertools.count() + + def fetch_all(self, pool: PriorityThreadPoolExecutor, use_numpy: bool) -> List[BaseConcurrentTask]: + # The initial tasks are important - as they tell us which time series are missing, + # which are string etc. We use this info when we choose the best fetch-strategy. + ts_task_lookup, missing_to_raise = {}, set() + initial_query_limits, initial_futures_dct = self._create_initial_tasks(pool) + + for future in as_completed(initial_futures_dct): + res = future.result() + chunk_agg_qs, chunk_raw_qs = initial_futures_dct.pop(future) + new_ts_tasks, chunk_missing = self._create_ts_tasks_and_handle_missing( + res, chunk_agg_qs, chunk_raw_qs, initial_query_limits, use_numpy + ) + missing_to_raise.update(chunk_missing) + ts_task_lookup.update(new_ts_tasks) + + if missing_to_raise: + raise CogniteNotFoundError(not_found=[q.identifier.as_dict(camel_case=False) for q in missing_to_raise]) + + if ts_tasks_left := self._update_queries_with_new_chunking_limit(ts_task_lookup): + self._add_to_subtask_pools( + chain.from_iterable( + task.split_into_subtasks(max_workers=self.max_workers, n_tot_queries=len(ts_tasks_left)) + for task in ts_tasks_left + ) + ) + futures_dct: Dict[Future, List[SplittingFetchSubtask]] = {} + self._queue_new_subtasks(pool, futures_dct) + self._fetch_until_complete(pool, futures_dct, ts_task_lookup) + # Return only non-missing time series tasks in correct order given by `all_queries`: + return list(filter(None, map(ts_task_lookup.get, self.all_queries))) + + def _fetch_until_complete( + self, + pool: PriorityThreadPoolExecutor, + futures_dct: Dict[Future, List[SplittingFetchSubtask]], + ts_task_lookup: Dict[_SingleTSQueryBase, BaseConcurrentTask], + ) -> None: + while futures_dct: + future = next(as_completed(futures_dct)) + res_lst, subtask_lst = future.result(), futures_dct.pop(future) + for subtask, res in zip(subtask_lst, res_lst): + # We may dynamically split subtasks based on what % of time range was returned: + if new_subtasks := subtask.store_partial_result(res): + self._add_to_subtask_pools(new_subtasks) + if not subtask.is_done: + self._add_to_subtask_pools([subtask]) + # Check each parent in current batch once if we may cancel some queued subtasks: + if done_ts_tasks := {sub.parent for sub in subtask_lst if sub.parent.is_done}: + self._cancel_subtasks(done_ts_tasks) + + self._queue_new_subtasks(pool, futures_dct) + + if all(task.is_done for task in ts_task_lookup.values()): + pool.shutdown(wait=False) + return None + + def request_datapoints(self, payload: DatapointsPayload) -> List[Optional[DatapointsFromAPI]]: + return self.dps_client._post( + self.dps_client._RESOURCE_PATH + "/list", json=cast(Dict[str, Any], payload) + ).json()["items"] + + def _create_initial_tasks( + self, pool: PriorityThreadPoolExecutor + ) -> Tuple[Dict[_SingleTSQueryBase, int], Dict[Future, Tuple[TSQueryList, TSQueryList]]]: + initial_query_limits: Dict[_SingleTSQueryBase, int] = {} + initial_futures_dct: Dict[Future, Tuple[TSQueryList, TSQueryList]] = {} + # Optimal queries uses the entire worker pool. We may be forced to use more (queue) when we + # can't fit all individual time series (maxes out at `FETCH_TS_LIMIT * max_workers`): + n_queries = max(self.max_workers, math.ceil(self.n_queries / FETCH_TS_LIMIT)) + splitter = functools.partial(split_into_n_parts, n=n_queries) + for query_chunks in zip(splitter(self.agg_queries), splitter(self.raw_queries)): + # Agg and raw limits are independent in the query, so we max out on both: + items = [] + for queries, max_lim in zip(query_chunks, [DPS_LIMIT_AGG, DPS_LIMIT]): + maxed_limits = self._find_initial_query_limits( + [q.capped_limit for q in queries], max_lim # type: ignore [attr-defined] + ) + initial_query_limits.update( + chunk_query_limits := dict(zip(queries, maxed_limits)) # type: ignore [arg-type] + ) + items.extend( + [ + {**q.to_payload(), "limit": lim} # type: ignore [attr-defined] + for q, lim in chunk_query_limits.items() + ] + ) + + payload = {"ignoreUnknownIds": True, "items": items} + future = pool.submit(self.request_datapoints, payload, priority=0) + initial_futures_dct[future] = query_chunks # type: ignore [assignment] + return initial_query_limits, initial_futures_dct + + def _create_ts_tasks_and_handle_missing( + self, + results: List[DatapointsFromAPI], + chunk_agg_qs: TSQueryList, + chunk_raw_qs: TSQueryList, + initial_query_limits: Dict[_SingleTSQueryBase, int], + use_numpy: bool, + ) -> Tuple[Dict[_SingleTSQueryBase, BaseConcurrentTask], Set[_SingleTSQueryBase]]: + if len(results) == len(chunk_agg_qs) + len(chunk_raw_qs): + to_raise: Set[_SingleTSQueryBase] = set() + else: + # We have at least 1 missing time series: + chunk_agg_qs, chunk_raw_qs, to_raise = self._handle_missing_ts(results, chunk_agg_qs, chunk_raw_qs) + self._update_queries_is_string(results, chunk_raw_qs) + # Align initial results with corresponding queries and create tasks: + ts_tasks = { + query: query.ts_task_type( + query=query, + eager_mode=False, + use_numpy=use_numpy, + first_dps_batch=res, + first_limit=initial_query_limits[query], + ) + for res, query in zip(results, chain(chunk_agg_qs, chunk_raw_qs)) + } + return ts_tasks, to_raise + + def _add_to_subtask_pools(self, new_subtasks: Iterable[SplittingFetchSubtask]) -> None: + for task in new_subtasks: + # We leverage how tuples are compared to prioritise items. First `priority`, then `payload limit` + # (to easily group smaller queries), then counter to always break ties, but keep order (never use tasks themselves): + limit = min(task.n_dps_left, task.max_query_limit) + new_subtask: PoolSubtaskType = (task.priority, limit, next(self.counter), task) + heapq.heappush(self.subtask_pools[task.is_raw_query], new_subtask) + + def _queue_new_subtasks( + self, pool: PriorityThreadPoolExecutor, futures_dct: Dict[Future, List[SplittingFetchSubtask]] + ) -> None: + qsize = pool._work_queue.qsize() # Approximate size of the queue (number of unstarted tasks) + if qsize > 2 * self.max_workers: + # Each worker has more than 2 tasks already awaiting in the thread pool queue already, so we + # hold off on combining new subtasks just yet (allows better prioritisation as more new tasks arrive). + return None + # When pool queue has few awaiting tasks, we empty the subtasks pool into a partial request: + return_partial_payload = qsize <= min(5, math.ceil(self.max_workers / 2)) + combined_requests = self._combine_subtasks_into_requests(return_partial_payload) + + for payload, subtask_lst, priority in combined_requests: + future = pool.submit(self.request_datapoints, payload, priority=priority) + futures_dct[future] = subtask_lst + + def _combine_subtasks_into_requests( + self, + return_partial_payload: bool, + ) -> Iterator[Tuple[DatapointsPayload, List[SplittingFetchSubtask], float]]: + + while any(self.subtask_pools): # As long as both are not empty + payload_at_max_items, payload_is_full = False, [False, False] + for task_pool, request_max_limit, is_raw in zip( + self.subtask_pools, (DPS_LIMIT_AGG, DPS_LIMIT), [False, True] + ): + if not task_pool: + continue + limit_used = 0 + if self.next_items: # Happens when we continue building on a previous "partial payload" + limit_used = sum( # Tally up either raw or agg query `limit_used` + item["limit"] + for item, task in zip(self.next_items, self.next_subtasks) + if task.is_raw_query is is_raw + ) + while task_pool: + if len(self.next_items) + 1 > FETCH_TS_LIMIT: + payload_at_max_items = True + break + # Highest priority task is always at index 0 (heap magic): + *_, next_task = task_pool[0] + next_payload = next_task.get_next_payload() + if next_payload is None or next_task.is_done: + # Parent task finished before subtask and has been marked done already: + heapq.heappop(task_pool) # Pop to remove from heap + continue + next_limit = next_payload["limit"] + if limit_used + next_limit <= request_max_limit: + self.next_items.append(next_payload) + self.next_subtasks.append(next_task) + limit_used += next_limit + heapq.heappop(task_pool) + else: + payload_is_full[is_raw] = True # type: ignore [has-type] + break + + payload_done = ( + payload_at_max_items + or all(payload_is_full) + or (payload_is_full[0] and not self.raw_subtask_pool) + or (payload_is_full[1] and not self.agg_subtask_pool) + or (return_partial_payload and not any(self.subtask_pools)) + ) + if payload_done: + if not len(self.next_subtasks): + # Happens with limited queries as more and more "later" tasks get cancelled. + break + priority = statistics.mean(task.priority for task in self.next_subtasks) + payload: DatapointsPayload = {"items": self.next_items[:]} # type: ignore [typeddict-item] + yield payload, self.next_subtasks[:], priority + + self.next_items, self.next_subtasks = [], [] + break + + def _update_queries_with_new_chunking_limit( + self, ts_task_lookup: Dict[_SingleTSQueryBase, BaseConcurrentTask] + ) -> List[BaseConcurrentTask]: + queries = [query for query, task in ts_task_lookup.items() if not task.is_done] + tot_raw = sum(q.is_raw_query for q in queries) + tot_agg = len(queries) - tot_raw + n_raw_chunk = min(FETCH_TS_LIMIT, math.ceil((tot_raw or 1) / 10)) + n_agg_chunk = min(FETCH_TS_LIMIT, math.ceil((tot_agg or 1) / 10)) + max_limit_raw = math.floor(DPS_LIMIT / n_raw_chunk) + max_limit_agg = math.floor(DPS_LIMIT_AGG / n_agg_chunk) + for query in queries: + if query.is_raw_query: + query.override_max_query_limit(max_limit_raw) + else: + query.override_max_query_limit(max_limit_agg) + return [ts_task_lookup[query] for query in queries] + + def _cancel_subtasks(self, done_ts_tasks: Set[BaseConcurrentTask]) -> None: + for ts_task in done_ts_tasks: + # We do -not- want to iterate/mutate the heapqs, so we mark subtasks as done instead: + for subtask in ts_task.subtasks: + subtask.is_done = True + + @staticmethod + def _find_initial_query_limits(limits: List[int], max_limit: int) -> List[int]: + actual_lims = [0] * len(limits) + not_done = set(range(len(limits))) + while not_done: + part = max_limit // len(not_done) + if not part: + # We still might not have not reached max_limit, but we can no longer distribute evenly + break + rm_idx = set() + for i in not_done: + i_part = min(part, limits[i]) # A query of limit=10 does not need more of max_limit than 10 + actual_lims[i] += i_part + max_limit -= i_part + if i_part == limits[i]: + rm_idx.add(i) + else: + limits[i] -= i_part + not_done -= rm_idx + return actual_lims + + @staticmethod + def _update_queries_is_string(res: List[DatapointsFromAPI], queries: TSQueryList) -> None: + is_string = {("id", r["id"]) for r in res if r["isString"]}.union( + ("externalId", r["externalId"]) for r in res if r["isString"] + ) + for q in queries: + q.is_string = q.identifier.as_tuple() in is_string + + @staticmethod + def _handle_missing_ts( + res: List[DatapointsFromAPI], + agg_queries: TSQueryList, + raw_queries: TSQueryList, + ) -> Tuple[TSQueryList, TSQueryList, Set[_SingleTSQueryBase]]: + missing, to_raise = set(), set() + not_missing = {("id", r["id"]) for r in res}.union(("externalId", r["externalId"]) for r in res) + for query in chain(agg_queries, raw_queries): + # Update _SingleTSQueryBase objects with `is_missing` status: + query.is_missing = query.identifier.as_tuple() not in not_missing + if query.is_missing: + missing.add(query) + # We might be handling multiple simultaneous top-level queries, each with a + # different settings for "ignore unknown": + if not query.ignore_unknown_ids: + to_raise.add(query) + agg_queries = [q for q in agg_queries if not q.is_missing] + raw_queries = [q for q in raw_queries if not q.is_missing] + return agg_queries, raw_queries, to_raise class DatapointsAPI(APIClient): @@ -22,79 +560,223 @@ class DatapointsAPI(APIClient): def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) - self._DPS_LIMIT_AGG = 10000 - self._DPS_LIMIT = 100000 - self._POST_DPS_OBJECTS_LIMIT = 10000 - self._RETRIEVE_LATEST_LIMIT = 100 self.synthetic = SyntheticDatapointsAPI( self._config, api_version=self._api_version, cognite_client=self._cognite_client ) def retrieve( self, - start: Union[int, str, datetime], - end: Union[int, str, datetime], - id: DatapointsIdMaybeAggregate = None, - external_id: DatapointsExternalIdMaybeAggregate = None, - aggregates: List[str] = None, - granularity: str = None, - include_outside_points: bool = None, - limit: int = None, + *, + id: Optional[DatapointsIdTypes] = None, + external_id: Optional[DatapointsExternalIdTypes] = None, + start: Union[int, str, datetime, None] = None, + end: Union[int, str, datetime, None] = None, + aggregates: Optional[List[str]] = None, + granularity: Optional[str] = None, + limit: Optional[int] = None, + include_outside_points: bool = False, ignore_unknown_ids: bool = False, ) -> Union[None, Datapoints, DatapointsList]: - """`Get datapoints for one or more time series. `_ + """`Retrieve datapoints for one or more time series. `_ + + **Note**: All arguments are optional, as long as at least one identifier is given. When passing aggregates, granularity must also be given. + When passing dict objects with specific parameters, these will take precedence. See examples below. - Note that you cannot specify the same ids/external_ids multiple times. + **Performance hint:**: For better performance and memory usage, consider using `retrieve_arrays(...)` which uses `numpy.ndarrays` for data storage. Args: - start (Union[int, str, datetime]): Inclusive start. - end (Union[int, str, datetime]): Exclusive end. - id (DatapointsIdMaybeAggregate): Id or list of ids. Can also be object - specifying aggregates. See example below. - external_id (DatapointsExternalIdMaybeAggregate): External id or list of external - ids. Can also be object specifying aggregates. See example below. - aggregates (List[str]): List of aggregate functions to apply. - granularity (str): The granularity to fetch aggregates at. e.g. '1s', '2h', '10d'. - include_outside_points (bool): Whether or not to include outside points. - limit (int): Maximum number of datapoints to return for each time series. - ignore_unknown_ids (bool): Ignore IDs and external IDs that are not found rather than throw an exception. + start (Union[int, str, datetime]): Inclusive start. Default: 1970-01-01 UTC. + end (Union[int, str, datetime]): Exclusive end. Default: "now" + id (DatapointsIdTypes): Id, dict (with id) or (mixed) list of these. See examples below. + external_id (DatapointsExternalIdTypes): External id, dict (with external id) or (mixed) list of these. See examples below. + aggregates (List[str]): List of aggregate functions to apply. Default: No aggregates (raw datapoints) + granularity (str): The granularity to fetch aggregates at. e.g. '1s', '2h', '10d'. Default: None. + limit (int): Maximum number of datapoints to return for each time series. Default: None (no limit) + include_outside_points (bool): Whether or not to include outside points. Not allowed when fetching aggregates. Default: False + ignore_unknown_ids (bool): Whether or not to ignore missing time series rather than raising an exception. Default: False Returns: - Union[None, Datapoints, DatapointsList]: A Datapoints object containing the requested data, or a list of such objects. If `ignore_unknown_id` is True, single id is requested and it is not found, the function will return `None`. + Union[None, Datapoints, DatapointsList]: A `Datapoints` object containing the requested data, or a `DatapointsList` if multiple + time series were asked for. If `ignore_unknown_ids` is `True`, a single time series is requested and it is not found, the function + will return `None`. The ordering is first ids, then external_ids. Examples: - You can get specify the ids of the datapoints you wish to retrieve in a number of ways. In this example - we are using the time-ago format to get raw data for the time series with id 1:: + You can specify the identifiers of the datapoints you wish to retrieve in a number of ways. In this example + we are using the time-ago format to get raw data for the time series with id=42 from 2 weeks ago up until now:: >>> from cognite.client import CogniteClient - >>> c = CogniteClient() - >>> dps = c.datapoints.retrieve(id=1, start="2w-ago", end="now") + >>> client = CogniteClient() + >>> dps = client.time_series.data.retrieve(id=42, start="2w-ago") - We can also get aggregated values, such as average. Here we are getting daily averages for all of 2018 for + You can also get aggregated values, such as the average. Here we are getting daily averages for all of 2018 for two different time series. Note that we are fetching them using their external ids:: - >>> from cognite.client import CogniteClient - >>> c = CogniteClient() - >>> dps = c.datapoints.retrieve(external_id=["abc", "def"], - ... start=datetime(2018,1,1), - ... end=datetime(2019,1,1), - ... aggregates=["average"], - ... granularity="1d") + >>> from datetime import datetime, timezone + >>> utc = timezone.utc + >>> dps = client.time_series.data.retrieve( + ... external_id=["foo", "bar"], + ... start=datetime(2018, 1, 1, tzinfo=utc), + ... end=datetime(2018, 1, 1, tzinfo=utc), + ... aggregates=["average"], + ... granularity="1d") + + Note that all parameters (except `ignore_unknown_ids`) can be individually set if you pass (one or more) dictionaries. + If you also pass top-level parameters, these will be overwritten by the individual parameters (when both exist). You are + free to mix ids and external ids. + + Let's say you want different aggregates and end-times for a few time series: + + >>> dps = client.time_series.data.retrieve( + ... id=[ + ... {"id": 42, "end": "2d-ago", "aggregates": ["average"]}, + ... {"id": 11, "end": "1d-ago", "aggregates": ["min", "max", "count"]}, + ... ], + ... external_id={"external_id": "foo", "aggregates": ["max"]}, + ... start="5d-ago", + ... granularity="1h") + + When requesting multiple time series, an easy way to get the datapoints of a specific one is to use the `.get` method + on the returned `DatapointsList` object, then specify if you want `id` or `external_id`. Note: If you fetch a time series + by using `id`, you can still access it with its `external_id` (and the opposite way around):: + + >>> dps_lst = client.time_series.data.retrieve( + ... id=[42, 43, ..., 500], start="2w-ago") + >>> ts_350 = dps_lst.get(id=350) # `Datapoints` object + + ...but what happens if you request duplicate `id`s or `external_id`s? Let's say you need to fetch data from multiple + disconnected periods, e.g. stock data only from recessions. In this case the `.get` method will return a list of `Datapoints` instead, + (similar to how slicing works with non-unique indexes on Pandas DataFrames): + + >>> dps_lst = client.time_series.data.retrieve( + ... id=[ + ... 42, 43, 44, 45, + ... {"id": 350, "start": datetime(1907, 10, 14, tzinfo=utc), "end": datetime(1907, 11, 6, tzinfo=utc)}, + ... {"id": 350, "start": datetime(1929, 9, 4, tzinfo=utc), "end": datetime(1929, 11, 13, tzinfo=utc)}, + ... ]) + >>> ts_44 = dps_lst.get(id=44) # Single `Datapoints` object + >>> ts_350_lst = dps_lst.get(id=350) # List of two `Datapoints` objects + + The last example showcases the great flexibility of the `retrieve` endpoint, with a very custom query. If you also want to + specify multiple values for `ignore_unknown_ids`, you'll need to use the `.query` endpoint. + + >>> ts1 = 1337 + >>> ts2 = { + ... "id": 42, + ... "start": -12345, # Overrides `start` argument below + ... "end": "1h-ago", + ... "limit": 1000, # Overrides `limit` argument below + ... "include_outside_points": True + ... } + >>> ts3 = { + ... "id": 11, + ... "end": "1h-ago", + ... "aggregates": ["max"], + ... "granularity": "42h", + ... "include_outside_points": False + ... } + >>> dps = client.time_series.data.retrieve( + ... id=[ts1, ts2, ts3], start="2w-ago", limit=None + ... ) + """ + query = DatapointsQuery( + start=start, + end=end, + id=id, + external_id=external_id, + aggregates=aggregates, + granularity=granularity, + limit=limit, + include_outside_points=include_outside_points, + ignore_unknown_ids=ignore_unknown_ids, + ) + fetcher = dps_fetch_selector(self, user_queries=[query]) + dps_list = fetcher.fetch_all_datapoints(use_numpy=False) + if not query.is_single_identifier: + return dps_list + elif not dps_list and ignore_unknown_ids: + return None + return dps_list[0] + + def retrieve_arrays( + self, + *, + id: Optional[DatapointsIdTypes] = None, + external_id: Optional[DatapointsExternalIdTypes] = None, + start: Union[int, str, datetime, None] = None, + end: Union[int, str, datetime, None] = None, + aggregates: Optional[List[str]] = None, + granularity: Optional[str] = None, + limit: Optional[int] = None, + include_outside_points: bool = False, + ignore_unknown_ids: bool = False, + ) -> Union[None, DatapointsArray, DatapointsArrayList]: + """`Retrieve datapoints for one or more time series. `_ + + **Note**: This method requires `numpy`. + + Args: + start (Union[int, str, datetime]): Inclusive start. Default: 1970-01-01 UTC. + end (Union[int, str, datetime]): Exclusive end. Default: "now" + id (DatapointsIdTypes): Id, dict (with id) or (mixed) list of these. See examples below. + external_id (DatapointsExternalIdTypes): External id, dict (with external id) or (mixed) list of these. See examples below. + aggregates (List[str]): List of aggregate functions to apply. Default: No aggregates (raw datapoints) + granularity (str): The granularity to fetch aggregates at. e.g. '1s', '2h', '10d'. Default: None. + limit (int): Maximum number of datapoints to return for each time series. Default: None (no limit) + include_outside_points (bool): Whether or not to include outside points. Not allowed when fetching aggregates. Default: False + ignore_unknown_ids (bool): Whether or not to ignore missing time series rather than raising an exception. Default: False + + Returns: + Union[None, DatapointsArray, DatapointsArrayList]: A `DatapointsArray` object containing the requested data, or a `DatapointsArrayList` if multiple + time series were asked for. If `ignore_unknown_ids` is `True`, a single time series is requested and it is not found, the function + will return `None`. The ordering is first ids, then external_ids. + + Examples: - If you want different aggregates for different time series specify your ids like this:: + **Note:** For more usage examples, see `DatapointsAPI.retrieve` method (which accepts exactly the same arguments). + + Get weekly `min` and `max` aggregates for a time series with id=42 since the year 2000, then compute the range of values: >>> from cognite.client import CogniteClient - >>> c = CogniteClient() - >>> dps = c.datapoints.retrieve(id=[{"id": 1, "aggregates": ["average"]}, - ... {"id": 1, "aggregates": ["min"]}], - ... external_id={"externalId": "1", "aggregates": ["max"]}, - ... start="1d-ago", end="now", granularity="1h") - """ - fetcher = DatapointsFetcher(client=self) + >>> from datetime import datetime, timezone + >>> client = CogniteClient() + >>> dps = client.time_series.data.retrieve_arrays( + ... id=42, + ... start=datetime(2020, 1, 1, tzinfo=timezone.utc), + ... aggregates=["min", "max"], + ... granularity="7d") + >>> weekly_range = dps.max - dps.min + + Get up-to 2 million raw datapoints for the last 48 hours for a noisy time series with external_id="ts-noisy", + then use a small and wide moving average filter to smooth it out: - _, is_single_id = fetcher._process_ts_identifiers(id, external_id) + >>> import numpy as np + >>> dps = client.time_series.data.retrieve_arrays( + ... external_id="ts-noisy", + ... start="2d-ago", + ... limit=2_000_000) + >>> smooth = np.convolve(dps.value, np.ones(5) / 5) # doctest: +SKIP + >>> smoother = np.convolve(dps.value, np.ones(20) / 20) # doctest: +SKIP + + Get raw datapoints for multiple time series, that may or may not exist, from the last 2 hours, then find the + largest gap between two consecutive values for all time series, also taking the previous value into account (outside point). + + >>> id_lst = [42, 43, 44] + >>> dps_lst = client.time_series.data.retrieve_arrays( + ... id=id_lst, + ... start="2h-ago", + ... include_outside_points=True, + ... ignore_unknown_ids=True) + >>> largest_gaps = [np.max(np.diff(dps.timestamp)) for dps in dps_lst] + + Get raw datapoints for a time series with external_id="bar" from the last 10 weeks, then convert to a `pandas.Series` + (you can of course also use the `to_pandas()` convenience method if you want a `pandas.DataFrame`): + >>> import pandas as pd + >>> dps = client.time_series.data.retrieve_arrays(external_id="bar", start="10w-ago") + >>> series = pd.Series(dps.value, index=dps.timestamp) + """ + local_import("numpy") # Verify that numpy is available or raise CogniteImportError query = DatapointsQuery( start=start, end=end, @@ -102,16 +784,186 @@ def retrieve( external_id=external_id, aggregates=aggregates, granularity=granularity, + limit=limit, include_outside_points=include_outside_points, + ignore_unknown_ids=ignore_unknown_ids, + ) + fetcher = dps_fetch_selector(self, user_queries=[query]) + dps_list = fetcher.fetch_all_datapoints(use_numpy=True) + if not query.is_single_identifier: + return dps_list + elif not dps_list and ignore_unknown_ids: + return None + return dps_list[0] + + def retrieve_dataframe( + self, + *, + id: Optional[DatapointsIdTypes] = None, + external_id: Optional[DatapointsExternalIdTypes] = None, + start: Union[int, str, datetime, None] = None, + end: Union[int, str, datetime, None] = None, + aggregates: Optional[List[str]] = None, + granularity: Optional[str] = None, + limit: Optional[int] = None, + include_outside_points: bool = False, + ignore_unknown_ids: bool = False, + uniform_index: bool = False, + include_aggregate_name: bool = True, + column_names: Literal["id", "external_id"] = "external_id", + ) -> pd.DataFrame: + """Get datapoints directly in a pandas dataframe (convenience method wrapping the `retrieve_arrays` method). + + **Note**: If you have duplicated time series in your query, the dataframe columns will also contain duplicates. + + Args: + start (Union[int, str, datetime]): Inclusive start. Default: 1970-01-01 UTC. + end (Union[int, str, datetime]): Exclusive end. Default: "now" + id (DatapointsIdTypes): Id, dict (with id) or (mixed) list of these. See examples below. + external_id (DatapointsExternalIdTypes): External id, dict (with external id) or (mixed) list of these. See examples below. + aggregates (List[str]): List of aggregate functions to apply. Default: No aggregates (raw datapoints) + granularity (str): The granularity to fetch aggregates at. e.g. '1s', '2h', '10d'. Default: None. + limit (int): Maximum number of datapoints to return for each time series. Default: None (no limit) + include_outside_points (bool): Whether or not to include outside points. Not allowed when fetching aggregates. Default: False + ignore_unknown_ids (bool): Whether or not to ignore missing time series rather than raising an exception. Default: False + uniform_index (bool): If only querying aggregates AND a single granularity is used AND no limit is used, specifying `uniform_index=True` will return a dataframe with an + equidistant datetime index from the earliest `start` to the latest `end` (missing values will be NaNs). If these requirements are not met, a ValueError is raised. Default: False + include_aggregate_name (bool): Include 'aggregate' in the column name, e.g. `my-ts|average`. Ignored for raw time series. Default: True + column_names ("id" | "external_id"): Use either ids or external ids as column names. Time series missing external id will use id as backup. Default: "external_id" + + Returns: + pandas.DataFrame + + Returns: + pandas.DataFrame: A pandas DataFrame containing the requested time series. The ordering of columns is ids first, then external_ids. + For time series with multiple aggregates, they will be sorted in alphabetical order ("average" before "max"). + + Examples: + + Get a pandas dataframe using a single id, and use this id as column name, with no more than 100 datapoints:: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> df = client.time_series.data.retrieve_dataframe( + ... id=12345, + ... start="2w-ago", + ... end="now", + ... limit=100, + ... column_names="id") + + Get the pandas dataframe with a uniform index (fixed spacing between points) of 1 day, for two time series with + individually specified aggregates, from 1990 through 2020:: + + >>> from datetime import datetime, timezone + >>> df = client.time_series.data.retrieve_dataframe( + ... id=[ + ... {"external_id": "foo", "aggregates": ["discrete_variance"]}, + ... {"external_id": "bar", "aggregates": ["total_variation", "continuous_variance"]}, + ... ], + ... granularity="1d", + ... start=datetime(1990, 1, 1, tzinfo=timezone.utc), + ... end=datetime(2020, 12, 31, tzinfo=timezone.utc), + ... uniform_index=True) + + Get a pandas dataframe containing the 'average' aggregate for two time series using a 30 day granularity, + starting Jan 1, 1970 all the way up to present, without having the aggregate name in the column names:: + + >>> df = client.time_series.data.retrieve_dataframe( + ... external_id=["foo", "bar"], + ... aggregates=["average"], + ... granularity="30d", + ... include_aggregate_name=False) + """ + _, pd = local_import("numpy", "pandas") # Verify that deps are available or raise CogniteImportError + if column_names not in {"id", "external_id"}: + raise ValueError(f"Given parameter {column_names=} must be one of 'id' or 'external_id'") + + query = DatapointsQuery( + start=start, + end=end, + id=id, + external_id=external_id, + aggregates=aggregates, + granularity=granularity, limit=limit, + include_outside_points=include_outside_points, ignore_unknown_ids=ignore_unknown_ids, ) - dps_list = fetcher.fetch(query) - if is_single_id: - if len(dps_list) == 0 and ignore_unknown_ids is True: - return None - return dps_list[0] - return dps_list + fetcher = dps_fetch_selector(self, user_queries=[query]) + if uniform_index: + grans_given = set(q.granularity for q in fetcher.all_queries) + is_limited = any(q.limit is not None for q in fetcher.all_queries) + if fetcher.raw_queries or len(grans_given) > 1 or is_limited: + raise ValueError( + "Cannot return a uniform index when asking for aggregates with multiple granularities " + f"({grans_given}) OR when (partly) querying raw datapoints OR when a finite limit is used." + ) + df = fetcher.fetch_all_datapoints(use_numpy=True).to_pandas(column_names, include_aggregate_name) + if not uniform_index: + return df + + start = pd.Timestamp(min(q.start for q in fetcher.agg_queries), unit="ms") + end = pd.Timestamp(max(q.end for q in fetcher.agg_queries), unit="ms") + (granularity,) = grans_given + # Pandas understand "Cognite granularities" except `m` (minutes) which we must translate: + return df.reindex(pd.date_range(start=start, end=end, freq=granularity.replace("m", "T"), inclusive="left")) + + @overload + def query( + self, + query: Union[Sequence[DatapointsQuery], DatapointsQuery], + use_numpy: Literal[False], + ) -> DatapointsList: + ... + + @overload + def query( + self, + query: Union[Sequence[DatapointsQuery], DatapointsQuery], + use_numpy: Literal[True], + ) -> DatapointsArrayList: + ... + + def query( + self, + query: Union[Sequence[DatapointsQuery], DatapointsQuery], + use_numpy: bool = False, + ) -> Union[DatapointsList, DatapointsArrayList]: + """Get datapoints for one or more time series by passing query objects directly. + + **Note**: Before version 5.0.0, this method was the only way to retrieve datapoints easily with individual fetch settings. + This is no longer the case: `query` only differs from `retrieve` in that you can specify different values for `ignore_unknown_ids` for the multiple + query objects you pass, which is quite a niche feature. Since this is a boolean parameter, the only real use case is to pass exactly + two queries to this method; the "can be" missing and the "can't be" missing groups. If you do not need this functionality, + stick with the `retrieve` and `retrieve_arrays` endpoint. + + Args: + query (Union[DatapointsQuery, Sequence[DatapointsQuery]): The queries for datapoints + use_numpy (bool): Override fetching method to take advantage of `numpy`. If True, returns `DatapointsArrayList` instead of `DatapointsList`. Default: False. + + Returns: + Union[DatapointsList, DatapointsArrayList]: The requested datapoints. Note that you always get a single list of datapoints objects returned with type dictated + by the `use_numpy` argument. The order is the ids of the first query, then the external ids of the first query, then so on for the next queries. + + Examples: + + This method is useful if one group of one or more time series can be missing AND another, can't be missing:: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes import DatapointsQuery + >>> c = CogniteClient() + >>> query1 = DatapointsQuery(id=[111, 222], start="2d-ago", end="now", ignore_unknown_ids=False) + >>> query2 = DatapointsQuery(external_id="foo", start=2900, end="now", ignore_unknown_ids=True) + >>> res_lst = c.time_series.data.query([query1, query2]) + + To return datapoints stored in `numpy` arrays, pass the `use_numpy` argument: + + >>> res_arrays = c.time_series.data.query([query1, query2], use_numpy=True) + """ + if isinstance(query, DatapointsQuery): + query = [query] + fetcher = dps_fetch_selector(self, user_queries=query) + return fetcher.fetch_all_datapoints(use_numpy=use_numpy) def retrieve_latest( self, @@ -123,9 +975,9 @@ def retrieve_latest( """`Get the latest datapoint for one or more time series `_ Args: - id (Union[int, List[int]]: Id or list of ids. + id (Union[int, List[int]]): Id or list of ids. external_id (Union[str, List[str]): External id or list of external ids. - before: Union[int, str, datetime]: Get latest datapoint before this time. + before: (Union[int, str, datetime]): Get latest datapoint before this time. ignore_unknown_ids (bool): Ignore IDs and external IDs that are not found rather than throw an exception. Returns: @@ -138,40 +990,38 @@ def retrieve_latest( >>> from cognite.client import CogniteClient >>> c = CogniteClient() - >>> res = c.datapoints.retrieve_latest(id=1)[0] + >>> res = c.time_series.data.retrieve_latest(id=1)[0] You can also get the first datapoint before a specific time:: >>> from cognite.client import CogniteClient >>> c = CogniteClient() - >>> res = c.datapoints.retrieve_latest(id=1, before="2d-ago")[0] + >>> res = c.time_series.data.retrieve_latest(id=1, before="2d-ago")[0] If you need the latest datapoint for multiple time series simply give a list of ids. Note that we are using external ids here, but either will work:: >>> from cognite.client import CogniteClient >>> c = CogniteClient() - >>> res = c.datapoints.retrieve_latest(external_id=["abc", "def"]) + >>> res = c.time_series.data.retrieve_latest(external_id=["abc", "def"]) >>> latest_abc = res[0][0] >>> latest_def = res[1][0] """ - before = cognite.client.utils._time.timestamp_to_ms(before) if before else None + before = timestamp_to_ms(before) if before else None id_seq = IdentifierSequence.load(id, external_id) all_ids = id_seq.as_dicts() if before: for id_ in all_ids: - id_.update({"before": before}) + id_["before"] = before tasks = [ { "url_path": self._RESOURCE_PATH + "/latest", "json": {"items": chunk, "ignoreUnknownIds": ignore_unknown_ids}, } - for chunk in utils._auxiliary.split_into_chunks(all_ids, self._RETRIEVE_LATEST_LIMIT) + for chunk in split_into_chunks(all_ids, RETRIEVE_LATEST_LIMIT) ] - tasks_summary = utils._concurrency.execute_tasks_concurrently( - self._post, tasks, max_workers=self._config.max_workers - ) + tasks_summary = execute_tasks_concurrently(self._post, tasks, max_workers=self._config.max_workers) if tasks_summary.exceptions: raise tasks_summary.exceptions[0] res = tasks_summary.joined_results(lambda res: res.json()["items"]) @@ -179,44 +1029,11 @@ def retrieve_latest( return Datapoints._load(res[0], cognite_client=self._cognite_client) return DatapointsList._load(res, cognite_client=self._cognite_client) - def query( - self, query: Union[DatapointsQuery, List[DatapointsQuery]] - ) -> Union[DatapointsList, List[DatapointsList]]: - """Get datapoints for one or more time series - - This method is different from get() in that you can specify different start times, end times, and granularities - for each requested time series. - - Args: - query (Union[DatapointsQuery, List[DatapointsQuery]): List of datapoint queries. - - Returns: - Union[DatapointsList, List[DatapointsList]]: The requested DatapointsList(s). - - Examples: - - This method is useful if you want to get multiple time series, but you want to specify different starts, - ends, or granularities for each. e.g.:: - - >>> from cognite.client import CogniteClient - >>> from cognite.client.data_classes import DatapointsQuery - >>> c = CogniteClient() - >>> queries = [DatapointsQuery(id=1, start="2d-ago", end="now"), - ... DatapointsQuery(external_id="abc", - ... start="10d-ago", - ... end="now", - ... aggregates=["average"], - ... granularity="1m")] - >>> res = c.datapoints.query(queries) - """ - fetcher = DatapointsFetcher(self) - if isinstance(query, DatapointsQuery): - return fetcher.fetch(query) - return fetcher.fetch_multiple(query) - def insert( self, datapoints: Union[ + Datapoints, + DatapointsArray, List[Dict[Union[int, float, datetime], Union[int, float, str]]], List[Tuple[Union[int, float, datetime], Union[int, float, str]]], ], @@ -229,7 +1046,7 @@ def insert( Args: datapoints(Union[List[Dict], List[Tuple],Datapoints]): The datapoints you wish to insert. Can either be a list of tuples, - a list of dictionaries, or a Datapoints object. See examples below. + a list of dictionaries, a Datapoints object or a DatapointsArray object. See examples below. id (int): Id of time series to insert datapoints into. external_id (str): External id of time series to insert datapoint into. @@ -243,38 +1060,43 @@ def insert( >>> from cognite.client import CogniteClient + >>> from datetime import datetime, timezone >>> c = CogniteClient() - >>> # with datetime objects - >>> datapoints = [(datetime(2018,1,1), 1000), (datetime(2018,1,2), 2000)] - >>> c.datapoints.insert(datapoints, id=1) - >>> # with ms since epoch + >>> # With datetime objects: + >>> datapoints = [ + ... (datetime(2018,1,1, tzinfo=timezone.utc), 1000), + ... (datetime(2018,1,2, tzinfo=timezone.utc), 2000), + ... ] + >>> c.time_series.data.insert(datapoints, id=1) + >>> # With ms since epoch: >>> datapoints = [(150000000000, 1000), (160000000000, 2000)] - >>> c.datapoints.insert(datapoints, id=2) + >>> c.time_series.data.insert(datapoints, id=2) Or they can be a list of dictionaries:: - >>> from cognite.client import CogniteClient - >>> c = CogniteClient() - >>> # with datetime objects - >>> datapoints = [{"timestamp": datetime(2018,1,1), "value": 1000}, - ... {"timestamp": datetime(2018,1,2), "value": 2000}] - >>> c.datapoints.insert(datapoints, external_id="abc") - >>> # with ms since epoch - >>> datapoints = [{"timestamp": 150000000000, "value": 1000}, - ... {"timestamp": 160000000000, "value": 2000}] - >>> c.datapoints.insert(datapoints, external_id="def") + >>> datapoints = [ + ... {"timestamp": 150000000000, "value": 1000}, + ... {"timestamp": 160000000000, "value": 2000}, + ... ] + >>> c.time_series.data.insert(datapoints, external_id="def") - Or they can be a Datapoints object:: + Or they can be a Datapoints or DatapointsArray object (raw datapoints only):: - >>> from cognite.client import CogniteClient - >>> c = CogniteClient() - >>> data = c.datapoints.retrieve(external_id="abc",start=datetime(2018,1,1),end=datetime(2018,2,2)) - >>> c.datapoints.insert(data, external_id="def") + >>> data = c.time_series.data.retrieve(external_id="abc", start="1w-ago", end="now") + >>> c.time_series.data.insert(data, external_id="def") """ post_dps_object = Identifier.of_either(id, external_id).as_dict() - if isinstance(datapoints, Datapoints): - datapoints = [(t, v) for t, v in zip(datapoints.timestamp, datapoints.value)] - post_dps_object.update({"datapoints": datapoints}) + if isinstance(datapoints, (Datapoints, DatapointsArray)): + if datapoints.value is None: + raise ValueError( + "When inserting data using a `Datapoints` or `DatapointsArray` object, only raw datapoints are supported" + ) + if isinstance(datapoints, Datapoints): + datapoints = list(zip(datapoints.timestamp, datapoints.value)) # type: ignore [arg-type] + else: + ts = datapoints.timestamp.astype("datetime64[ms]").astype("int64") + datapoints = list(zip(ts, datapoints.value)) # type: ignore [arg-type] + post_dps_object["datapoints"] = datapoints dps_poster = DatapointsPoster(self) dps_poster.insert([post_dps_object]) @@ -294,30 +1116,19 @@ def insert_multiple(self, datapoints: List[Dict[str, Union[str, int, List]]]) -> the value:: >>> from cognite.client import CogniteClient + >>> from datetime import datetime, timezone >>> c = CogniteClient() >>> datapoints = [] - >>> # with datetime objects and id - >>> datapoints.append({"id": 1, "datapoints": [(datetime(2018,1,1), 1000), (datetime(2018,1,2), 2000)]}) + >>> # With datetime objects and id + >>> datapoints.append( + ... {"id": 1, "datapoints": [ + ... (datetime(2018,1,1,tzinfo=timezone.utc), 1000), + ... (datetime(2018,1,2,tzinfo=timezone.utc), 2000) + ... ]}) >>> # with ms since epoch and externalId >>> datapoints.append({"externalId": 1, "datapoints": [(150000000000, 1000), (160000000000, 2000)]}) - - >>> c.datapoints.insert_multiple(datapoints) - - Or they can be a list of dictionaries:: - - >>> from cognite.client import CogniteClient - >>> c = CogniteClient() - - >>> datapoints = [] - >>> # with datetime objects and external id - >>> datapoints.append({"externalId": "1", "datapoints": [{"timestamp": datetime(2018,1,1), "value": 1000}, - ... {"timestamp": datetime(2018,1,2), "value": 2000}]}) - >>> # with ms since epoch and id - >>> datapoints.append({"id": 1, "datapoints": [{"timestamp": 150000000000, "value": 1000}, - ... {"timestamp": 160000000000, "value": 2000}]}) - - >>> c.datapoints.insert_multiple(datapoints) + >>> c.time_series.data.insert_multiple(datapoints) """ dps_poster = DatapointsPoster(self) dps_poster.insert(datapoints) @@ -342,14 +1153,14 @@ def delete_range( >>> from cognite.client import CogniteClient >>> c = CogniteClient() - >>> c.datapoints.delete_range(start="1w-ago", end="now", id=1) + >>> c.time_series.data.delete_range(start="1w-ago", end="now", id=1) """ - start = utils._time.timestamp_to_ms(start) - end = utils._time.timestamp_to_ms(end) + start = timestamp_to_ms(start) + end = timestamp_to_ms(end) assert end > start, "end must be larger than start" - delete_dps_object = Identifier.of_either(id, external_id).as_dict() - delete_dps_object.update({"inclusiveBegin": start, "exclusiveEnd": end}) + identifier = Identifier.of_either(id, external_id).as_dict() + delete_dps_object = {**identifier, "inclusiveBegin": start, "exclusiveEnd": end} self._delete_datapoints_ranges([delete_dps_object]) def delete_ranges(self, ranges: List[Dict[str, Any]]) -> None: @@ -369,7 +1180,7 @@ def delete_ranges(self, ranges: List[Dict[str, Any]]) -> None: >>> c = CogniteClient() >>> ranges = [{"id": 1, "start": "2d-ago", "end": "now"}, ... {"externalId": "abc", "start": "2d-ago", "end": "now"}] - >>> c.datapoints.delete_ranges(ranges) + >>> c.time_series.data.delete_ranges(ranges) """ valid_ranges = [] for range in ranges: @@ -381,8 +1192,8 @@ def delete_ranges(self, ranges: List[Dict[str, Any]]) -> None: id = range.get("id") external_id = range.get("externalId") valid_range = Identifier.of_either(id, external_id).as_dict() - start = utils._time.timestamp_to_ms(range["start"]) - end = utils._time.timestamp_to_ms(range["end"]) + start = timestamp_to_ms(range["start"]) + end = timestamp_to_ms(range["end"]) valid_range.update({"inclusiveBegin": start, "exclusiveEnd": end}) valid_ranges.append(valid_range) self._delete_datapoints_ranges(valid_ranges) @@ -390,234 +1201,7 @@ def delete_ranges(self, ranges: List[Dict[str, Any]]) -> None: def _delete_datapoints_ranges(self, delete_range_objects: List[Union[Dict]]) -> None: self._post(url_path=self._RESOURCE_PATH + "/delete", json={"items": delete_range_objects}) - def retrieve_dataframe( - self, - start: Union[int, str, datetime], - end: Union[int, str, datetime], - aggregates: List[str], - granularity: str, - id: DatapointsIdMaybeAggregate = None, - external_id: DatapointsExternalIdMaybeAggregate = None, - limit: int = None, - include_aggregate_name: bool = True, - complete: str = None, - ignore_unknown_ids: bool = False, - ) -> "pandas.DataFrame": - """Get a pandas dataframe describing the requested data. - - Note that you cannot specify the same ids/external_ids multiple times. - - Args: - start (Union[int, str, datetime]): Inclusive start. - end (Union[int, str, datetime]): Exclusive end. - aggregates (List[str]): List of aggregate functions to apply. - granularity (str): The granularity to fetch aggregates at. e.g. '1s', '2h', '10d'. - id (Union[int, List[int], Dict[str, Any], List[Dict[str, Any]]]): Id or list of ids. Can also be object - specifying aggregates. See example below. - external_id (Union[str, List[str], Dict[str, Any], List[Dict[str, Any]]]): External id or list of external - ids. Can also be object specifying aggregates. See example below. - limit (int): Maximum number of datapoints to return for each time series. - include_aggregate_name (bool): Include 'aggregate' in the column name. Defaults to True and should only be set to False when only a single aggregate is requested per id/external id. - complete (str): Post-processing of the dataframe. - ignore_unknown_ids (bool): Ignore IDs and external IDs that are not found rather than throw an exception. - - Pass 'fill' to insert missing entries into the index, and complete data where possible (supports interpolation, stepInterpolation, count, sum, totalVariation). - - Pass 'fill,dropna' to additionally drop rows in which any aggregate for any time series has missing values (typically rows at the start and end for interpolation aggregates). - This option guarantees that all returned dataframes have the exact same shape and no missing values anywhere, and is only supported for aggregates sum, count, totalVariation, interpolation and stepInterpolation. - - Returns: - pandas.DataFrame: The requested dataframe - - Examples: - - Get a pandas dataframe:: - - >>> from cognite.client import CogniteClient - >>> c = CogniteClient() - >>> df = c.datapoints.retrieve_dataframe(id=[1,2,3], start="2w-ago", end="now", - ... aggregates=["average","sum"], granularity="1h") - - Get a pandas dataframe with the index regularly spaced at 1 minute intervals, missing values completed and without the aggregate name in the columns:: - - >>> from cognite.client import CogniteClient - >>> c = CogniteClient() - >>> df = c.datapoints.retrieve_dataframe(id=[1,2,3], start="2w-ago", end="now", - ... aggregates=["interpolation"], granularity="1m", include_aggregate_name=False, complete="fill,dropna") - """ - pd = cast(Any, utils._auxiliary.local_import("pandas")) - - if id is not None: - id_dpl = self.retrieve( - id=id, - start=start, - end=end, - aggregates=aggregates, - granularity=granularity, - limit=limit, - ignore_unknown_ids=ignore_unknown_ids, - ) - if id_dpl is None: - id_dpl = DatapointsList([]) - id_df = id_dpl.to_pandas(column_names="id") - else: - id_df = pd.DataFrame() - id_dpl = DatapointsList([]) - - if external_id is not None: - external_id_dpl = self.retrieve( - external_id=external_id, - start=start, - end=end, - aggregates=aggregates, - granularity=granularity, - limit=limit, - ignore_unknown_ids=ignore_unknown_ids, - ) - if external_id_dpl is None: - external_id_dpl = DatapointsList([]) - external_id_df = external_id_dpl.to_pandas() - else: - external_id_df = pd.DataFrame() - external_id_dpl = DatapointsList([]) - - df = pd.concat([id_df, external_id_df], axis="columns") - - complete_list = [s.strip() for s in (complete or "").split(",")] - if set(complete_list) - {"fill", "dropna", ""}: - raise ValueError("complete should be 'fill', 'fill,dropna' or Falsy") - - if "fill" in complete_list and df.shape[0] > 1: - ag_used_by_id = { - dp.id: [attr for attr, _ in dp._get_non_empty_data_fields(get_empty_lists=True)] - for dpl in [id_dpl, external_id_dpl] - for dp in (dpl.data if isinstance(dpl, DatapointsList) else [dpl]) - } - is_step_dict = { - str(field): bool(dp.is_step) - for dpl in [id_dpl, external_id_dpl] - for dp in (dpl.data if isinstance(dpl, DatapointsList) else [dpl]) - for field in [dp.id, dp.external_id] - if field - } - df = self._dataframe_fill(df, granularity, is_step_dict) - - if "dropna" in complete_list: - self._dataframe_safe_dropna(df, set([ag for id, ags in ag_used_by_id.items() for ag in ags])) - - if not include_aggregate_name: - Datapoints._strip_aggregate_names(df) - - return df - - def _dataframe_fill( - self, df: "pandas.DataFrame", granularity: str, is_step_dict: Dict[str, bool] - ) -> "pandas.DataFrame": - np, pd = utils._auxiliary.local_import("numpy", "pandas") - df = df.reindex( - np.arange( - df.index[0], - df.index[-1] + pd.Timedelta(microseconds=1), - pd.Timedelta(microseconds=cognite.client.utils._time.granularity_to_ms(granularity) * 1000), - ), - copy=False, - ) - df.fillna({c: 0 for c in df.columns if regexp.search(c, r"\|(sum|totalVariation|count)$")}, inplace=True) - int_cols = [c for c in df.columns if regexp.search(c, r"\|interpolation$")] - - def _linear_interpolation_col(col: str) -> str: - match = regexp.match(r"(.*)\|\w+$", col) - assert match - return match.group(1) - - lin_int_cols = [c for c in int_cols if not is_step_dict[_linear_interpolation_col(c)]] - step_int_cols = [c for c in df.columns if regexp.search(c, r"\|stepInterpolation$")] + list( - set(int_cols) - set(lin_int_cols) - ) - if lin_int_cols: - df[lin_int_cols] = df[lin_int_cols].interpolate(limit_area="inside") - df[step_int_cols] = df[step_int_cols].ffill() - return df - - def _dataframe_safe_dropna(self, df: "pandas.DataFrame", aggregates_used: Set[str]) -> None: - supported_aggregates = ["sum", "count", "total_variation", "interpolation", "step_interpolation"] - not_supported = set(aggregates_used) - set(supported_aggregates + ["timestamp"]) - if not_supported: - raise ValueError( - "The aggregate(s) {} is not supported for dataframe completion with dropna, only {} are".format( - [utils._auxiliary.to_camel_case(a) for a in not_supported], - [utils._auxiliary.to_camel_case(a) for a in supported_aggregates], - ) - ) - df.dropna(inplace=True) - - def retrieve_dataframe_dict( - self, - start: Union[int, str, datetime], - end: Union[int, str, datetime], - aggregates: List[str], - granularity: str, - id: DatapointsIdMaybeAggregate = None, - external_id: DatapointsExternalIdMaybeAggregate = None, - limit: int = None, - ignore_unknown_ids: bool = False, - complete: str = None, - ) -> Dict[str, "pandas.DataFrame"]: # noqa: F821 - """Get a dictionary of aggregate: pandas dataframe describing the requested data. - - Args: - start (Union[int, str, datetime]): Inclusive start. - end (Union[int, str, datetime]): Exclusive end. - aggregates (List[str]): List of aggregate functions to apply. - granularity (str): The granularity to fetch aggregates at. e.g. '1s', '2h', '10d'. - id (Union[int, List[int], Dict[str, Any], List[Dict[str, Any]]]: Id or list of ids. Can also be object specifying aggregates. - external_id (Union[str, List[str], Dict[str, Any], List[Dict[str, Any]]]): External id or list of external ids. Can also be object specifying aggregates. - limit (int): Maximum number of datapoints to return for each time series. - ignore_unknown_ids (bool): Ignore IDs and external IDs that are not found rather than throw an exception. - complete (str): Post-processing of the dataframe. - - Pass 'fill' to insert missing entries into the index, and complete data where possible (supports interpolation, stepInterpolation, count, sum, totalVariation). - - Pass 'fill,dropna' to additionally drop rows in which any aggregate for any time series has missing values (typically rows at the start and end for interpolation aggregates). - This option guarantees that all returned dataframes have the exact same shape and no missing values anywhere, and is only supported for aggregates sum, count, totalVariation, interpolation and stepInterpolation. - - Returns: - Dict[str,pandas.DataFrame]: A dictionary of aggregate: dataframe. - - Examples: - - Get a dictionary of pandas dataframes, with the index evenly spaced at 1h intervals, missing values completed in the middle and incomplete entries dropped at the start and end:: - - >>> from cognite.client import CogniteClient - >>> c = CogniteClient() - >>> dfs = c.datapoints.retrieve_dataframe_dict(id=[1,2,3], start="2w-ago", end="now", - ... aggregates=["interpolation","count"], granularity="1h", complete="fill,dropna") - """ - all_aggregates = aggregates - for queries in [id, external_id]: - if isinstance(queries, list) and queries and isinstance(queries[0], dict): - for it in queries: - for ag in cast(dict, it).get("aggregates", []): - if ag not in all_aggregates: - all_aggregates.append(ag) - - df = self.retrieve_dataframe( - start, - end, - aggregates, - granularity, - id, - external_id, - limit, - include_aggregate_name=True, - complete=complete, - ignore_unknown_ids=ignore_unknown_ids, - ) - return {ag: df.filter(like="|" + ag).rename(columns=lambda s: s[: -len(ag) - 1]) for ag in all_aggregates} - - def insert_dataframe( - self, dataframe: "pandas.DataFrame", external_id_headers: bool = False, dropna: bool = False - ) -> None: + def insert_dataframe(self, df: pd.DataFrame, external_id_headers: bool = True, dropna: bool = True) -> None: """Insert a dataframe. The index of the dataframe must contain the timestamps. The names of the remaining columns specify the ids or external ids of @@ -626,10 +1210,9 @@ def insert_dataframe( Said time series must already exist. Args: - dataframe (pandas.DataFrame): Pandas DataFrame Object containing the time series. - external_id_headers (bool): Set to True if the column headers are external ids rather than internal ids. - Defaults to False. - dropna (bool): Set to True to skip NaNs in the given DataFrame, applied per column. + df (pandas.DataFrame): Pandas DataFrame object containing the time series. + external_id_headers (bool): Interpret the column names as external id. Pass False if using ids. Default: True. + dropna (bool): Set to True to ignore NaNs in the given DataFrame, applied per column. Default: True. Returns: None @@ -640,27 +1223,25 @@ def insert_dataframe( >>> import numpy as np >>> import pandas as pd >>> from cognite.client import CogniteClient - >>> from datetime import datetime, timedelta >>> >>> c = CogniteClient() - >>> ts_id = 123 - >>> start = datetime(2018, 1, 1) - >>> x = pd.DatetimeIndex([start + timedelta(days=d) for d in range(100)]) - >>> y = np.random.normal(0, 1, 100) - >>> df = pd.DataFrame({ts_id: y}, index=x) - >>> c.datapoints.insert_dataframe(df) + >>> ts_xid = "my-foo-ts" + >>> idx = pd.date_range(start="2018-01-01", periods=100, freq="1d") + >>> noise = np.random.normal(0, 1, 100) + >>> df = pd.DataFrame({ts_xid: noise}, index=idx) + >>> c.time_series.data.insert_dataframe(df) """ - np = cast(Any, utils._auxiliary.local_import("numpy")) - assert not np.isinf(dataframe.select_dtypes(include=[np.number])).any( - axis=None - ), "Dataframe contains Infinity. Remove them in order to insert the data." + np = cast(Any, local_import("numpy")) + if np.isinf(df.select_dtypes(include=[np.number])).any(axis=None): + raise ValueError("Dataframe contains one or more (+/-) Infinity. Remove them in order to insert the data.") if not dropna: - assert not dataframe.isnull().any( - axis=None - ), "Dataframe contains NaNs. Remove them or pass `dropna=True` in order to insert the data." + if df.isnull().any(axis=None): + raise ValueError( + "Dataframe contains one or more NaNs. Remove or pass `dropna=True` in order to insert the data." + ) dps = [] - idx = dataframe.index.values.astype("datetime64[ms]").astype(np.int64) - for column_id, col in dataframe.iteritems(): + idx = df.index.to_numpy("datetime64[ms]").astype(np.int64) + for column_id, col in df.iteritems(): mask = col.notna() datapoints = list(zip(idx[mask], col[mask])) if not datapoints: @@ -720,32 +1301,32 @@ def _validate_and_format_datapoints( List[Tuple[Union[int, float, datetime], Union[int, float, str]]], ], ) -> List[Tuple[int, Any]]: - utils._auxiliary.assert_type(datapoints, "datapoints", [list]) + assert_type(datapoints, "datapoints", [list]) assert len(datapoints) > 0, "No datapoints provided" - utils._auxiliary.assert_type(datapoints[0], "datapoints element", [tuple, dict]) + assert_type(datapoints[0], "datapoints element", [tuple, dict]) valid_datapoints = [] if isinstance(datapoints[0], tuple): - valid_datapoints = [(cognite.client.utils._time.timestamp_to_ms(t), v) for t, v in datapoints] + valid_datapoints = [(timestamp_to_ms(t), v) for t, v in datapoints] elif isinstance(datapoints[0], dict): for dp in datapoints: dp = cast(Dict[str, Any], dp) assert "timestamp" in dp, "A datapoint is missing the 'timestamp' key" assert "value" in dp, "A datapoint is missing the 'value' key" - valid_datapoints.append((cognite.client.utils._time.timestamp_to_ms(dp["timestamp"]), dp["value"])) + valid_datapoints.append((timestamp_to_ms(dp["timestamp"]), dp["value"])) return valid_datapoints def _bin_datapoints(self, dps_object_list: List[Dict[str, Any]]) -> List[List[Dict[str, Any]]]: for dps_object in dps_object_list: - for i in range(0, len(dps_object["datapoints"]), self.client._DPS_LIMIT): + for i in range(0, len(dps_object["datapoints"]), DPS_LIMIT): dps_object_chunk = {k: dps_object[k] for k in ["id", "externalId"] if k in dps_object} - dps_object_chunk["datapoints"] = dps_object["datapoints"][i : i + self.client._DPS_LIMIT] + dps_object_chunk["datapoints"] = dps_object["datapoints"][i : i + DPS_LIMIT] for bin in self.bins: if bin.will_fit(len(dps_object_chunk["datapoints"])): bin.add(dps_object_chunk) break else: - bin = DatapointsBin(self.client._DPS_LIMIT, self.client._POST_DPS_OBJECTS_LIMIT) + bin = DatapointsBin(DPS_LIMIT, POST_DPS_OBJECTS_LIMIT) bin.add(dps_object_chunk) self.bins.append(bin) binned_dps_object_list = [] @@ -757,7 +1338,7 @@ def _insert_datapoints_concurrently(self, dps_object_lists: List[List[Dict[str, tasks = [] for dps_object_list in dps_object_lists: tasks.append((dps_object_list,)) - summary = utils._concurrency.execute_tasks_concurrently( + summary = execute_tasks_concurrently( self._insert_datapoints, tasks, max_workers=self.client._config.max_workers ) summary.raise_compound_exception_if_failed_tasks( @@ -772,341 +1353,3 @@ def _insert_datapoints(self, post_dps_objects: List[Dict[str, Any]]) -> None: self.client._post(url_path=self.client._RESOURCE_PATH, json={"items": post_dps_objects}) for it in post_dps_objects: del it["datapoints"] - - -class _DPWindow: - def __init__(self, start: int, end: int, limit: int = cast(int, float("inf"))) -> None: - self.start = start - self.end = end - self.limit = limit - - def __eq__(self, other: Any) -> bool: - return [self.start, self.end, self.limit] == [other.start, other.end, other.limit] - - -class _DPTask: - def __init__( - self, - client: DatapointsAPI, - start: Union[int, str, datetime], - end: Union[int, str, datetime], - ts_item: dict, - aggregates: Optional[List[str]], - granularity: Optional[str], - include_outside_points: Optional[bool], - limit: Optional[int], - ignore_unknown_ids: Optional[bool], - ): - self.start = cognite.client.utils._time.timestamp_to_ms(start) - self.end = cognite.client.utils._time.timestamp_to_ms(end) - self.aggregates = ts_item.get("aggregates") or aggregates - self.ts_item = {k: v for k, v in ts_item.items() if k in ["id", "externalId"]} - self.granularity = granularity - self.include_outside_points = include_outside_points - self.limit = cast(int, limit or float("inf")) - self.ignore_unknown_ids = ignore_unknown_ids - - self.client = client - self.request_limit = client._DPS_LIMIT_AGG if self.aggregates else client._DPS_LIMIT - self.missing = False - self.results: List[Datapoints] = [] - self.point_before = Datapoints() - self.point_after = Datapoints() - - def next_start_offset(self) -> int: - return cognite.client.utils._time.granularity_to_ms(self.granularity) if self.granularity else 1 - - def store_partial_result(self, raw_data: Dict[str, Any], start: int, end: int) -> Tuple[int, Optional[int]]: - expected_fields = self.aggregates or ["value"] - - if self.include_outside_points and raw_data["datapoints"]: - # assumes first query has full start/end range - copy_data = copy.copy(raw_data) # shallow copy - if raw_data["datapoints"][0]["timestamp"] < start: - if not self.point_before: - copy_data["datapoints"] = raw_data["datapoints"][:1] - self.point_before = Datapoints._load( - copy_data, expected_fields, cognite_client=self.client._cognite_client - ) - raw_data["datapoints"] = raw_data["datapoints"][1:] - if raw_data["datapoints"] and raw_data["datapoints"][-1]["timestamp"] >= end: - if not self.point_after: - copy_data["datapoints"] = raw_data["datapoints"][-1:] - self.point_after = Datapoints._load( - copy_data, expected_fields, cognite_client=self.client._cognite_client - ) - raw_data["datapoints"] = raw_data["datapoints"][:-1] - - self.results.append(Datapoints._load(raw_data, expected_fields, cognite_client=self.client._cognite_client)) - last_timestamp = raw_data["datapoints"] and raw_data["datapoints"][-1]["timestamp"] - return len(raw_data["datapoints"]), last_timestamp - - def mark_missing(self) -> Tuple[int, None]: # for ignore unknown ids - self.missing = True - return 0, None # as in store partial result - - def result(self) -> Datapoints: - def custom_sort_key(x: Datapoints) -> Union[int, float]: - if x.timestamp: - return x.timestamp[0] - return 0 - - dps = self.point_before - for res in sorted(self.results, key=custom_sort_key): - dps._extend(res) - dps._extend(self.point_after) - if len(dps) > self.limit: - dps = cast(Datapoints, dps[: self.limit]) - return dps - - def as_tuple(self) -> Tuple[int, int, dict, Optional[List[str]], Optional[str], Optional[bool], Optional[int]]: - return ( - self.start, - self.end, - self.ts_item, - self.aggregates, - self.granularity, - self.include_outside_points, - self.limit, - ) - - -class DatapointsFetcher: - def __init__(self, client: DatapointsAPI): - self.client = client - - def fetch(self, query: DatapointsQuery) -> DatapointsList: - return self.fetch_multiple([query])[0] - - def fetch_multiple(self, queries: List[DatapointsQuery]) -> List[DatapointsList]: - task_lists = [self._create_tasks(q) for q in queries] - self._fetch_datapoints(sum(task_lists, [])) - return self._get_dps_results(task_lists) - - def _create_tasks(self, query: DatapointsQuery) -> List[_DPTask]: - ts_items, _ = self._process_ts_identifiers(query.id, query.external_id) - tasks = [ - _DPTask( - self.client, - query.start, - query.end, - ts_item, - query.aggregates, - query.granularity, - query.include_outside_points, - query.limit, - query.ignore_unknown_ids, - ) - for ts_item in ts_items - ] - self._validate_tasks(tasks) - self._preprocess_tasks(tasks) - return tasks - - @staticmethod - def _validate_tasks(tasks: List[_DPTask]) -> None: - identifiers_seen = set() - for t in tasks: - identifier = utils._auxiliary.unwrap_identifer(t.ts_item) - if identifier in identifiers_seen: - raise ValueError("Time series identifier '{}' is duplicated in query".format(identifier)) - identifiers_seen.add(identifier) - if t.aggregates is not None and t.granularity is None: - raise ValueError("When specifying aggregates, granularity must also be provided.") - if t.granularity is not None and not t.aggregates: - raise ValueError("When specifying granularity, aggregates must also be provided.") - - def _preprocess_tasks(self, tasks: List[_DPTask]) -> None: - for t in tasks: - new_start = cognite.client.utils._time.timestamp_to_ms(t.start) - new_end = cognite.client.utils._time.timestamp_to_ms(t.end) - if t.aggregates: - assert t.granularity - new_start = self._align_with_granularity_unit(new_start, t.granularity) - new_end = self._align_with_granularity_unit(new_end, t.granularity) - t.start = new_start - t.end = new_end - - def _get_dps_results(self, task_lists: List[List[_DPTask]]) -> List[DatapointsList]: - return [ - DatapointsList([t.result() for t in tl if not t.missing], cognite_client=self.client._cognite_client) - for tl in task_lists - ] - - def _fetch_datapoints(self, tasks: List[_DPTask]) -> None: - tasks_summary = utils._concurrency.execute_tasks_concurrently( - self._fetch_dps_initial_and_return_remaining_tasks, - [(t,) for t in tasks], - max_workers=self.client._config.max_workers, - ) - if tasks_summary.exceptions: - raise tasks_summary.exceptions[0] - - remaining_tasks_with_windows = tasks_summary.joined_results() - if len(remaining_tasks_with_windows) > 0: - self._fetch_datapoints_for_remaining_queries(remaining_tasks_with_windows) - - def _fetch_dps_initial_and_return_remaining_tasks(self, task: _DPTask) -> List[Tuple[_DPTask, _DPWindow]]: - ndp_in_first_task, last_timestamp = self._get_datapoints(task, None, True) - if ndp_in_first_task < task.request_limit: - return [] - remaining_user_limit = task.limit - ndp_in_first_task - assert last_timestamp - task.start = last_timestamp + task.next_start_offset() - queries = self._split_task_into_windows(cast(int, task.results[0].id), task, remaining_user_limit) - return queries - - def _fetch_datapoints_for_remaining_queries(self, tasks_with_windows: List[Tuple[_DPTask, _DPWindow]]) -> None: - tasks_summary = utils._concurrency.execute_tasks_concurrently( - self._get_datapoints_with_paging, tasks_with_windows, max_workers=self.client._config.max_workers - ) - if tasks_summary.exceptions: - raise tasks_summary.exceptions[0] - - @staticmethod - def _align_with_granularity_unit(ts: int, granularity: str) -> int: - gms = cognite.client.utils._time.granularity_unit_to_ms(granularity) - if ts % gms == 0: - return ts - return ts - (ts % gms) + gms - - def _split_task_into_windows( - self, id: int, task: _DPTask, remaining_user_limit: int - ) -> List[Tuple[_DPTask, _DPWindow]]: - windows = self._get_windows(id, task, remaining_user_limit) - return [(task, w) for w in windows] - - def _get_windows(self, id: int, task: _DPTask, remaining_user_limit: int) -> List[_DPWindow]: - if remaining_user_limit <= 0: - return [] - if task.start >= task.end: - return [] - count_granularity = "1d" - if task.granularity and cognite.client.utils._time.granularity_to_ms( - "1d" - ) < cognite.client.utils._time.granularity_to_ms(task.granularity): - count_granularity = task.granularity - try: - count_task = _DPTask( - self.client, task.start, task.end, {"id": id}, ["count"], count_granularity, False, None, False - ) - self._get_datapoints_with_paging(count_task, _DPWindow(task.start, task.end)) - res = count_task.result() - except CogniteAPIError: - res = Datapoints() - if len(res) == 0: # string based series or aggregates not yet calculated - return [_DPWindow(task.start, task.end, remaining_user_limit)] - assert res.count is not None - counts = list(zip(res.timestamp, res.count)) - windows = [] - total_count = 0 - current_window_count = 0 - window_start = task.start - granularity_ms = cognite.client.utils._time.granularity_to_ms(task.granularity) if task.granularity else None - agg_count = lambda count: int( - min( - math.ceil(cognite.client.utils._time.granularity_to_ms(count_granularity) / cast(int, granularity_ms)), - count, - ) - ) - for i, (ts, count) in enumerate(counts): - if ts < task.start: # API rounds time stamps down, so some of the first day may have been retrieved already - count = 0 - - if i < len(counts) - 1: - next_timestamp = counts[i + 1][0] - next_raw_count = counts[i + 1][1] - next_count = next_raw_count if task.granularity is None else agg_count(next_raw_count) - else: - next_timestamp = task.end - next_count = 0 - current_count = count if task.granularity is None else agg_count(count) - total_count += current_count - current_window_count += current_count - if current_window_count + next_count > task.request_limit or i == len(counts) - 1: - window_end = int(next_timestamp) - if task.granularity: - window_end = self._align_window_end(task.start, int(next_timestamp), task.granularity) - windows.append(_DPWindow(window_start, window_end, remaining_user_limit)) - window_start = window_end - current_window_count = 0 - if total_count >= remaining_user_limit: - break - return windows - - @staticmethod - def _align_window_end(start: int, end: int, granularity: str) -> int: - gms = cognite.client.utils._time.granularity_to_ms(granularity) - diff = end - start - end -= diff % gms - return end - - def _get_datapoints_with_paging(self, task: _DPTask, window: _DPWindow) -> None: - ndp_retrieved_total = 0 - while window.end > window.start and ndp_retrieved_total < window.limit: - ndp_retrieved, last_time = self._get_datapoints(task, window) - if ndp_retrieved < min(window.limit, task.request_limit): - break - window.limit -= ndp_retrieved - assert last_time - window.start = last_time + task.next_start_offset() - - def _get_datapoints( - self, task: _DPTask, window: _DPWindow = None, first_page: bool = False - ) -> Tuple[int, Optional[int]]: - window = window or _DPWindow(task.start, task.end, task.limit) - payload = { - "items": [task.ts_item], - "start": window.start, - "end": window.end, - "aggregates": task.aggregates, - "granularity": task.granularity, - "includeOutsidePoints": task.include_outside_points and first_page, - "ignoreUnknownIds": task.ignore_unknown_ids, - "limit": min(window.limit, task.request_limit), - } - res = self.client._post(self.client._RESOURCE_PATH + "/list", json=payload).json()["items"] - if not res and task.ignore_unknown_ids: - return task.mark_missing() - else: - return task.store_partial_result(res[0], window.start, window.end) - - @staticmethod - def _process_ts_identifiers( - ids: Optional[DatapointsIdMaybeAggregate], external_ids: Optional[DatapointsExternalIdMaybeAggregate] - ) -> Tuple[List[Dict], bool]: - is_list = False - items = [] - - if isinstance(ids, List): - is_list = True - for id_item in ids: - items.append(DatapointsFetcher._process_single_ts_item(id_item, False)) - elif ids is not None: - items.append(DatapointsFetcher._process_single_ts_item(ids, False)) - - if isinstance(external_ids, List): - is_list = True - for ext_id_item in external_ids: - items.append(DatapointsFetcher._process_single_ts_item(ext_id_item, True)) - elif external_ids is not None: - items.append(DatapointsFetcher._process_single_ts_item(external_ids, True)) - - return items, not is_list and len(items) == 1 - - @staticmethod - def _process_single_ts_item(item: Union[int, str, dict], external: bool) -> Dict[str, Any]: - item_type = "externalId" if external else "id" - id_type = str if external else int - if isinstance(item, id_type): - return {item_type: item} - elif isinstance(item, Dict): - for key in item: - if key not in [item_type, "aggregates"]: - raise ValueError("Unknown key '{}' in {} dict argument".format(key, item_type)) - if item_type not in item: - raise ValueError( - "When passing a dict to the {} argument, '{}' must be specified.".format(item_type, item_type) - ) - return item - raise TypeError("Invalid type '{}' for argument '{}'".format(type(item), item_type)) diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index 99de2bb193..c973bc323f 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -110,7 +110,9 @@ def collect_exc_info_and_raise( def execute_tasks_concurrently( func: Callable, tasks: Union[Sequence[Tuple], List[Dict]], max_workers: int ) -> TasksSummary: - assert max_workers > 0, "Number of workers should be >= 1, was {}".format(max_workers) + if max_workers < 1: + raise RuntimeError(f"Number of workers should be >= 1, was {max_workers}") + with ThreadPoolExecutor(max_workers) as p: futures = [] for task in tasks: