Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

set RestartPolicy=Never for executor #367

Merged

Conversation

honkiko
Copy link

@honkiko honkiko commented Jul 4, 2017

As for current implementation the RestartPolicy of executor pod is
not set, so the default value "OnFailure" is in effect. But this
causes problem.

If an executor is terminated unexpectedly, for example, exit by
java.lang.OutOfMemoryError, it'll be restarted by k8s with the
same executor ID. When the new executor tries to fetch a block hold by
the last executor, ShuffleBlockFetcherIterator.splitLocalRemoteBlocks()
think it's a local block and tries to read it from it's local dir.
But the executor's local dir is changed because random generated ID is
part of local dir. FetchFailedException will raise and the stage will
fail.

The rolling Error message:

17/06/29 01:54:56 WARN KubernetesTaskSetManager: Lost task 0.1 in stage
2.0 (TID 7, 172.16.75.92, executor 1): FetchFailed(BlockManagerId(1,
172.16.75.92, 40539, None), shuffleId=2, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException:
/data2/spark/blockmgr-0e228d3c-8727-422e-aa97-2841a877c42a/32/shuffle_2_0_0.index
(No such file or directory)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

What changes were proposed in this pull request?

set RestartPolicy=Never for executor

How was this patch tested?

Tested on my local testbed and the executor RestartPolicy is now "Never".
Should work together with #244

Please review http://spark.apache.org/contributing.html before opening a pull request.

As for current implementation the RestartPolicy of executor pod is
not set, so the default value "OnFailure" is in effect. But this
causes problem.

If an executor is terminated unexpectedly, for example, exit by
java.lang.OutOfMemoryError,  it'll be restarted by k8s with the
same executor ID.  When the new executor tries to fetch a block hold by
the last executor, ShuffleBlockFetcherIterator.splitLocalRemoteBlocks()
think it's a **local** block and tries to read it from it's local dir.
But the executor's local dir is changed because random generated ID is
part of local dir. FetchFailedException will raise and the stage will
fail.

The rolling Error message:

17/06/29 01:54:56 WARN KubernetesTaskSetManager: Lost task 0.1 in stage
2.0 (TID 7, 172.16.75.92, executor 1): FetchFailed(BlockManagerId(1,
172.16.75.92, 40539, None), shuffleId=2, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException:
/data2/spark/blockmgr-0e228d3c-8727-422e-aa97-2841a877c42a/32/shuffle_2_0_0.index
(No such file or directory)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
@erikerlandson
Copy link
Member

This seems like it will work in the Dynamic Allocation scenario; if dynamic allocation isn't on, does this mean the cluster size remains diminished by one?

@foxish
Copy link
Member

foxish commented Jul 6, 2017

We added this for the driver in #303. However, the expectation was that the executor will recover. However, it seems like having it reconnect with the same ID doesn't really let us accomplish that. This is okay as long as executor recovery semantics handle this and respawn executors when lost anyway. @varunkatta can you confirm that we'll get the same behavior even if we disable executor restarts?

@foxish
Copy link
Member

foxish commented Jul 7, 2017

/cc @kimoonkim

@ash211
Copy link

ash211 commented Jul 19, 2017

Minor merge conflict, but +1 from me. We shouldn't be restarting executors with the same ID, so better to do the restart at the Spark driver level than k8s.

@foxish
Copy link
Member

foxish commented Jul 19, 2017

Will merge on green

@foxish foxish merged commit e1ff2f0 into apache-spark-on-k8s:branch-2.1-kubernetes Jul 19, 2017
@varunkatta
Copy link
Member

ExectuorRecovery should work even with restartPolicy set to Never. This change is good to go, I think.

foxish pushed a commit that referenced this pull request Jul 24, 2017
* set RestartPolicy=Never for executor

As for current implementation the RestartPolicy of executor pod is
not set, so the default value "OnFailure" is in effect. But this
causes problem.

If an executor is terminated unexpectedly, for example, exit by
java.lang.OutOfMemoryError,  it'll be restarted by k8s with the
same executor ID.  When the new executor tries to fetch a block hold by
the last executor, ShuffleBlockFetcherIterator.splitLocalRemoteBlocks()
think it's a **local** block and tries to read it from it's local dir.
But the executor's local dir is changed because random generated ID is
part of local dir. FetchFailedException will raise and the stage will
fail.

The rolling Error message:

17/06/29 01:54:56 WARN KubernetesTaskSetManager: Lost task 0.1 in stage
2.0 (TID 7, 172.16.75.92, executor 1): FetchFailed(BlockManagerId(1,
172.16.75.92, 40539, None), shuffleId=2, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException:
/data2/spark/blockmgr-0e228d3c-8727-422e-aa97-2841a877c42a/32/shuffle_2_0_0.index
(No such file or directory)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

* Update KubernetesClusterSchedulerBackend.scala
ifilonenko pushed a commit to ifilonenko/spark that referenced this pull request Feb 26, 2019
puneetloya pushed a commit to puneetloya/spark that referenced this pull request Mar 11, 2019
* set RestartPolicy=Never for executor

As for current implementation the RestartPolicy of executor pod is
not set, so the default value "OnFailure" is in effect. But this
causes problem.

If an executor is terminated unexpectedly, for example, exit by
java.lang.OutOfMemoryError,  it'll be restarted by k8s with the
same executor ID.  When the new executor tries to fetch a block hold by
the last executor, ShuffleBlockFetcherIterator.splitLocalRemoteBlocks()
think it's a **local** block and tries to read it from it's local dir.
But the executor's local dir is changed because random generated ID is
part of local dir. FetchFailedException will raise and the stage will
fail.

The rolling Error message:

17/06/29 01:54:56 WARN KubernetesTaskSetManager: Lost task 0.1 in stage
2.0 (TID 7, 172.16.75.92, executor 1): FetchFailed(BlockManagerId(1,
172.16.75.92, 40539, None), shuffleId=2, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException:
/data2/spark/blockmgr-0e228d3c-8727-422e-aa97-2841a877c42a/32/shuffle_2_0_0.index
(No such file or directory)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

* Update KubernetesClusterSchedulerBackend.scala
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants