Skip to content

Commit

Permalink
rename includeEndStreamAction to endStreamActionEnabled
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Oct 9, 2024
1 parent e60043a commit 09cde9d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class DeltaSharingRestClient(
readerFeatures: String = "",
queryTablePaginationEnabled: Boolean = false,
maxFilesPerReq: Int = 100000,
includeEndStreamAction: Boolean = true,
endStreamActionEnabled: Boolean = true,
enableAsyncQuery: Boolean = false,
asyncQueryPollIntervalMillis: Long = 10000L,
asyncQueryMaxDuration: Long = 600000L,
Expand All @@ -197,7 +197,7 @@ class DeltaSharingRestClient(
tokenRenewalThresholdInSeconds: Int = 600
) extends DeltaSharingClient with Logging {

logInfo(s"DeltaSharingRestClient with includeEndStreamAction: $includeEndStreamAction, " +
logInfo(s"DeltaSharingRestClient with endStreamActionEnabled: $endStreamActionEnabled, " +
s"enableAsyncQuery:$enableAsyncQuery")

import DeltaSharingRestClient._
Expand Down Expand Up @@ -501,7 +501,7 @@ class DeltaSharingRestClient(
(version, respondedFormat, lines)
} else {
val response = getNDJsonPost(
target = target, data = request, setIncludeEndStreamAction = true
target = target, data = request, setIncludeEndStreamAction = endStreamActionEnabled
)
val (filteredLines, _) = maybeExtractEndStreamAction(response.lines)
(response.version, response.respondedFormat, filteredLines)
Expand Down Expand Up @@ -557,7 +557,7 @@ class DeltaSharingRestClient(
getNDJsonWithAsync(table, targetUrl, request)
} else {
val response = getNDJsonPost(
target = targetUrl, data = request, setIncludeEndStreamAction = true
target = targetUrl, data = request, setIncludeEndStreamAction = endStreamActionEnabled
)
(response.version, response.respondedFormat, response.lines, None)
}
Expand Down Expand Up @@ -657,7 +657,9 @@ class DeltaSharingRestClient(
)
getCDFFilesByPage(target)
} else {
val response = getNDJson(target, requireVersion = false, setIncludeEndStreamAction = true)
val response = getNDJson(
target, requireVersion = false, setIncludeEndStreamAction = endStreamActionEnabled
)
val (filteredLines, _) = maybeExtractEndStreamAction(response.lines)
(response.version, response.respondedFormat, filteredLines)
}
Expand Down Expand Up @@ -712,7 +714,9 @@ class DeltaSharingRestClient(

// Fetch first page
var updatedUrl = s"$targetUrl&maxFiles=$maxFilesPerReq"
val response = getNDJson(updatedUrl, requireVersion = false, setIncludeEndStreamAction = true)
val response = getNDJson(
updatedUrl, requireVersion = false, setIncludeEndStreamAction = endStreamActionEnabled
)
var (filteredLines, endStreamAction) = maybeExtractEndStreamAction(response.lines)
if (endStreamAction.isEmpty) {
logWarning(
Expand Down Expand Up @@ -741,7 +745,7 @@ class DeltaSharingRestClient(
expectedProtocol = protocol,
expectedMetadata = metadata,
pageNumber = numPages,
setIncludeEndStreamAction = true
setIncludeEndStreamAction = endStreamActionEnabled
)
allLines.appendAll(res._1)
endStreamAction = res._2
Expand Down Expand Up @@ -876,7 +880,7 @@ class DeltaSharingRestClient(
lines,
capabilities = capabilities
)
if (includeEndStreamAction && setIncludeEndStreamAction) {
if (setIncludeEndStreamAction) {
checkEndStreamAction(response)
}
response
Expand Down Expand Up @@ -1008,14 +1012,14 @@ class DeltaSharingRestClient(
lines,
capabilities = capabilities
)
if (includeEndStreamAction && setIncludeEndStreamAction) {
if (setIncludeEndStreamAction) {
checkEndStreamAction(response)
}
response
}

private def checkEndStreamAction(response: ParsedDeltaSharingResponse): Unit = {
// Only perform additional check when includeEndStreamAction = true
// Only perform additional check when endStreamActionEnabled = true
response.includeEndStreamActionHeader match {
case Some(true) =>
val lastLineAction = JsonUtils.fromJson[SingleAction](response.lines.last)
Expand Down Expand Up @@ -1233,7 +1237,7 @@ class DeltaSharingRestClient(
capabilities = capabilities :+ s"$DELTA_SHARING_CAPABILITIES_ASYNC_READ=true"
}

if (includeEndStreamAction && setIncludeEndStreamAction) {
if (setIncludeEndStreamAction) {
capabilities = capabilities :+ s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"
}

Expand Down Expand Up @@ -1349,7 +1353,7 @@ object DeltaSharingRestClient extends Logging {
val queryTablePaginationEnabled = ConfUtils.queryTablePaginationEnabled(sqlConf)
val maxFilesPerReq = ConfUtils.maxFilesPerQueryRequest(sqlConf)
val useAsyncQuery = ConfUtils.useAsyncQuery(sqlConf)
val includeEndStreamAction = ConfUtils.includeEndStreamAction(sqlConf)
val endStreamActionEnabled = ConfUtils.includeEndStreamAction(sqlConf)
val asyncQueryMaxDurationMillis = ConfUtils.asyncQueryTimeout(sqlConf)
val asyncQueryPollDurationMillis = ConfUtils.asyncQueryPollIntervalMillis(sqlConf)

Expand Down Expand Up @@ -1388,7 +1392,7 @@ object DeltaSharingRestClient extends Logging {
readerFeatures,
java.lang.Boolean.valueOf(queryTablePaginationEnabled),
java.lang.Integer.valueOf(maxFilesPerReq),
java.lang.Boolean.valueOf(includeEndStreamAction),
java.lang.Boolean.valueOf(endStreamActionEnabled),
java.lang.Boolean.valueOf(useAsyncQuery),
java.lang.Long.valueOf(asyncQueryPollDurationMillis),
java.lang.Long.valueOf(asyncQueryMaxDurationMillis),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
assert(h.contains(" java/"))
}

def getEndStreamActionHeader(includeEndStreamAction: Boolean): String = {
if (includeEndStreamAction) {
def getEndStreamActionHeader(endStreamActionEnabled: Boolean): String = {
if (endStreamActionEnabled) {
s";$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"
} else {
""
Expand All @@ -96,9 +96,9 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
request: HttpRequestBase,
responseFormat: String,
readerFeatures: String,
includeEndStreamAction: Boolean): Unit = {
endStreamActionEnabled: Boolean): Unit = {
val expected = s"${RESPONSE_FORMAT}=$responseFormat$readerFeatures" +
getEndStreamActionHeader(includeEndStreamAction)
getEndStreamActionHeader(endStreamActionEnabled)
val h = request.getFirstHeader(DELTA_SHARING_CAPABILITIES_HEADER)
assert(h.getValue == expected)
}
Expand All @@ -108,39 +108,39 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
(true, false),
(false, true),
(false, false)
).foreach { case (forStreaming, includeEndStreamAction) =>
).foreach { case (forStreaming, endStreamActionEnabled) =>
var client = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = forStreaming,
includeEndStreamAction = includeEndStreamAction,
endStreamActionEnabled = endStreamActionEnabled,
readerFeatures = "willBeIgnored")
.prepareHeaders(httpRequest, setIncludeEndStreamAction = includeEndStreamAction)
.prepareHeaders(httpRequest, setIncludeEndStreamAction = endStreamActionEnabled)
checkUserAgent(client, forStreaming)
checkDeltaSharingCapabilities(client, "parquet", "", includeEndStreamAction)
checkDeltaSharingCapabilities(client, "parquet", "", endStreamActionEnabled)

val readerFeatures = "deletionVectors,columnMapping,timestampNTZ"
client = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = forStreaming,
includeEndStreamAction = includeEndStreamAction,
endStreamActionEnabled = endStreamActionEnabled,
responseFormat = RESPONSE_FORMAT_DELTA,
readerFeatures = readerFeatures)
.prepareHeaders(httpRequest, setIncludeEndStreamAction = includeEndStreamAction)
.prepareHeaders(httpRequest, setIncludeEndStreamAction = endStreamActionEnabled)
checkUserAgent(client, forStreaming)
checkDeltaSharingCapabilities(
client, "delta", s";$READER_FEATURES=$readerFeatures", includeEndStreamAction
client, "delta", s";$READER_FEATURES=$readerFeatures", endStreamActionEnabled
)

client = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = forStreaming,
includeEndStreamAction = includeEndStreamAction,
endStreamActionEnabled = endStreamActionEnabled,
responseFormat = s"$RESPONSE_FORMAT_DELTA,$RESPONSE_FORMAT_PARQUET",
readerFeatures = readerFeatures)
.prepareHeaders(httpRequest, setIncludeEndStreamAction = includeEndStreamAction)
.prepareHeaders(httpRequest, setIncludeEndStreamAction = endStreamActionEnabled)
checkUserAgent(client, forStreaming)
checkDeltaSharingCapabilities(
client, s"delta,parquet", s";$READER_FEATURES=$readerFeatures", includeEndStreamAction
client, s"delta,parquet", s";$READER_FEATURES=$readerFeatures", endStreamActionEnabled
)
}
}
Expand Down

0 comments on commit 09cde9d

Please sign in to comment.