From cc6b1a7ccba153c5387c5ae8b396ad15145b0da3 Mon Sep 17 00:00:00 2001 From: Lin Zhou <87341375+linzhou-db@users.noreply.github.com> Date: Sat, 12 Oct 2024 18:55:24 -0700 Subject: [PATCH] Backport includeEndStreamAction in DeltasharingService to branch-0.7 (#595) * Backport endstreamaction to branc-0.7 * fix cdf * Client side changes * Backport additional logging to 0.7 * Set default of includeEndStreamAction to false * Set default to false in test * fix ConfUtilsSuite * fix lint --- .../sharing/server/DeltaSharingService.scala | 55 +++- .../internal/DeltaSharedTableLoader.scala | 20 +- .../server/DeltaSharingServiceSuite.scala | 279 +++++++++++------- .../sharing/spark/DeltaSharingClient.scala | 208 ++++++++++--- .../sharing/spark/DeltaSharingErrors.scala | 3 + .../sharing/spark/DeltaSharingSource.scala | 69 +++-- .../delta/sharing/spark/RemoteDeltaLog.scala | 13 +- .../delta/sharing/spark/util/ConfUtils.scala | 11 + .../delta/sharing/spark/util/RetryUtils.scala | 3 + .../spark/DeltaSharingRestClientSuite.scala | 73 ++++- .../sharing/spark/DeltaSharingSuite.scala | 3 +- .../spark/TestDeltaSharingClient.scala | 3 +- .../sharing/spark/util/ConfUtilsSuite.scala | 18 ++ .../sharing/spark/util/RetryUtilsSuite.scala | 3 + 14 files changed, 556 insertions(+), 205 deletions(-) diff --git a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala index 0b5e3c3a1..45a0e0e18 100644 --- a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala +++ b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala @@ -285,7 +285,8 @@ class DeltaSharingService(serverConfig: ServerConfig) { startingVersion = None, endingVersion = None, includeRefreshToken = false, - refreshToken = None + refreshToken = None, + includeEndStreamAction = false ) streamingOutput(Some(v), actions) } @@ -293,6 +294,7 @@ class DeltaSharingService(serverConfig: ServerConfig) { @Post("/shares/{share}/schemas/{schema}/tables/{table}/query") @ConsumesJson def listFiles( + req: HttpRequest, @Param("share") share: String, @Param("schema") schema: String, @Param("table") table: String, @@ -343,6 +345,10 @@ class DeltaSharingService(serverConfig: ServerConfig) { ) } } + val capabilitiesMap = getDeltaSharingCapabilitiesMap( + req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER) + ) + val includeEndStreamAction = getRequestEndStreamAction(capabilitiesMap) val (version, actions) = deltaSharedTableLoader.loadTable(tableConfig).query( includeFiles = true, request.predicateHints, @@ -353,7 +359,8 @@ class DeltaSharingService(serverConfig: ServerConfig) { request.startingVersion, request.endingVersion, request.includeRefreshToken.getOrElse(false), - request.refreshToken + request.refreshToken, + includeEndStreamAction = includeEndStreamAction ) if (version < tableConfig.startVersion) { throw new DeltaSharingIllegalArgumentException( @@ -362,12 +369,13 @@ class DeltaSharingService(serverConfig: ServerConfig) { } logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table " + s"and sign ${actions.length - 2} urls for table $share/$schema/$table") - streamingOutput(Some(version), actions) + streamingOutput(Some(version), actions, includeEndStreamAction) } @Get("/shares/{share}/schemas/{schema}/tables/{table}/changes") @ConsumesJson def listCdfFiles( + req: HttpRequest, @Param("share") share: String, @Param("schema") schema: String, @Param("table") table: String, @@ -384,6 +392,10 @@ class DeltaSharingService(serverConfig: ServerConfig) { s"$share.$schema.$table") } + val capabilitiesMap = getDeltaSharingCapabilitiesMap( + req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER) + ) + val includeEndStreamAction = getRequestEndStreamAction(capabilitiesMap) val (v, actions) = deltaSharedTableLoader.loadTable(tableConfig).queryCDF( getCdfOptionsMap( Option(startingVersion), @@ -391,21 +403,32 @@ class DeltaSharingService(serverConfig: ServerConfig) { Option(startingTimestamp), Option(endingTimestamp) ), - includeHistoricalMetadata = Try(includeHistoricalMetadata.toBoolean).getOrElse(false) + includeHistoricalMetadata = Try(includeHistoricalMetadata.toBoolean).getOrElse(false), + includeEndStreamAction = includeEndStreamAction ) logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table cdf " + s"and sign ${actions.length - 2} urls for table $share/$schema/$table") - streamingOutput(Some(v), actions) + streamingOutput(Some(v), actions, includeEndStreamAction) } - private def streamingOutput(version: Option[Long], actions: Seq[SingleAction]): HttpResponse = { + private def streamingOutput( + version: Option[Long], + actions: Seq[SingleAction], + includeEndStreamAction: Boolean = false): HttpResponse = { + val capabilities = if (includeEndStreamAction) { + s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true" + } else { + "" + } val headers = if (version.isDefined) { createHeadersBuilderForTableVersion(version.get) .set(HttpHeaderNames.CONTENT_TYPE, DELTA_TABLE_METADATA_CONTENT_TYPE) + .set(DELTA_SHARING_CAPABILITIES_HEADER, capabilities) .build() } else { ResponseHeaders.builder(200) .set(HttpHeaderNames.CONTENT_TYPE, DELTA_TABLE_METADATA_CONTENT_TYPE) + .set(DELTA_SHARING_CAPABILITIES_HEADER, capabilities) .build() } ResponseConversionUtil.streamingFrom( @@ -420,12 +443,27 @@ class DeltaSharingService(serverConfig: ServerConfig) { }, ServiceRequestContext.current().blockingTaskExecutor()) } + + private def getDeltaSharingCapabilitiesMap(headerString: String): Map[String, String] = { + if (headerString == null) { + return Map.empty[String, String] + } + headerString.toLowerCase().split(DELTA_SHARING_CAPABILITIES_DELIMITER) + .map(_.split("=")) + .filter(_.size == 2) + .map { splits => + (splits(0), splits(1)) + }.toMap + } } object DeltaSharingService { val DELTA_TABLE_VERSION_HEADER = "Delta-Table-Version" val DELTA_TABLE_METADATA_CONTENT_TYPE = "application/x-ndjson; charset=utf-8" + val DELTA_SHARING_CAPABILITIES_HEADER = "delta-sharing-capabilities" + val DELTA_SHARING_INCLUDE_END_STREAM_ACTION = "includeendstreamaction" + val DELTA_SHARING_CAPABILITIES_DELIMITER = ";" val SPARK_STRUCTURED_STREAMING = "SparkStructuredStreaming" @@ -547,6 +585,11 @@ object DeltaSharingService { endingTimestamp.map(DeltaDataSource.CDF_END_TIMESTAMP_KEY -> _)).toMap } + private[server] def getRequestEndStreamAction( + headerCapabilities: Map[String, String]): Boolean = { + headerCapabilities.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION).exists(_.toBoolean) + } + def main(args: Array[String]): Unit = { val ns = parser.parseArgsOrFail(args) val serverConfigPath = ns.getString("config") diff --git a/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala b/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala index 60d5842fd..d8131e4ca 100644 --- a/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala +++ b/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala @@ -185,6 +185,7 @@ class DeltaSharedTable( snapshot.version } + // scalastyle:off argcount def query( includeFiles: Boolean, predicateHints: Seq[String], @@ -195,7 +196,8 @@ class DeltaSharedTable( startingVersion: Option[Long], endingVersion: Option[Long], includeRefreshToken: Boolean, - refreshToken: Option[String] + refreshToken: Option[String], + includeEndStreamAction: Boolean ): (Long, Seq[model.SingleAction]) = withClassLoader { // TODO Support `limitHint` if (Seq(version, timestamp, startingVersion).filter(_.isDefined).size >= 2) { @@ -245,7 +247,7 @@ class DeltaSharedTable( if (startingVersion.isDefined) { // Only read changes up to snapshot.version, and ignore changes that are committed during // queryDataChangeSinceStartVersion. - queryDataChangeSinceStartVersion(startingVersion.get, endingVersion) + queryDataChangeSinceStartVersion(startingVersion.get, endingVersion, includeEndStreamAction) } else if (includeFiles) { val ts = if (isVersionQuery) { val timestampsByVersion = DeltaSharingHistoryManager.getTimestampsByVersion( @@ -305,6 +307,8 @@ class DeltaSharedTable( ) ) Seq(model.EndStreamAction(refreshTokenStr).wrap) + } else if (includeEndStreamAction) { + Seq(model.EndStreamAction(null).wrap) } else { Nil } @@ -319,7 +323,8 @@ class DeltaSharedTable( private def queryDataChangeSinceStartVersion( startingVersion: Long, - endingVersion: Option[Long] + endingVersion: Option[Long], + includeEndStreamAction: Boolean ): Seq[model.SingleAction] = { var latestVersion = tableVersion if (startingVersion > latestVersion) { @@ -388,12 +393,16 @@ class DeltaSharedTable( case _ => () } } + if (includeEndStreamAction) { + actions.append(model.EndStreamAction(null).wrap) + } actions.toSeq } def queryCDF( cdfOptions: Map[String, String], - includeHistoricalMetadata: Boolean = false + includeHistoricalMetadata: Boolean = false, + includeEndStreamAction: Boolean = false ): (Long, Seq[model.SingleAction]) = withClassLoader { val actions = ListBuffer[model.SingleAction]() @@ -495,6 +504,9 @@ class DeltaSharedTable( actions.append(modelRemoveFile.wrap) } } + if (includeEndStreamAction) { + actions.append(model.EndStreamAction(null).wrap) + } start -> actions.toSeq } diff --git a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala index 2c78fcf55..affaae6ea 100644 --- a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala @@ -26,11 +26,11 @@ import javax.net.ssl._ import scala.collection.mutable.ArrayBuffer import com.linecorp.armeria.server.Server -import io.delta.standalone.internal.DeltaCDFErrors import org.apache.commons.io.IOUtils import org.scalatest.{BeforeAndAfterAll, FunSuite} import scalapb.json4s.JsonFormat +import io.delta.sharing.server.DeltaSharingService.DELTA_SHARING_INCLUDE_END_STREAM_ACTION import io.delta.sharing.server.config.ServerConfig import io.delta.sharing.server.model._ import io.delta.sharing.server.protocol._ @@ -111,13 +111,15 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { url: String, method: Option[String] = None, data: Option[String] = None, - expectedTableVersion: Option[Long] = None): String = { + expectedTableVersion: Option[Long] = None, + includeEndStreamAction: Boolean = false): String = { readHttpContent( url, method, data, expectedTableVersion, - "application/x-ndjson; charset=utf-8" + "application/x-ndjson; charset=utf-8", + includeEndStreamAction = includeEndStreamAction ) } @@ -127,9 +129,14 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { method: Option[String], data: Option[String] = None, expectedTableVersion: Option[Long] = None, - expectedContentType: String): String = { + expectedContentType: String, + includeEndStreamAction: Boolean = false): String = { val connection = new URL(url).openConnection().asInstanceOf[HttpsURLConnection] connection.setRequestProperty("Authorization", s"Bearer ${TestResource.testAuthorizationToken}") + if (includeEndStreamAction) { + val deltaSharingCapabilities = s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true" + connection.setRequestProperty("delta-sharing-capabilities", deltaSharingCapabilities) + } method.foreach(connection.setRequestMethod) data.foreach { d => connection.setDoOutput(true) @@ -155,6 +162,12 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { expectedTableVersion.foreach { v => assert(v.toString == deltaTableVersion) } + if (includeEndStreamAction) { + val responseCapabilities = connection.getHeaderField("delta-sharing-capabilities") + val expectedCapabilities = s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true" + assert(responseCapabilities == expectedCapabilities, + s"Incorrect header: $responseCapabilities") + } content } @@ -501,51 +514,94 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { } integrationTest("table1 - non partitioned - /shares/{share}/schemas/{schema}/tables/{table}/query") { - val p = - """ - |{ - | "predicateHints": [ - | "date = CAST('2021-04-28' AS DATE)" - | ] - |} - |""".stripMargin - val response = readNDJson(requestPath("/shares/share1/schemas/default/tables/table1/query"), Some("POST"), Some(p), Some(2)) - val lines = response.split("\n") - val protocol = lines(0) - val metadata = lines(1) - val expectedProtocol = Protocol(minReaderVersion = 1).wrap - assert(expectedProtocol == JsonUtils.fromJson[SingleAction](protocol)) - val expectedMetadata = Metadata( - id = "ed96aa41-1d81-4b7f-8fb5-846878b4b0cf", - format = Format(), - schemaString = """{"type":"struct","fields":[{"name":"eventTime","type":"timestamp","nullable":true,"metadata":{}},{"name":"date","type":"date","nullable":true,"metadata":{}}]}""", - partitionColumns = Nil).wrap - assert(expectedMetadata == JsonUtils.fromJson[SingleAction](metadata)) - val files = lines.drop(2) - val actualFiles = files.map(f => JsonUtils.fromJson[SingleAction](f).file) - assert(actualFiles.size == 2) - val expectedFiles = Seq( - AddFile( - url = actualFiles(0).url, - expirationTimestamp = actualFiles(0).expirationTimestamp, - id = "061cb3683a467066995f8cdaabd8667d", - partitionValues = Map.empty, - size = 781, - stats = """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T06:32:22.421Z","date":"2021-04-28"},"maxValues":{"eventTime":"2021-04-28T06:32:22.421Z","date":"2021-04-28"},"nullCount":{"eventTime":0,"date":0}}""" - ), - AddFile( - url = actualFiles(1).url, - expirationTimestamp = actualFiles(1).expirationTimestamp, - id = "e268cbf70dbaa6143e7e9fa3e2d3b00e", - partitionValues = Map.empty, - size = 781, - stats = """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T06:32:02.070Z","date":"2021-04-28"},"maxValues":{"eventTime":"2021-04-28T06:32:02.070Z","date":"2021-04-28"},"nullCount":{"eventTime":0,"date":0}}""" + Seq(true, false).foreach { includeEndStreamAction => + val p = + """ + |{ + | "predicateHints": [ + | "date = CAST('2021-04-28' AS DATE)" + | ] + |} + |""".stripMargin + val response = readNDJson( + requestPath("/shares/share1/schemas/default/tables/table1/query"), + Some("POST"), + Some(p), + Some(2), + includeEndStreamAction = includeEndStreamAction) + var lines = response.split("\n") + val protocol = lines(0) + val metadata = lines(1) + val expectedProtocol = Protocol(minReaderVersion = 1).wrap + assert(expectedProtocol == JsonUtils.fromJson[SingleAction](protocol)) + val expectedMetadata = Metadata( + id = "ed96aa41-1d81-4b7f-8fb5-846878b4b0cf", + format = Format(), + schemaString = """{"type":"struct","fields":[{"name":"eventTime","type":"timestamp","nullable":true,"metadata":{}},{"name":"date","type":"date","nullable":true,"metadata":{}}]}""", + partitionColumns = Nil).wrap + assert(expectedMetadata == JsonUtils.fromJson[SingleAction](metadata)) + if (includeEndStreamAction) { + val endAction = JsonUtils.fromJson[SingleAction](lines.last).endStreamAction + assert(endAction != null, lines.last) + lines = lines.dropRight(1) + } + val files = lines.drop(2) + val actualFiles = files.map(f => JsonUtils.fromJson[SingleAction](f).file) + assert(actualFiles.size == 2) + val expectedFiles = Seq( + AddFile( + url = actualFiles(0).url, + expirationTimestamp = actualFiles(0).expirationTimestamp, + id = "061cb3683a467066995f8cdaabd8667d", + partitionValues = Map.empty, + size = 781, + stats = """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T06:32:22.421Z","date":"2021-04-28"},"maxValues":{"eventTime":"2021-04-28T06:32:22.421Z","date":"2021-04-28"},"nullCount":{"eventTime":0,"date":0}}""" + ), + AddFile( + url = actualFiles(1).url, + expirationTimestamp = actualFiles(1).expirationTimestamp, + id = "e268cbf70dbaa6143e7e9fa3e2d3b00e", + partitionValues = Map.empty, + size = 781, + stats = """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T06:32:02.070Z","date":"2021-04-28"},"maxValues":{"eventTime":"2021-04-28T06:32:02.070Z","date":"2021-04-28"},"nullCount":{"eventTime":0,"date":0}}""" + ) ) - ) - assert(actualFiles.count(_.expirationTimestamp != null) == 2) - assert(expectedFiles == actualFiles.toList) - verifyPreSignedUrl(actualFiles(0).url, 781) - verifyPreSignedUrl(actualFiles(1).url, 781) + assert(actualFiles.count(_.expirationTimestamp != null) == 2) + assert(expectedFiles == actualFiles.toList) + verifyPreSignedUrl(actualFiles(0).url, 781) + verifyPreSignedUrl(actualFiles(1).url, 781) + } + } + + integrationTest("refresh query returns the same set of files as initial query") { + Seq(true, false).foreach { includeEndStreamAction => + val initialResponse = readNDJson( + requestPath("/shares/share1/schemas/default/tables/table1/query"), + Some("POST"), + Some("""{"includeRefreshToken": true}"""), + Some(2), + includeEndStreamAction = includeEndStreamAction + ).split("\n") + assert(initialResponse.length == 5) + val endAction = JsonUtils.fromJson[SingleAction](initialResponse.last).endStreamAction + assert(endAction.refreshToken != null) + + val refreshResponse = readNDJson( + requestPath("/shares/share1/schemas/default/tables/table1/query"), + Some("POST"), + Some(s"""{"includeRefreshToken": true, "refreshToken": "${endAction.refreshToken}"}"""), + Some(2) + ).split("\n") + assert(refreshResponse.length == 5) + // protocol + assert(initialResponse(0) == refreshResponse(0)) + // metadata + assert(initialResponse(1) == refreshResponse(1)) + // files + val initialFiles = initialResponse.slice(2, 4).map(f => JsonUtils.fromJson[SingleAction](f).file) + val refreshedFiles = refreshResponse.slice(2, 4).map(f => JsonUtils.fromJson[SingleAction](f).file) + assert(initialFiles.map(_.id) sameElements refreshedFiles.map(_.id)) + } } integrationTest("table2 - partitioned - /shares/{share}/schemas/{schema}/tables/{table}/metadata") { @@ -1386,63 +1442,76 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { } integrationTest("cdf_table_cdf_enabled_changes - query table changes") { - val response = readNDJson(requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/changes?startingVersion=0&endingVersion=3"), Some("GET"), None, Some(0)) - val lines = response.split("\n") - val protocol = lines(0) - val metadata = lines(1) - val expectedProtocol = Protocol(minReaderVersion = 1).wrap - assert(expectedProtocol == JsonUtils.fromJson[SingleAction](protocol)) - val expectedMetadata = Metadata( - id = "16736144-3306-4577-807a-d3f899b77670", - format = Format(), - schemaString = """{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":true,"metadata":{}},{"name":"birthday","type":"date","nullable":true,"metadata":{}}]}""", - configuration = Map("enableChangeDataFeed" -> "true"), - partitionColumns = Nil, - version = 5).wrap - assert(expectedMetadata == JsonUtils.fromJson[SingleAction](metadata)) - val files = lines.drop(2) - assert(files.size == 5) - verifyAddCDCFile( - files(0), - size = 1301, - partitionValues = Map.empty, - version = 2, - timestamp = 1651272655000L - ) - verifyAddCDCFile( - files(1), - size = 1416, - partitionValues = Map.empty, - version = 3, - timestamp = 1651272660000L - ) - verifyAddFile( - files(2), - size = 1030, - stats = - """{"numRecords":1,"minValues":{"name":"1","age":1,"birthday":"2020-01-01"},"maxValues":{"name":"1","age":1,"birthday":"2020-01-01"},"nullCount":{"name":0,"age":0,"birthday":0}}""", - partitionValues = Map.empty, - version = 1, - timestamp = 1651272635000L - ) - verifyAddFile( - files(3), - size = 1030, - stats = - """{"numRecords":1,"minValues":{"name":"2","age":2,"birthday":"2020-01-01"},"maxValues":{"name":"2","age":2,"birthday":"2020-01-01"},"nullCount":{"name":0,"age":0,"birthday":0}}""", - partitionValues = Map.empty, - version = 1, - timestamp = 1651272635000L - ) - verifyAddFile( - files(4), - size = 1030, - stats = - """{"numRecords":1,"minValues":{"name":"3","age":3,"birthday":"2020-01-01"},"maxValues":{"name":"3","age":3,"birthday":"2020-01-01"},"nullCount":{"name":0,"age":0,"birthday":0}}""", - partitionValues = Map.empty, - version = 1, - timestamp = 1651272635000L - ) + Seq(true, false).foreach { includeEndStreamAction => + val response = readNDJson( + requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/changes?startingVersion=0&endingVersion=3"), + Some("GET"), + None, + Some(0), + includeEndStreamAction = includeEndStreamAction + ) + var lines = response.split("\n") + val protocol = lines(0) + val metadata = lines(1) + val expectedProtocol = Protocol(minReaderVersion = 1).wrap + assert(expectedProtocol == JsonUtils.fromJson[SingleAction](protocol)) + val expectedMetadata = Metadata( + id = "16736144-3306-4577-807a-d3f899b77670", + format = Format(), + schemaString = """{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":true,"metadata":{}},{"name":"birthday","type":"date","nullable":true,"metadata":{}}]}""", + configuration = Map("enableChangeDataFeed" -> "true"), + partitionColumns = Nil, + version = 5).wrap + assert(expectedMetadata == JsonUtils.fromJson[SingleAction](metadata)) + if (includeEndStreamAction) { + val endAction = JsonUtils.fromJson[SingleAction](lines.last).endStreamAction + assert(endAction != null) + lines = lines.dropRight(1) + } + val files = lines.drop(2) + assert(files.size == 5) + verifyAddCDCFile( + files(0), + size = 1301, + partitionValues = Map.empty, + version = 2, + timestamp = 1651272655000L + ) + verifyAddCDCFile( + files(1), + size = 1416, + partitionValues = Map.empty, + version = 3, + timestamp = 1651272660000L + ) + verifyAddFile( + files(2), + size = 1030, + stats = + """{"numRecords":1,"minValues":{"name":"1","age":1,"birthday":"2020-01-01"},"maxValues":{"name":"1","age":1,"birthday":"2020-01-01"},"nullCount":{"name":0,"age":0,"birthday":0}}""", + partitionValues = Map.empty, + version = 1, + timestamp = 1651272635000L + ) + verifyAddFile( + files(3), + size = 1030, + stats = + """{"numRecords":1,"minValues":{"name":"2","age":2,"birthday":"2020-01-01"},"maxValues":{"name":"2","age":2,"birthday":"2020-01-01"},"nullCount":{"name":0,"age":0,"birthday":0}}""", + partitionValues = Map.empty, + version = 1, + timestamp = 1651272635000L + ) + verifyAddFile( + files(4), + size = 1030, + stats = + """{"numRecords":1,"minValues":{"name":"3","age":3,"birthday":"2020-01-01"},"maxValues":{"name":"3","age":3,"birthday":"2020-01-01"},"nullCount":{"name":0,"age":0,"birthday":0}}""", + partitionValues = Map.empty, + version = 1, + timestamp = 1651272635000L + ) + } } diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala index ee9ffa6fa..48a73340e 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala @@ -45,6 +45,17 @@ import io.delta.sharing.spark.util.{JsonUtils, RetryUtils, UnexpectedHttpStatus} /** An interface to fetch Delta metadata from remote server. */ private[sharing] trait DeltaSharingClient { + + protected var dsQueryId: Option[String] = None + + def getQueryId: String = { + dsQueryId.getOrElse("dsQueryIdNotSet") + } + + protected def getDsQueryIdForLogging: String = { + s" for query($dsQueryId)." + } + def listAllTables(): Seq[Table] def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long @@ -103,14 +114,16 @@ private[spark] class DeltaSharingRestClient( numRetries: Int = 10, maxRetryDuration: Long = Long.MaxValue, sslTrustAll: Boolean = false, - forStreaming: Boolean = false + forStreaming: Boolean = false, + endStreamActionEnabled: Boolean = false ) extends DeltaSharingClient with Logging { + + logInfo(s"DeltaSharingRestClient with endStreamActionEnabled: $endStreamActionEnabled.") + import DeltaSharingRestClient._ @volatile private var created = false - private var queryId: Option[String] = None - private lazy val client = { val clientBuilder: HttpClientBuilder = if (sslTrustAll) { val sslBuilder = new SSLContextBuilder() @@ -200,9 +213,15 @@ private[spark] class DeltaSharingRestClient( val target = getTargetUrl(s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/" + s"$encodedTableName/version$encodedParam") - val (version, _) = getResponse(new HttpGet(target), true, true) + val (version, _) = getResponse( + new HttpGet(target), + allowNoContent = true, + fetchAsOneString = true, + setIncludeEndStreamAction = false + ) version.getOrElse { - throw new IllegalStateException("Cannot find Delta-Table-Version in the header") + throw new IllegalStateException(s"Cannot find $RESPONSE_TABLE_VERSION_HEADER_KEY in the " + + "header," + getDsQueryIdForLogging) } } @@ -212,12 +231,15 @@ private[spark] class DeltaSharingRestClient( val encodedTableName = URLEncoder.encode(table.name, "UTF-8") val target = getTargetUrl( s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/$encodedTableName/metadata") - val (version, lines) = getNDJson(target) + val (version, lines) = getNDJson( + target, requireVersion = true, setIncludeEndStreamAction = false + ) val protocol = JsonUtils.fromJson[SingleAction](lines(0)).protocol checkProtocol(protocol) val metadata = JsonUtils.fromJson[SingleAction](lines(1)).metaData if (lines.size != 2) { - throw new IllegalStateException("received more than two lines") + throw new IllegalStateException(s"received more than two lines:${lines.size}," + + getDsQueryIdForLogging) } DeltaTableMetadata(version, protocol, metadata) } @@ -226,7 +248,8 @@ private[spark] class DeltaSharingRestClient( if (protocol.minReaderVersion > DeltaSharingRestClient.CURRENT) { throw new IllegalArgumentException(s"The table requires a newer version" + s" ${protocol.minReaderVersion} to read. But the current release supports version " + - s"is ${DeltaSharingProfile.CURRENT} and below. Please upgrade to a newer release.") + s"is ${DeltaSharingProfile.CURRENT} and below. Please upgrade to a newer release." + + getDsQueryIdForLogging) } } @@ -245,7 +268,7 @@ private[spark] class DeltaSharingRestClient( val encodedTableName = URLEncoder.encode(table.name, "UTF-8") val target = getTargetUrl( s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/$encodedTableName/query") - val (version, lines) = getNDJson( + val (version, lines) = getNDJsonPost( target, QueryTableRequest( predicates, @@ -257,7 +280,8 @@ private[spark] class DeltaSharingRestClient( jsonPredicateHints, Some(includeRefreshToken), refreshToken - ) + ), + setIncludeEndStreamAction = endStreamActionEnabled ) val (filteredLines, endStreamAction) = maybeExtractEndStreamAction(lines) val refreshTokenOpt = endStreamAction.flatMap { e => @@ -266,7 +290,7 @@ private[spark] class DeltaSharingRestClient( } } if (includeRefreshToken && refreshTokenOpt.isEmpty) { - logWarning("includeRefreshToken=true but refresh token is not returned.") + logWarning("includeRefreshToken=true but refresh token is not returned " + getQueryIdString) } require(versionAsOf.isEmpty || versionAsOf.get == version) val protocol = JsonUtils.fromJson[SingleAction](filteredLines(0)).protocol @@ -278,7 +302,7 @@ private[spark] class DeltaSharingRestClient( if (action.file != null) { files.append(action.file) } else { - throw new IllegalStateException(s"Unexpected Line:${line}") + throw new IllegalStateException(s"Unexpected Line:${line}" + getDsQueryIdForLogging) } } DeltaTableFiles(version, protocol, metadata, files.toSeq, refreshToken = refreshTokenOpt) @@ -294,7 +318,7 @@ private[spark] class DeltaSharingRestClient( val encodedTableName = URLEncoder.encode(table.name, "UTF-8") val target = getTargetUrl( s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/$encodedTableName/query") - val (version, lines) = getNDJson( + val (version, lines) = getNDJsonPost( target, QueryTableRequest( /* predicateHint */ Nil, @@ -306,21 +330,24 @@ private[spark] class DeltaSharingRestClient( /* jsonPredicateHints */ None, /* includeRefreshToken */ None, /* refreshToken */ None - ) + ), + setIncludeEndStreamAction = endStreamActionEnabled ) - val protocol = JsonUtils.fromJson[SingleAction](lines(0)).protocol + val (filteredLines, _) = maybeExtractEndStreamAction(lines) + val protocol = JsonUtils.fromJson[SingleAction](filteredLines(0)).protocol checkProtocol(protocol) - val metadata = JsonUtils.fromJson[SingleAction](lines(1)).metaData + val metadata = JsonUtils.fromJson[SingleAction](filteredLines(1)).metaData val addFiles = ArrayBuffer[AddFileForCDF]() val removeFiles = ArrayBuffer[RemoveFile]() val additionalMetadatas = ArrayBuffer[Metadata]() - lines.drop(2).foreach { line => + filteredLines.drop(2).foreach { line => val action = JsonUtils.fromJson[SingleAction](line).unwrap action match { case a: AddFileForCDF => addFiles.append(a) case r: RemoveFile => removeFiles.append(r) case m: Metadata => additionalMetadatas.append(m) - case _ => throw new IllegalStateException(s"Unexpected Line:${line}") + case _ => throw new IllegalStateException(s"Unexpected Line:${line}" + + getDsQueryIdForLogging) } } DeltaTableFiles( @@ -344,23 +371,27 @@ private[spark] class DeltaSharingRestClient( val target = getTargetUrl( s"/shares/$encodedShare/schemas/$encodedSchema/tables/$encodedTable/changes?$encodedParams") - val (version, lines) = getNDJson(target, requireVersion = false) - val protocol = JsonUtils.fromJson[SingleAction](lines(0)).protocol + val (version, lines) = getNDJson( + target, requireVersion = false, setIncludeEndStreamAction = endStreamActionEnabled + ) + val (filteredLines, _) = maybeExtractEndStreamAction(lines) + val protocol = JsonUtils.fromJson[SingleAction](filteredLines(0)).protocol checkProtocol(protocol) - val metadata = JsonUtils.fromJson[SingleAction](lines(1)).metaData + val metadata = JsonUtils.fromJson[SingleAction](filteredLines(1)).metaData val addFiles = ArrayBuffer[AddFileForCDF]() val cdfFiles = ArrayBuffer[AddCDCFile]() val removeFiles = ArrayBuffer[RemoveFile]() val additionalMetadatas = ArrayBuffer[Metadata]() - lines.drop(2).foreach { line => + filteredLines.drop(2).foreach { line => val action = JsonUtils.fromJson[SingleAction](line).unwrap action match { case c: AddCDCFile => cdfFiles.append(c) case a: AddFileForCDF => addFiles.append(a) case r: RemoveFile => removeFiles.append(r) case m: Metadata => additionalMetadatas.append(m) - case _ => throw new IllegalStateException(s"Unexpected Line:${line}") + case _ => throw new IllegalStateException(s"Unexpected Line:${line}" + + getDsQueryIdForLogging) } } DeltaTableFiles( @@ -398,33 +429,50 @@ private[spark] class DeltaSharingRestClient( }.mkString("&") } - private def getNDJson(target: String, requireVersion: Boolean = true): (Long, Seq[String]) = { - val (version, lines) = getResponse(new HttpGet(target)) + private def getNDJson( + target: String, + requireVersion: Boolean, + setIncludeEndStreamAction: Boolean): (Long, Seq[String]) = { + val (version, lines) = getResponse( + new HttpGet(target), setIncludeEndStreamAction = setIncludeEndStreamAction + ) version.getOrElse { if (requireVersion) { - throw new IllegalStateException("Cannot find Delta-Table-Version in the header") + throw new IllegalStateException(s"Cannot find $RESPONSE_TABLE_VERSION_HEADER_KEY in the " + + s"header," + getDsQueryIdForLogging) } else { 0L } } -> lines } - private def getNDJson[T: Manifest](target: String, data: T): (Long, Seq[String]) = { + private def getNDJsonPost[T: Manifest]( + target: String, + data: T, + setIncludeEndStreamAction: Boolean): (Long, Seq[String]) = { val httpPost = new HttpPost(target) val json = JsonUtils.toJson(data) httpPost.setHeader("Content-type", "application/json") httpPost.setEntity(new StringEntity(json, UTF_8)) - val (version, lines) = getResponse(httpPost) + val (version, lines) = getResponse( + httpPost, setIncludeEndStreamAction = setIncludeEndStreamAction + ) version.getOrElse { - throw new IllegalStateException("Cannot find Delta-Table-Version in the header") + throw new IllegalStateException(s"Cannot find $RESPONSE_TABLE_VERSION_HEADER_KEY in the " + + s"header," + getDsQueryIdForLogging) } -> lines } private def getJson[R: Manifest](target: String): R = { - val (_, response) = getResponse(new HttpGet(target), false, true) + val (_, response) = getResponse( + new HttpGet(target), + allowNoContent = false, + fetchAsOneString = true, + setIncludeEndStreamAction = false + ) if (response.size != 1) { throw new IllegalStateException( - "Unexpected response for target: " + target + ", response=" + response + s"Unexpected response for target:$target, response=$response" + getDsQueryIdForLogging ) } JsonUtils.fromJson[R](response(0)) @@ -452,7 +500,8 @@ private[spark] class DeltaSharingRestClient( } } - private[spark] def prepareHeaders(httpRequest: HttpRequestBase): HttpRequestBase = { + private[spark] def prepareHeaders( + httpRequest: HttpRequestBase, setIncludeEndStreamAction: Boolean): HttpRequestBase = { val customeHeaders = profileProvider.getCustomHeaders if (customeHeaders.contains(HttpHeaders.AUTHORIZATION) || customeHeaders.contains(HttpHeaders.USER_AGENT)) { @@ -464,7 +513,7 @@ private[spark] class DeltaSharingRestClient( val headers = Map( HttpHeaders.AUTHORIZATION -> s"Bearer ${profileProvider.getProfile.bearerToken}", HttpHeaders.USER_AGENT -> getUserAgent() - ) ++ customeHeaders + ) ++ customeHeaders ++ constructDeltaSharingCapabilities(setIncludeEndStreamAction) headers.foreach(header => httpRequest.setHeader(header._1, header._2)) httpRequest @@ -482,15 +531,16 @@ private[spark] class DeltaSharingRestClient( private def getResponse( httpRequest: HttpRequestBase, allowNoContent: Boolean = false, - fetchAsOneString: Boolean = false + fetchAsOneString: Boolean = false, + setIncludeEndStreamAction: Boolean = false ): (Option[Long], Seq[String]) = { - // Reset queryId before calling RetryUtils, and before prepareHeaders. - queryId = Some(UUID.randomUUID().toString().split('-').head) + // Reset dsQueryId before calling RetryUtils, and before prepareHeaders. + dsQueryId = Some(UUID.randomUUID().toString().split('-').head) RetryUtils.runWithExponentialBackoff(numRetries, maxRetryDuration) { val profile = profileProvider.getProfile val response = client.execute( getHttpHost(profile.endpoint), - prepareHeaders(httpRequest), + prepareHeaders(httpRequest, setIncludeEndStreamAction), HttpClientContext.create() ) try { @@ -518,7 +568,8 @@ private[spark] class DeltaSharingRestClient( } } catch { case e: org.apache.http.ConnectionClosedException => - val error = s"Request to delta sharing server failed due to ${e}." + val error = s"Request to delta sharing server failed$getDsQueryIdForLogging" + + s" due to ${e}." logError(error) lineBuffer += error lineBuffer.toList @@ -538,10 +589,19 @@ private[spark] class DeltaSharingRestClient( // Only show the last 100 lines in the error to keep it contained. val responseToShow = lines.drop(lines.size - 100).mkString("\n") throw new UnexpectedHttpStatus( - s"HTTP request failed with status: $status $responseToShow. $additionalErrorInfo", + s"HTTP request failed with status: $status" + + Seq(getDsQueryIdForLogging, additionalErrorInfo, responseToShow).mkString(" "), statusCode) } - Option(response.getFirstHeader("Delta-Table-Version")).map(_.getValue.toLong) -> lines + if (setIncludeEndStreamAction) { + val capabilities = Option( + response.getFirstHeader(DELTA_SHARING_CAPABILITIES_HEADER) + ).map(_.getValue) + val capabilitiesMap = parseDeltaSharingCapabilities(capabilities) + checkEndStreamAction(capabilities, capabilitiesMap, lines) + } + Option(response.getFirstHeader(RESPONSE_TABLE_VERSION_HEADER_KEY)).map( + _.getValue.toLong) -> lines } finally { response.close() } @@ -560,7 +620,69 @@ private[spark] class DeltaSharingRestClient( } private def getQueryIdString: String = { - s"QueryId-${queryId.getOrElse("not_set")}" + s"QueryId-${dsQueryId.getOrElse("not_set")}" + } + + private def checkEndStreamAction( + capabilities: Option[String], + capabilitiesMap: Map[String, String], + lines: Seq[String]): Unit = { + val includeEndStreamActionHeader = getRespondedIncludeEndStreamActionHeader(capabilitiesMap) + includeEndStreamActionHeader match { + case Some(true) => + val lastLineAction = JsonUtils.fromJson[SingleAction](lines.last) + if (lastLineAction.endStreamAction == null) { + throw new MissingEndStreamActionException(s"Client sets " + + s"${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " + + s"header, server responded with the header set to true(${capabilities}, " + + s"and ${lines.size} lines, and last line parsed as " + + s"${lastLineAction.unwrap.getClass()}," + getDsQueryIdForLogging) + } + logInfo( + s"Successfully verified endStreamAction in the response" + getDsQueryIdForLogging + ) + case Some(false) => + logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " + + s"header, but the server responded with the header set to false(" + + s"${capabilities})," + getDsQueryIdForLogging + ) + case None => + logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the" + + s" header, but server didn't respond with the header(${capabilities}), " + + getDsQueryIdForLogging + ) + } + } + + // includeEndStreamActionHeader indicates whether the last line is required to be an + // EndStreamAction, parsed from the response header. + private def getRespondedIncludeEndStreamActionHeader( + capabilitiesMap: Map[String, String]): Option[Boolean] = { + capabilitiesMap.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION).map(_.toBoolean) + } + + private def parseDeltaSharingCapabilities(capabilities: Option[String]): Map[String, String] = { + if (capabilities.isEmpty) { + return Map.empty[String, String] + } + capabilities.get.toLowerCase().split(DELTA_SHARING_CAPABILITIES_DELIMITER) + .map(_.split("=")) + .filter(_.size == 2) + .map { splits => + (splits(0), splits(1)) + }.toMap + } + + // The value for delta-sharing-capabilities header, semicolon separated capabilities. + // Each capability is in the format of "key=value1,value2", values are separated by comma. + // Example: "capability1=value1;capability2=value3,value4,value5" + private def constructDeltaSharingCapabilities( + setIncludeEndStreamAction: Boolean): Map[String, String] = { + if (setIncludeEndStreamAction) { + Map(DELTA_SHARING_CAPABILITIES_HEADER -> s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true") + } else { + Map.empty[String, String] + } } def close(): Unit = { @@ -578,6 +700,10 @@ private[spark] object DeltaSharingRestClient extends Logging { val CURRENT = 1 val SPARK_STRUCTURED_STREAMING = "Delta-Sharing-SparkStructuredStreaming" + val RESPONSE_TABLE_VERSION_HEADER_KEY = "Delta-Table-Version" + val DELTA_SHARING_CAPABILITIES_HEADER = "delta-sharing-capabilities" + val DELTA_SHARING_INCLUDE_END_STREAM_ACTION = "includeendstreamaction" + val DELTA_SHARING_CAPABILITIES_DELIMITER = ";" lazy val USER_AGENT = { try { diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala index 7f394f13d..73469defb 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala @@ -18,6 +18,9 @@ package io.delta.sharing.spark import org.apache.spark.sql.types.StructType +class MissingEndStreamActionException(message: String) extends IllegalStateException(message) + + object DeltaSharingErrors { def nonExistentDeltaSharingTable(tableId: String): Throwable = { new IllegalStateException(s"Delta sharing table ${tableId} doesn't exist. " + diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala index 5e9f64b31..12c0162e3 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala @@ -18,6 +18,7 @@ package io.delta.sharing.spark // scalastyle:off import.ordering.noEmptyLine import java.lang.ref.WeakReference +import java.util.UUID import scala.collection.mutable.ArrayBuffer @@ -111,6 +112,8 @@ case class DeltaSharingSource( assert(deltaLog.client.getForStreaming, "forStreaming must be true for client in DeltaSharingSource.") + private val sourceId = Some(UUID.randomUUID().toString().split('-').head) + // The snapshot that's used to construct the dataframe, constructed when source is initialized. // Use latest snapshot instead of snapshot at startingVersion, to allow easy recovery from // failures on schema incompatibility. @@ -159,9 +162,11 @@ case class DeltaSharingSource( val interval = 30000.max( ConfUtils.streamingQueryTableVersionIntervalSeconds(spark.sessionState.conf) * 1000 ) + logInfo(s"Configured queryTableVersionIntervalMilliSeconds:${interval}," + + getTableInfoForLogging) if (interval < 30000) { - throw new IllegalArgumentException("QUERY_TABLE_VERSION_INTERVAL_MILLIS must not be less " + - "than 30 seconds.") + throw new IllegalArgumentException(s"QUERY_TABLE_VERSION_INTERVAL_MILLIS($interval) must " + + "not be less than 30 seconds." + getTableInfoForLogging) } interval } @@ -176,6 +181,13 @@ case class DeltaSharingSource( TableRefreshResult(Map.empty[String, String], None, None) } + private lazy val getTableInfoForLogging: String = + s" for table(id:$tableId, name:${deltaLog.table.toString}, source:$sourceId)" + + private def getQueryIdForLogging: String = { + s", with queryId(${deltaLog.client.getQueryId})" + } + // Check the latest table version from the delta sharing server through the client.getTableVersion // RPC. Adding a minimum interval of QUERY_TABLE_VERSION_INTERVAL_MILLIS between two consecutive // rpcs to avoid traffic jam on the delta sharing server. @@ -184,13 +196,14 @@ case class DeltaSharingSource( if (lastGetVersionTimestamp == -1 || (currentTimeMillis - lastGetVersionTimestamp) >= QUERY_TABLE_VERSION_INTERVAL_MILLIS) { val serverVersion = deltaLog.client.getTableVersion(deltaLog.table) - logInfo(s"Got table version $serverVersion from Delta Sharing Server.") + logInfo(s"Got table version $serverVersion from Delta Sharing Server," + + getTableInfoForLogging) if (serverVersion < 0) { throw new IllegalStateException(s"Delta Sharing Server returning negative table version:" + - s"$serverVersion.") + s"$serverVersion," + getTableInfoForLogging) } else if (serverVersion < latestTableVersion) { logWarning(s"Delta Sharing Server returning smaller table version:$serverVersion < " + - s"$latestTableVersion.") + s"$latestTableVersion," + getTableInfoForLogging) } latestTableVersion = serverVersion lastGetVersionTimestamp = currentTimeMillis @@ -255,8 +268,8 @@ case class DeltaSharingSource( logWarning(s"The asked file(" + s"$fromVersion, $fromIndex, $isStartingVersion) is not included in sortedFetchedFiles[" + s"(${headFile.version}, ${headFile.index}, ${headFile.isSnapshot}) to " + - s"(${lastFile.version}, ${lastFile.index}, ${lastFile.isSnapshot})], " + - s"for table(id:$tableId, name:${deltaLog.table.toString})") + s"(${lastFile.version}, ${lastFile.index}, ${lastFile.isSnapshot})]," + + getTableInfoForLogging) sortedFetchedFiles = Seq.empty } else { return @@ -274,8 +287,7 @@ case class DeltaSharingSource( if (endingVersionForQuery < currentLatestVersion) { logInfo(s"Reducing ending version for delta sharing rpc from currentLatestVersion(" + s"$currentLatestVersion) to endingVersionForQuery($endingVersionForQuery), fromVersion:" + - s"$fromVersion, maxVersionsPerRpc:$maxVersionsPerRpc, " + - s"for table(id:$tableId, name:${deltaLog.table.toString})." + s"$fromVersion, maxVersionsPerRpc:$maxVersionsPerRpc," + getTableInfoForLogging ) } @@ -342,7 +354,7 @@ case class DeltaSharingSource( ): Unit = { synchronized { logInfo(s"Refreshing sortedFetchedFiles(size: ${sortedFetchedFiles.size}) with newIdToUrl(" + - s"size: ${newIdToUrl.size}), for table(id:$tableId, name:${deltaLog.table.toString}).") + s"size: ${newIdToUrl.size})," + getTableInfoForLogging + getQueryIdForLogging) lastQueryTableTimestamp = queryTimestamp minUrlExpirationTimestamp = newMinUrlExpiration if (!CachedTableManager.INSTANCE.isValidUrlExpirationTime(minUrlExpirationTimestamp)) { @@ -361,7 +373,7 @@ case class DeltaSharingSource( val newUrl = newIdToUrl.getOrElse( indexedFile.add.id, throw new IllegalStateException(s"cannot find url for id ${indexedFile.add.id} " + - s"when refreshing table ${deltaLog.path}") + s"when refreshing table ${deltaLog.path}," + getTableInfoForLogging) ) indexedFile.add.copy(url = newUrl) }, @@ -372,7 +384,7 @@ case class DeltaSharingSource( val newUrl = newIdToUrl.getOrElse( indexedFile.remove.id, throw new IllegalStateException(s"cannot find url for id ${indexedFile.remove.id} " + - s"when refreshing table ${deltaLog.path}") + s"when refreshing table ${deltaLog.path}," + getTableInfoForLogging) ) indexedFile.remove.copy(url = newUrl) }, @@ -383,7 +395,7 @@ case class DeltaSharingSource( val newUrl = newIdToUrl.getOrElse( indexedFile.cdc.id, throw new IllegalStateException(s"cannot find url for id ${indexedFile.cdc.id} " + - s"when refreshing table ${deltaLog.path}") + s"when refreshing table ${deltaLog.path}," + getTableInfoForLogging) ) indexedFile.cdc.copy(url = newUrl) }, @@ -392,7 +404,7 @@ case class DeltaSharingSource( ) } logInfo(s"Refreshed ${numUrlsRefreshed} urls in sortedFetchedFiles(size: " + - s"${sortedFetchedFiles.size}).") + s"${sortedFetchedFiles.size})," + getTableInfoForLogging) } } @@ -418,8 +430,8 @@ case class DeltaSharingSource( isStartingVersion: Boolean, endingVersionForQuery: Long): Unit = { logInfo(s"Fetching files with fromVersion($fromVersion), fromIndex($fromIndex), " + - s"isStartingVersion($isStartingVersion), endingVersionForQuery($endingVersionForQuery), " + - s"for table(id:$tableId, name:${deltaLog.table.toString})." + s"isStartingVersion($isStartingVersion), endingVersionForQuery($endingVersionForQuery)," + + getTableInfoForLogging ) resetGlobalTimestamp() if (isStartingVersion) { @@ -466,7 +478,7 @@ case class DeltaSharingSource( val numFiles = tableFiles.files.size logInfo( s"Fetched ${numFiles} files for table version ${tableFiles.version} from" + - " delta sharing server." + s" delta sharing server," + getTableInfoForLogging + getQueryIdForLogging ) tableFiles.files.sortWith(fileActionCompareFunc).zipWithIndex.foreach { case (file, index) if (index > fromIndex) => @@ -520,11 +532,13 @@ case class DeltaSharingSource( TableRefreshResult(idToUrl, minUrlExpiration, None) } - val allAddFiles = validateCommitAndFilterAddFiles(tableFiles).groupBy(a => a.version) + val filteredAddFiles = validateCommitAndFilterAddFiles(tableFiles) + val allAddFiles = filteredAddFiles.groupBy(a => a.version) logInfo( - s"Fetched and filtered ${allAddFiles.size} files from startingVersion " + + s"Fetched ${tableFiles.addFiles.size} files, filtered ${filteredAddFiles.size} " + + s"files in ${allAddFiles.size} versions from startingVersion " + s"${fromVersion} to endingVersion ${endingVersionForQuery} from " + - "delta sharing server." + s"delta sharing server," + getTableInfoForLogging + getQueryIdForLogging ) for (v <- fromVersion to endingVersionForQuery) { val vAddFiles = allAddFiles.getOrElse(v, ArrayBuffer[AddFileForCDF]()) @@ -564,8 +578,7 @@ case class DeltaSharingSource( fromIndex: Long, endingVersionForQuery: Long): Unit = { logInfo(s"Fetching CDF files with fromVersion($fromVersion), fromIndex($fromIndex), " + - s"endingVersionForQuery($endingVersionForQuery), " + - s"for table(id:$tableId, name:${deltaLog.table.toString}).") + s"endingVersionForQuery($endingVersionForQuery)," + getTableInfoForLogging) resetGlobalTimestamp() val tableFiles = deltaLog.client.getCDFFiles( deltaLog.table, @@ -834,7 +847,7 @@ case class DeltaSharingSource( case cdf: AddCDCFile => cdfFiles.append(cdf) case add: AddFileForCDF => addFiles.append(add) case remove: RemoveFile => removeFiles.append(remove) - case f => throw new IllegalStateException(s"Unexpected File:${f}") + case f => throw new IllegalStateException(s"Unexpected File:${f},$getTableInfoForLogging") } } @@ -1010,8 +1023,8 @@ case class DeltaSharingSource( } override def getBatch(startOffsetOption: Option[Offset], end: Offset): DataFrame = { - logInfo(s"getBatch with startOffsetOption($startOffsetOption) and end($end), " + - s"for table(id:$tableId, name:${deltaLog.table.toString})") + logInfo(s"getBatch with startOffsetOption($startOffsetOption) and end($end)," + + getTableInfoForLogging) val endOffset = DeltaSharingSourceOffset(tableId, end) val (startVersion, startIndex, isStartingVersion, startSourceVersion) = if ( @@ -1037,8 +1050,8 @@ case class DeltaSharingSource( } else { val startOffset = DeltaSharingSourceOffset(tableId, startOffsetOption.get) if (startOffset == endOffset) { - logInfo(s"startOffset($startOffset) is the same as endOffset($endOffset) in getBatch, " + - s"for table(id:$tableId, name:${deltaLog.table.toString})") + logInfo(s"startOffset($startOffset) is the same as endOffset($endOffset) in getBatch," + + getTableInfoForLogging) previousOffset = endOffset // This happens only if we recover from a failure and `MicroBatchExecution` tries to call // us with the previous offsets. The returned DataFrame will be dropped immediately, so we @@ -1138,7 +1151,7 @@ case class DeltaSharingSource( } else if (options.startingTimestamp.isDefined) { val version = deltaLog.client.getTableVersion(deltaLog.table, options.startingTimestamp) logInfo(s"Got table version $version for timestamp ${options.startingTimestamp} " + - s"from Delta Sharing Server.") + s"from Delta Sharing Server," + getTableInfoForLogging) Some(version) } else { None diff --git a/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala b/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala index abdcaaad6..6d58afb4b 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala @@ -164,6 +164,7 @@ private[sharing] object RemoteDeltaLog { val numRetries = ConfUtils.numRetries(sqlConf) val maxRetryDurationMillis = ConfUtils.maxRetryDurationMillis(sqlConf) val timeoutInSeconds = ConfUtils.timeoutInSeconds(sqlConf) + val endStreamActionEnabled = ConfUtils.includeEndStreamAction(sqlConf) val clientClass = sqlConf.getConfString("spark.delta.sharing.client.class", @@ -172,13 +173,19 @@ private[sharing] object RemoteDeltaLog { val client: DeltaSharingClient = Class.forName(clientClass) .getConstructor(classOf[DeltaSharingProfileProvider], - classOf[Int], classOf[Int], classOf[Long], classOf[Boolean], classOf[Boolean]) - .newInstance(profileProvider, + classOf[Int], + classOf[Int], + classOf[Long], + classOf[Boolean], + classOf[Boolean], + classOf[Boolean] + ).newInstance(profileProvider, java.lang.Integer.valueOf(timeoutInSeconds), java.lang.Integer.valueOf(numRetries), java.lang.Long.valueOf(maxRetryDurationMillis), java.lang.Boolean.valueOf(sslTrustAll), - java.lang.Boolean.valueOf(forStreaming)) + java.lang.Boolean.valueOf(forStreaming), + java.lang.Boolean.valueOf(endStreamActionEnabled)) .asInstanceOf[DeltaSharingClient] new RemoteDeltaLog(deltaSharingTable, new Path(path + getFormattedTimestampWithUUID), client) } diff --git a/spark/src/main/scala/io/delta/sharing/spark/util/ConfUtils.scala b/spark/src/main/scala/io/delta/sharing/spark/util/ConfUtils.scala index ecda5ef3e..0a65fd1f3 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/util/ConfUtils.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/util/ConfUtils.scala @@ -30,6 +30,9 @@ object ConfUtils { val MAX_RETRY_DURATION_CONF = "spark.delta.sharing.network.maxRetryDuration" val MAX_RETRY_DURATION_DEFAULT_MILLIS = 10L * 60L* 1000L /* 10 mins */ + val INCLUDE_END_STREAM_ACTION_CONF = "spark.delta.sharing.query.includeEndStreamAction" + val INCLUDE_END_STREAM_ACTION_DEFAULT = "false" + val TIMEOUT_CONF = "spark.delta.sharing.network.timeout" val TIMEOUT_DEFAULT = "320s" @@ -65,6 +68,14 @@ object ConfUtils { maxDur } + def includeEndStreamAction(conf: Configuration): Boolean = { + conf.getBoolean(INCLUDE_END_STREAM_ACTION_CONF, INCLUDE_END_STREAM_ACTION_DEFAULT.toBoolean) + } + + def includeEndStreamAction(conf: SQLConf): Boolean = { + conf.getConfString(INCLUDE_END_STREAM_ACTION_CONF, INCLUDE_END_STREAM_ACTION_DEFAULT).toBoolean + } + def timeoutInSeconds(conf: Configuration): Int = { val timeoutStr = conf.get(TIMEOUT_CONF, TIMEOUT_DEFAULT) toTimeInSeconds(timeoutStr, TIMEOUT_CONF) diff --git a/spark/src/main/scala/io/delta/sharing/spark/util/RetryUtils.scala b/spark/src/main/scala/io/delta/sharing/spark/util/RetryUtils.scala index c311f5d37..bba453dc9 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/util/RetryUtils.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/util/RetryUtils.scala @@ -22,6 +22,8 @@ import scala.util.control.NonFatal import org.apache.spark.internal.Logging +import io.delta.sharing.spark.MissingEndStreamActionException + private[sharing] object RetryUtils extends Logging { // Expose it for testing @@ -70,6 +72,7 @@ private[sharing] object RetryUtils extends Logging { } else { false } + case _: MissingEndStreamActionException => true case _: java.net.SocketTimeoutException => true // do not retry on ConnectionClosedException because it can be caused by invalid json returned // from the delta sharing server. diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala index 7de2659d2..9ead0db0c 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala @@ -19,7 +19,7 @@ package io.delta.sharing.spark import java.sql.Timestamp import org.apache.http.HttpHeaders -import org.apache.http.client.methods.HttpGet +import org.apache.http.client.methods.{HttpGet, HttpRequestBase} import io.delta.sharing.spark.model.{ AddCDCFile, @@ -37,21 +37,61 @@ import io.delta.sharing.spark.util.UnexpectedHttpStatus // scalastyle:off maxLineLength class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { + import DeltaSharingRestClient._ + integrationTest("Check headers") { - val httpRequest = new HttpGet("random_url") - - val client = new DeltaSharingRestClient(testProfileProvider, forStreaming = false) - var h = client.prepareHeaders(httpRequest).getFirstHeader(HttpHeaders.USER_AGENT).getValue - assert(!h.contains(DeltaSharingRestClient.SPARK_STRUCTURED_STREAMING)) - assert(h.contains("Delta-Sharing-Spark")) - assert(h.contains(" QueryId-")) - assert(h.contains(" Hadoop/")) - assert(h.contains(" Linux/")) - assert(h.contains(" java/")) - - val streamingClient = new DeltaSharingRestClient(testProfileProvider, forStreaming = true) - h = streamingClient.prepareHeaders(httpRequest).getFirstHeader(HttpHeaders.USER_AGENT).getValue - assert(h.contains(DeltaSharingRestClient.SPARK_STRUCTURED_STREAMING)) + def checkUserAgent(request: HttpRequestBase, containsStreaming: Boolean): Unit = { + val h = request.getFirstHeader(HttpHeaders.USER_AGENT).getValue + if (containsStreaming) { + assert(h.contains(SPARK_STRUCTURED_STREAMING) == containsStreaming) + } else { + assert(!h.contains(SPARK_STRUCTURED_STREAMING)) + assert(h.contains("Delta-Sharing-Spark")) + } + + assert(h.contains(" QueryId-")) + assert(h.contains(" Hadoop/")) + assert(h.contains(" Linux/")) + assert(h.contains(" java/")) + } + + def getEndStreamActionHeader(endStreamActionEnabled: Boolean): String = { + if (endStreamActionEnabled) { + s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true" + } else { + "" + } + } + + def checkDeltaSharingCapabilities( + request: HttpRequestBase, + endStreamActionEnabled: Boolean): Unit = { + if (endStreamActionEnabled) { + val expected = getEndStreamActionHeader(endStreamActionEnabled) + val h = request.getFirstHeader(DELTA_SHARING_CAPABILITIES_HEADER) + assert( + h.getValue == expected, s"actual header:$h, endStreamActionEnabled:$endStreamActionEnabled" + ) + } else { + assert(!request.containsHeader(DELTA_SHARING_CAPABILITIES_HEADER)) + } + } + + Seq( + (true, true), + (true, false), + (false, true), + (false, false) + ).foreach { case (forStreaming, endStreamActionEnabled) => + val httpRequest = new HttpGet("random_url") + val request = new DeltaSharingRestClient( + testProfileProvider, + forStreaming = forStreaming, + endStreamActionEnabled = endStreamActionEnabled + ).prepareHeaders(httpRequest, endStreamActionEnabled) + checkUserAgent(request, forStreaming) + checkDeltaSharingCapabilities(request, endStreamActionEnabled) + } } integrationTest("listAllTables") { @@ -770,7 +810,8 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { false ) }.getMessage - assert(errorMessage.contains("""400 Bad Request {"errorCode":"RESOURCE_DOES_NOT_EXIST"""")) + assert(errorMessage.contains("""400 Bad Request for query""")) + assert(errorMessage.contains("""{"errorCode":"RESOURCE_DOES_NOT_EXIST"""")) assert(errorMessage.contains("table files missing")) } finally { client.close() diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala index 0d6b09238..53aaa8bc7 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala @@ -447,7 +447,8 @@ class DeltaSharingSuite extends QueryTest with SharedSparkSession with DeltaShar .option("startingVersion", 0).load(tablePath) checkAnswer(df, Nil) } - assert (ex.getMessage.contains("""400 Bad Request {"errorCode":"RESOURCE_DOES_NOT_EXIST"""")) + assert(ex.getMessage.contains("""400 Bad Request""")) + assert(ex.getMessage.contains("""{"errorCode":"RESOURCE_DOES_NOT_EXIST"""")) } integrationTest("azure support") { diff --git a/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala b/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala index 30ce21950..367eea599 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala @@ -36,7 +36,8 @@ class TestDeltaSharingClient( numRetries: Int = 10, maxRetryDuration: Long = Long.MaxValue, sslTrustAll: Boolean = false, - includeHistoricalMetadata: Boolean = false) extends DeltaSharingClient { + includeHistoricalMetadata: Boolean = false, + endStreamActionEnabled: Boolean = false) extends DeltaSharingClient { private val metadataString = """{"metaData":{"id":"93351cf1-c931-4326-88f0-d10e29e71b21","format": diff --git a/spark/src/test/scala/io/delta/sharing/spark/util/ConfUtilsSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/util/ConfUtilsSuite.scala index 531c77174..70cfd1fa0 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/util/ConfUtilsSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/util/ConfUtilsSuite.scala @@ -83,6 +83,24 @@ class ConfUtilsSuite extends SparkFunSuite { }.getMessage.contains(TIMEOUT_CONF) } + test("includeEndStreamAction") { + assert(includeEndStreamAction(newConf()) == false) + assert(includeEndStreamAction(newConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "true"))) == true) + assert(includeEndStreamAction(newConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "false"))) == false) + assert(includeEndStreamAction(newConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "rdm"))) == false) + + assert(includeEndStreamAction(newSqlConf()) == false) + assert( + includeEndStreamAction(newSqlConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "true"))) == true + ) + assert( + includeEndStreamAction(newSqlConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "false"))) == false + ) + intercept[IllegalArgumentException] { + includeEndStreamAction(newSqlConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "random"))) + }.getMessage.contains(INCLUDE_END_STREAM_ACTION_CONF) + } + test("maxConnections") { assert(maxConnections(newConf()) == MAX_CONNECTION_DEFAULT) assert(maxConnections(newConf(Map(MAX_CONNECTION_CONF -> "100"))) == 100) diff --git a/spark/src/test/scala/io/delta/sharing/spark/util/RetryUtilsSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/util/RetryUtilsSuite.scala index 185e895e6..8a37db1df 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/util/RetryUtilsSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/util/RetryUtilsSuite.scala @@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkFunSuite +import io.delta.sharing.spark.MissingEndStreamActionException + class RetryUtilsSuite extends SparkFunSuite { import RetryUtils._ @@ -34,6 +36,7 @@ class RetryUtilsSuite extends SparkFunSuite { assert(shouldRetry(new IOException)) assert(shouldRetry(new java.net.SocketTimeoutException)) assert(!shouldRetry(new RuntimeException)) + assert(shouldRetry(new MissingEndStreamActionException("missing"))) } test("runWithExponentialBackoff") {