Skip to content

Commit

Permalink
Backport includeEndStreamAction in DeltasharingService to branch-0.7 (#…
Browse files Browse the repository at this point in the history
…595)

* Backport endstreamaction to branc-0.7

* fix cdf

* Client side changes

* Backport additional logging to 0.7

* Set default of includeEndStreamAction to false

* Set default to false in test

* fix ConfUtilsSuite

* fix lint
  • Loading branch information
linzhou-db authored Oct 13, 2024
1 parent 6ee58e7 commit cc6b1a7
Show file tree
Hide file tree
Showing 14 changed files with 556 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,16 @@ class DeltaSharingService(serverConfig: ServerConfig) {
startingVersion = None,
endingVersion = None,
includeRefreshToken = false,
refreshToken = None
refreshToken = None,
includeEndStreamAction = false
)
streamingOutput(Some(v), actions)
}

@Post("/shares/{share}/schemas/{schema}/tables/{table}/query")
@ConsumesJson
def listFiles(
req: HttpRequest,
@Param("share") share: String,
@Param("schema") schema: String,
@Param("table") table: String,
Expand Down Expand Up @@ -343,6 +345,10 @@ class DeltaSharingService(serverConfig: ServerConfig) {
)
}
}
val capabilitiesMap = getDeltaSharingCapabilitiesMap(
req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER)
)
val includeEndStreamAction = getRequestEndStreamAction(capabilitiesMap)
val (version, actions) = deltaSharedTableLoader.loadTable(tableConfig).query(
includeFiles = true,
request.predicateHints,
Expand All @@ -353,7 +359,8 @@ class DeltaSharingService(serverConfig: ServerConfig) {
request.startingVersion,
request.endingVersion,
request.includeRefreshToken.getOrElse(false),
request.refreshToken
request.refreshToken,
includeEndStreamAction = includeEndStreamAction
)
if (version < tableConfig.startVersion) {
throw new DeltaSharingIllegalArgumentException(
Expand All @@ -362,12 +369,13 @@ class DeltaSharingService(serverConfig: ServerConfig) {
}
logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table " +
s"and sign ${actions.length - 2} urls for table $share/$schema/$table")
streamingOutput(Some(version), actions)
streamingOutput(Some(version), actions, includeEndStreamAction)
}

@Get("/shares/{share}/schemas/{schema}/tables/{table}/changes")
@ConsumesJson
def listCdfFiles(
req: HttpRequest,
@Param("share") share: String,
@Param("schema") schema: String,
@Param("table") table: String,
Expand All @@ -384,28 +392,43 @@ class DeltaSharingService(serverConfig: ServerConfig) {
s"$share.$schema.$table")
}

val capabilitiesMap = getDeltaSharingCapabilitiesMap(
req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER)
)
val includeEndStreamAction = getRequestEndStreamAction(capabilitiesMap)
val (v, actions) = deltaSharedTableLoader.loadTable(tableConfig).queryCDF(
getCdfOptionsMap(
Option(startingVersion),
Option(endingVersion),
Option(startingTimestamp),
Option(endingTimestamp)
),
includeHistoricalMetadata = Try(includeHistoricalMetadata.toBoolean).getOrElse(false)
includeHistoricalMetadata = Try(includeHistoricalMetadata.toBoolean).getOrElse(false),
includeEndStreamAction = includeEndStreamAction
)
logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table cdf " +
s"and sign ${actions.length - 2} urls for table $share/$schema/$table")
streamingOutput(Some(v), actions)
streamingOutput(Some(v), actions, includeEndStreamAction)
}

private def streamingOutput(version: Option[Long], actions: Seq[SingleAction]): HttpResponse = {
private def streamingOutput(
version: Option[Long],
actions: Seq[SingleAction],
includeEndStreamAction: Boolean = false): HttpResponse = {
val capabilities = if (includeEndStreamAction) {
s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"
} else {
""
}
val headers = if (version.isDefined) {
createHeadersBuilderForTableVersion(version.get)
.set(HttpHeaderNames.CONTENT_TYPE, DELTA_TABLE_METADATA_CONTENT_TYPE)
.set(DELTA_SHARING_CAPABILITIES_HEADER, capabilities)
.build()
} else {
ResponseHeaders.builder(200)
.set(HttpHeaderNames.CONTENT_TYPE, DELTA_TABLE_METADATA_CONTENT_TYPE)
.set(DELTA_SHARING_CAPABILITIES_HEADER, capabilities)
.build()
}
ResponseConversionUtil.streamingFrom(
Expand All @@ -420,12 +443,27 @@ class DeltaSharingService(serverConfig: ServerConfig) {
},
ServiceRequestContext.current().blockingTaskExecutor())
}

private def getDeltaSharingCapabilitiesMap(headerString: String): Map[String, String] = {
if (headerString == null) {
return Map.empty[String, String]
}
headerString.toLowerCase().split(DELTA_SHARING_CAPABILITIES_DELIMITER)
.map(_.split("="))
.filter(_.size == 2)
.map { splits =>
(splits(0), splits(1))
}.toMap
}
}


object DeltaSharingService {
val DELTA_TABLE_VERSION_HEADER = "Delta-Table-Version"
val DELTA_TABLE_METADATA_CONTENT_TYPE = "application/x-ndjson; charset=utf-8"
val DELTA_SHARING_CAPABILITIES_HEADER = "delta-sharing-capabilities"
val DELTA_SHARING_INCLUDE_END_STREAM_ACTION = "includeendstreamaction"
val DELTA_SHARING_CAPABILITIES_DELIMITER = ";"

val SPARK_STRUCTURED_STREAMING = "SparkStructuredStreaming"

Expand Down Expand Up @@ -547,6 +585,11 @@ object DeltaSharingService {
endingTimestamp.map(DeltaDataSource.CDF_END_TIMESTAMP_KEY -> _)).toMap
}

private[server] def getRequestEndStreamAction(
headerCapabilities: Map[String, String]): Boolean = {
headerCapabilities.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION).exists(_.toBoolean)
}

def main(args: Array[String]): Unit = {
val ns = parser.parseArgsOrFail(args)
val serverConfigPath = ns.getString("config")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ class DeltaSharedTable(
snapshot.version
}

// scalastyle:off argcount
def query(
includeFiles: Boolean,
predicateHints: Seq[String],
Expand All @@ -195,7 +196,8 @@ class DeltaSharedTable(
startingVersion: Option[Long],
endingVersion: Option[Long],
includeRefreshToken: Boolean,
refreshToken: Option[String]
refreshToken: Option[String],
includeEndStreamAction: Boolean
): (Long, Seq[model.SingleAction]) = withClassLoader {
// TODO Support `limitHint`
if (Seq(version, timestamp, startingVersion).filter(_.isDefined).size >= 2) {
Expand Down Expand Up @@ -245,7 +247,7 @@ class DeltaSharedTable(
if (startingVersion.isDefined) {
// Only read changes up to snapshot.version, and ignore changes that are committed during
// queryDataChangeSinceStartVersion.
queryDataChangeSinceStartVersion(startingVersion.get, endingVersion)
queryDataChangeSinceStartVersion(startingVersion.get, endingVersion, includeEndStreamAction)
} else if (includeFiles) {
val ts = if (isVersionQuery) {
val timestampsByVersion = DeltaSharingHistoryManager.getTimestampsByVersion(
Expand Down Expand Up @@ -305,6 +307,8 @@ class DeltaSharedTable(
)
)
Seq(model.EndStreamAction(refreshTokenStr).wrap)
} else if (includeEndStreamAction) {
Seq(model.EndStreamAction(null).wrap)
} else {
Nil
}
Expand All @@ -319,7 +323,8 @@ class DeltaSharedTable(

private def queryDataChangeSinceStartVersion(
startingVersion: Long,
endingVersion: Option[Long]
endingVersion: Option[Long],
includeEndStreamAction: Boolean
): Seq[model.SingleAction] = {
var latestVersion = tableVersion
if (startingVersion > latestVersion) {
Expand Down Expand Up @@ -388,12 +393,16 @@ class DeltaSharedTable(
case _ => ()
}
}
if (includeEndStreamAction) {
actions.append(model.EndStreamAction(null).wrap)
}
actions.toSeq
}

def queryCDF(
cdfOptions: Map[String, String],
includeHistoricalMetadata: Boolean = false
includeHistoricalMetadata: Boolean = false,
includeEndStreamAction: Boolean = false
): (Long, Seq[model.SingleAction]) = withClassLoader {
val actions = ListBuffer[model.SingleAction]()

Expand Down Expand Up @@ -495,6 +504,9 @@ class DeltaSharedTable(
actions.append(modelRemoveFile.wrap)
}
}
if (includeEndStreamAction) {
actions.append(model.EndStreamAction(null).wrap)
}
start -> actions.toSeq
}

Expand Down
Loading

0 comments on commit cc6b1a7

Please sign in to comment.