Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix support for macros with dots in DataProcJobBuilder #28970

Closed
wants to merge 1 commit into from

Conversation

rafalh
Copy link
Contributor

@rafalh rafalh commented Jan 16, 2023

Do not sanitize job name in DataProcJobBuilder because it can use Jinja macros. Move sanitization to DataprocJobBaseOperator default value for job_name parameter to keep supporting task groups.

Fixes #28810

@boring-cyborg boring-cyborg bot added area:providers provider:google Google (including GCP) related issues labels Jan 16, 2023
Do not sanitize job name in DataProcJobBuilder because it can use Jinja macros. Move sanitization to DataprocJobBaseOperator default value for job_name parameter to keep supporting task groups.

Fixes apache#28810
@rafalh rafalh force-pushed the fix-dataproc-job-builder-macros branch from f0ccecd to b6bb0e9 Compare January 16, 2023 12:22
@rafalh rafalh marked this pull request as ready for review January 16, 2023 14:39
Comment on lines -1009 to -1017
@pytest.mark.parametrize(
"job_name",
[
pytest.param("name", id="simple"),
pytest.param("name_with_dash", id="name with underscores"),
pytest.param("group.name", id="name with dot"),
pytest.param("group.name_with_dash", id="name with dot and underscores"),
],
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to remove this tests cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They were added with sanitization (dot substitution) in DataProcJobBuilder. Now that I removed sanitization from that class all test cases with dots would fail. I just reverted this test to the state before the changes in DataProcJobBuilder and added a test that checks if proper job_id is passed from the operator instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were added to avoid regression of #23439
How can you be sure job_name works with these values if there is no coverage for these cases?

Copy link
Contributor Author

@rafalh rafalh Jan 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal I moved the replacement to DataprocJobBaseOperator default value for job_name and added another test that checks if job_name generated in DataprocJobBaseOperator has dots replaced by _ so that issue should still be fixed. That issue explicitly says that it's the default job name being broken:

DataprocJobBaseOperator have default of using task_id for job name

And that's exactly what this PR fixes, letting the people who use DataProcJobBuilder directly to pass any job name they want including templates containing dots.

@rafalh rafalh requested review from Taragolis and eladkal and removed request for Taragolis January 24, 2023 11:33
@rafalh
Copy link
Contributor Author

rafalh commented Jan 30, 2023

@Taragolis @eladkal Please review my change. Please note that this change makes migration from old job operators like DataprocSubmitPySparkJobOperator (which are deprecated according to code in __init__) to DataprocSubmitJobOperator easier. DataprocSubmitPySparkJobOperator generates job name based on task ID (ti.task_id). When using DataprocSubmitJobOperator with DataProcJobBuilder it is currently not possible without workarounds, e.g. this code won't work:

task1 = DataprocSubmitJobOperator(
    task_id="foo",
    job=DataProcJobBuilder(task_id="{{ task.task_id }}", ...).build(),
    ...
)

Of course I can duplicate task ID, but I shouldn't have to considering that templates work in job field and there was no problem with using templates in job name until #23791 was merged.
My PR deals with this problem by removing job name sanitization and fixing the default value provided in old job operators in case a grouped task is used. It should fix the problem that #23791 was fixing and brings back the old behavior in DataProcJobBuilder.
Theoretically it is not backward compatible because if user uses DataProcJobBuilder directly and passes a broken name (e.g. one with a dot) in task_id field of the constructor or by using set_job_name it will stop working, but:

  1. it was the old behavior and no one complained until groups were introduced, so I don't think it will affect users
  2. I think a good API should fail for invalid input, not try to fix it

@Taragolis
Copy link
Contributor

I tried understand initial issue #28810

I am passing string containing macros to DataProcJobBuilder task_id parameter:
{{ (dag.dag_id + '-' + task.task_id.replace('_', '-'))[:90] }}

Is this PR open for allow something like this?

DataProcJobBuilder(
    task_id="{{ (dag.dag_id + '-' + task.task_id.replace('_', '-'))[:90] }}"
    ...
)

@rafalh
Copy link
Contributor Author

rafalh commented Feb 1, 2023

@Taragolis yes, that's exactly what I am trying to do. It worked for a long time and regressed after #23791

@Taragolis
Copy link
Contributor

Sorry for late response. It is still unclear for me about this changes.

I have a look on DataProcJobBuilder and it usage in DataprocJobBaseOperator, i could say that all operators which based on DataprocJobBaseOperator:

  1. Call DataprocJobBaseOperator.create_job_template method in execute, so all templates should resolved at this point.
  2. Deprecated for very long time: [AIRFLOW-5691] Rewrite Dataproc operators to use python library #6371, and recommendation is switch to DataprocSubmitJobOperator which do not use DataProcJobBuilder at all

@rafalh
Copy link
Contributor Author

rafalh commented Mar 9, 2023

@Taragolis I am aware of those things. In our case we use non-deprecated DataprocSubmitJobOperator but to avoid creating untyped job dict by hand we use DataProcJobBuilder which AFAIK is not deprecated. If it was deprecated we wouldn't have used it and this issue wouldn't occur. We configure DataProcJobBuilder during DAG parsing time and expect to have all templates passed down to job dict and resolved during operator execution. It worked that way until sanitization was added to the builder.
Example of code:

task1 = DataprocSubmitJobOperator(
    task_id="foo",
    job=DataProcJobBuilder(task_id="{{ task.task_id }}", ...).build(),
    ...
)

@Taragolis
Copy link
Contributor

I guess you found some undocumented feature, and as many undocumented features it might stop work at any moment.
DataprocSubmitJobOperator.job expected dict, and there is no information and suggestion to use DataProcJobBuilder.build method for this purpose.

:param job: Required. The job resource.
If a dict is provided, it must be of the same form as the protobuf message
:class:`~google.cloud.dataproc_v1.types.Job`

@potiuk
Copy link
Member

potiuk commented Mar 12, 2023

I think this is really nice feature to have to use the DataProcJobBuillder instead of "raw" dictionary and it was - I believe the original intention of the builder - and while it is used like that internally by the "specific" operators, there should generally be no problem with it. So I would see that as a nice new "feature" to add. Yes it worked (to some extend before sanitization - but that was more of an accident than intention and sanitization indeed broke it.

Why don't we turn it into a "real" feature:

  • explain that it can be done
  • implement a way to sanitize the job name in a different place.

I believe (correct me if I am wrong) we can sanitize the job name here:

    def execute(self, context: Context):
        self.log.info("Submitting job")
        self.hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
         # We could sanitize the job name here
        job_object = self.hook.submit_job(

Is there any reason we cannot move the sanitization to inside the execute method ? Would that break anything other than delaying sanitization to the moment of execution? I guess this is usually what we do with potentially-template'able fields, the benefits of them being templated far outweight the "early" sanitization during parsing, and we have generally pushed such code to execute() method in multiple places.

@potiuk
Copy link
Member

potiuk commented Mar 12, 2023

This approach would also have the advantage, that even job names passes by hand would be sanitized and "." removed.

@Taragolis
Copy link
Contributor

I think this is really nice feature to have to use the DataProcJobBuillder instead of "raw" dictionary and it was

Nah, it wouldn't work in current implementation, moreover if DataProcJobBuillder object passed directly to any operator rather than build in execution method (how it implemented in deprecated operators) we can't resolve for sanitise easily common Airflow issue: "Is this template, or value, or XComArg and what should we do if it not rendered yet"

So I would suggest to remove in next major version everything related to deprecated stuff, rather that try to fix/adopt to current operators. WDYT?

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Apr 30, 2023
@github-actions github-actions bot closed this May 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:google Google (including GCP) related issues stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Broken macro support in DataProcJobBuilder task_id parameter
4 participants