diff --git a/src/main/java/apoc/index/IndexUpdateTransactionEventHandler.java b/src/main/java/apoc/index/IndexUpdateTransactionEventHandler.java index 14ccee93cd..047fb65c47 100644 --- a/src/main/java/apoc/index/IndexUpdateTransactionEventHandler.java +++ b/src/main/java/apoc/index/IndexUpdateTransactionEventHandler.java @@ -10,7 +10,6 @@ import org.neo4j.graphdb.event.TransactionEventHandler; import org.neo4j.graphdb.index.Index; import org.neo4j.graphdb.index.IndexManager; -import org.neo4j.helpers.collection.Iterables; import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.logging.Log; @@ -20,7 +19,10 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.stream.StreamSupport; +import java.util.stream.Stream; + +import static org.neo4j.helpers.collection.Iterables.stream; +import static org.neo4j.helpers.collection.Iterators.*; /** * a transaction event handler that updates manual indexes based on configuration in graph properties @@ -31,15 +33,13 @@ public class IndexUpdateTransactionEventHandler extends TransactionEventHandler.Adapter>> { private final GraphDatabaseService graphDatabaseService; - private final Log log; private final boolean async; private final BlockingQueue> indexCommandQueue = new LinkedBlockingQueue<>(100); private Map>>> indexesByLabelAndProperty; - public IndexUpdateTransactionEventHandler(GraphDatabaseAPI graphDatabaseService, Log log, boolean async) { + public IndexUpdateTransactionEventHandler(GraphDatabaseAPI graphDatabaseService, boolean async) { this.graphDatabaseService = graphDatabaseService; - this.log = log; this.async = async; Pools.SCHEDULED.scheduleAtFixedRate(() -> indexesByLabelAndProperty = initIndexConfiguration(),10,10, TimeUnit.SECONDS); } @@ -58,7 +58,7 @@ public Collection> beforeCommit(TransactionData data) throws Exce getIndexesByLabelAndProperty(); Collection> state = async ? new LinkedList<>() : null; - iterateNodePropertyChange(data.assignedNodeProperties(), (index, node, key, value, oldValue) -> indexUpdate(state, aVoid -> { + iterateNodePropertyChange(stream(data.assignedNodeProperties()),(index, node, key, value, oldValue) -> indexUpdate(state, aVoid -> { if (oldValue != null) { index.remove(node, key); index.remove(node, FreeTextSearch.KEY); @@ -67,17 +67,18 @@ public Collection> beforeCommit(TransactionData data) throws Exce index.add(node, FreeTextSearch.KEY, value); })); - iterateNodePropertyChange(data.removedNodeProperties(), (index, node, key, value, oldValue) -> indexUpdate(state, aVoid -> { + // filter out removedNodeProperties from node deletions + iterateNodePropertyChange(stream(data.removedNodeProperties()).filter(nodePropertyEntry -> !contains(data.deletedNodes().iterator(), nodePropertyEntry.entity())), (index, node, key, value, oldValue) -> indexUpdate(state, aVoid -> { index.remove(node, key); index.remove(node, FreeTextSearch.KEY); })); - iterateLabelChanges(data.assignedLabels(), Iterables.asSet(data.createdNodes()), (index, node, key, value, ignore) -> indexUpdate(state, aVoid -> { + iterateLabelChanges(stream(data.assignedLabels()).filter(labelEntry -> !contains(data.createdNodes().iterator(), labelEntry.node())), (index, node, key, value, ignore) -> indexUpdate(state, aVoid -> { index.add(node, key, value); index.add(node, FreeTextSearch.KEY, value); })); - iterateLabelChanges(data.removedLabels(), Iterables.asSet(data.deletedNodes()), (index, node, key, value, ignore) -> indexUpdate(state, aVoid -> { + iterateLabelChanges(stream(data.removedLabels()).filter(labelEntry -> !contains(data.deletedNodes().iterator(), labelEntry.node())), (index, node, key, value, ignore) -> indexUpdate(state, aVoid -> { index.remove(node, key); index.remove(node, FreeTextSearch.KEY); })); @@ -98,8 +99,9 @@ public void afterCommit(TransactionData data, Collection> state) } } - private void iterateNodePropertyChange(Iterable> propertyEntries, IndexFunction, Node, String, Object, Object> function) { - propertyEntries.forEach(nodePropertyEntry -> { + private void iterateNodePropertyChange(Stream> stream, + IndexFunction, Node, String, Object, Object> function) { + stream.forEach(nodePropertyEntry -> { final Node entity = nodePropertyEntry.entity(); final String key = nodePropertyEntry.key(); final Object value = nodePropertyEntry.value(); @@ -110,24 +112,22 @@ private void iterateNodePropertyChange(Iterable> propertyEnt }); } - private void iterateLabelChanges(Iterable labelEntries, Set blacklist, IndexFunction, Node, String, Object, Void> function) { - StreamSupport.stream(labelEntries.spliterator(), false) - .filter(labelEntry -> !blacklist.contains(labelEntry.node())) - .forEach(labelEntry -> { - final Map>> propertyIndicesMap = indexesByLabelAndProperty.get(labelEntry.label().name()); - if (propertyIndicesMap!=null) { - final Node entity = labelEntry.node(); - for (String key : entity.getPropertyKeys()) { - Collection> indices = propertyIndicesMap.get(key); - if (indices != null) { - for (Index index : indices) { - Object value = entity.getProperty(key); - function.apply(index, entity, key, value, null); - } - } + private void iterateLabelChanges(Stream stream, IndexFunction, Node, String, Object, Void> function) { + stream.forEach(labelEntry -> { + final Map>> propertyIndicesMap = indexesByLabelAndProperty.get(labelEntry.label().name()); + if (propertyIndicesMap != null) { + final Node entity = labelEntry.node(); + for (String key : entity.getPropertyKeys()) { + Collection> indices = propertyIndicesMap.get(key); + if (indices != null) { + for (Index index : indices) { + Object value = entity.getProperty(key); + function.apply(index, entity, key, value, null); } } - }); + } + } + }); } private Collection> findIndicesAffectedBy(Iterable