Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ThreadPoolExecutor locks/hangs when getting all items from client.relationships.list #1831

Closed
korvalds opened this issue Jun 26, 2024 · 6 comments
Labels

Comments

@korvalds
Copy link

System information:

  • OS: Linux/5.15.153.1-microsoft-standard-WSL2-x86_64-64bit
  • Python Version: CPython/3.10.12
  • SDK Version: CognitePythonSDK/7.52.0

Describe the bug
ThreadPoolExecutor locks/hangs when getting all items (limit=-1) from client.relationships.list endpoint using partitions=10 and source_external_ids=[...] when len(source_external_id_list) > self._LIST_SUBQUERY_LIMIT

To Reproduce
Runnable code reproducing the error.

from cognite.client import CogniteClient

client = CogniteClient()

workorders = client.events.list(
    data_set_external_ids=["some_id"],
    type="work_order",
    metadata=({"woMainAsset": "asset_name"}),
    limit=-1,
    partitions=10
).to_pandas(camel_case=True)

ops = client.relationships.list(
    data_set_external_ids=["some_id"],
    source_types=["event"],
    target_types=["event"],
    source_external_ids=workorders["externalId"].tolist(), # this is maybe 50000 ids
    limit=-1,
    partitions=10,
).to_pandas(camel_case=True)

 # Then hangs indefinitely, until killing the process:

  File "/home/dev/test_cognite_sdk_client.py", line 111, in <module>
    ops = client.relationships.list(
  File "/home/dev/.virtualenvs/core/lib/python3.10/site-packages/cognite/client/_api/relationships.py", line 353, in list
    tasks_summary = execute_tasks(
  File "/home/dev/.virtualenvs/core/lib/python3.10/site-packages/cognite/client/utils/_concurrency.py", line 300, in execute_tasks
    for fut in as_completed(futures_dct):
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 245, in as_completed
    waiter.event.wait(wait_timeout)
  File "/usr/lib/python3.10/threading.py", line 607, in wait
    signaled = self._cond.wait(timeout)
  File "/usr/lib/python3.10/threading.py", line 320, in wait
    waiter.acquire()
KeyboardInterrupt

Exception ignored in: <module 'threading' from '/usr/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1537, in _shutdown
    atexit_call()
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 31, in _python_exit
    t.join()
  File "/usr/lib/python3.10/threading.py", line 1096, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
    if lock.acquire(block, timeout):
KeyboardInterrupt:

Expected behavior
Should return all relationsships.

Additional context
I think the problem lies in the execute_tasks implementation, where a there is a single get_thread_pool_executor returned every time. Since the client.relationships.list has nested calls to execute_tasks: execute_tasks(..., self._list ..) -> self._list_partitioned -> execute_tasks(get_partition..) the threads lock up.

Solution can be to use a context manager inside execute_tasks:
with concurrent.futures.ThreadPoolExecutor(max_workers) as executor:

@korvalds korvalds added the bug label Jun 26, 2024
@haakonvt
Copy link
Contributor

haakonvt commented Jul 16, 2024

Hi @korvalds and thanks for the bug report! Sorry for not responding earlier (vacation 🌴 ). This issue will be prioritized and I expect a fix to be out later this week.

@haakonvt
Copy link
Contributor

@korvalds a fix is released now, would you mind retrying with 7.54.2?

@korvalds
Copy link
Author

@haakonvt ok, thanks. Will try next week when I'm back from vacation 😎

@korvalds
Copy link
Author

@haakonvt Hi! I have tested, but not sure if the fix is good enough. Long comment below 😅

The main motivation for using the partitions=10 argument is to prevent 429 and timeout errors when using the api. This lock issue was first encountered while investigation 429 errors, and I tried following the documentation . With partitions it seems to reduce the number of of 429 / retries and also is much faster.

In your fix the partitions argument is ignored when source/target external IDs have more than {self._LIST_SUBQUERY_LIMIT}.
Which seems contrary to use partitions when the number of requests increases, or I'm missing something here?
And is this not also prone to the same 429 errors as before? (I don't know the specifics values of current rate limits, just encountering them, do you have some facts on rate limiting?)

With your fix, how is this code now different than not using partitions at all? I.e partitions=None takes the same amount of time. The lock was an issue with the singleton threadpool, what is the purpose of that and why not use a context manager instead?

@haakonvt
Copy link
Contributor

@korvalds First of all, I understand your concern regarding the removal of partitions, however, I do believe my reasoning for it is sound, and I'll try to explain.

Since the API allows a maximum of 1000 source- or target ids, whenever the user exceeds this amount, we need to create subqueries and merge the results of these subqueries client side. The number of subqueries scales quadratically, as each source chunk must be fetched against all target chunks. This quickly creates a lot of requests.

If each of these individual requests are to use partitions, we end up in a situation where the total number of requests grows even faster - and that's why I removed it. We still fetch these subqueries in parallel, but we don't parallelize each one further by use of partitions.

Rate limiting does not distinguish on whether partitions is used or not, only the total volume.

I can see that the new implementation is slower between the two "fetch regimens".

  • When the number of ids is less than a 1000 (for target and source), partitions is now used, leading to better performance (it was silently ignored previously).
  • When the number of ids above 1000, but not by a lot, there can for sure be cases that fetch slower than before. Note however that max_workers was not respected by the previous code, and each task would spin up its own threadpool, hammering away at the API (after your initial burst request budget is spent, you might get severly rate limited).
  • When the number of ids is greatly above 1000, the new method should be faster due to less rate limiting (we need less queries, and the responses better utilize the max limit on resources that can be returned.

For maintainability, there is also a good argument to not make the fetching logic too complicated, but I'm happy to discuss other ideas and strategies for how to make this as performant as possible!

If you have a specific scenario that fetches significantly slower than it used to (I guess compared with a quite old SDK version) I would be very interested in seeing/doing some benchmarking on this!

@korvalds
Copy link
Author

korvalds commented Aug 2, 2024

@haakonvt to summarize: will close this issue with the fix in 7.54.2

For those interested, this issue was tested with two different strategies for executing parallel requests, using the fix in master and concurrency-strategy-testing-relationships.

The results (in my use case) showed that the approach in concurrency-strategy-testing-relationships was about 7 times faster.
This was because the parallelization happens at the request level and limits the amount of pagination required. That reduces sequential requests since the response is distributed between more partitions instead of waiting for nextCursor value from previous request.

Depending on cognite-sdk version, split source_external_id_list beforehand is maybe worth a try:

source_external_id_list = workorders ["externalId"].tolist() # this is maybe 50000 ids
subquery_limit = client.relationships._LIST_SUBQUERY_LIMIT
results = []
for si in range(0, max(1, len(source_external_id_list)), subquery_limit ):
    results.extend(client.relationships._list(
        list_cls=RelationshipList,
                resource_cls=Relationship,
                method="POST",
        filter={"dataSetIds": [{'externalId': "some_id"}],
        "sourceTypes":["event"],
        "targetTypes":["event"],
        "sourceExternalIds":source_external_id_list[si: si + subquery_limit ]},
        limit=-1,
        partitions=3,
    ))
ops = RelationshipList(results, cognite_client=client).to_pandas(camel_case=True)

@korvalds korvalds closed this as completed Aug 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants