Skip to content

Commit

Permalink
change behaviour of SPARQLProtocolWorker for streaming request bodies
Browse files Browse the repository at this point in the history
  • Loading branch information
nck-mlcnv committed Sep 21, 2023
1 parent abe29cc commit a302807
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ public String value() {

public record QueryStringWrapper(int index, String query) {}
public record QueryStreamWrapper(int index, InputStream queryInputStream) {}
public record QueryStreamSupplierWrapper(int index, Supplier<InputStream> queryStreamSupplier) {}


protected final Logger LOGGER = LoggerFactory.getLogger(QueryHandler.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class SPARQLProtocolWorker extends HttpWorker {
Expand Down Expand Up @@ -68,6 +69,21 @@ public HttpRequest buildHttpRequest(InputStream queryStream,
ConnectionConfig connection,
String requestHeader) throws URISyntaxException, IOException {
HttpRequest.Builder request = HttpRequest.newBuilder().timeout(timeout);

class CustomStreamSupplier {
boolean used = false; // assume, that the stream will only be used again, if the first request failed, because of the client
public Supplier<InputStream> getStreamSupplier() {
if (!used) {
used = true;
return () -> queryStream;
}
else
return () -> null;
}
}

final var streamSupplier = new CustomStreamSupplier();

if (requestHeader != null)
request.header("Accept", requestHeader);
if (connection.user() != null)
Expand All @@ -93,7 +109,7 @@ public HttpRequest buildHttpRequest(InputStream queryStream,
case POST_QUERY -> {
request.uri(connection.endpoint())
.header("Content-Type", "application/sparql-query")
.POST(HttpRequest.BodyPublishers.ofByteArray(queryStream.readAllBytes())); // InputStream BodyPublisher won't work
.POST(HttpRequest.BodyPublishers.ofInputStream(streamSupplier.getStreamSupplier()));
}
case POST_URL_ENC_UPDATE -> {
request.uri(connection.endpoint())
Expand All @@ -106,7 +122,7 @@ public HttpRequest buildHttpRequest(InputStream queryStream,
case POST_UPDATE -> {
request.uri(connection.endpoint())
.header("Content-Type", "application/sparql-update")
.POST(HttpRequest.BodyPublishers.ofByteArray(queryStream.readAllBytes()));
.POST(HttpRequest.BodyPublishers.ofInputStream(streamSupplier.getStreamSupplier()));
}
}
return request.build();
Expand Down Expand Up @@ -264,12 +280,27 @@ private ExecutionStats executeQuery(Duration timeout, boolean discardOnFailure)


private HttpExecutionResult executeHttpRequest(Duration timeout) {
final var queryHandle = config().queries().getNextQueryStreamSupplier();
final QueryHandler.QueryStreamWrapper queryHandle;
try {
queryHandle = config().queries().getNextQueryStream();
} catch (IOException e) {
return new HttpExecutionResult(
config().queries().getCurrentQueryID(),
Optional.empty(),
Instant.now(),
Duration.ZERO,
Optional.empty(),
OptionalLong.empty(),
OptionalLong.empty(),
Optional.of(e)
);
}

final HttpRequest request;

try {
request = requestFactory.buildHttpRequest(
queryHandle.queryStreamSupplier(),
queryHandle.queryInputStream(),
timeout,
config().connection(),
config().acceptHeader()
Expand Down

0 comments on commit a302807

Please sign in to comment.