Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix KubernetesPodOperator reattachment #10230

Merged
merged 5 commits into from
Aug 11, 2020

Conversation

dimberman
Copy link
Contributor

in 1.10.11 we introduced a bug where the KubernetesPodOperator
was not properly reattaching due to implementation errors.

This fix will allow users to determine reattachment based on the
reattach_on_restart config


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

in 1.10.11 we introduced a bug where the KubernetesPodOperator
was not properly reattaching due to implementation errors.

This fix will allow users to  determine reattachment based on the
`reattach_on_restart` config
@boring-cyborg boring-cyborg bot added the k8s label Aug 7, 2020
@dimberman dimberman requested review from kaxil and ashb August 7, 2020 21:30
@dimberman
Copy link
Contributor Author

cc: @danccooper

@danccooper
Copy link

LGTM, logic is much clearer, thank you.

One thing to consider is the comment by @dakov here: #6377 (comment)

Perhaps on line 280 where we check for 0 or 1 existing pod & raise otherwise, we should only raise if reattach_on_restart is True? As if it is False then we probably don't care & we will create another pod anyway. What do you think?

@dimberman
Copy link
Contributor Author

LGTM, logic is much clearer, thank you.

One thing to consider is the comment by @dakov here: #6377 (comment)

Perhaps on line 280 where we check for 0 or 1 existing pod & raise otherwise, we should only raise if reattach_on_restart is True? As if it is False then we probably don't care & we will create another pod anyway. What do you think?

Yeah that makes sense. Tbh I'll be surprised if many people turn off reattach_on_restart as it seems like the logical step to take.

@dimberman dimberman requested a review from kaxil August 9, 2020 16:39
@danccooper
Copy link

Thanks @dimberman LGTM 👍

@kaxil kaxil merged commit 8cd2be9 into apache:master Aug 11, 2020
@kaxil kaxil deleted the remove_k8s_try_number_check branch August 11, 2020 14:01
dimberman added a commit that referenced this pull request Aug 11, 2020
kaxil pushed a commit that referenced this pull request Aug 11, 2020
kaxil pushed a commit that referenced this pull request Aug 15, 2020
@FloChehab
Copy link
Contributor

FloChehab commented Aug 24, 2020

Hi @dimberman,

I was doing more airflow testing and I think this PR also addresses this issue #10325 (I was having on older Airflow version). Which is pretty great (we had issues in production with this the other day) !

Unfortunately, I still can experience issues with the KubernetesPodOperator (with the latest 1.10.12rc):

  • Process: start airflow, trigger the dag with KubernetesPodOperator, kill everything except the pod with the task, wait for the task to complete (status Completed on the kubernetes API),
  • When the scheduler is restarted, the task seems to be stuck in "up_for_retry" ; if I restart the scheduler again, then it is marked as success ([scheduler] [2020-08-24 17:36:16,190] {base_executor.py:157} DEBUG - Changing state: ('bug_kuberntes_pod_operator', 'task', datetime.datetime(2020, 8, 24, 17, 27, 49, 493579, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 2)). Weird ; this seems to be 100% reproducible (I've tried three times).
  • Also, I've experienced a situation (still with the latest 1.10.12rc), where the task would be marked as "completed" on kubernetes side and "running" (at least for 30+ mins ; afterwards it was marked as failed) on airflow side (without scheduler restarts, if I remember correctly ; but that's why I found the issue above in the first place) ; I haven't reproduced it yet. Could there be weird edge cases where this could happen ? (I am working with the latest chart + celery executor + keda).

@dimberman
Copy link
Contributor Author

Hi @FloChehab,

Can you please post the scheduler logs for the scheduler where it is up for retry + the DAG code? Seems odd that on second restart it would come out as a success and just want to make sure.

@kaxil
Copy link
Member

kaxil commented Aug 24, 2020

Just to be clear @FloChehab , were these issues introduced from 1.10.11 or 1.10.12rcs ?

If not we will still definitely fix it, but will continue releasing 1.10.12

@FloChehab
Copy link
Contributor

FloChehab commented Aug 24, 2020

Hi @FloChehab,

Can you please post the scheduler logs for the scheduler where it is up for retry + the DAG code? Seems odd that on second restart it would come out as a success and just want to make sure.

Here you go:

Dag:

from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.kubernetes.secret import Secret
from airflow.models import DAG
from airflow.utils.dates import days_ago


default_args = {
    'owner': 'Airflow',
    'start_date': days_ago(2),
    'retries': 3
}

with DAG(
    dag_id='bug_kuberntes_pod_operator',
    default_args=default_args,
    schedule_interval=None
) as dag:
    k = KubernetesPodOperator(
        namespace='dev-airflow-helm',
        image="ubuntu:16.04",
        cmds=["bash", "-cx"],
        arguments=["sleep 100"],
        name="airflow-test-pod",
        task_id="task",
        get_logs=True,
        is_delete_operator_pod=True,
    )

Logs during first restart (stuck in up_for_retry in DB and UI)

second_restart.log

For some reason I didn't get the DEBUG - Changing state: this time ; but the task was correctly labelled with "success" after the second restart.

@FloChehab
Copy link
Contributor

Just to be clear @FloChehab , were these issues introduced from 1.10.11 or 1.10.12rcs ?

If not we will still definitely fix it, but will continue releasing 1.10.12

Sure, that what I was thinking too. Regarding the version it was introduced, I'd say that before 1.10.12 we had a way bigger problem so I definitely don't see a blocker for releasing 1.10.12.

@FloChehab
Copy link
Contributor

(But I can't say if the issue was present or not in airflow >1.10.2&<1.10.12 as I haven't tested those versions)

@FloChehab
Copy link
Contributor

FloChehab commented Aug 24, 2020

Same phenomenon with LocalExecutor (I've cleaned all the Persistent Volume Claim before testing with the LocalExecutor):

State in db just before the scheduler (after the second restarts), picks the task:

postgres=# select * from task_instance where state = 'up_for_retry';
-[ RECORD 1 ]---+------------------------------
task_id         | task
dag_id          | bug_kuberntes_pod_operator
execution_date  | 2020-08-24 19:02:51.616716+00
start_date      | 2020-08-24 19:02:57.048154+00
end_date        | 2020-08-24 19:05:56.199493+00
duration        | 179.151339
state           | up_for_retry
try_number      | 1
hostname        | 
unixname        | airflow
job_id          | 2
pool            | default_pool
queue           | celery
priority_weight | 1
operator        | KubernetesPodOperator
queued_dttm     | 2020-08-24 19:02:53.814477+00
pid             | 628
max_tries       | 3
executor_config | \x80057d942e
pool_slots      | 1

EDIT: in the case of the localExecutor I am starting the webserver with the scheduler, so don't get confused by what the logs say sometimes.

@dimberman
Copy link
Contributor Author

Thank you @FloChehab. I think since this feature was already broken in 1.10.11 we're not going to block the 1.10.12 release for this, though this should be a necessary fix for 1.10.13

@FloChehab
Copy link
Contributor

Thank you @FloChehab. I think since this feature was already broken in 1.10.11 we're not going to block the 1.10.12 release for this, though this should be a necessary fix for 1.10.13

👍 I have not encountered the case where 2 pods end up running the same task simultaneously while testing the latest 1.10.12rc (which can cause some real inconsistencies in our case -- we had this issue on Friday on an old 1.10.2 airflow). So no blocker for me here. I will add some comments in the issue tomorrow.

@FloChehab
Copy link
Contributor

FloChehab commented Sep 8, 2020

@dimberman Do you have instructions on how to install airflow reproducibility ? (so that we compare the same thing -- I am not familiar with installing airflow manually) I am using the official helm chart + LocalExecutor + latest apache/airflow:1.10.12-python3.8 . By judging on the git diff between v1-10-test and v1-10-stable, there shouldn't be many things impacting this issue. And I definitely have it with my setup (just retested) on 1.10.12.

@FloChehab
Copy link
Contributor

@dimberman
Copy link
Contributor Author

@FloChehab what I did was the following:

  1. Install airflow source and git checkout apache/v1-10-test
  2. Installed postgres on my mac and ran pg_ctl -D /usr/local/var/postgres start
  3. Set the following values on my airflow.cfg
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = LocalExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = postgres://localhost:5432/airflow
  1. started two terminals and ran "airflow webserver" and "aiflow scheduler"

@FloChehab
Copy link
Contributor

FloChehab commented Sep 8, 2020

So I have something a bit magical going on:

  • Same setup as you,
  • Same process.

However I don't even need to restart the webserver or the scheduler:

  • I start the task
  • Wait for the pod to be in running state (on a remote cluster)
  • Stop the scheduler / webserver
  • Check on db the state of task => runnning
  • Pod finishes
  • State on db => success
  • Pods is deleted

I don't really get what is going on ; nor how a pod in a remote cluster can talk to my local airflow db (which shouldn't be what is going on). I must have some airflow process in the background monitoring the pod, but I can't seem to find it... Too many weird stuff going on.

@dimberman
Copy link
Contributor Author

Hmm... this might have to do with airflow leaving behind a zombie process, so it's harder to get a real interruption when running locally. Will test that now.

@dimberman
Copy link
Contributor Author

So ok, funny enough, I think because we added an on_kill to the KubernetesPodOperator, it now kills the pod if the process dies. Not sure if that counts as a solution or not, gonna need to think about this.

@dimberman
Copy link
Contributor Author

Oh wait it's not the on_kill.

It's these lines

  try:
            launcher.start_pod(
                pod,
                startup_timeout=self.startup_timeout_seconds)
            final_state, result = launcher.monitor_pod(pod=pod, get_logs=self.get_logs)
        except AirflowException as ex:
            if self.log_events_on_failure:
                for event in launcher.read_pod_events(pod).items:
                    self.log.error("Pod Event: %s - %s", event.reason, event.message)
            raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
        finally:
            if self.is_delete_operator_pod:
                launcher.delete_pod(pod)
        return final_state, pod, result

@dimberman
Copy link
Contributor Author

so if it recieves an error from a SIGTERM it deletes the pod because of is_delete_operator_pod

@FloChehab
Copy link
Contributor

Hum, I am not sure I would do that. I think that the life of the worker / "object" that is starting / monitoring / etc. the pod shouldn't impact the pod itself (we have usecases with very long jobs started from airflow on kubernetes and I don't think it would play nicely with this)

@dimberman
Copy link
Contributor Author

Yeah agreed. For now if you set is_delete_operator_pod to false it fixes it.

@FloChehab
Copy link
Contributor

So, with is_delete_operator_pod=False and doing the same process (including manually killing the zombie process), I do have the bug I was describing: it took me 4 scheduler restarts to have the task go from "up_for_retry" to "running", then quickly "success".

scheduler.log

@dimberman
Copy link
Contributor Author

@FloChehab what happens if you are running this with the helm chart, you get to the "up_for_retry" state, and then you manually rerun the task with "ignore all deps"

Screen Shot 2020-09-08 at 12 48 01 PM

@FloChehab
Copy link
Contributor

Ok let's see :)

@FloChehab
Copy link
Contributor

(just need a bit more time to build the production image for 1.10-test)

@FloChehab
Copy link
Contributor

Just tested with 1.10.12 (while the image is building) and is_delete_operator_pod=false. This time the task seemed stucked in running on first scheduler restart. And I got this if I tried the suggested action. I guess I am going to test with a Celery setup:

 Only works with the Celery or Kubernetes executors, sorry 

@FloChehab
Copy link
Contributor

So this time with image from v1-10-test + helm + KEDA:

  • Task is stuck on running on scheduler restart (no tasks are queued on redis)
  • If run + ignore all deps => task get queued on redis again, and is quickly set to success as expected.

@FloChehab
Copy link
Contributor

And the scheduler logs on restart:
scheduler.log

@dimberman I have to stop my investigations for today, but I'll be more than happy to help tomorrow.

@dimberman
Copy link
Contributor Author

@FloChehab Ok that's a good sign (thank you btw). One more question, have you tried leaving the task in up_for_retry and seeing if the scheduler eventually picks it up?

@ashb @kaxil this seems like it might just be the scheduler retry_timeout yeah? Like the clock to retry a failed task starts when the scheduler restarts and just takes a few minutes?

@FloChehab
Copy link
Contributor

FloChehab commented Sep 8, 2020

@FloChehab Ok that's a good sign (thank you btw). One more question, have you tried leaving the task in up_for_retry and seeing if the scheduler eventually picks it up?

Ok, i'll try that last one (in 1.10.12 + LocalExecutor + is_delete_operator_pod=True -- otherwise I won't get the stuck in up for retry), take a swim and come back.

@FloChehab
Copy link
Contributor

FloChehab commented Sep 8, 2020

@dimberman You were right ! After ~10 minutes it got picked out of "up_for_retry" state.

I guess I was a bit confused by the logs showing that the scheduler is running and not taking up "up_for_retry" tasks.

partial-scheduler.log

EDIT: must have been 5 minutes actually.

@FloChehab
Copy link
Contributor

And the default retry_delay seems to be 300s so everything seems to be ok. Let's just try with a shorter retry delay.

@FloChehab
Copy link
Contributor

FloChehab commented Sep 8, 2020

So, I've set retry_delay to 10s. On scheduler restart the task is stucked in "running" state for ~4 minutes (while being "completed" on kubernetes side before scheduler restart) then it switches to up_for_retry and finally 10s later, everything is fine.

@FloChehab
Copy link
Contributor

Hi,

So to sum up:

@subhendudey
Copy link

I am facing the same issue with KubernetesExecutor + KubernetesPodOperator. The only error I can see is in scheduler log where it says:

[2020-09-11 05:36:13,724] {scheduler_job.py:1351} ERROR - Executor reports task instance <TaskInstance: kubernetes_sample.passing-task 2020-09-11 05:30:59.027877+00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
[2020-09-11 05:36:13,724] {dagbag.py:417} INFO - Filling up the DagBag from /root/airflow/dags/kubernetes_sample.py
[2020-09-11 05:36:13,734] {taskinstance.py:1150} ERROR - Executor reports task instance <TaskInstance: kubernetes_sample.passing-task 2020-09-11 05:30:59.027877+00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?

Did anyone face this issue and have any solution?

@dimberman dimberman added this to the Airflow 1.10.13 milestone Sep 16, 2020
@kaxil kaxil added provider:cncf-kubernetes Kubernetes provider related issues and removed area:k8s labels Nov 18, 2020
cfei18 pushed a commit to cfei18/incubator-airflow that referenced this pull request Mar 5, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
provider:cncf-kubernetes Kubernetes provider related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants