Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass SSL arg to all requests in DruidOperator #39066

6 changes: 4 additions & 2 deletions airflow/providers/apache/druid/hooks/druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,15 @@ def submit_indexing_job(

sec = 0
while running:
req_status = requests.get(druid_task_status_url, auth=self.get_auth())
req_status = requests.get(druid_task_status_url, auth=self.get_auth(), verify=self.get_verify())
danielbe11 marked this conversation as resolved.
Show resolved Hide resolved

self.log.info("Job still running for %s seconds...", sec)

if self.max_ingestion_time and sec > self.max_ingestion_time:
# ensure that the job gets killed if the max ingestion time is exceeded
requests.post(f"{url}/{druid_task_id}/shutdown", auth=self.get_auth())
requests.post(
f"{url}/{druid_task_id}/shutdown", auth=self.get_auth(), verify=self.get_verify()
)
raise AirflowException(f"Druid ingestion took more than {self.max_ingestion_time} seconds")

time.sleep(self.timeout)
Expand Down
27 changes: 21 additions & 6 deletions tests/providers/apache/druid/hooks/test_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,24 +100,39 @@ def test_submit_sql_based_ingestion_ok(self, requests_mock):
assert status_check.call_count == 1

def test_submit_with_correct_ssl_arg(self, requests_mock):
# Timeout so that all three requests are sent
self.db_hook.timeout = 1
self.db_hook.max_ingestion_time = 5
self.db_hook.verify_ssl = False

task_post = requests_mock.post(
"http://druid-overlord:8081/druid/indexer/v1/task",
text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}',
)
status_check = requests_mock.get(
"http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status",
text='{"status":{"status": "SUCCESS"}}',
text='{"status":{"status": "RUNNING"}}',
)
shutdown_post = requests_mock.post(
"http://druid-overlord:8081/druid/indexer/v1/task/"
"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown",
text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}',
)

self.db_hook.submit_indexing_job("Long json file")
with pytest.raises(AirflowException):
self.db_hook.submit_indexing_job("Long json file")

# PGH005: false positive on ``requests_mock`` argument `called_once`
assert task_post.call_count == 1
assert status_check.call_count == 1
if task_post.called_once:
verify_ssl = task_post.request_history[0].verify
assert False is verify_ssl
assert False is task_post.request_history[0].verify

assert status_check.call_count > 1
if status_check.call_count > 1:
assert False is status_check.request_history[0].verify

assert shutdown_post.call_count == 1
if shutdown_post.called_once:
assert False is shutdown_post.request_history[0].verify

def test_submit_correct_json_body(self, requests_mock):
task_post = requests_mock.post(
Expand Down