-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-1467] Dynamic pooling via allowing tasks to use more than one pool slot (depending upon the need) #7160
Conversation
Codecov Report
@@ Coverage Diff @@
## master #7160 +/- ##
========================================
- Coverage 85.41% 84.42% -1%
========================================
Files 753 753
Lines 39685 39693 +8
========================================
- Hits 33898 33509 -389
- Misses 5787 6184 +397
Continue to review full report at Codecov.
|
Hello @dimberman, Could you please review this PR. This is same PR as earlier #6975. In this PR, I have updated the migration head as suggested by potiuk. Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other thing you will need to do is add support for this column in to the Serialized DAG format.
We should add this as an (optional) field in to airflow/serialization/schema.json
and most things should be handled already. The important thing to test is that the existing "ground truth" dag in tests/serializtion/test_dag_serialization.py
should have this field set correctly when it is deserializaed without having to update the JSON blob -- that ensures that when upgrading this will behave itself. Please add tests covering that.
airflow/models/taskinstance.py
Outdated
@@ -194,6 +195,11 @@ def __init__(self, task, execution_date, state=None): | |||
|
|||
self.queue = task.queue | |||
self.pool = task.pool | |||
if hasattr(task, 'pool_capacity'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This hasattr check shouldn't be needed -- you've added it to baseoperator
airflow/models/taskinstance.py
Outdated
@@ -194,6 +195,11 @@ def __init__(self, task, execution_date, state=None): | |||
|
|||
self.queue = task.queue | |||
self.pool = task.pool | |||
if hasattr(task, 'pool_capacity'): | |||
self.pool_capacity = task.pool_capacity | |||
if task.pool_capacity < 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check should be done when creating/setting it on the Task/operator, not here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved the check to baseoperator
|
||
if open_slots <= 0: | ||
if open_slots <= (ti.pool_capacity - 1): | ||
yield self._failing_status( | ||
reason=("Not scheduling since there are %s open slots in pool %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extend this message to say how many slots we are looking for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
modified the message to
reason=("Not scheduling since there are %s open slots in pool %s "
"and require %s pool slots",
open_slots, pool_name, ti.pool_slots)
self.assertFalse(PoolSlotsAvailableDep().is_met(ti=ti)) | ||
|
||
@patch('airflow.models.Pool.open_slots', return_value=1) | ||
# pylint: disable=unused-argument | ||
def test_pooled_task_pass(self, mock_open_slots): | ||
ti = Mock(pool='test_pool') | ||
ti = Mock(pool='test_pool', pool_capacity=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't 1 the default, meaning most of these changes in tests aren't needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed pool_capacity from most of the places except few where it is absolutely required.
airflow/models/baseoperator.py
Outdated
@@ -178,6 +178,9 @@ class derived from this one results in the creation of a task object, | |||
:param pool: the slot pool this task should run in, slot pools are a | |||
way to limit concurrency for certain tasks | |||
:type pool: str | |||
:param pool_capacity: the number of pool slots this task should use (>= 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Capacity is the overall size of the pool, and in the logs etc we talk about slots ("Not scheduling since there are %s open slots in pool %s") so do you think this would be better named as pool_slots
WDYT @tooptoop4 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes pool_slots sounds more reasonable. renamed pool_capacity to pool_slots
(sorry to add more work for you after the first PR was merged!) |
I have added automated test covering this case - so that in the future this should be apparent that you should do it #7162 |
Thank you everyone for the direction. I am looking at them one by one. |
I have added the pool_slots in schema.json. I will wait for your inputs on #7162 . Just to clarify, the test should create a dag and deserialize the dag and check it against the "ground truth" dict. and pool_slots needs to be added in the ground truth dict as well. |
Thanks for your patience @lokeshlal -> I think we would like to wait with that for @dimberman or @kaxil who knows best the serialisation part and agree with him how to handle the scenario of added field in BaseOperator. It happened for the first time since we introduced serialisation, so we do not have fully hashed-out scenario! Kaxil is now travelling to India and have some holidays so it might take some time (few days maybe) until we synchronise, so if you can bear with us a little more, that will be great! Thanks for the contribution BTW. It looks great! |
Thanks for waiting @lokeshlal , I am currently on Leave but would be reviewing it very soon :) |
So just update the json @lokeshlal. Here is the current message that you will get when we merge #7162. So please follow it and let us know if it's clear message
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The #7162 test is merged now - so please rebase, observe the test failing ;) and fix it then @lokeshlal
closes apache#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache#7160
closes #13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by #7160
closes #13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by #7160 (cherry picked from commit f763b7c)
closes apache/airflow#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache/airflow#7160 (cherry picked from commit f763b7c3aa9cdac82b5d77e21e1840fbe931257a) GitOrigin-RevId: 8c956756e7b60ee265a73309cb3a245966a7477c
closes apache/airflow#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache/airflow#7160 GitOrigin-RevId: f763b7c3aa9cdac82b5d77e21e1840fbe931257a
closes apache/airflow#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache/airflow#7160 GitOrigin-RevId: f763b7c3aa9cdac82b5d77e21e1840fbe931257a
closes apache/airflow#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache/airflow#7160 GitOrigin-RevId: f763b7c3aa9cdac82b5d77e21e1840fbe931257a
closes apache/airflow#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache/airflow#7160 GitOrigin-RevId: f763b7c3aa9cdac82b5d77e21e1840fbe931257a
closes apache/airflow#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache/airflow#7160 GitOrigin-RevId: f763b7c3aa9cdac82b5d77e21e1840fbe931257a
closes apache/airflow#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache/airflow#7160 GitOrigin-RevId: f763b7c3aa9cdac82b5d77e21e1840fbe931257a
closes apache/airflow#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache/airflow#7160 GitOrigin-RevId: f763b7c3aa9cdac82b5d77e21e1840fbe931257a
closes apache/airflow#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache/airflow#7160 GitOrigin-RevId: f763b7c3aa9cdac82b5d77e21e1840fbe931257a
closes apache/airflow#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache/airflow#7160 GitOrigin-RevId: f763b7c3aa9cdac82b5d77e21e1840fbe931257a
closes apache/airflow#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache/airflow#7160 GitOrigin-RevId: f763b7c3aa9cdac82b5d77e21e1840fbe931257a
closes apache/airflow#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache/airflow#7160 GitOrigin-RevId: f763b7c3aa9cdac82b5d77e21e1840fbe931257a
closes apache/airflow#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache/airflow#7160 GitOrigin-RevId: f763b7c3aa9cdac82b5d77e21e1840fbe931257a
closes apache/airflow#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache/airflow#7160 GitOrigin-RevId: f763b7c3aa9cdac82b5d77e21e1840fbe931257a
closes apache/airflow#13799 Without it the migration from 1.10.14 to 2.0.0 can fail with following error for old TIs: ``` Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute self._run_scheduler_loop() File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1533, in _do_scheduling num_queued_tis = self._critical_section_execute_task_instances(session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1132, in _critical_section_execute_task_instances queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 1034, in _executable_task_instances_to_queued if task_instance.pool_slots > open_slots: TypeError: '>' not supported between instances of 'NoneType' and 'int' ``` Workaround was to run manually: ``` UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL; ``` This commit makes adds a DB migration to change the value to 1 for records with NULL value. And makes the column NOT NULLABLE. This bug was caused by apache/airflow#7160 GitOrigin-RevId: f763b7c3aa9cdac82b5d77e21e1840fbe931257a
PR contains changes in pool and task instance to provide functionality to tasks to use more than one pool slot.
[AIRFLOW-XXXX]
for document-only changesIn case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.