Skip to content

Commit

Permalink
refactor: implemented retrieve
Browse files Browse the repository at this point in the history
  • Loading branch information
doctrino committed Jun 21, 2024
1 parent c5bcd98 commit 2577642
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 8 deletions.
16 changes: 13 additions & 3 deletions cognite/client/_api/datapoint_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
_DatapointsPayloadItem,
)
from cognite.client.utils._auxiliary import is_unlimited
from cognite.client.utils._identifier import InstanceId
from cognite.client.utils._text import convert_all_keys_to_snake_case, to_snake_case
from cognite.client.utils._time import (
ZoneInfo,
Expand Down Expand Up @@ -87,6 +88,7 @@
DatapointsExternalId = Union[
str, DatapointsQuery, Dict[str, Any], SequenceNotStr[Union[str, DatapointsQuery, Dict[str, Any]]]
]
DatapointsInstanceId = Union[InstanceId, Sequence[InstanceId]]


@dataclass
Expand All @@ -100,6 +102,7 @@ class _FullDatapointsQuery:
end: int | str | datetime.datetime | None = None
id: DatapointsId | None = None
external_id: DatapointsExternalId | None = None
instance_id: InstanceId | Sequence[InstanceId] | None = None
aggregates: Aggregate | str | list[Aggregate | str] | None = None
granularity: str | None = None
timezone: str | datetime.timezone | ZoneInfo | None = None
Expand All @@ -117,9 +120,11 @@ def is_single_identifier(self) -> bool:
# No lists given and exactly one of id/xid was given:
return (
isinstance(self.id, (dict, DatapointsQuery, numbers.Integral))
and self.external_id is None
and (self.external_id is None and self.instance_id is None)
or isinstance(self.external_id, (dict, DatapointsQuery, str))
and self.id is None
and (self.id is None and self.instance_id is None)
or isinstance(self.instance_id, InstanceId)
and (self.id is None and self.external_id is None)
)

@cached_property
Expand All @@ -146,12 +151,17 @@ def parse_into_queries(self) -> list[DatapointsQuery]:
queries.extend(self._parse(id_, arg_name="id", exp_type=numbers.Integral))
if (xid := self.external_id) is not None:
queries.extend(self._parse(xid, arg_name="external_id", exp_type=str))
if (iid := self.instance_id) is not None:
queries.extend(self._parse(iid, arg_name="instance_id", exp_type=InstanceId))
if queries:
return queries
raise ValueError("Pass at least one time series `id` or `external_id`!")

def _parse(
self, id_or_xid: DatapointsId | DatapointsExternalId, arg_name: Literal["id", "external_id"], exp_type: type
self,
id_or_xid: DatapointsId | DatapointsExternalId | DatapointsInstanceId,
arg_name: Literal["id", "external_id", "instance_id"],
exp_type: type,
) -> list[DatapointsQuery]:
user_queries: SequenceNotStr[int | str | DatapointsQuery | dict[str, Any]]
if isinstance(id_or_xid, (dict, DatapointsQuery, exp_type)):
Expand Down
25 changes: 21 additions & 4 deletions cognite/client/_api/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,20 @@
_TResLst = TypeVar("_TResLst", DatapointsList, DatapointsArrayList)


def select_dps_fetch_strategy(dps_client: DatapointsAPI, full_query: _FullDatapointsQuery) -> DpsFetchStrategy:
def select_dps_fetch_strategy(
dps_client: DatapointsAPI, full_query: _FullDatapointsQuery, cdf_version: str | None = None
) -> DpsFetchStrategy:
all_queries = full_query.parse_into_queries()
full_query.validate(all_queries, dps_limit_raw=dps_client._DPS_LIMIT_RAW, dps_limit_agg=dps_client._DPS_LIMIT_AGG)
agg_queries, raw_queries = split_queries_into_raw_and_aggs(all_queries)

# Running mode is decided based on how many time series are requested VS. number of workers:
if len(all_queries) <= (max_workers := dps_client._config.max_workers):
# Start shooting requests from the hip immediately:
return EagerDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers)
return EagerDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers, cdf_version)
# Fetch a smaller, chunked batch of dps from all time series - which allows us to do some rudimentary
# guesstimation of dps density - then chunk away:
return ChunkingDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers)
return ChunkingDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers, cdf_version)


def split_queries_into_raw_and_aggs(all_queries: _TSQueryList) -> tuple[_TSQueryList, _TSQueryList]:
Expand All @@ -119,13 +121,15 @@ def __init__(
agg_queries: _TSQueryList,
raw_queries: _TSQueryList,
max_workers: int,
cdf_version: str | None = None,
) -> None:
self.dps_client = dps_client
self.all_queries = all_queries
self.agg_queries = agg_queries
self.raw_queries = raw_queries
self.max_workers = max_workers
self.n_queries = len(all_queries)
self.cdf_version = cdf_version

def fetch_all_datapoints(self) -> DatapointsList:
pool = ConcurrencySettings.get_executor(max_workers=self.max_workers)
Expand All @@ -142,13 +146,18 @@ def fetch_all_datapoints_numpy(self) -> DatapointsArrayList:
)

def _request_datapoints(self, payload: _DatapointsPayload) -> Sequence[DataPointListItem]:
headers: dict | None = None
if self.cdf_version:
headers = {"cdf-version": self.cdf_version}

(res := DataPointListResponse()).MergeFromString(
self.dps_client._do_request(
json=payload,
method="POST",
url_path=f"{self.dps_client._RESOURCE_PATH}/list",
accept="application/protobuf",
timeout=self.dps_client._config.timeout,
headers=headers,
).content
)
return res.items
Expand Down Expand Up @@ -532,6 +541,7 @@ def retrieve(
| DatapointsQuery
| dict[str, Any]
| SequenceNotStr[str | DatapointsQuery | dict[str, Any]] = None,
instance_id: None | InstanceId | Sequence[InstanceId] = None,
start: int | str | datetime.datetime | None = None,
end: int | str | datetime.datetime | None = None,
aggregates: Aggregate | str | list[Aggregate | str] | None = None,
Expand Down Expand Up @@ -563,6 +573,7 @@ def retrieve(
Args:
id (None | int | DatapointsQuery | dict[str, Any] | Sequence[int | DatapointsQuery | dict[str, Any]]): Id, dict (with id) or (mixed) sequence of these. See examples below.
external_id (None | str | DatapointsQuery | dict[str, Any] | SequenceNotStr[str | DatapointsQuery | dict[str, Any]]): External id, dict (with external id) or (mixed) sequence of these. See examples below.
instance_id (None | InstanceId | Sequence[InstanceId]): Instance id or sequence of instance ids. If provided, the `id` and `external_id` arguments are ignored.
start (int | str | datetime.datetime | None): Inclusive start. Default: 1970-01-01 UTC.
end (int | str | datetime.datetime | None): Exclusive end. Default: "now"
aggregates (Aggregate | str | list[Aggregate | str] | None): Single aggregate or list of aggregates to retrieve. Available options: ``average``, ``continuous_variance``, ``count``, ``count_bad``, ``count_good``,
Expand Down Expand Up @@ -742,6 +753,7 @@ def retrieve(
end=end,
id=id,
external_id=external_id,
instance_id=instance_id,
aggregates=aggregates,
granularity=granularity,
timezone=timezone,
Expand All @@ -754,7 +766,12 @@ def retrieve(
ignore_bad_datapoints=ignore_bad_datapoints,
treat_uncertain_as_bad=treat_uncertain_as_bad,
)
fetcher = select_dps_fetch_strategy(self, full_query=query)
cdf_version: str | None = None
if instance_id:
cdf_version = "alpha"
self._use_instance_api()

fetcher = select_dps_fetch_strategy(self, full_query=query, cdf_version=cdf_version)

dps_lst = fetcher.fetch_all_datapoints()
if not query.is_single_identifier:
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/data_classes/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def __hash__(self) -> int:

@classmethod
# TODO: Remove in next major version (require use of DatapointsQuery directly)
def from_dict(cls, dct: dict[str, Any], id_type: Literal["id", "external_id"]) -> Self:
def from_dict(cls, dct: dict[str, Any], id_type: Literal["id", "external_id", "instance_id"]) -> Self:
if id_type not in dct:
if (arg_name_cc := to_camel_case(id_type)) not in dct:
raise KeyError(f"Missing required key `{id_type}` in dict: {dct}.")
Expand Down
5 changes: 5 additions & 0 deletions tests/tests_integration/test_api/test_datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -2736,3 +2736,8 @@ def test_dict_format_with_status_codes_using_insert_multiple(self, cognite_clien

def test_insert_datapoints_with_instance_id(self, alpha_client: CogniteClient, instance_ts_id: InstanceId) -> None:
alpha_client.time_series.data.insert([(0, 0.0), (1.0, 1.0)], instance_id=instance_ts_id)

retrieved = alpha_client.time_series.data.retrieve(instance_id=instance_ts_id)

assert retrieved.timestamp == [0, 1]
assert retrieved.value == [0.0, 1.0]

0 comments on commit 2577642

Please sign in to comment.