Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport includeEndStreamAction in DeltasharingService to branch-0.7 #595

Merged
merged 8 commits into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,16 @@ class DeltaSharingService(serverConfig: ServerConfig) {
startingVersion = None,
endingVersion = None,
includeRefreshToken = false,
refreshToken = None
refreshToken = None,
includeEndStreamAction = false
)
streamingOutput(Some(v), actions)
}

@Post("/shares/{share}/schemas/{schema}/tables/{table}/query")
@ConsumesJson
def listFiles(
req: HttpRequest,
@Param("share") share: String,
@Param("schema") schema: String,
@Param("table") table: String,
Expand Down Expand Up @@ -343,6 +345,10 @@ class DeltaSharingService(serverConfig: ServerConfig) {
)
}
}
val capabilitiesMap = getDeltaSharingCapabilitiesMap(
req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER)
)
val includeEndStreamAction = getRequestEndStreamAction(capabilitiesMap)
val (version, actions) = deltaSharedTableLoader.loadTable(tableConfig).query(
includeFiles = true,
request.predicateHints,
Expand All @@ -353,7 +359,8 @@ class DeltaSharingService(serverConfig: ServerConfig) {
request.startingVersion,
request.endingVersion,
request.includeRefreshToken.getOrElse(false),
request.refreshToken
request.refreshToken,
includeEndStreamAction = includeEndStreamAction
)
if (version < tableConfig.startVersion) {
throw new DeltaSharingIllegalArgumentException(
Expand All @@ -362,12 +369,13 @@ class DeltaSharingService(serverConfig: ServerConfig) {
}
logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table " +
s"and sign ${actions.length - 2} urls for table $share/$schema/$table")
streamingOutput(Some(version), actions)
streamingOutput(Some(version), actions, includeEndStreamAction)
}

@Get("/shares/{share}/schemas/{schema}/tables/{table}/changes")
@ConsumesJson
def listCdfFiles(
req: HttpRequest,
@Param("share") share: String,
@Param("schema") schema: String,
@Param("table") table: String,
Expand All @@ -384,28 +392,43 @@ class DeltaSharingService(serverConfig: ServerConfig) {
s"$share.$schema.$table")
}

val capabilitiesMap = getDeltaSharingCapabilitiesMap(
req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER)
)
val includeEndStreamAction = getRequestEndStreamAction(capabilitiesMap)
val (v, actions) = deltaSharedTableLoader.loadTable(tableConfig).queryCDF(
getCdfOptionsMap(
Option(startingVersion),
Option(endingVersion),
Option(startingTimestamp),
Option(endingTimestamp)
),
includeHistoricalMetadata = Try(includeHistoricalMetadata.toBoolean).getOrElse(false)
includeHistoricalMetadata = Try(includeHistoricalMetadata.toBoolean).getOrElse(false),
includeEndStreamAction = includeEndStreamAction
)
logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table cdf " +
s"and sign ${actions.length - 2} urls for table $share/$schema/$table")
streamingOutput(Some(v), actions)
streamingOutput(Some(v), actions, includeEndStreamAction)
}

private def streamingOutput(version: Option[Long], actions: Seq[SingleAction]): HttpResponse = {
private def streamingOutput(
version: Option[Long],
actions: Seq[SingleAction],
includeEndStreamAction: Boolean = false): HttpResponse = {
val capabilities = if (includeEndStreamAction) {
s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"
} else {
""
}
val headers = if (version.isDefined) {
createHeadersBuilderForTableVersion(version.get)
.set(HttpHeaderNames.CONTENT_TYPE, DELTA_TABLE_METADATA_CONTENT_TYPE)
.set(DELTA_SHARING_CAPABILITIES_HEADER, capabilities)
.build()
} else {
ResponseHeaders.builder(200)
.set(HttpHeaderNames.CONTENT_TYPE, DELTA_TABLE_METADATA_CONTENT_TYPE)
.set(DELTA_SHARING_CAPABILITIES_HEADER, capabilities)
.build()
}
ResponseConversionUtil.streamingFrom(
Expand All @@ -420,12 +443,27 @@ class DeltaSharingService(serverConfig: ServerConfig) {
},
ServiceRequestContext.current().blockingTaskExecutor())
}

private def getDeltaSharingCapabilitiesMap(headerString: String): Map[String, String] = {
if (headerString == null) {
return Map.empty[String, String]
}
headerString.toLowerCase().split(DELTA_SHARING_CAPABILITIES_DELIMITER)
.map(_.split("="))
.filter(_.size == 2)
.map { splits =>
(splits(0), splits(1))
}.toMap
}
}


object DeltaSharingService {
val DELTA_TABLE_VERSION_HEADER = "Delta-Table-Version"
val DELTA_TABLE_METADATA_CONTENT_TYPE = "application/x-ndjson; charset=utf-8"
val DELTA_SHARING_CAPABILITIES_HEADER = "delta-sharing-capabilities"
val DELTA_SHARING_INCLUDE_END_STREAM_ACTION = "includeendstreamaction"
val DELTA_SHARING_CAPABILITIES_DELIMITER = ";"

val SPARK_STRUCTURED_STREAMING = "SparkStructuredStreaming"

Expand Down Expand Up @@ -547,6 +585,11 @@ object DeltaSharingService {
endingTimestamp.map(DeltaDataSource.CDF_END_TIMESTAMP_KEY -> _)).toMap
}

private[server] def getRequestEndStreamAction(
headerCapabilities: Map[String, String]): Boolean = {
headerCapabilities.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION).exists(_.toBoolean)
}

def main(args: Array[String]): Unit = {
val ns = parser.parseArgsOrFail(args)
val serverConfigPath = ns.getString("config")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ class DeltaSharedTable(
snapshot.version
}

// scalastyle:off argcount
def query(
includeFiles: Boolean,
predicateHints: Seq[String],
Expand All @@ -195,7 +196,8 @@ class DeltaSharedTable(
startingVersion: Option[Long],
endingVersion: Option[Long],
includeRefreshToken: Boolean,
refreshToken: Option[String]
refreshToken: Option[String],
includeEndStreamAction: Boolean
): (Long, Seq[model.SingleAction]) = withClassLoader {
// TODO Support `limitHint`
if (Seq(version, timestamp, startingVersion).filter(_.isDefined).size >= 2) {
Expand Down Expand Up @@ -245,7 +247,7 @@ class DeltaSharedTable(
if (startingVersion.isDefined) {
// Only read changes up to snapshot.version, and ignore changes that are committed during
// queryDataChangeSinceStartVersion.
queryDataChangeSinceStartVersion(startingVersion.get, endingVersion)
queryDataChangeSinceStartVersion(startingVersion.get, endingVersion, includeEndStreamAction)
} else if (includeFiles) {
val ts = if (isVersionQuery) {
val timestampsByVersion = DeltaSharingHistoryManager.getTimestampsByVersion(
Expand Down Expand Up @@ -305,6 +307,8 @@ class DeltaSharedTable(
)
)
Seq(model.EndStreamAction(refreshTokenStr).wrap)
} else if (includeEndStreamAction) {
Seq(model.EndStreamAction(null).wrap)
} else {
Nil
}
Expand All @@ -319,7 +323,8 @@ class DeltaSharedTable(

private def queryDataChangeSinceStartVersion(
startingVersion: Long,
endingVersion: Option[Long]
endingVersion: Option[Long],
includeEndStreamAction: Boolean
): Seq[model.SingleAction] = {
var latestVersion = tableVersion
if (startingVersion > latestVersion) {
Expand Down Expand Up @@ -388,12 +393,16 @@ class DeltaSharedTable(
case _ => ()
}
}
if (includeEndStreamAction) {
actions.append(model.EndStreamAction(null).wrap)
}
actions.toSeq
}

def queryCDF(
cdfOptions: Map[String, String],
includeHistoricalMetadata: Boolean = false
includeHistoricalMetadata: Boolean = false,
includeEndStreamAction: Boolean = false
): (Long, Seq[model.SingleAction]) = withClassLoader {
val actions = ListBuffer[model.SingleAction]()

Expand Down Expand Up @@ -495,6 +504,9 @@ class DeltaSharedTable(
actions.append(modelRemoveFile.wrap)
}
}
if (includeEndStreamAction) {
actions.append(model.EndStreamAction(null).wrap)
}
start -> actions.toSeq
}

Expand Down
Loading
Loading