Skip to content

Commit

Permalink
Release memory where possible in RestElasticSearchClient
Browse files Browse the repository at this point in the history
Closes JanusGraph#4684

Signed-off-by: Allan Clements <[email protected]>
  • Loading branch information
criminosis committed Sep 27, 2024
1 parent 4a576f6 commit 75d6f51
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.entity.ByteArrayEntity;
Expand Down Expand Up @@ -50,17 +48,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -426,28 +423,41 @@ class RequestBytes {
@VisibleForTesting
int getSerializedSize() {
int serializedSize = this.requestBytes.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
serializedSize += 1; //For follow-up NEW_LINE_BYTES
if (this.requestSource != null) {
serializedSize += this.requestSource.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
serializedSize += 1; //For follow-up NEW_LINE_BYTES
}
return serializedSize;
}

private void writeTo(OutputStream outputStream) throws IOException {
outputStream.write(this.requestBytes);
outputStream.write(NEW_LINE_BYTES);
private int writeTo(byte[] target, int initialOffset) {
int offset = initialOffset;
System.arraycopy(this.requestBytes, 0, target, offset, this.requestBytes.length);
offset += this.requestBytes.length;
System.arraycopy(NEW_LINE_BYTES, 0, target, offset, NEW_LINE_BYTES.length);
offset += NEW_LINE_BYTES.length;
if (this.requestSource != null) {
outputStream.write(requestSource);
outputStream.write(NEW_LINE_BYTES);
System.arraycopy(this.requestSource, 0, target, offset, this.requestSource.length);
offset += this.requestSource.length;
System.arraycopy(NEW_LINE_BYTES, 0, target, offset, NEW_LINE_BYTES.length);
offset += NEW_LINE_BYTES.length;
}
return offset;
}
}

private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) {
int totalBytes = requests.stream().mapToInt(RequestBytes::getSerializedSize).sum();
//By making a singular array we copy into avoids any dynamically expanded growth of the array that may overshoot
//how much memory we actually need, additionally it also avoids a final copy at the end normally done by
//ByteArrayOutputStream's toByteArray()
byte[] bytes = new byte[totalBytes];
int offset = 0;
for (final RequestBytes request : requests) {
request.writeTo(outputStream);
//We can't remove the element from the collection like we do elsewhere, because we need to retain the
//serialized form in case of an error so the error can be paired to the originating request based on index
offset = request.writeTo(bytes, offset);
}

final StringBuilder bulkRequestQueryParameters = new StringBuilder();
Expand All @@ -458,7 +468,7 @@ private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests,
APPEND_OP.apply(bulkRequestQueryParameters).append("refresh=").append(bulkRefresh);
}
final String bulkRequestPath = REQUEST_SEPARATOR + "_bulk" + bulkRequestQueryParameters;
return Pair.with(bulkRequestPath, outputStream.toByteArray());
return Pair.with(bulkRequestPath, bytes);
}

private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMutation(
Expand Down Expand Up @@ -490,14 +500,20 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> {
//There is no "correct" number of actions to perform in a single bulk request. Experiment with different
// settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum
// size of a HTTP request to 100mb by default
private final PeekingIterator<RequestBytes> requestIterator;
private final ListIterator<RequestBytes> requestIterator;
private final int[] exceptionallyLargeRequests;
private RequestBytes peeked;

@VisibleForTesting
BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
List<RequestBytes> serializedRequests = new ArrayList<>(requests.size());
List<Integer> requestSizesThatWereTooLarge = new ArrayList<>();
for (ElasticSearchMutation request : requests) {
ListIterator<ElasticSearchMutation> requestsIter = requests.listIterator();
while (requestsIter.hasNext()) {
ElasticSearchMutation request = requestsIter.next();
//Remove the element from the collection so the collection's reference to it doesn't hold it from being
//GC'ed after it has been converted to its serialized form
requestsIter.set(null);
RequestBytes requestBytes = new RequestBytes(request);
int requestSerializedSize = requestBytes.getSerializedSize();
if (requestSerializedSize <= bulkChunkSerializedLimitBytes) {
Expand All @@ -507,7 +523,7 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> {
requestSizesThatWereTooLarge.add(requestSerializedSize);
}
}
this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator());
this.requestIterator = serializedRequests.listIterator();
//Condense request sizes that are too large into an int array to remove Boxed & List memory overhead
this.exceptionallyLargeRequests = requestSizesThatWereTooLarge.isEmpty() ? null :
requestSizesThatWereTooLarge.stream().mapToInt(Integer::intValue).toArray();
Expand All @@ -517,20 +533,31 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> {
public boolean hasNext() {
//Make sure hasNext() still returns true if exceptionally large requests were attempted to be submitted
//This allows next() to throw after all well sized requests have been chunked for submission
return requestIterator.hasNext() || exceptionallyLargeRequests != null;
return peeked != null || requestIterator.hasNext() || exceptionallyLargeRequests != null;
}

@Override
public List<RequestBytes> next() {
List<RequestBytes> serializedRequests = new ArrayList<>();
int chunkSerializedTotal = 0;
while (requestIterator.hasNext()) {
RequestBytes peeked = requestIterator.peek();
//If we peeked at something but stopped on it, then add it to this list
if (peeked != null) {
chunkSerializedTotal += peeked.getSerializedSize();
serializedRequests.add(peeked);
peeked = null;
}

while (requestIterator.hasNext()) {
RequestBytes next = requestIterator.next();
//Remove the element from the collection, so the iterator doesn't prevent it from being GC'ed
//due to the reference to it in the collection
requestIterator.set(null);
chunkSerializedTotal += next.getSerializedSize();
if (chunkSerializedTotal <= bulkChunkSerializedLimitBytes) {
serializedRequests.add(requestIterator.next());
serializedRequests.add(next);
} else {
//Adding this element would exceed the limit, so return the chunk
this.peeked = next;
return serializedRequests;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -152,10 +154,11 @@ public void testThrowingIfSingleBulkItemIsLargerThanLimit() throws IOException {
//This payload is too large to send given the set limit, since it is a single item we can't split it
IntStream.range(0, bulkLimit * 10).forEach(value -> payloadBuilder.append("a"));
Assertions.assertThrows(IllegalArgumentException.class, () -> restClientUnderTest.bulkRequest(
Collections.singletonList(
ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id",
Collections.singletonMap("someKey", payloadBuilder.toString()))
), null), "Should have thrown due to bulk request item being too large");
Stream.of(
ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id",
Collections.singletonMap("someKey", payloadBuilder.toString())))
.collect(Collectors.toList()),
null), "Should have thrown due to bulk request item being too large");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ public void testRetryOnConfiguredErrorStatus() throws IOException {
when(restClientMock.performRequest(any()))
.thenThrow(responseException)
.thenThrow(expectedFinalException);
restClientUnderTest.bulkRequest(Collections.singletonList(
ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")),
restClientUnderTest.bulkRequest(
Stream.of(
ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id"))
.collect(Collectors.toList()),
null);
Assertions.fail("Should have thrown the expected exception after retry");
} catch (Exception actualException) {
Expand Down Expand Up @@ -173,8 +175,10 @@ public void testRetriesExhaustedReturnsLastRetryException() throws IOException {
.thenThrow(responseException);


restClientUnderTest.bulkRequest(Collections.singletonList(
ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")),
restClientUnderTest.bulkRequest(
Stream.of(
ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id"))
.collect(Collectors.toList()),
null);
Assertions.fail("Should have thrown the expected exception after retry");
} catch (Exception e) {
Expand All @@ -194,8 +198,9 @@ public void testNonRetryErrorCodeException() throws IOException {
//prime the restClientMock again after it's reset after creation
when(restClientMock.performRequest(any()))
.thenThrow(responseException);
restClientUnderTest.bulkRequest(Collections.singletonList(
ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")),
restClientUnderTest.bulkRequest(
Stream.of(ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")).
collect(Collectors.toList()),
null);
Assertions.fail("Should have thrown the expected exception");
} catch (Exception e) {
Expand Down

1 comment on commit 75d6f51

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 75d6f51 Previous: 4a576f6 Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 12918.027465784486 ms/op 12387.503702656188 ms/op 1.04
org.janusgraph.GraphCentricQueryBenchmark.getVertices 926.1012468984785 ms/op 881.5787764770677 ms/op 1.05
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 216.44049560072463 ms/op 216.02639356231884 ms/op 1.00
org.janusgraph.MgmtOlapJobBenchmark.runReindex 329.7595239017857 ms/op 319.4162031316667 ms/op 1.03
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 265.5710353387699 ms/op 214.43612039488983 ms/op 1.24
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 5441.060422495235 ms/op 4620.418355055224 ms/op 1.18
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 16907.915232946663 ms/op 16143.572659017893 ms/op 1.05
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 21079.679674174447 ms/op 20253.546099167273 ms/op 1.04
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 57564.07564556667 ms/op 56628.47083503333 ms/op 1.02
org.janusgraph.CQLMultiQueryDropBenchmark.dropVertices 1657.8034654630944 ms/op 1521.104838248883 ms/op 1.09
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 8373.540031478788 ms/op 7631.26260980391 ms/op 1.10
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 397.25358385631375 ms/op 376.440884295281 ms/op 1.06
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 4513.230792941702 ms/op 4082.0649584542043 ms/op 1.11
org.janusgraph.CQLMultiQueryBenchmark.getNames 8495.168730747038 ms/op 7858.0254477321005 ms/op 1.08
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 5766.51883461338 ms/op 5530.8370018318055 ms/op 1.04
org.janusgraph.CQLMultiQueryBenchmark.getLabels 7269.195997930431 ms/op 6651.821376948318 ms/op 1.09
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 439.0443792180341 ms/op 413.53179240439476 ms/op 1.06
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 13322.397292615356 ms/op 12753.082837076472 ms/op 1.04
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 368.86824918366545 ms/op 355.9563125681389 ms/op 1.04
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 17048.064459043526 ms/op 14119.051614935463 ms/op 1.21
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 256.9368886404652 ms/op 250.88565330056014 ms/op 1.02
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 13954.238491693777 ms/op 14488.354575075491 ms/op 0.96
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 8462.619389556665 ms/op 7873.668866501788 ms/op 1.07
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 9316.801316835756 ms/op 8690.493274357672 ms/op 1.07
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 8562.246337052797 ms/op 8127.869559427535 ms/op 1.05

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.