diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala index 55f962b2a52c8..46aeaeff43d2f 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala @@ -530,6 +530,25 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach { assert(reattachableIter.resultComplete) } + test("SPARK-48056: Client execute gets INVALID_HANDLE.SESSION_NOT_FOUND and proceeds") { + startDummyServer(0) + client = SparkConnectClient + .builder() + .connectionString(s"sc://localhost:${server.getPort}") + .enableReattachableExecute() + .build() + service.errorToThrowOnExecute = Some( + new StatusRuntimeException( + Status.INTERNAL.withDescription("INVALID_HANDLE.SESSION_NOT_FOUND"))) + + val plan = buildPlan("select * from range(1)") + val iter = client.execute(plan) + val reattachableIter = + ExecutePlanResponseReattachableIterator.fromIterator(iter) + reattachableIter.foreach(_ => ()) + assert(reattachableIter.resultComplete) + } + test("GRPC stub unary call throws error immediately") { // Spark Connect error retry handling depends on the error being returned from the unary // call immediately. @@ -609,6 +628,8 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer private val inputArtifactRequests: mutable.ListBuffer[AddArtifactsRequest] = mutable.ListBuffer.empty + var errorToThrowOnExecute: Option[Throwable] = None + private[sql] def getAndClearLatestInputPlan(): proto.Plan = { val plan = inputPlan inputPlan = null @@ -624,6 +645,13 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer override def executePlan( request: ExecutePlanRequest, responseObserver: StreamObserver[ExecutePlanResponse]): Unit = { + if (errorToThrowOnExecute.isDefined) { + val error = errorToThrowOnExecute.get + errorToThrowOnExecute = None + responseObserver.onError(error) + return + } + // Reply with a dummy response using the same client ID val requestSessionId = request.getSessionId val operationId = if (request.hasOperationId) { diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala index 74f13272a3655..f3c13c9c2c4d8 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala @@ -42,7 +42,8 @@ import org.apache.spark.sql.connect.client.GrpcRetryHandler.RetryException * ReattachExecute request. ReattachExecute request is provided the responseId of last returned * ExecutePlanResponse on the iterator to return a new iterator from server that continues after * that. If the initial ExecutePlan did not even reach the server, and hence reattach fails with - * INVALID_HANDLE.OPERATION_NOT_FOUND, we attempt to retry ExecutePlan. + * INVALID_HANDLE.OPERATION_NOT_FOUND or INVALID_HANDLE.SESSION_NOT_FOUND, we attempt to retry + * ExecutePlan. * * In reattachable execute the server does buffer some responses in case the client needs to * backtrack. To let server release this buffer sooner, this iterator asynchronously sends @@ -66,7 +67,8 @@ class ExecutePlanResponseReattachableIterator( // Add operation id, if not present. // with operationId set by the client, the client can use it to try to reattach on error // even before getting the first response. If the operation in fact didn't even reach the - // server, that will end with INVALID_HANDLE.OPERATION_NOT_FOUND error. + // server, that will end with INVALID_HANDLE.OPERATION_NOT_FOUND or + // INVALID_HANDLE.SESSION_NOT_FOUND error. UUID.randomUUID.toString } @@ -234,10 +236,14 @@ class ExecutePlanResponseReattachableIterator( } catch { case ex: StatusRuntimeException if Option(StatusProto.fromThrowable(ex)) - .exists(_.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND")) => + .exists(ex => { + ex.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND") || + ex.getMessage.contains("INVALID_HANDLE.SESSION_NOT_FOUND") + }) => if (lastReturnedResponseId.isDefined) { throw new IllegalStateException( - "OPERATION_NOT_FOUND on the server but responses were already received from it.", + "OPERATION_NOT_FOUND/SESSION_NOT_FOUND on the server but responses were already " + + "received from it.", ex) } // Try a new ExecutePlan, and throw upstream for retry.