Skip to content

Commit

Permalink
Add additional logging in DeltaSharingClient and DeltaSharingSource (#…
Browse files Browse the repository at this point in the history
…586)

* Add additional logging in DeltaSharingClient and DeltaSharingSource

* fix lint
  • Loading branch information
linzhou-db authored Oct 4, 2024
1 parent faf6912 commit 8a1d012
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ package io.delta.sharing.client
import java.io.{BufferedReader, InputStreamReader}
import java.net.{URL, URLEncoder}
import java.nio.charset.StandardCharsets.UTF_8
import java.sql.Timestamp
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter.{ISO_DATE, ISO_DATE_TIME}
import java.util.UUID

import scala.collection.mutable.{ArrayBuffer, ListBuffer}
Expand All @@ -46,6 +43,17 @@ import io.delta.sharing.client.util.{ConfUtils, JsonUtils, RetryUtils, Unexpecte

/** An interface to fetch Delta metadata from remote server. */
trait DeltaSharingClient {

protected var dsQueryId: Option[String] = None

def getQueryId: String = {
dsQueryId.getOrElse("dsQueryIdNotSet")
}

protected def getDsQueryIdForLogging: String = {
s" for query($dsQueryId)."
}

def listAllTables(): Seq[Table]

def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long
Expand Down Expand Up @@ -198,8 +206,6 @@ class DeltaSharingRestClient(
// Convert the responseFormat to a Seq to be used later.
private val responseFormatSet = responseFormat.split(",").toSet

private var dsQueryId: Option[String] = None

private lazy val client = {
val clientBuilder: HttpClientBuilder = if (sslTrustAll) {
val sslBuilder = new SSLContextBuilder()
Expand Down Expand Up @@ -299,7 +305,7 @@ class DeltaSharingRestClient(
val (version, _, _) = getResponse(new HttpGet(target), true, true)
version.getOrElse {
throw new IllegalStateException(s"Cannot find " +
s"${RESPONSE_TABLE_VERSION_HEADER_KEY} in the header")
s"${RESPONSE_TABLE_VERSION_HEADER_KEY} in the header," + getDsQueryIdForLogging)
}
}

Expand All @@ -311,10 +317,10 @@ class DeltaSharingRestClient(
private def checkRespondedFormat(respondedFormat: String, rpc: String, table: String): Unit = {
if (!responseFormatSet.contains(respondedFormat)) {
logError(s"RespondedFormat($respondedFormat) is different from requested " +
s"responseFormat($responseFormat) for $rpc for table $table.")
s"responseFormat($responseFormat) for $rpc for table $table," + getDsQueryIdForLogging)
throw new IllegalArgumentException("The responseFormat returned from the delta sharing " +
s"server doesn't match the requested responseFormat: respondedFormat($respondedFormat)" +
s" != requestedFormat($responseFormat).")
s" != requestedFormat($responseFormat)," + getDsQueryIdForLogging)
}
}

Expand All @@ -338,8 +344,8 @@ class DeltaSharingRestClient(
table = s"${table.share}.${table.schema}.${table.name}"
)
if (response.lines.size != 2) {
throw new IllegalStateException(s"received more than two lines:${response.lines.size}, " +
s"for query($dsQueryId).")
throw new IllegalStateException(s"received more than two lines:${response.lines.size}," +
getDsQueryIdForLogging)
}

if (response.respondedFormat == RESPONSE_FORMAT_DELTA) {
Expand All @@ -365,7 +371,8 @@ class DeltaSharingRestClient(
if (protocol.minReaderVersion > DeltaSharingProfile.CURRENT) {
throw new IllegalArgumentException(s"The table requires a newer version" +
s" ${protocol.minReaderVersion} to read. But the current release supports version " +
s"is ${DeltaSharingProfile.CURRENT} and below. Please upgrade to a newer release.")
s"is ${DeltaSharingProfile.CURRENT} and below. Please upgrade to a newer release." +
getDsQueryIdForLogging)
}
}

Expand Down Expand Up @@ -440,7 +447,7 @@ class DeltaSharingRestClient(
if (action.file != null) {
files.append(action.file)
} else {
throw new IllegalStateException(s"Unexpected Line:${line}")
throw new IllegalStateException(s"Unexpected Line:${line}" + getDsQueryIdForLogging)
}
}
DeltaTableFiles(
Expand Down Expand Up @@ -481,7 +488,8 @@ class DeltaSharingRestClient(
val (version, respondedFormat, lines) = if (queryTablePaginationEnabled) {
logInfo(
s"Making paginated queryTable from version $startingVersion requests for table " +
s"${table.share}.${table.schema}.${table.name} with maxFiles=$maxFilesPerReq"
s"${table.share}.${table.schema}.${table.name} with maxFiles=$maxFilesPerReq, " +
s"for query($dsQueryId)."
)
val (version, respondedFormat, lines, _) = getFilesByPage(table, target, request)
(version, respondedFormat, lines)
Expand Down Expand Up @@ -512,7 +520,8 @@ class DeltaSharingRestClient(
case a: AddFileForCDF => addFiles.append(a)
case r: RemoveFile => removeFiles.append(r)
case m: Metadata => additionalMetadatas.append(m)
case _ => throw new IllegalStateException(s"Unexpected Line:${line}")
case _ => throw new IllegalStateException(
s"Unexpected Line:${line}" + getDsQueryIdForLogging)
}
}
DeltaTableFiles(
Expand Down Expand Up @@ -546,7 +555,7 @@ class DeltaSharingRestClient(
var (filteredLines, endStreamAction) = maybeExtractEndStreamAction(lines)
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response for paginated query($dsQueryId)."
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
)
}

Expand Down Expand Up @@ -599,18 +608,20 @@ class DeltaSharingRestClient(
endStreamAction = res._2
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response for paginated query($dsQueryId)."
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
)
}
// Throw an error if the first page is expiring before we get all pages
if (minUrlExpirationTimestamp.exists(_ <= System.currentTimeMillis())) {
throw new IllegalStateException("Unable to fetch all pages before minimum url expiration.")
throw new IllegalStateException(
"Unable to fetch all pages before minimum url expiration." + getDsQueryIdForLogging
)
}
}

// TODO: remove logging once changes are rolled out
logInfo(s"Took ${System.currentTimeMillis() - start} ms to query $numPages pages " +
s"of ${allLines.size} files")
s"of ${allLines.size} files," + getDsQueryIdForLogging)
(version, respondedFormat, allLines.toSeq, refreshToken)
}

Expand All @@ -629,7 +640,8 @@ class DeltaSharingRestClient(
// TODO: remove logging once changes are rolled out
logInfo(
s"Making paginated queryTableChanges requests for table " +
s"${table.share}.${table.schema}.${table.name} with maxFiles=$maxFilesPerReq"
s"${table.share}.${table.schema}.${table.name} with maxFiles=$maxFilesPerReq, " +
s"for query($dsQueryId)."
)
getCDFFilesByPage(target)
} else {
Expand Down Expand Up @@ -663,7 +675,8 @@ class DeltaSharingRestClient(
case a: AddFileForCDF => addFiles.append(a)
case r: RemoveFile => removeFiles.append(r)
case m: Metadata => additionalMetadatas.append(m)
case _ => throw new IllegalStateException(s"Unexpected Line:${line}")
case _ => throw new IllegalStateException(
s"Unexpected Line:${line}," + getDsQueryIdForLogging)
}
}
DeltaTableFiles(
Expand Down Expand Up @@ -691,7 +704,7 @@ class DeltaSharingRestClient(
var (filteredLines, endStreamAction) = maybeExtractEndStreamAction(response.lines)
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response for paginated query($dsQueryId)."
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
)
}
val protocol = filteredLines(0)
Expand Down Expand Up @@ -721,19 +734,21 @@ class DeltaSharingRestClient(
endStreamAction = res._2
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response for paginated query($dsQueryId)."
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
)
}
// Throw an error if the first page is expiring before we get all pages
if (minUrlExpirationTimestamp.exists(_ <= System.currentTimeMillis())) {
throw new IllegalStateException("Unable to fetch all pages before minimum url expiration.")
throw new IllegalStateException(
"Unable to fetch all pages before minimum url expiration," + getDsQueryIdForLogging
)
}
}

// TODO: remove logging once changes are rolled out
logInfo(
s"Took ${System.currentTimeMillis() - start} ms to query $numPages pages " +
s"of ${allLines.size} files"
s"of ${allLines.size} files," + getDsQueryIdForLogging
)
(response.version, response.respondedFormat, allLines.toSeq)
}
Expand All @@ -756,7 +771,7 @@ class DeltaSharingRestClient(
getNDJson(targetUrl, requireVersion = false)
}
logInfo(s"Took ${System.currentTimeMillis() - start} to fetch ${pageNumber}th page " +
s"of ${response.lines.size} lines.")
s"of ${response.lines.size} lines," + getDsQueryIdForLogging)

// Validate that version/format/protocol/metadata in the response don't change across pages
if (response.version != expectedVersion ||
Expand All @@ -768,7 +783,7 @@ class DeltaSharingRestClient(
|Received inconsistent version/format/protocol/metadata across pages.
|Expected: version $expectedVersion, $expectedRespondedFormat,
|$expectedProtocol, $expectedMetadata. Actual: version ${response.version},
|${response.respondedFormat}, ${response.lines}""".stripMargin
|${response.respondedFormat}, ${response.lines},$getDsQueryIdForLogging""".stripMargin
logError(s"Error while fetching next page files at url $targetUrl " +
s"with body(${JsonUtils.toJson(requestBody.orNull)}: $errorMsg)")
throw new IllegalStateException(errorMsg)
Expand Down Expand Up @@ -826,7 +841,7 @@ class DeltaSharingRestClient(
version = version.getOrElse {
if (requireVersion) {
throw new IllegalStateException(s"Cannot find " +
s"${RESPONSE_TABLE_VERSION_HEADER_KEY} in the header")
s"${RESPONSE_TABLE_VERSION_HEADER_KEY} in the header" + getDsQueryIdForLogging)
} else {
0L
}
Expand Down Expand Up @@ -948,7 +963,9 @@ class DeltaSharingRestClient(

val response = ParsedDeltaSharingResponse(
version = version.getOrElse {
throw new IllegalStateException("Cannot find Delta-Table-Version in the header")
throw new IllegalStateException(
"Cannot find Delta-Table-Version in the header" + getDsQueryIdForLogging
)
},
respondedFormat = respondedFormat,
includedEndStreamAction = includedEndStreamAction,
Expand All @@ -970,20 +987,20 @@ class DeltaSharingRestClient(
s"${DELTA_SHARING_END_STREAM_ACTION}=true in the " +
s"header, server responded with the header set to true(${response.capabilities}, " +
s"and ${response.lines.size} lines, and last line parsed as " +
s"${lastLineAction.unwrap.getClass()}, for query($dsQueryId).")
s"${lastLineAction.unwrap.getClass()}," + getDsQueryIdForLogging)
}
logInfo(
s"Successfully verified endStreamAction in the response for query($dsQueryId)."
s"Successfully verified endStreamAction in the response" + getDsQueryIdForLogging
)
case Some(false) =>
logWarning(s"Client sets ${DELTA_SHARING_END_STREAM_ACTION}=true in the " +
s"header, but the server responded with the header set to false(" +
s"${response.capabilities}), for query($dsQueryId)."
s"${response.capabilities})," + getDsQueryIdForLogging
)
case None =>
logWarning(s"Client sets ${DELTA_SHARING_END_STREAM_ACTION}=true in the" +
s" header, but server didn't respond with the header(${response.capabilities}) " +
s"for query($dsQueryId)."
s" header, but server didn't respond with the header(${response.capabilities}), " +
s"for query($dsQueryId)."
)
}
}
Expand Down Expand Up @@ -1015,7 +1032,7 @@ class DeltaSharingRestClient(
val (_, _, response) = getResponse(new HttpGet(target), false, true)
if (response.size != 1) {
throw new IllegalStateException(
"Unexpected response for target: " + target + ", response=" + response
s"Unexpected response for target:$target, response=$response" + getDsQueryIdForLogging
)
}
JsonUtils.fromJson[R](response(0))
Expand Down Expand Up @@ -1103,7 +1120,8 @@ class DeltaSharingRestClient(
}
} catch {
case e: org.apache.http.ConnectionClosedException =>
val error = s"Request to delta sharing server failed due to ${e}."
val error = s"Request to delta sharing server failed for query($dsQueryId) " +
s"due to ${e}."
logError(error)
lineBuffer += error
lineBuffer.toList
Expand All @@ -1123,7 +1141,8 @@ class DeltaSharingRestClient(
// Only show the last 100 lines in the error to keep it contained.
val responseToShow = lines.drop(lines.size - 100).mkString("\n")
throw new UnexpectedHttpStatus(
s"HTTP request failed with status: $status $responseToShow. $additionalErrorInfo",
s"HTTP request failed with status: $status $responseToShow. $additionalErrorInfo"
+ getDsQueryIdForLogging,
statusCode)
}
(
Expand Down Expand Up @@ -1166,7 +1185,7 @@ class DeltaSharingRestClient(
}

if (enableAsyncQuery) {
capabilities = capabilities :+ s"$DELTA_SHARING_CAPABILITIES_ASYNC_READ=true"
capabilities = capabilities :+ s"$DELTA_SHARING_CAPABILITIES_ASYNC_READ=true"
}

if (includeEndStreamAction) {
Expand Down
Loading

0 comments on commit 8a1d012

Please sign in to comment.