Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: iterate datapoints using DatapointsAPI.__call__ #2010

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

haakonvt
Copy link
Contributor

@haakonvt haakonvt commented Nov 6, 2024

`Iterate through datapoints in chunks, for one or more time series. <https://developer.cognite.com/api#tag/Time-series/operation/getMultiTimeSeriesDatapoints>`_

Note:
    Control memory usage by specifying ``chunk_size_time_series``, how many time series to iterate simultaneously and ``chunk_size_datapoints``,
    how many datapoints to yield per iteration. Note that in order to make efficient use of the API request limits, this method will never hold
    less than 100k datapoints in memory at a time, per time series.

    If you run with memory constraints, use ``return_arrays=True`` (the default).

    No empty chunk will ever be returned.

Args:
    queries (DatapointsQuery | Sequence[DatapointsQuery]): Query, or queries, using id, external_id or instance_id for time series to fetch data for. Individual settings in the DatapointsQuery take precedence over top-level settings. The options 'limit' and 'include_outside_points' are not supported.
    start (int | str | datetime.datetime | None): Inclusive start. Default: 1970-01-01 UTC.
    end (int | str | datetime.datetime | None): Exclusive end. Default: "now"
    aggregates (Aggregate | str | list[Aggregate | str] | None): Single aggregate or list of aggregates to retrieve. Available options: ``average``, ``continuous_variance``, ``count``, ``count_bad``, ``count_good``, ``count_uncertain``, ``discrete_variance``, ``duration_bad``, ``duration_good``, ``duration_uncertain``, ``interpolation``, ``max``, ``min``, ``step_interpolation``, ``sum`` and ``total_variation``. Default: None (raw datapoints returned)
    granularity (str | None): The granularity to fetch aggregates at. Can be given as an abbreviation or spelled out for clarity: ``s/second(s)``, ``m/minute(s)``, ``h/hour(s)``, ``d/day(s)``, ``w/week(s)``, ``mo/month(s)``, ``q/quarter(s)``, or ``y/year(s)``. Examples: ``30s``, ``5m``, ``1day``, ``2weeks``. Default: None.
    timezone (str | datetime.timezone | ZoneInfo | None): For raw datapoints, which timezone to use when displaying (will not affect what is retrieved). For aggregates, which timezone to align to for granularity 'hour' and longer. Align to the start of the hour, day or month. For timezones of type Region/Location, like 'Europe/Oslo', pass a string or ``ZoneInfo`` instance. The aggregate duration will then vary, typically due to daylight saving time. You can also use a fixed offset from UTC by passing a string like '+04:00', 'UTC-7' or 'UTC-02:30' or an instance of ``datetime.timezone``. Note: Historical timezones with second offset are not supported, and timezones with minute offsets (e.g. UTC+05:30 or Asia/Kolkata) may take longer to execute.
    target_unit (str | None): The unit_external_id of the datapoints returned. If the time series does not have a unit_external_id that can be converted to the target_unit, an error will be returned. Cannot be used with target_unit_system.
    target_unit_system (str | None): The unit system of the datapoints returned. Cannot be used with target_unit.
    ignore_unknown_ids (bool): Whether to ignore missing time series rather than raising an exception. Default: False
    include_status (bool): Also return the status code, an integer, for each datapoint in the response. Only relevant for raw datapoint queries, not aggregates.
    ignore_bad_datapoints (bool): Treat datapoints with a bad status code as if they do not exist. If set to false, raw queries will include bad datapoints in the response, and aggregates will in general omit the time period between a bad datapoint and the next good datapoint. Also, the period between a bad datapoint and the previous good datapoint will be considered constant. Default: True.
    treat_uncertain_as_bad (bool): Treat datapoints with uncertain status codes as bad. If false, treat datapoints with uncertain status codes as good. Used for both raw queries and aggregates. Default: True.
    chunk_size_datapoints (int): The number of datapoints per time series to yield per iteration. Must evenly divide 100k OR be an integer multiple of 100k. Default: 100_000.
    chunk_size_time_series (int | None): The max number of time series to yield per iteration (varies as time series get exhausted, but is always at least 1). Default: None (all given queries are returned).
    return_arrays (bool): Whether to return the datapoints as numpy arrays. Default: True.

Yields:
    DatapointsArray | DatapointsArrayList | Datapoints | DatapointsList: If return_arrays=True, a ``DatapointsArray`` object containing the datapoints chunk, or a ``DatapointsArrayList`` if multiple time series were asked for. When False, a ``Datapoints`` object containing the datapoints chunk, or a ``DatapointsList`` if multiple time series were asked for. If `ignore_unknown_ids` is `True`, a single time series is requested and it is not found, iteration will quit.

Examples:

    BACKUP: DatapointsArray | DatapointsArrayList | Datapoints | DatapointsList:
    Iterate through the datapoints of a single time series with external_id="foo", in chunks of 25k:

        >>> from cognite.client import CogniteClient
        >>> from cognite.client.data_classes import DatapointsQuery
        >>> client = CogniteClient()
        >>> query = DatapointsQuery(external_id="foo", start="2w-ago")
        >>> for chunk in client.time_series.data(query, chunk_size=25_000)
        ...     pass  # do something with the datapoints chunk

    Iterate through datapoints from multiple time series, and do not return them as numpy arrays. As one or more time
    series get exhausted (no more data), they are no longer part of the returned "chunk list", but the order is
    still preserved (for the remaining).

    If you run with ``chunk_size_time_series=None``, an easy way to check is to use the ``.get`` method, as illustrated below:

        >>> from cognite.client.data_classes.data_modeling import NodeId
        >>> queries = [
        ...     DatapointsQuery(id=123),
        ...     DatapointsQuery(external_id="foo"),
        ...     DatapointsQuery(instance_id=NodeId("my-space", "my-ts-xid"))
        ... ]
        >>> for chunk_lst in client.time_series.data(query, return_arrays=False):
        ...     if chunk_lst.get(id=2) is None:
        ...         print("Time series with id=2 has no more datapoints!")

    A likely use case for iterating datapoints is to clone data from one project to another, while keeping a low memory
    footprint and without having to write very custom logic involving count aggregates (which won't work for string data)
    and time domain splitting.

    Here's an example of how to do so efficiently, while not ignoring bad and uncertain data (``ignore_bad_datapoints=False``)
    and copying status codes (automatically taken care of when requested, ``include_status=True``). The only assumption is that
    the time series are already created in the target project.

        >>> from cognite.client.utils import MIN_TIMESTAMP_MS, MAX_TIMESTAMP_MS
        >>> target_client = CogniteClient(...)
        >>> to_copy = client.time_series.list(data_set_external_ids="my-use-case")
        >>> queries = [DatapointsQuery(external_id=ts.external_id) for ts in to_copy]
        >>> for dps_chunk in client.time_series.data(
        ...     queries,  # may be several thousand time series...
        ...     chunk_size_time_series=20,  # control memory usage by specifying how many to iterate at a time
        ...     chunk_size_datapoints=100_000,
        ...     include_status=True,
        ...     ignore_bad_datapoints=False,
        ...     start=MIN_TIMESTAMP_MS,
        ...     end=MAX_TIMESTAMP_MS + 1,  # end is exclusive
        ... ):
        ...     target_client.time_series.data.insert_multiple(
        ...         [{"external_id": dps.external_id, "datapoints": dps} for dps in dps_chunk]
        ...     )

@haakonvt haakonvt changed the title draft: add DatapointsAPI.__call__ draft: add DatapointsAPI.__call__ Nov 6, 2024
@haakonvt haakonvt force-pushed the add-dps-iter-method branch 2 times, most recently from c9814f5 to 266c6bc Compare November 6, 2024 23:02
@haakonvt haakonvt changed the title draft: add DatapointsAPI.__call__ feature: iterate datapoints using DatapointsAPI.__call__ Nov 7, 2024
@haakonvt haakonvt marked this pull request as ready for review November 7, 2024 14:30
@haakonvt haakonvt requested review from a team as code owners November 7, 2024 14:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant