Skip to content

Commit

Permalink
[SPARK-48056][CONNECT][FOLLOW-UP] Scala Client re-execute plan if a S…
Browse files Browse the repository at this point in the history
…ESSION_NOT_FOUND error is raised and no partial response was received

### What changes were proposed in this pull request?

This change lets a Scala Spark Connect client reattempt execution of a plan when it receives a SESSION_NOT_FOUND error from the Spark Connect service if it has not received any partial responses.

This is a Scala version of the previous fix of the same issue - #46297.

### Why are the changes needed?

Spark Connect clients often get a spurious error from the Spark Connect service if the service is busy or the network is congested. This error leads to a situation where the client immediately attempts to reattach without the service being aware of the client; this leads to a query failure.

### Does this PR introduce _any_ user-facing change?

Prevoiusly, a Scala Spark Connect client would fail with the error code "INVALID_HANDLE.SESSION_NOT_FOUND" in the very first attempt to make a request to the service, but with this change, the client will automatically retry.

### How was this patch tested?

Attached unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46971 from changgyoopark-db/SPARK-48056.

Authored-by: Changgyoo Park <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
  • Loading branch information
changgyoopark-db authored and zhengruifeng committed Jun 14, 2024
1 parent dd8b05f commit 2d2bedf
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,25 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach {
assert(reattachableIter.resultComplete)
}

test("SPARK-48056: Client execute gets INVALID_HANDLE.SESSION_NOT_FOUND and proceeds") {
startDummyServer(0)
client = SparkConnectClient
.builder()
.connectionString(s"sc://localhost:${server.getPort}")
.enableReattachableExecute()
.build()
service.errorToThrowOnExecute = Some(
new StatusRuntimeException(
Status.INTERNAL.withDescription("INVALID_HANDLE.SESSION_NOT_FOUND")))

val plan = buildPlan("select * from range(1)")
val iter = client.execute(plan)
val reattachableIter =
ExecutePlanResponseReattachableIterator.fromIterator(iter)
reattachableIter.foreach(_ => ())
assert(reattachableIter.resultComplete)
}

test("GRPC stub unary call throws error immediately") {
// Spark Connect error retry handling depends on the error being returned from the unary
// call immediately.
Expand Down Expand Up @@ -609,6 +628,8 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer
private val inputArtifactRequests: mutable.ListBuffer[AddArtifactsRequest] =
mutable.ListBuffer.empty

var errorToThrowOnExecute: Option[Throwable] = None

private[sql] def getAndClearLatestInputPlan(): proto.Plan = {
val plan = inputPlan
inputPlan = null
Expand All @@ -624,6 +645,13 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer
override def executePlan(
request: ExecutePlanRequest,
responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
if (errorToThrowOnExecute.isDefined) {
val error = errorToThrowOnExecute.get
errorToThrowOnExecute = None
responseObserver.onError(error)
return
}

// Reply with a dummy response using the same client ID
val requestSessionId = request.getSessionId
val operationId = if (request.hasOperationId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ import org.apache.spark.sql.connect.client.GrpcRetryHandler.RetryException
* ReattachExecute request. ReattachExecute request is provided the responseId of last returned
* ExecutePlanResponse on the iterator to return a new iterator from server that continues after
* that. If the initial ExecutePlan did not even reach the server, and hence reattach fails with
* INVALID_HANDLE.OPERATION_NOT_FOUND, we attempt to retry ExecutePlan.
* INVALID_HANDLE.OPERATION_NOT_FOUND or INVALID_HANDLE.SESSION_NOT_FOUND, we attempt to retry
* ExecutePlan.
*
* In reattachable execute the server does buffer some responses in case the client needs to
* backtrack. To let server release this buffer sooner, this iterator asynchronously sends
Expand All @@ -66,7 +67,8 @@ class ExecutePlanResponseReattachableIterator(
// Add operation id, if not present.
// with operationId set by the client, the client can use it to try to reattach on error
// even before getting the first response. If the operation in fact didn't even reach the
// server, that will end with INVALID_HANDLE.OPERATION_NOT_FOUND error.
// server, that will end with INVALID_HANDLE.OPERATION_NOT_FOUND or
// INVALID_HANDLE.SESSION_NOT_FOUND error.
UUID.randomUUID.toString
}

Expand Down Expand Up @@ -234,10 +236,14 @@ class ExecutePlanResponseReattachableIterator(
} catch {
case ex: StatusRuntimeException
if Option(StatusProto.fromThrowable(ex))
.exists(_.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND")) =>
.exists(ex => {
ex.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND") ||
ex.getMessage.contains("INVALID_HANDLE.SESSION_NOT_FOUND")
}) =>
if (lastReturnedResponseId.isDefined) {
throw new IllegalStateException(
"OPERATION_NOT_FOUND on the server but responses were already received from it.",
"OPERATION_NOT_FOUND/SESSION_NOT_FOUND on the server but responses were already " +
"received from it.",
ex)
}
// Try a new ExecutePlan, and throw upstream for retry.
Expand Down

0 comments on commit 2d2bedf

Please sign in to comment.