Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallelize using partitions #1863

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 22 additions & 33 deletions cognite/client/_api/relationships.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from __future__ import annotations

import itertools
import warnings
from functools import partial
from typing import TYPE_CHECKING, Iterator, Literal, Sequence, overload

from cognite.client._api_client import APIClient
Expand All @@ -16,8 +14,7 @@
)
from cognite.client.data_classes.labels import LabelFilter
from cognite.client.data_classes.relationships import RelationshipCore
from cognite.client.utils._auxiliary import is_unlimited, split_into_chunks
from cognite.client.utils._concurrency import execute_tasks
from cognite.client.utils._auxiliary import is_unlimited, remove_duplicates_keep_order, split_into_chunks
from cognite.client.utils._identifier import IdentifierSequence
from cognite.client.utils._validation import assert_type, process_data_set_ids
from cognite.client.utils.useful_types import SequenceNotStr
Expand Down Expand Up @@ -296,8 +293,11 @@ def list(
labels=labels,
).dump(camel_case=True)

target_external_ids, source_external_ids = target_external_ids or [], source_external_ids or []
if all(len(xids) <= self._LIST_SUBQUERY_LIMIT for xids in (target_external_ids, source_external_ids)):
# The API is fine with duplicated target/source IDs - but since we fetch in chunks, we must ensure
# we don't ask for the same across chunks so that we don't return duplicates back to the user:
unique_target_xids = remove_duplicates_keep_order(target_external_ids or [])
unique_source_xids = remove_duplicates_keep_order(source_external_ids or [])
if all(len(xids) <= self._LIST_SUBQUERY_LIMIT for xids in (unique_target_xids, unique_source_xids)):
return self._list(
list_cls=RelationshipList,
resource_cls=Relationship,
Expand All @@ -312,41 +312,30 @@ def list(
f"Querying more than {self._LIST_SUBQUERY_LIMIT} source_external_ids/target_external_ids is only "
f"supported for unlimited queries (pass -1 / None / inf instead of {limit})"
)
tasks = []
target_chunks = split_into_chunks(target_external_ids, self._LIST_SUBQUERY_LIMIT) or [[]]
source_chunks = split_into_chunks(source_external_ids, self._LIST_SUBQUERY_LIMIT) or [[]]
target_chunks = split_into_chunks(unique_target_xids, self._LIST_SUBQUERY_LIMIT) or [[]]
source_chunks = split_into_chunks(unique_source_xids, self._LIST_SUBQUERY_LIMIT) or [[]]

# All sources (if any) must be checked against all targets (if any). When either is not
# given, we must exhaustively list all matching just the source or the target:
results = []
for target_xids, source_xids in itertools.product(target_chunks, source_chunks):
task_filter = filter.copy()
if target_external_ids: # keep null if it was
if unique_target_xids: # keep null if it was
task_filter["targetExternalIds"] = target_xids
if source_external_ids:
if unique_source_xids:
task_filter["sourceExternalIds"] = source_xids
tasks.append({"filter": task_filter})

if partitions is not None:
warnings.warn(
f"When one or both of source/target external IDs have more than {self._LIST_SUBQUERY_LIMIT} "
"elements, `partitions` is ignored",
UserWarning,
results.extend(
self._list(
list_cls=RelationshipList,
resource_cls=Relationship,
method="POST",
limit=None,
filter=task_filter,
partitions=partitions,
other_params={"fetchResources": fetch_resources},
).data
)
tasks_summary = execute_tasks(
partial(
self._list,
list_cls=RelationshipList,
resource_cls=Relationship,
method="POST",
limit=None,
partitions=None, # Otherwise, workers will spawn workers -> deadlock (singleton threadpool)
other_params={"fetchResources": fetch_resources},
),
tasks,
max_workers=self._config.max_workers,
)
tasks_summary.raise_compound_exception_if_failed_tasks()
return RelationshipList(tasks_summary.joined_results(), cognite_client=self._cognite_client)
return RelationshipList(results, cognite_client=self._cognite_client)

@overload
def create(self, relationship: Relationship | RelationshipWrite) -> Relationship: ...
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/utils/_auxiliary.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def find_duplicates(seq: Iterable[THashable]) -> set[THashable]:
return {x for x in seq if x in seen or add(x)}


def remove_duplicates_keep_order(seq: Sequence[THashable]) -> list[THashable]:
def remove_duplicates_keep_order(seq: SequenceNotStr[THashable]) -> list[THashable]:
seen: set[THashable] = set()
add = seen.add
return [x for x in seq if x not in seen and not add(x)]
Expand Down