Skip to content

Commit

Permalink
test: speedup celery tests (#12885)
Browse files Browse the repository at this point in the history
* speedup celery tests

* refactor wait for success
  • Loading branch information
kstrz committed Feb 3, 2021
1 parent b5b0c2c commit 742d560
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions tests/celery_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ def test_run_sync_query_cta_config(setup_sqllab, ctas_method):
)

assert query.select_sql == get_select_star(tmp_table_name, schema=CTAS_SCHEMA_NAME)
time.sleep(CELERY_SLEEP_TIME)
results = run_sql(query.select_sql)
assert QueryStatus.SUCCESS == results["status"], result

Expand All @@ -231,9 +230,8 @@ def test_run_async_query_cta_config(setup_sqllab, ctas_method):
QUERY, cta=True, ctas_method=ctas_method, async_=True, tmp_table=tmp_table_name,
)

time.sleep(CELERY_SLEEP_TIME)
query = wait_for_success(result)

query = get_query_by_id(result["query"]["serverId"])
assert QueryStatus.SUCCESS == query.status
assert get_select_star(tmp_table_name, schema=CTAS_SCHEMA_NAME) == query.select_sql
assert (
Expand All @@ -252,9 +250,8 @@ def test_run_async_cta_query(setup_sqllab, ctas_method):
QUERY, cta=True, ctas_method=ctas_method, async_=True, tmp_table=table_name
)

time.sleep(CELERY_SLEEP_TIME)
query = wait_for_success(result)

query = get_query_by_id(result["query"]["serverId"])
assert QueryStatus.SUCCESS == query.status
assert get_select_star(table_name) in query.select_sql

Expand All @@ -274,9 +271,8 @@ def test_run_async_cta_query_with_lower_limit(setup_sqllab, ctas_method):
result = run_sql(
QUERY, cta=True, ctas_method=ctas_method, async_=True, tmp_table=tmp_table
)
time.sleep(CELERY_SLEEP_TIME)
query = wait_for_success(result)

query = get_query_by_id(result["query"]["serverId"])
assert QueryStatus.SUCCESS == query.status

assert get_select_star(tmp_table) == query.select_sql
Expand Down Expand Up @@ -429,3 +425,12 @@ def my_task():

def delete_tmp_view_or_table(name: str, db_object_type: str):
db.get_engine().execute(f"DROP {db_object_type} IF EXISTS {name}")


def wait_for_success(result):
for _ in range(CELERY_SLEEP_TIME * 2):
time.sleep(0.5)
query = get_query_by_id(result["query"]["serverId"])
if QueryStatus.SUCCESS == query.status:
break
return query

0 comments on commit 742d560

Please sign in to comment.