-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make postgresql default engine args comply with SA 2.0 #38362
Make postgresql default engine args comply with SA 2.0 #38362
Conversation
I do not think we ever use row count after I am not 100% sure (maybe others who know SQLAlchemy better) - but I believe the only place where SQL Alchemy runs execute many for update would be in our "bulk_" methods. The only thin that The only place I could find that the bulk method would return anything is (in dag.py) return cls.bulk_write_to_db(dags=dags, session=session) But after closer inspection it always return None - because that's what bulk_write_to_db returns. I also looked at the
deactivated_dagmodel = session.execute(
update(DagModel)
.where(DagModel.dag_id.in_(to_deactivate))
.values(is_active=False)
.execution_options(synchronize_session="fetch")
)
deactivated = deactivated_dagmodel.rowcount
num_failed = session.execute(
update(Job)
.where(
Job.job_type == "SchedulerJob",
Job.state == JobState.RUNNING,
Job.latest_heartbeat < (timezone.utcnow() - timedelta(seconds=timeout)),
)
.values(state=JobState.FAILED)
).rowcount and update(TI)
.where(
TI.state == TaskInstanceState.DEFERRED,
TI.trigger_timeout < timezone.utcnow(),
)
.values(
state=TaskInstanceState.SCHEDULED,
next_method="__fail__",
next_kwargs={"error": "Trigger/execution timeout"},
trigger_id=None,
)
count += session.execute(
update(TI)
.where(
TI.dag_id == self.dag_id,
TI.run_id == self.run_id,
tuple_in_condition((TI.task_id, TI.map_index), schedulable_ti_ids_chunk),
)
.values(state=TaskInstanceState.SCHEDULED)
.execution_options(synchronize_session=False)
).rowcount and update(TI)
.where(
TI.dag_id == self.dag_id,
TI.run_id == self.run_id,
tuple_in_condition((TI.task_id, TI.map_index), dummy_ti_ids_chunk),
)
.values(
state=TaskInstanceState.SUCCESS,
start_date=timezone.utcnow(),
end_date=timezone.utcnow(),
duration=0,
)
.execution_options(
synchronize_session=False,
)
).rowcount Here is the doc,
It would be good if others (@ashb @dstandish @kaxil @uranusjr could take a look as well and see if they can confirm that we should not be affected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. But I think we need more confirmation that it should be safe to change the default
Note this not a change in default. So behavior should remaining the same in 1.4. Some snippet for local validation from sqlalchemy import create_engine
from importlib.metadata import version
SQL_ALCHEMY_CONN = "postgresql+psycopg2://postgres:[email protected]:25433/airflow"
args = {
"current": {
"executemany_mode": "values",
"executemany_values_page_size": 10000,
"executemany_batch_page_size": 2000,
},
"proposed": {
"executemany_mode": "values_plus_batch",
"executemany_values_page_size": 10000,
"executemany_batch_page_size": 2000,
},
"not-set": {}
}
print(f"SQLAlchemy version: {version('sqlalchemy')}")
for name, pg_default_args in args.items():
print(f" {name} ".center(72, "="))
try:
engine = create_engine(SQL_ALCHEMY_CONN, **pg_default_args, future=True)
except Exception as e:
print(f"Input: {pg_default_args.get('executemany_mode')!r}, error: {e}")
else:
print(f"Input: {pg_default_args.get('executemany_mode')!r}, output: {engine.dialect.executemany_mode}")
engine.dispose() Outputs on different versions
|
Yeah it just postpone error to another value, in SA 2.0 it should be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am convinced then :)
And modified snippet, regular tests still can't run with SA20 😢 from sqlalchemy import create_engine
from importlib.metadata import version
from airflow.settings import DEFAULT_ENGINE_ARGS
SQL_ALCHEMY_CONN = "postgresql+psycopg2://postgres:[email protected]:25433/airflow"
args = {
"current": {
"executemany_mode": "values",
"executemany_values_page_size": 10000,
"executemany_batch_page_size": 2000,
},
"from-airflow-settings": DEFAULT_ENGINE_ARGS.get("postgresql"),
"not-set": {}
}
print(f"SQLAlchemy version: {version('sqlalchemy')}")
for name, pg_default_args in args.items():
print(f" {name} ".center(72, "="))
print(f"Input: {pg_default_args}")
try:
engine = create_engine(SQL_ALCHEMY_CONN, **pg_default_args)
except Exception as e:
print(f"Error: {e}")
else:
print(f"executemany_mode: {getattr(engine.dialect, 'executemany_mode', 'NOT-EXISTS')}")
print(f"executemany_batch_page_size: {getattr(engine.dialect, 'executemany_batch_page_size', 'NOT-EXISTS')}")
print(f"insertmanyvalues_page_size: {getattr(engine.dialect, 'insertmanyvalues_page_size', 'NOT-EXISTS')}")
print(f"executemany_values_page_size: {getattr(engine.dialect, 'executemany_values_page_size', 'NOT-EXISTS')}")
engine.dispose() SQLAlchemy version: 2.0.28
=============================== current ================================
Input: {'executemany_mode': 'values', 'executemany_values_page_size': 10000, 'executemany_batch_page_size': 2000}
Error: Invalid value for 'executemany_mode': 'values'
======================== from-airflow-settings =========================
Input: {'executemany_mode': 'values_plus_batch', 'insertmanyvalues_page_size': 10000, 'executemany_batch_page_size': 2000}
executemany_mode: symbol('EXECUTEMANY_VALUES_PLUS_BATCH')
executemany_batch_page_size: 2000
insertmanyvalues_page_size: 10000
executemany_values_page_size: NOT-EXISTS
=============================== not-set ================================
Input: {}
executemany_mode: symbol('EXECUTEMANY_VALUES')
executemany_batch_page_size: 100
insertmanyvalues_page_size: 1000
executemany_values_page_size: NOT-EXISTS SQLAlchemy version: 1.4.52
=============================== current ================================
Input: {'executemany_mode': 'values', 'executemany_values_page_size': 10000, 'executemany_batch_page_size': 2000}
executemany_mode: symbol('executemany_values_plus_batch')
executemany_batch_page_size: 2000
insertmanyvalues_page_size: NOT-EXISTS
executemany_values_page_size: 10000
======================== from-airflow-settings =========================
Input: {'executemany_mode': 'values_plus_batch', 'executemany_values_page_size': 10000, 'executemany_batch_page_size': 2000}
executemany_mode: symbol('executemany_values_plus_batch')
executemany_batch_page_size: 2000
insertmanyvalues_page_size: NOT-EXISTS
executemany_values_page_size: 10000
=============================== not-set ================================
Input: {}
executemany_mode: symbol('executemany_values')
executemany_batch_page_size: 100
insertmanyvalues_page_size: NOT-EXISTS
executemany_values_page_size: 1000 |
* Change Postgres default executemany_mode to 'values_plus_batch' * executemany_values_page_size > insertmanyvalues_page_size for SA20
'values' is old mode from SQLAlchemy <= 1.3
SA 1.3: https://docs.sqlalchemy.org/en/13/dialects/postgresql.html#psycopg2-fast-execution-helpers
SA 1.4: https://docs.sqlalchemy.org/en/14/dialects/postgresql.html#psycopg2-fast-execution-helpers
SA 2.0: https://docs.sqlalchemy.org/en/20/dialects/postgresql.html#psycopg2-fast-execution-helpers
It still work with 1.4 hover it presumably fail in 2.0
As far as I understand
values_plus_batch
it is the same asvalues
in 1.4: sqlalchemy/sqlalchemy#5401This one is continuation of: #38066
closes: #38064 ?
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.