Skip to content

Commit

Permalink
Add full dps/time series support for instance_id (#1934)
Browse files Browse the repository at this point in the history
  • Loading branch information
haakonvt authored Sep 19, 2024
1 parent 6f3cad4 commit 2a825fa
Show file tree
Hide file tree
Showing 19 changed files with 277 additions and 301 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.61.0] - 2024-09-18
### Changed
- TimeSeriesAPI and DatapointsAPI support for `instance_id` reaches general availability (GA).
### Added
- `instance_id` can now be used freely alongside `id` and `external_id`, and is now accepted by
retrieve/retrieve_array/retrieve_dataframe.
- `instance_id` now works in `to_pandas` methods, with fallbacks on `external_id` and `id`.
### Fixed
- A bug caused all datapoints objects to load an empty instance_id.

## [7.60.6] - 2024-09-17
### Fixed
- Fixed bug in `replace` upsert mode which caused objects to not be cleared.
Expand Down
68 changes: 33 additions & 35 deletions cognite/client/_api/datapoint_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import datetime
import math
import numbers
import operator as op
import warnings
from abc import ABC, abstractmethod
Expand All @@ -15,7 +14,6 @@
Any,
Callable,
DefaultDict,
Dict,
Iterator,
List,
Literal,
Expand Down Expand Up @@ -46,8 +44,7 @@
DatapointsQuery,
_DatapointsPayloadItem,
)
from cognite.client.utils._auxiliary import is_unlimited
from cognite.client.utils._identifier import InstanceId
from cognite.client.utils._auxiliary import exactly_one_is_not_none, is_unlimited
from cognite.client.utils._text import convert_all_keys_to_snake_case, to_snake_case
from cognite.client.utils._time import (
ZoneInfo,
Expand Down Expand Up @@ -85,11 +82,9 @@
DatapointsRaw = Union[NumericDatapoints, StringDatapoints]

RawDatapointValue = Union[float, str]
DatapointsId = Union[int, DatapointsQuery, Dict[str, Any], Sequence[Union[int, DatapointsQuery, Dict[str, Any]]]]
DatapointsExternalId = Union[
str, DatapointsQuery, Dict[str, Any], SequenceNotStr[Union[str, DatapointsQuery, Dict[str, Any]]]
]
DatapointsInstanceId = Union[InstanceId, Sequence[InstanceId]]
DatapointsId = Union[int, DatapointsQuery, Sequence[Union[int, DatapointsQuery]]]
DatapointsExternalId = Union[str, DatapointsQuery, SequenceNotStr[Union[str, DatapointsQuery]]]
DatapointsInstanceId = Union[NodeId, DatapointsQuery, Sequence[Union[NodeId, DatapointsQuery]]]


@dataclass
Expand All @@ -103,7 +98,7 @@ class _FullDatapointsQuery:
end: int | str | datetime.datetime | None = None
id: DatapointsId | None = None
external_id: DatapointsExternalId | None = None
instance_id: InstanceId | Sequence[InstanceId] | None = None
instance_id: DatapointsInstanceId | None = None
aggregates: Aggregate | str | list[Aggregate | str] | None = None
granularity: str | None = None
timezone: str | datetime.timezone | ZoneInfo | None = None
Expand All @@ -118,15 +113,15 @@ class _FullDatapointsQuery:

@property
def is_single_identifier(self) -> bool:
# No lists given and exactly one of id/xid was given:
return (
isinstance(self.id, (dict, DatapointsQuery, numbers.Integral))
and (self.external_id is None and self.instance_id is None)
or isinstance(self.external_id, (dict, DatapointsQuery, str))
and (self.id is None and self.instance_id is None)
or isinstance(self.instance_id, InstanceId)
and (self.id is None and self.external_id is None)
)
# Exactly one of the identifiers given...
if exactly_one_is_not_none(self.external_id, self.id, self.instance_id):
# ...and that one is not a sequence:
return not (
isinstance(self.id, Sequence)
or isinstance(self.external_id, SequenceNotStr)
or isinstance(self.instance_id, Sequence)
)
return False

@cached_property
def top_level_defaults(self) -> dict[str, Any]:
Expand All @@ -149,35 +144,35 @@ def top_level_defaults(self) -> dict[str, Any]:
def parse_into_queries(self) -> list[DatapointsQuery]:
queries = []
if (id_ := self.id) is not None:
queries.extend(self._parse(id_, arg_name="id", exp_type=numbers.Integral))
queries.extend(self._parse(id_, arg_name="id", exp_type=int))
if (xid := self.external_id) is not None:
queries.extend(self._parse(xid, arg_name="external_id", exp_type=str))
if (iid := self.instance_id) is not None:
queries.extend(self._parse(iid, arg_name="instance_id", exp_type=InstanceId))
queries.extend(self._parse(iid, arg_name="instance_id", exp_type=NodeId))
if queries:
return queries
raise ValueError("Pass at least one time series `id` or `external_id`!")
raise ValueError("Pass at least one time series `id`, `external_id` or `instance_id`!")

def _parse(
self,
id_or_xid: DatapointsId | DatapointsExternalId | DatapointsInstanceId,
identifier: DatapointsId | DatapointsExternalId | DatapointsInstanceId,
arg_name: Literal["id", "external_id", "instance_id"],
exp_type: type,
) -> list[DatapointsQuery]:
user_queries: SequenceNotStr[int | str | DatapointsQuery | dict[str, Any]]
if isinstance(id_or_xid, (dict, DatapointsQuery, exp_type)):
user_queries: SequenceNotStr[int | str | NodeId | DatapointsQuery]
if isinstance(identifier, (dict, DatapointsQuery, exp_type)):
# Lazy - we postpone evaluation:
user_queries = [cast(Union[int, str, DatapointsQuery, Dict[str, Any]], id_or_xid)]
user_queries = [identifier]

elif isinstance(id_or_xid, SequenceNotStr):
elif isinstance(identifier, SequenceNotStr):
# We use Sequence because we require an ordering of elements
user_queries = id_or_xid
user_queries = identifier
else:
self._raise_on_wrong_ts_identifier_type(id_or_xid, arg_name, exp_type)
self._raise_on_wrong_ts_identifier_type(identifier, arg_name, exp_type)

parsed_queries = []
for query in user_queries:
# We merge 'defaults' and given user query; the query takes precedence:
# We merge 'defaults' and the given user query; the query takes precedence:
if isinstance(query, exp_type):
id_dct = {arg_name: query}
query = DatapointsQuery(**self.top_level_defaults, **id_dct) # type: ignore [misc, arg-type]
Expand All @@ -196,13 +191,13 @@ def _parse(

@staticmethod
def _raise_on_wrong_ts_identifier_type(
id_or_xid: object, # This fn is only called when gotten the wrong type
identifier: object, # This fn is only called when gotten the wrong type
arg_name: str,
exp_type: type,
) -> NoReturn:
raise TypeError(
f"Got unsupported type {type(id_or_xid)}, as, or part of argument `{arg_name}`. Expected one of "
f"{exp_type}, {DatapointsQuery} or {dict} (deprecated), or a (mixed) list of these, but got `{id_or_xid}`."
f"Got unsupported type {type(identifier)}, as, or part of argument `{arg_name}`. Expected one of "
f"{exp_type}, {DatapointsQuery}, or a (mixed) list of these, but got `{identifier}`."
)

def validate(self, queries: list[DatapointsQuery], dps_limit_raw: int, dps_limit_agg: int) -> list[DatapointsQuery]:
Expand Down Expand Up @@ -295,7 +290,7 @@ def _verify_and_convert_granularity(granularity: str | None) -> tuple[str | None
def _verify_and_convert_limit(limit: int | None) -> int | None:
if is_unlimited(limit):
return None
elif isinstance(limit, numbers.Integral) and limit >= 0: # limit=0 is accepted by the API
elif isinstance(limit, int) and limit >= 0: # limit=0 is accepted by the API
try:
# We don't want weird stuff like numpy dtypes etc:
return int(limit)
Expand Down Expand Up @@ -465,7 +460,10 @@ def get_datapoints_from_proto(res: DataPointListItem) -> DatapointsAny:

def get_ts_info_from_proto(res: DataPointListItem) -> dict[str, int | str | bool | NodeId | None]:
# Note: When 'unit_external_id' is returned, regular 'unit' is ditched
instance_id = NodeId(res.instanceId.space, res.instanceId.externalId) if res.instanceId else None
if res.instanceId and res.instanceId.space: # res.instanceId evaluates to True even when empty :eyes:
instance_id = NodeId(res.instanceId.space, res.instanceId.externalId)
else:
instance_id = None
return {
"id": res.id,
"external_id": res.externalId,
Expand Down
Loading

0 comments on commit 2a825fa

Please sign in to comment.