Skip to content

Commit

Permalink
improve logging and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Oct 5, 2024
1 parent 8a1d012 commit af870a6
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ class DeltaSharingRestClient(
var (filteredLines, endStreamAction) = maybeExtractEndStreamAction(lines)
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
s"EndStreamAction is not returned in the paginated response" + getDsQueryIdForLogging
)
}

Expand Down Expand Up @@ -608,7 +608,7 @@ class DeltaSharingRestClient(
endStreamAction = res._2
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
s"EndStreamAction is not returned in the paginated response" + getDsQueryIdForLogging
)
}
// Throw an error if the first page is expiring before we get all pages
Expand Down Expand Up @@ -704,7 +704,7 @@ class DeltaSharingRestClient(
var (filteredLines, endStreamAction) = maybeExtractEndStreamAction(response.lines)
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
s"EndStreamAction is not returned in the paginated response" + getDsQueryIdForLogging
)
}
val protocol = filteredLines(0)
Expand Down Expand Up @@ -734,7 +734,7 @@ class DeltaSharingRestClient(
endStreamAction = res._2
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
s"EndStreamAction is not returned in the paginated response" + getDsQueryIdForLogging
)
}
// Throw an error if the first page is expiring before we get all pages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,37 +84,62 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
assert(h.contains(" java/"))
}

def checkDeltaSharingCapabilities(request: HttpRequestBase, expected: String): Unit = {
def getEndStreamActionHeader(includeEndStreamAction: Boolean): String = {
if (includeEndStreamAction) {
s";$DELTA_SHARING_END_STREAM_ACTION=true"
} else {
""
}
}

def checkDeltaSharingCapabilities(
request: HttpRequestBase,
responseFormat: String,
readerFeatures: String,
includeEndStreamAction: Boolean): Unit = {
val expected = s"${RESPONSE_FORMAT}=$responseFormat$readerFeatures" +
getEndStreamActionHeader(includeEndStreamAction)
val h = request.getFirstHeader(DELTA_SHARING_CAPABILITIES_HEADER)
assert(h.getValue == expected)
}

var httpRequestBase = new DeltaSharingRestClient(
testProfileProvider, forStreaming = false, readerFeatures = "willBeIgnored").prepareHeaders(httpRequest)
checkUserAgent(httpRequestBase, false)
checkDeltaSharingCapabilities(httpRequestBase, s"${RESPONSE_FORMAT}=parquet;$DELTA_SHARING_END_STREAM_ACTION=true")
Seq(
(true, true),
(true, false),
(false, true),
(false, false),
).foreach { case (forStreaming, includeEndStreamAction) =>
var client = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = forStreaming,
includeEndStreamAction = includeEndStreamAction,
readerFeatures = "willBeIgnored").prepareHeaders(httpRequest)
checkUserAgent(client, forStreaming)
checkDeltaSharingCapabilities(client, "parquet", "", includeEndStreamAction)

val readerFeatures = "deletionVectors,columnMapping,timestampNTZ"
httpRequestBase = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = true,
responseFormat = RESPONSE_FORMAT_DELTA,
readerFeatures = readerFeatures).prepareHeaders(httpRequest)
checkUserAgent(httpRequestBase, true)
checkDeltaSharingCapabilities(
httpRequestBase, s"$RESPONSE_FORMAT=delta;$READER_FEATURES=$readerFeatures;$DELTA_SHARING_END_STREAM_ACTION=true"
)
val readerFeatures = "deletionVectors,columnMapping,timestampNTZ"
client = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = forStreaming,
includeEndStreamAction = includeEndStreamAction,
responseFormat = RESPONSE_FORMAT_DELTA,
readerFeatures = readerFeatures).prepareHeaders(httpRequest)
checkUserAgent(client, forStreaming)
checkDeltaSharingCapabilities(
client, "delta", s";$READER_FEATURES=$readerFeatures", includeEndStreamAction
)

httpRequestBase = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = true,
responseFormat = s"$RESPONSE_FORMAT_DELTA,$RESPONSE_FORMAT_PARQUET",
readerFeatures = readerFeatures,
includeEndStreamAction = false).prepareHeaders(httpRequest)
checkUserAgent(httpRequestBase, true)
checkDeltaSharingCapabilities(
httpRequestBase, s"responseformat=delta,parquet;readerfeatures=$readerFeatures"
)
client = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = forStreaming,
includeEndStreamAction = includeEndStreamAction,
responseFormat = s"$RESPONSE_FORMAT_DELTA,$RESPONSE_FORMAT_PARQUET",
readerFeatures = readerFeatures).prepareHeaders(httpRequest)
checkUserAgent(client, forStreaming)
checkDeltaSharingCapabilities(
client, s"delta,parquet", s";$READER_FEATURES=$readerFeatures", includeEndStreamAction
)
}
}

integrationTest("listAllTables") {
Expand Down

0 comments on commit af870a6

Please sign in to comment.