Skip to content

Commit

Permalink
Pass SSL arg to all requests in DruidOperator (#39066)
Browse files Browse the repository at this point in the history
* Pass SSL arg to all requests in DruidOperator

* Remove unneeded test

* Lint

* Fix test

* Fix tests

* Add true test as per dirrao's comment

* Use call_count == 1

---------

Co-authored-by: Daniel Bell <[email protected]>
  • Loading branch information
danielbe11 and Daniel Bell authored May 5, 2024
1 parent 54e8376 commit e396f06
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 10 deletions.
6 changes: 4 additions & 2 deletions airflow/providers/apache/druid/hooks/druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,15 @@ def submit_indexing_job(

sec = 0
while running:
req_status = requests.get(druid_task_status_url, auth=self.get_auth())
req_status = requests.get(druid_task_status_url, auth=self.get_auth(), verify=self.get_verify())

self.log.info("Job still running for %s seconds...", sec)

if self.max_ingestion_time and sec > self.max_ingestion_time:
# ensure that the job gets killed if the max ingestion time is exceeded
requests.post(f"{url}/{druid_task_id}/shutdown", auth=self.get_auth())
requests.post(
f"{url}/{druid_task_id}/shutdown", auth=self.get_auth(), verify=self.get_verify()
)
raise AirflowException(f"Druid ingestion took more than {self.max_ingestion_time} seconds")

time.sleep(self.timeout)
Expand Down
60 changes: 52 additions & 8 deletions tests/providers/apache/druid/hooks/test_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,69 @@ def test_submit_sql_based_ingestion_ok(self, requests_mock):
assert task_post.call_count == 1
assert status_check.call_count == 1

def test_submit_with_correct_ssl_arg(self, requests_mock):
def test_submit_with_false_ssl_arg(self, requests_mock):
# Timeout so that all three requests are sent
self.db_hook.timeout = 1
self.db_hook.max_ingestion_time = 5
self.db_hook.verify_ssl = False

task_post = requests_mock.post(
"http://druid-overlord:8081/druid/indexer/v1/task",
text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}',
)
status_check = requests_mock.get(
"http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status",
text='{"status":{"status": "SUCCESS"}}',
text='{"status":{"status": "RUNNING"}}',
)
shutdown_post = requests_mock.post(
"http://druid-overlord:8081/druid/indexer/v1/task/"
"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown",
text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}',
)

self.db_hook.submit_indexing_job("Long json file")
with pytest.raises(AirflowException):
self.db_hook.submit_indexing_job("Long json file")

# PGH005: false positive on ``requests_mock`` argument `called_once`
assert task_post.call_count == 1
assert status_check.call_count == 1
if task_post.called_once:
verify_ssl = task_post.request_history[0].verify
assert False is verify_ssl
assert False is task_post.request_history[0].verify

assert status_check.call_count > 1
assert False is status_check.request_history[0].verify

assert shutdown_post.call_count == 1
assert False is shutdown_post.request_history[0].verify

def test_submit_with_true_ssl_arg(self, requests_mock):
# Timeout so that all three requests are sent
self.db_hook.timeout = 1
self.db_hook.max_ingestion_time = 5
self.db_hook.verify_ssl = True

task_post = requests_mock.post(
"http://druid-overlord:8081/druid/indexer/v1/task",
text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}',
)
status_check = requests_mock.get(
"http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status",
text='{"status":{"status": "RUNNING"}}',
)
shutdown_post = requests_mock.post(
"http://druid-overlord:8081/druid/indexer/v1/task/"
"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown",
text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}',
)

with pytest.raises(AirflowException):
self.db_hook.submit_indexing_job("Long json file")

assert task_post.call_count == 1
assert True is task_post.request_history[0].verify

assert status_check.call_count > 1
assert True is status_check.request_history[0].verify

assert shutdown_post.call_count == 1
assert True is shutdown_post.request_history[0].verify

def test_submit_correct_json_body(self, requests_mock):
task_post = requests_mock.post(
Expand Down

0 comments on commit e396f06

Please sign in to comment.