-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add optional result handler to database hooks #15581
Conversation
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
|
The following is an example of an alternative implementation that works with the existing codebase, specifically addressing the requirement of calling a stored procedure that has one or more out parameters. Simply use a dummy parameter with the right Python type for each out parameter (e.g. the integer from airflow.models import BaseOperator
from airflow.providers.oracle.hooks.oracle import OracleHook
from airflow.utils.decorators import apply_defaults
class OracleCallOperator(BaseOperator):
ui_color = '#ededed'
@apply_defaults
def __init__(
self,
*,
function: str,
oracle_conn_id: str = 'oracle_default',
parameters: Optional[Union[Mapping, Iterable]] = None,
autocommit: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.oracle_conn_id = oracle_conn_id
self.function = function
self.autocommit = autocommit
self.parameters = parameters
def execute(self, context) -> None:
self.log.info('Executing: %s', self.function)
hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
args = ", ".join(
f":{name}" for name in (
self.parameters if isinstance(self.parameters, dict)
else range(1, len(self.parameters) + 1)
)
)
sql = f"BEGIN {self.function}({args}); END;"
with hook.get_conn() as conn, conn.cursor() as cursor:
cursor.execute(sql, self.parameters)
if isinstance(cursor.bindvars, list):
return [v.getvalue() for v in cursor.bindvars]
if isinstance(cursor.bindvars, dict):
return {n: v.getvalue() for (n, v) in cursor.bindvars.items()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just making sure we won't push this breaking change to Oracle Provider.
523a77e
to
553e79b
Compare
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason. |
@potiuk I decided to remove the more opinionated part of this PR – I think it's better to start with a more basic approach and then see if it's useful to add more functionality to the Operator-level later on. |
Hey @malthe - isn't that change now something different :) ? From what I see - what's left is Just adding handlers to the DB hooks. Which I think is very good and we should merge it before 2.1 so that the next wave of providers which will be 2.1+ only make use of it ? Would you mind changing the commit title/message? We have a major outage of GitHub Actions now, I hope it will be solved soon and finally we will be able to merge it: https://www.githubstatus.com/incidents/zbpwygxwb3gw |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some unit tests covering passing a handler in would have been nice, but given that path is simple we can merge this anyway to get in in 2.1
@malthe Could you follow up this PR with a unit test to cover passing in a handler please?
@ashb there is a rather rudimentary unit test added |
Oh no that's good. I missed it, sorry! |
@malthe - there are some static checks failing about trailing whitespace - can you please fix ? I hope it's going to be the last one! |
@malthe - plus the unit tests are failing with int comparision on Mocks :) https://github.com/apache/airflow/pull/15581/checks?check_run_id=2599840179#step:6:12848 |
But the CI seems to finally be alive & kicking. Seems like GitHuib had some big troubles with Actions that culminated in weekend outtage and seems to be fixed now. |
And @malthe -> not sure what timezone you are in, but we would love to release 2.1 rc1 tonight and get that change in so that when we release next wave of providers with >= 2.1 we could already use this feature. |
Copenhagen :D. So there is a chance ;). |
This allows retrieving result rows, or other post-query access to the cursor object.
I just pushed a fix to your PR -- fixed the style, moved test from oracle to bash db hook, and tested scalar and list types. Oh and added Changelog as we've already merged it (not normally something you have to worry about) :) |
|
||
mock_run.assert_called_once_with(sql, autocommit=autocommit, parameters=parameters) | ||
assert len(mock_run.mock_calls) == 1 | ||
assert mock_run.mock_calls[0].args[1] == sql |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, .args
doesn't exist on Py3.6
The Workflow run is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason. |
@ashb wait so it seemed like you already fixed most or all of the remaining issues? |
Two random failures. Merging. |
Cool! Thanks @ashb ! |
I got this working, it looked like this: def handler(cur):
hello, world = cur.fetchone()
message = f"{hello} {world}!"
return message
@task
def mytask():
hook = SqliteHook(sqlite_conn_id="my_conn_id")
message = hook.run(my_sql, handler=handler)
print(message) As it stands, I don't think this works: SqliteOperator(
task_id="some_id",
sql=my_sql,
sqlite_conn_id="sqlite_conn_id",
handler=handler #<--- no such kwarg
) Do we want to add the handler kwarg to the operators too? Not that the return value would be helpful there, but users could call |
@MatrixManAtYrService Yeah, as it stands now this hasn't been exposed to any of the Operators -- it was mostly designed as an extra tool in the db hook for writing operator features.
If it's written as part of the operator, then you can make a closure to get the context vars you need. |
@MatrixManAtYrService with the new Taskflow API, coding up a custom task using the hooks directly seems like a more simple approach than trying to add a closure to the operator. I had actually started out with an implementation that made the operator smarter about handling a query result but realized halfway that it's not really that helpful. |
This is useful when executing a statement where out variables are required – for example a stored procedure.
Where an out parameter is required, pass a Python value of the required type (for example
0
for an integer, or "dummy" for a string).Note that this scheme is supported by the Oracle driver itself. In the future, a more elaborate scheme can be implemented which could allow the use of https://cx-oracle.readthedocs.io/en/latest/api_manual/cursor.html#Cursor.var.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.