-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-5945] Make inbuilt OperatorLinks work when using Serialization #6715
Conversation
What do you think to save the generated links in a separate column in the database after Task execution? I think it will be nicer because it will not be storing additional partial data, but the final data that interests us. This could be realized independent of DAG serialization. In addition, it will simplify the logic of handling these links. Now it is not possible to download all links for a given task, but we download one link in one HTTP request. https://github.com/apache/airflow/blob/master/airflow/www/templates/airflow/dag.html#L492-L518 |
This is actually the 1st solution I mentioned to Ash and I thought it was the way to go :) Unfortunately Ash mentioned a use-case that ruled it out which was I think "S3 signed URL can have an expiration date of 15 mins and change after that". @ashb can probably explain that in more detail but due to that use-case storing the links in DB wouldn't work. |
Codecov Report
@@ Coverage Diff @@
## master #6715 +/- ##
==========================================
- Coverage 84.83% 84.54% -0.29%
==========================================
Files 669 669
Lines 37738 37834 +96
==========================================
- Hits 32014 31988 -26
- Misses 5724 5846 +122
Continue to review full report at Codecov.
|
Yeah, we did think about that as a solution (which means we should probably add a comment/"architecture decision record" somewhere) but the issue is that I know of a link I want to add to EMR that isn't fully static: Linking from EmrAddStepOperator to the job logs in S3, which involves signing a URL that will only be valid for 15minutes. (This uses the Airflow servers AWS credentials, rather than needing the user to have the right permissions, which doesn't really work with S3 easily.) Because of this only allowing static links in the DB would not work for this use case |
# If OperatorLinks is defined in Plugins but not in the Operator | ||
# set the Operator links attribute | ||
if op_extra_links_from_plugin and "_operator_links_sources" not in encoded_op: | ||
setattr(op, "operator_extra_links", list(op_extra_links_from_plugin.values())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we define Links in a plugin and in the operator?
I'm finding this code (including the existing code) a bit hard to follow. Hmmmm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we define in a plugin and in operator, then the following code get's executed:
https://github.com/apache/airflow/pull/6715/files#diff-f58748b66b2d3d00c8132103faea223fR114-R121
I can add more detailed comments if it is not clear.
for _operator_links_source in encoded_op_links: | ||
_operator_link_class, data = list(_operator_links_source.items())[0] | ||
try: | ||
single_link = import_string(_operator_link_class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we should do this here (the import string) - my first thought was that we should look through the loaded plugins only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is needed for the inbuilt OperatorLinks (BigQuery and Qubole) to work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somewhere I'd like to see us do a "static" deserialization that includes a Links - both ones defined in the operator, and one registered via a plugin, like we have with serialized_simple_dag_ground_truth
.
(it might be time to move those to JSON files rather than having them in the test file? Don't mind either way)
Added |
2ebd096
to
f6bb1a7
Compare
f1664d2
to
905ced1
Compare
68ecb72
to
6d98614
Compare
6d98614
to
3c6ad8e
Compare
op_predefined_extra_links = {} | ||
|
||
for _operator_links_source in encoded_op_links: | ||
_operator_link_class, data = list(_operator_links_source.items())[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably add this to our JSON Schema. Something like this is the part of the schema we want for op_links:
{ "type": "array",
"items": {
"type": "object",
"minProperties": 1,
"maxProperties": 1
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line took me a bit to work out what it's doing -- It modifies it in place, but _operator_link_class, data = _operator_links_source.popitem()
works and is a bit clearer to me. What do you think?
(Don't know if that works for py2, which isn't a problem here but is when backporting this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Make sure you have checked all steps below.
Jira
Description
The inbuilt Operator links were not working when using DAG Serialization (which was 1 of the limitations of DAG Serialization). The work-around previously was adding those OperatorLinks using Airflow Plugin.
This PR fixes it so we inbuilt links work out of the box.
Tests
Added
Commits
Documentation