Skip to content

Commit

Permalink
use capabilitiesMap instead of capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Oct 9, 2024
1 parent d5ef283 commit 5ae145c
Showing 1 changed file with 20 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,14 @@ case class ParsedDeltaSharingTablePath(
* @param version the table version of the shared table.
* @param respondedFormat the sharing format (parquet or delta), used to parse the lines.
* @param lines all lines in the response.
* @param capabilities value of delta-sharing-capabilities in the response header
* @param capabilitiesMap Map parsed from the value of delta-sharing-capabilities in the
* response header
*/
case class ParsedDeltaSharingResponse(
version: Long,
respondedFormat: String,
lines: Seq[String],
capabilities: Option[String])
capabilitiesMap: Map[String, String])

private[sharing] trait PaginationResponse {
def nextPageToken: Option[String]
Expand Down Expand Up @@ -858,7 +859,7 @@ class DeltaSharingRestClient(
target: String,
requireVersion: Boolean,
setIncludeEndStreamAction: Boolean): ParsedDeltaSharingResponse = {
val (version, capabilities, lines) = getResponse(
val (version, capabilitiesMap, lines) = getResponse(
new HttpGet(target), setIncludeEndStreamAction = setIncludeEndStreamAction
)

Expand All @@ -871,9 +872,9 @@ class DeltaSharingRestClient(
0L
}
},
respondedFormat = getRespondedFormat(capabilities),
respondedFormat = getRespondedFormat(capabilitiesMap),
lines,
capabilities = capabilities
capabilitiesMap = capabilitiesMap
)
response
}
Expand Down Expand Up @@ -988,7 +989,7 @@ class DeltaSharingRestClient(
val json = JsonUtils.toJson(data)
httpPost.setHeader("Content-type", "application/json")
httpPost.setEntity(new StringEntity(json, UTF_8))
val (version, capabilities, lines) = getResponse(
val (version, capabilitiesMap, lines) = getResponse(
httpPost, setIncludeEndStreamAction = setIncludeEndStreamAction
)

Expand All @@ -998,15 +999,18 @@ class DeltaSharingRestClient(
"Cannot find Delta-Table-Version in the header" + getDsQueryIdForLogging
)
},
respondedFormat = getRespondedFormat(capabilities),
respondedFormat = getRespondedFormat(capabilitiesMap),
lines,
capabilities = capabilities
capabilitiesMap = capabilitiesMap
)
response
}

private def checkEndStreamAction(capabilities: Option[String], lines: Seq[String]): Unit = {
val includeEndStreamActionHeader = getRespondedIncludeEndStreamActionHeader(capabilities)
private def checkEndStreamAction(
capabilities: Option[String],
capabilitiesMap: Map[String, String],
lines: Seq[String]): Unit = {
val includeEndStreamActionHeader = getRespondedIncludeEndStreamActionHeader(capabilitiesMap)
includeEndStreamActionHeader match {
case Some(true) =>
val lastLineAction = JsonUtils.fromJson[SingleAction](lines.last)
Expand All @@ -1033,16 +1037,14 @@ class DeltaSharingRestClient(
}
}

private def getRespondedFormat(capabilities: Option[String]): String = {
val capabilitiesMap = parseDeltaSharingCapabilities(capabilities)
private def getRespondedFormat(capabilitiesMap: Map[String, String]): String = {
capabilitiesMap.get(RESPONSE_FORMAT).getOrElse(RESPONSE_FORMAT_PARQUET)
}

// includeEndStreamActionHeader indicates whether the last line is required to be an
// EndStreamAction, parsed from the response header.
private def getRespondedIncludeEndStreamActionHeader(
capabilities: Option[String]): Option[Boolean] = {
val capabilitiesMap = parseDeltaSharingCapabilities(capabilities)
capabilitiesMap: Map[String, String]): Option[Boolean] = {
capabilitiesMap.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION).map(_.toBoolean)
}

Expand Down Expand Up @@ -1124,7 +1126,7 @@ class DeltaSharingRestClient(
allowNoContent: Boolean = false,
fetchAsOneString: Boolean = false,
setIncludeEndStreamAction: Boolean = false
): (Option[Long], Option[String], Seq[String]) = {
): (Option[Long], Map[String, String], Seq[String]) = {
// Reset dsQueryId before calling RetryUtils, and before prepareHeaders.
dsQueryId = Some(UUID.randomUUID().toString().split('-').head)
RetryUtils.runWithExponentialBackoff(numRetries, maxRetryDuration) {
Expand Down Expand Up @@ -1187,14 +1189,15 @@ class DeltaSharingRestClient(
val capabilities = Option(
response.getFirstHeader(DELTA_SHARING_CAPABILITIES_HEADER)
).map(_.getValue)
val capabilitiesMap = parseDeltaSharingCapabilities(capabilities)
if (setIncludeEndStreamAction) {
checkEndStreamAction(capabilities, lines)
checkEndStreamAction(capabilities, capabilitiesMap, lines)
}
(
Option(
response.getFirstHeader(RESPONSE_TABLE_VERSION_HEADER_KEY)
).map(_.getValue.toLong),
capabilities,
capabilitiesMap,
lines
)
} finally {
Expand Down

0 comments on commit 5ae145c

Please sign in to comment.