Skip to content

Commit

Permalink
fix: Close streams fetched from data-providers (#18552)
Browse files Browse the repository at this point in the history
Fixes DataCommunicator API by handling the streams fetched from data-providers inside try-with-resources blocks.
Unclosed streams might still leak from the DataView API, that should be fixed in each component using that API.

Fixes #18551
  • Loading branch information
heruan authored and vaadin-bot committed Jan 30, 2024
1 parent 9e20637 commit 1eb9abb
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,10 @@ public T getItem(int index) {
* because the backend can have the item on that index (we simply
* not yet fetched this item during the scrolling).
*/
return (T) getDataProvider().fetch(buildQuery(index, 1)).findFirst()
.orElse(null);
try (Stream<T> stream = getDataProvider()
.fetch(buildQuery(index, 1))) {
return stream.findFirst().orElse(null);
}
}
}

Expand Down Expand Up @@ -1010,18 +1012,20 @@ protected Stream<T> fetchFromProvider(int offset, int limit) {
int page = 0;
do {
final int newOffset = offset + page * pageSize;
Stream<T> dataProviderStream = doFetchFromDataProvider(
newOffset, pageSize);
// Stream.Builder is not thread safe, so for parallel stream
// we need to first collect items before adding them
if (dataProviderStream.isParallel()) {
getLogger().debug(
"Data provider {} has returned parallel stream on 'fetch' call",
getDataProvider().getClass());
dataProviderStream.collect(Collectors.toList())
.forEach(addItemAndCheckConsumer);
} else {
dataProviderStream.forEach(addItemAndCheckConsumer);
try (Stream<T> dataProviderStream = doFetchFromDataProvider(
newOffset, pageSize)) {
// Stream.Builder is not thread safe, so for parallel
// stream we need to first collect items before adding
// them
if (dataProviderStream.isParallel()) {
getLogger().debug(
"Data provider {} has returned parallel stream on 'fetch' call",
getDataProvider().getClass());
dataProviderStream.collect(Collectors.toList())
.forEach(addItemAndCheckConsumer);
} else {
dataProviderStream.forEach(addItemAndCheckConsumer);
}
}
page++;
} while (page < pages
Expand All @@ -1040,8 +1044,10 @@ protected Stream<T> fetchFromProvider(int offset, int limit) {
getLogger().debug(
"Data provider {} has returned parallel stream on 'fetch' call",
getDataProvider().getClass());
stream = stream.collect(Collectors.toList()).stream();
assert !stream.isParallel();
try (Stream<T> parallelStream = stream) {
stream = parallelStream.collect(Collectors.toList()).stream();
assert !stream.isParallel();
}
}

SizeVerifier verifier = new SizeVerifier<>(limit);
Expand Down Expand Up @@ -1476,17 +1482,20 @@ private Activation activate(Range range) {

// XXX Explicitly refresh anything that is updated
List<String> activeKeys = new ArrayList<>(range.length());
fetchFromProvider(range.getStart(), range.length()).forEach(bean -> {
boolean mapperHasKey = keyMapper.has(bean);
String key = keyMapper.key(bean);
if (mapperHasKey) {
// Ensure latest instance from provider is used
keyMapper.refresh(bean);
passivatedByUpdate.values().stream()
.forEach(set -> set.remove(key));
}
activeKeys.add(key);
});
try (Stream<T> stream = fetchFromProvider(range.getStart(),
range.length())) {
stream.forEach(bean -> {
boolean mapperHasKey = keyMapper.has(bean);
String key = keyMapper.key(bean);
if (mapperHasKey) {
// Ensure latest instance from provider is used
keyMapper.refresh(bean);
passivatedByUpdate.values().stream()
.forEach(set -> set.remove(key));
}
activeKeys.add(key);
});
}
boolean needsSizeRecheck = activeKeys.size() < range.length();
return new Activation(activeKeys, needsSizeRecheck);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@ public interface DataView<T> extends Serializable {
/**
* Get the full data available to the component. Data is filtered and sorted
* the same way as in the component.
* <p>
* Consumers of the returned stream are responsible for closing it when all
* the stream operations are done to ensure that any resources feeding the
* stream are properly released. Failure to close the stream might lead to
* resource leaks.
* <p>
* It is strongly recommended to use a try-with-resources block to
* automatically close the stream after its terminal operation has been
* executed. Below is an example of how to properly use and close the
* stream:
*
* <pre>{@code
* try (Stream<T> stream = dataView.getItems()) {
* stream.forEach(System.out::println); // Example terminal operation
* }
* }</pre>
*
* @return filtered and sorted data set
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,18 @@ private List<String> activate(Range range) {
// XXX Explicitly refresh anything that is updated
List<String> activeKeys = new ArrayList<>(range.length());

fetchItems.apply(parentKey, range).forEach(bean -> {
boolean mapperHasKey = keyMapper.has(bean);
String key = keyMapper.key(bean);
if (mapperHasKey) {
// Ensure latest instance from provider is used
keyMapper.refresh(bean);
passivatedByUpdate.values().forEach(set -> set.remove(key));
}
activeKeys.add(key);
});
try (Stream<T> stream = fetchItems.apply(parentKey, range)) {
stream.forEach(bean -> {
boolean mapperHasKey = keyMapper.has(bean);
String key = keyMapper.key(bean);
if (mapperHasKey) {
// Ensure latest instance from provider is used
keyMapper.refresh(bean);
passivatedByUpdate.values().forEach(set -> set.remove(key));
}
activeKeys.add(key);
});
}
return activeKeys;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,9 @@ private Stream<T> getFlatChildrenStream(T parent) {
private Stream<T> getFlatChildrenStream(T parent, boolean includeParent) {
List<T> childList = Collections.emptyList();
if (isExpanded(parent)) {
childList = doFetchDirectChildren(parent)
.collect(Collectors.toList());
try (Stream<T> stream = doFetchDirectChildren(parent)) {
childList = stream.collect(Collectors.toList());
}
if (childList.isEmpty()) {
removeChildren(parent == null ? null
: getDataProvider().getId(parent));
Expand Down Expand Up @@ -563,8 +564,9 @@ private Stream<T> getChildrenStream(T parent, Range range,
boolean includeParent) {
List<T> childList = Collections.emptyList();
if (isExpanded(parent)) {
childList = doFetchDirectChildren(parent, range)
.collect(Collectors.toList());
try (Stream<T> stream = doFetchDirectChildren(parent, range)) {
childList = stream.collect(Collectors.toList());
}
if (childList.isEmpty()) {
removeChildren(parent == null ? null
: getDataProvider().getId(parent));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,18 @@ public void getItem_withUndefinedSizeAndSorting() {
dataCommunicator.getItem(2));
}

@Test
public void getItem_streamIsClosed() {
AtomicBoolean streamIsClosed = new AtomicBoolean();
dataCommunicator.setDataProvider(createDataProvider(streamIsClosed),
null);

fakeClientCommunication();
dataCommunicator.getItem(0);

Assert.assertTrue(streamIsClosed.get());
}

@Test
public void itemCountEstimateAndStep_defaults() {
Assert.assertEquals(dataCommunicator.getItemCountEstimate(),
Expand Down Expand Up @@ -1353,6 +1365,18 @@ public void fetchFromProvider_itemCountLessThanTwoPages_correctItemsReturned() {

}

@Test
public void fetchFromProvider_streamIsClosed() {
AtomicBoolean streamIsClosed = new AtomicBoolean();
dataCommunicator.setDataProvider(createDataProvider(streamIsClosed),
null);
dataCommunicator.setRequestedRange(0, 50);

fakeClientCommunication();

Assert.assertTrue(streamIsClosed.get());
}

@Test
public void fetchEnabled_getItemCount_stillReturnsItemsCount() {
dataCommunicator.setFetchEnabled(false);
Expand Down Expand Up @@ -1737,6 +1761,11 @@ public Stream<Item> fetch(Query<Item, Object> query) {
}

private AbstractDataProvider<Item, Object> createDataProvider() {
return createDataProvider(new AtomicBoolean());
}

private AbstractDataProvider<Item, Object> createDataProvider(
AtomicBoolean streamIsClosed) {
return new AbstractDataProvider<Item, Object>() {
@Override
public boolean isInMemory() {
Expand All @@ -1752,7 +1781,8 @@ public int size(Query<Item, Object> query) {
public Stream<Item> fetch(Query<Item, Object> query) {
return asParallelIfRequired(IntStream.range(query.getOffset(),
query.getLimit() + query.getOffset()))
.mapToObj(Item::new);
.mapToObj(Item::new)
.onClose(() -> streamIsClosed.set(true));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -221,6 +223,33 @@ public void uniqueKeyProviderIsNotSet_keysGeneratedByKeyMapper() {
communicator.getKeyMapper().get(i)));
}

@Test
public void expandRoot_streamIsClosed() {
AtomicBoolean streamIsClosed = new AtomicBoolean();

dataProvider = new TreeDataProvider<>(treeData) {

@Override
public Stream<Item> fetchChildren(
HierarchicalQuery<Item, SerializablePredicate<Item>> query) {
return super.fetchChildren(query)
.onClose(() -> streamIsClosed.set(true));
}
};

communicator.setDataProvider(dataProvider, null);

communicator.expand(ROOT);
fakeClientCommunication();

communicator.setParentRequestedRange(0, 50, ROOT);
fakeClientCommunication();

communicator.reset();

Assert.assertTrue(streamIsClosed.get());
}

@Test
public void expandRoot_filterOutAllChildren_clearCalled() {
parentClearCalled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -288,6 +289,42 @@ public void getExpandedItems_tryToAddItemsToCollection_shouldThrowException() {
expandedItems.add(new TreeNode("third-1"));
}

@Test
public void fetchHierarchyItems_streamIsClosed() {
AtomicBoolean streamIsClosed = new AtomicBoolean();
mapper = new HierarchyMapper<>(new TreeDataProvider<>(data) {
@Override
public Stream<Node> fetchChildren(
HierarchicalQuery<Node, SerializablePredicate<Node>> query) {
return super.fetchChildren(query)
.onClose(() -> streamIsClosed.set(true));
}
});
Node rootNode = testData.get(0);
mapper.expand(rootNode);
mapper.fetchHierarchyItems(rootNode, Range.between(0, 10)).count();

Assert.assertTrue(streamIsClosed.get());
}

@Test
public void fetchChildItems_streamIsClosed() {
AtomicBoolean streamIsClosed = new AtomicBoolean();
mapper = new HierarchyMapper<>(new TreeDataProvider<>(data) {
@Override
public Stream<Node> fetchChildren(
HierarchicalQuery<Node, SerializablePredicate<Node>> query) {
return super.fetchChildren(query)
.onClose(() -> streamIsClosed.set(true));
}
});
Node rootNode = testData.get(0);
mapper.expand(rootNode);
mapper.fetchChildItems(rootNode, Range.between(0, 10));

Assert.assertTrue(streamIsClosed.get());
}

private void expand(Node node) {
insertRows(mapper.expand(node, mapper.getIndexOf(node).orElse(null)));
}
Expand Down

0 comments on commit 1eb9abb

Please sign in to comment.