Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix support for macros with dots in DataProcJobBuilder #28970

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions airflow/providers/google/cloud/hooks/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(
job_type: str,
properties: dict[str, str] | None = None,
) -> None:
name = f"{task_id.replace('.', '_')}_{uuid.uuid4()!s:.8}"
name = f"{task_id}_{uuid.uuid4()!s:.8}"
self.job_type = job_type
self.job: dict[str, Any] = {
"job": {
Expand Down Expand Up @@ -180,12 +180,11 @@ def set_python_main(self, main: str) -> None:

def set_job_name(self, name: str) -> None:
"""
Set Dataproc job name. Job name is sanitized, replacing dots by underscores.
Set Dataproc job name.

:param name: Job name.
"""
sanitized_name = f"{name.replace('.', '_')}_{uuid.uuid4()!s:.8}"
self.job["job"]["reference"]["job_id"] = sanitized_name
self.job["job"]["reference"]["job_id"] = f"{name}_{uuid.uuid4()!s:.8}"

def build(self) -> dict:
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ def __init__(
self,
*,
region: str,
job_name: str = "{{task.task_id}}_{{ds_nodash}}",
job_name: str = "{{task.task_id.replace('.', '_')}}_{{ds_nodash}}",
cluster_name: str = "cluster-1",
project_id: str | None = None,
dataproc_properties: dict | None = None,
Expand Down
12 changes: 2 additions & 10 deletions tests/providers/google/cloud/hooks/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,17 +1006,9 @@ def test_set_python_main(self):
self.builder.set_python_main(main)
assert main == self.builder.job["job"][self.job_type]["main_python_file_uri"]

@pytest.mark.parametrize(
"job_name",
[
pytest.param("name", id="simple"),
pytest.param("name_with_dash", id="name with underscores"),
pytest.param("group.name", id="name with dot"),
pytest.param("group.name_with_dash", id="name with dot and underscores"),
],
)
Comment on lines -1009 to -1017
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to remove this tests cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They were added with sanitization (dot substitution) in DataProcJobBuilder. Now that I removed sanitization from that class all test cases with dots would fail. I just reverted this test to the state before the changes in DataProcJobBuilder and added a test that checks if proper job_id is passed from the operator instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were added to avoid regression of #23439
How can you be sure job_name works with these values if there is no coverage for these cases?

Copy link
Contributor Author

@rafalh rafalh Jan 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal I moved the replacement to DataprocJobBaseOperator default value for job_name and added another test that checks if job_name generated in DataprocJobBaseOperator has dots replaced by _ so that issue should still be fixed. That issue explicitly says that it's the default job name being broken:

DataprocJobBaseOperator have default of using task_id for job name

And that's exactly what this PR fixes, letting the people who use DataProcJobBuilder directly to pass any job name they want including templates containing dots.

@mock.patch(DATAPROC_STRING.format("uuid.uuid4"))
def test_set_job_name(self, mock_uuid, job_name):
def test_set_job_name(self, mock_uuid):
job_name = "name"
uuid = "test_uuid"
expected_job_name = f"{job_name}_{uuid[:8]}".replace(".", "_")
mock_uuid.return_value = uuid
Expand Down
22 changes: 21 additions & 1 deletion tests/providers/google/cloud/operators/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
DataprocSubmitPySparkJobOperator,
DataprocSubmitSparkJobOperator,
DataprocSubmitSparkSqlJobOperator,
DataprocUpdateClusterOperator,
DataprocUpdateClusterOperator, DataprocJobBaseOperator,
)
from airflow.providers.google.cloud.triggers.dataproc import DataprocBaseTrigger
from airflow.providers.google.common.consts import GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME
Expand Down Expand Up @@ -1344,6 +1344,26 @@ def test_instantiate_inline_workflow_operator_extra_links(
assert ti.task.get_extra_links(ti, DataprocLink.name) == DATAPROC_WORKFLOW_LINK_EXPECTED


class TestDataprocJobBaseOperator(DataprocJobTestBase):
@mock.patch(DATAPROC_PATH.format("uuid.uuid4"))
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_default_job_name(self, mock_hook, mock_uuid):
mock_uuid.return_value = TEST_JOB_ID
mock_hook.return_value.project_id = GCP_PROJECT
mock_uuid.return_value = TEST_JOB_ID
mock_hook.return_value.submit_job.return_value.reference.job_id = TEST_JOB_ID
self.extra_links_manager_mock.attach_mock(mock_hook, "hook")

op = DataprocJobBaseOperator(
task_id="group.task",
region=GCP_REGION,
gcp_conn_id=GCP_CONN_ID,
)
op.render_template_fields(context=self.mock_context)
job = op.create_job_template().build()
assert job["job"]["reference"]["job_id"] == f"group_task_{TEST_JOB_ID}"


class TestDataProcHiveOperator(unittest.TestCase):
query = "define sin HiveUDF('sin');"
variables = {"key": "value"}
Expand Down