Skip to content

Commit

Permalink
fixes #349 (#350) index tracking when deleting nodes
Browse files Browse the repository at this point in the history
* adding test for #349
* fixes #349

filter removedNodeProperties for those belonging to `deletedNodes`
  • Loading branch information
sarmbruster authored and jexp committed Apr 2, 2017
1 parent edd7d51 commit 0f3eedd
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 30 deletions.
59 changes: 29 additions & 30 deletions src/main/java/apoc/index/IndexUpdateTransactionEventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.neo4j.graphdb.event.TransactionEventHandler;
import org.neo4j.graphdb.index.Index;
import org.neo4j.graphdb.index.IndexManager;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.Log;
Expand All @@ -20,7 +19,10 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;
import java.util.stream.Stream;

import static org.neo4j.helpers.collection.Iterables.stream;
import static org.neo4j.helpers.collection.Iterators.*;

/**
* a transaction event handler that updates manual indexes based on configuration in graph properties
Expand All @@ -31,15 +33,13 @@
public class IndexUpdateTransactionEventHandler extends TransactionEventHandler.Adapter<Collection<Consumer<Void>>> {

private final GraphDatabaseService graphDatabaseService;
private final Log log;
private final boolean async;

private final BlockingQueue<Consumer<Void>> indexCommandQueue = new LinkedBlockingQueue<>(100);
private Map<String, Map<String, Collection<Index<Node>>>> indexesByLabelAndProperty;

public IndexUpdateTransactionEventHandler(GraphDatabaseAPI graphDatabaseService, Log log, boolean async) {
public IndexUpdateTransactionEventHandler(GraphDatabaseAPI graphDatabaseService, boolean async) {
this.graphDatabaseService = graphDatabaseService;
this.log = log;
this.async = async;
Pools.SCHEDULED.scheduleAtFixedRate(() -> indexesByLabelAndProperty = initIndexConfiguration(),10,10, TimeUnit.SECONDS);
}
Expand All @@ -58,7 +58,7 @@ public Collection<Consumer<Void>> beforeCommit(TransactionData data) throws Exce
getIndexesByLabelAndProperty();
Collection<Consumer<Void>> state = async ? new LinkedList<>() : null;

iterateNodePropertyChange(data.assignedNodeProperties(), (index, node, key, value, oldValue) -> indexUpdate(state, aVoid -> {
iterateNodePropertyChange(stream(data.assignedNodeProperties()),(index, node, key, value, oldValue) -> indexUpdate(state, aVoid -> {
if (oldValue != null) {
index.remove(node, key);
index.remove(node, FreeTextSearch.KEY);
Expand All @@ -67,17 +67,18 @@ public Collection<Consumer<Void>> beforeCommit(TransactionData data) throws Exce
index.add(node, FreeTextSearch.KEY, value);
}));

iterateNodePropertyChange(data.removedNodeProperties(), (index, node, key, value, oldValue) -> indexUpdate(state, aVoid -> {
// filter out removedNodeProperties from node deletions
iterateNodePropertyChange(stream(data.removedNodeProperties()).filter(nodePropertyEntry -> !contains(data.deletedNodes().iterator(), nodePropertyEntry.entity())), (index, node, key, value, oldValue) -> indexUpdate(state, aVoid -> {
index.remove(node, key);
index.remove(node, FreeTextSearch.KEY);
}));

iterateLabelChanges(data.assignedLabels(), Iterables.asSet(data.createdNodes()), (index, node, key, value, ignore) -> indexUpdate(state, aVoid -> {
iterateLabelChanges(stream(data.assignedLabels()).filter(labelEntry -> !contains(data.createdNodes().iterator(), labelEntry.node())), (index, node, key, value, ignore) -> indexUpdate(state, aVoid -> {
index.add(node, key, value);
index.add(node, FreeTextSearch.KEY, value);
}));

iterateLabelChanges(data.removedLabels(), Iterables.asSet(data.deletedNodes()), (index, node, key, value, ignore) -> indexUpdate(state, aVoid -> {
iterateLabelChanges(stream(data.removedLabels()).filter(labelEntry -> !contains(data.deletedNodes().iterator(), labelEntry.node())), (index, node, key, value, ignore) -> indexUpdate(state, aVoid -> {
index.remove(node, key);
index.remove(node, FreeTextSearch.KEY);
}));
Expand All @@ -98,8 +99,9 @@ public void afterCommit(TransactionData data, Collection<Consumer<Void>> state)
}
}

private void iterateNodePropertyChange(Iterable<PropertyEntry<Node>> propertyEntries, IndexFunction<Index<Node>, Node, String, Object, Object> function) {
propertyEntries.forEach(nodePropertyEntry -> {
private void iterateNodePropertyChange(Stream<PropertyEntry<Node>> stream,
IndexFunction<Index<Node>, Node, String, Object, Object> function) {
stream.forEach(nodePropertyEntry -> {
final Node entity = nodePropertyEntry.entity();
final String key = nodePropertyEntry.key();
final Object value = nodePropertyEntry.value();
Expand All @@ -110,24 +112,22 @@ private void iterateNodePropertyChange(Iterable<PropertyEntry<Node>> propertyEnt
});
}

private void iterateLabelChanges(Iterable<LabelEntry> labelEntries, Set<Node> blacklist, IndexFunction<Index<Node>, Node, String, Object, Void> function) {
StreamSupport.stream(labelEntries.spliterator(), false)
.filter(labelEntry -> !blacklist.contains(labelEntry.node()))
.forEach(labelEntry -> {
final Map<String, Collection<Index<Node>>> propertyIndicesMap = indexesByLabelAndProperty.get(labelEntry.label().name());
if (propertyIndicesMap!=null) {
final Node entity = labelEntry.node();
for (String key : entity.getPropertyKeys()) {
Collection<Index<Node>> indices = propertyIndicesMap.get(key);
if (indices != null) {
for (Index<Node> index : indices) {
Object value = entity.getProperty(key);
function.apply(index, entity, key, value, null);
}
}
private void iterateLabelChanges(Stream<LabelEntry> stream, IndexFunction<Index<Node>, Node, String, Object, Void> function) {
stream.forEach(labelEntry -> {
final Map<String, Collection<Index<Node>>> propertyIndicesMap = indexesByLabelAndProperty.get(labelEntry.label().name());
if (propertyIndicesMap != null) {
final Node entity = labelEntry.node();
for (String key : entity.getPropertyKeys()) {
Collection<Index<Node>> indices = propertyIndicesMap.get(key);
if (indices != null) {
for (Index<Node> index : indices) {
Object value = entity.getProperty(key);
function.apply(index, entity, key, value, null);
}
}
});
}
}
});
}

private Collection<Index<Node>> findIndicesAffectedBy(Iterable<Label> labels, String key) {
Expand Down Expand Up @@ -203,13 +203,12 @@ public void start() {
boolean enabled = ApocConfiguration.isEnabled("autoIndex.enabled");
if (enabled) {
boolean async = ApocConfiguration.isEnabled("autoIndex.async");
final Log userLog = log;
indexUpdateTransactionEventHandler = new IndexUpdateTransactionEventHandler(db, userLog, async);
indexUpdateTransactionEventHandler = new IndexUpdateTransactionEventHandler(db, async);
if (async) {
startIndexTrackingThread(db, indexUpdateTransactionEventHandler.getIndexCommandQueue(),
Long.parseLong(ApocConfiguration.get("autoIndex.async_rollover_opscount", "10000")),
Long.parseLong(ApocConfiguration.get("autoIndex.async_rollover_millis", "5000")),
userLog
log
);
}
db.registerTransactionEventHandler(indexUpdateTransactionEventHandler);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package apoc.index;

import apoc.util.TestUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.test.TestGraphDatabaseFactory;

import static apoc.util.TestUtil.*;
import static org.junit.Assert.*;

public class IndexUpdateTransactionEventHandlerTest {

private GraphDatabaseService db;

@Before
public void setUp() throws Exception {
db = new TestGraphDatabaseFactory().newImpermanentDatabase();
db.registerTransactionEventHandler(new IndexUpdateTransactionEventHandler((GraphDatabaseAPI) db, false));
TestUtil.registerProcedure(db, FreeTextSearch.class);
}

@After
public void tearDown() {
db.shutdown();
}

@Test
public void shouldDeletingIndexedNodesSucceed() {
// setup: create index, add a node
testCallEmpty(db, "call apoc.index.addAllNodesExtended('search_index',{City:['name']},{autoUpdate:true})", null);
testCallEmpty(db, "create (c:City{name:\"Made Up City\",url:\"/places/nowhere/made-up-city\"})", null);

// check if we find the node
testCallCount(db, "start n=node:search_index('name:\"Made Up\"') return n", null, 1);

// when
TestUtil.testCall(db, "match (c:City{name:'Made Up City'}) delete c return count(c) as count", map -> assertEquals(1L, map.get("count")));

// nothing found in the index after deletion
testCallCount(db, "start n=node:search_index('name:\"Made Up\"') return n", null, 0);

}
}

0 comments on commit 0f3eedd

Please sign in to comment.