-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Send vertex label to user log processor for all mutations #3264
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -141,9 +141,11 @@ | |
import org.janusgraph.graphdb.types.CompositeIndexType; | ||
import org.janusgraph.graphdb.types.StandardEdgeLabelMaker; | ||
import org.janusgraph.graphdb.types.StandardPropertyKeyMaker; | ||
import org.janusgraph.graphdb.types.VertexLabelVertex; | ||
import org.janusgraph.graphdb.types.system.BaseVertexLabel; | ||
import org.janusgraph.graphdb.types.system.ImplicitKey; | ||
import org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex; | ||
import org.janusgraph.graphdb.vertices.AbstractVertexUtil; | ||
import org.janusgraph.graphdb.vertices.CacheVertex; | ||
import org.janusgraph.testutil.FeatureFlag; | ||
import org.janusgraph.testutil.JanusGraphFeature; | ||
|
@@ -4871,6 +4873,8 @@ public void simpleLogTest(final boolean withLogFailure) throws InterruptedExcept | |
|
||
PropertyKey weight = tx.makePropertyKey("weight").dataType(Float.class).cardinality(Cardinality.SINGLE).make(); | ||
EdgeLabel knows = tx.makeEdgeLabel("knows").make(); | ||
String testVertexLabel = "testVertex"; | ||
tx.makeVertexLabel(testVertexLabel).make(); | ||
JanusGraphVertex n1 = tx.addVertex("weight", 10.5); | ||
tx.addProperties(knows, weight); | ||
newTx(); | ||
|
@@ -4885,7 +4889,7 @@ public void simpleLogTest(final boolean withLogFailure) throws InterruptedExcept | |
final long v1id = getId(v1); | ||
txTimes[1] = times.getTime(); | ||
tx2 = graph.buildTransaction().logIdentifier(userLogName).start(); | ||
JanusGraphVertex v2 = tx2.addVertex("weight", 222.2); | ||
JanusGraphVertex v2 = (JanusGraphVertex) tx2.traversal().addV(testVertexLabel).property("weight", 222.2).next(); | ||
v2.addEdge("knows", getV(tx2, v1id)); | ||
tx2.commit(); | ||
final long v2id = getId(v2); | ||
|
@@ -4998,8 +5002,8 @@ public void read(Message message) { | |
assertEquals(0, userLogMsgCounter.get()); | ||
} else { | ||
assertEquals(4, userLogMsgCounter.get()); | ||
assertEquals(7, userChangeCounter.get(Change.ADDED).get()); | ||
assertEquals(4, userChangeCounter.get(Change.REMOVED).get()); | ||
assertEquals(8, userChangeCounter.get(Change.ADDED).get()); | ||
assertEquals(5, userChangeCounter.get(Change.REMOVED).get()); | ||
} | ||
|
||
clopen(option(VERBOSE_TX_RECOVERY), true); | ||
|
@@ -5057,6 +5061,8 @@ public void read(Message message) { | |
txNo = 2; | ||
//v2 addition transaction | ||
assertEquals(1, Iterables.size(changes.getVertices(Change.ADDED))); | ||
String vertexLabel = changes.getVertices(Change.ADDED).iterator().next().label(); | ||
assertEquals(testVertexLabel, vertexLabel); | ||
Comment on lines
+5064
to
+5065
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This two lines would fail without the business logic change because |
||
assertEquals(0, Iterables.size(changes.getVertices(Change.REMOVED))); | ||
assertEquals(2, Iterables.size(changes.getVertices(Change.ANY))); | ||
assertEquals(2, Iterables.size(changes.getRelations(Change.ADDED))); | ||
|
@@ -5077,6 +5083,8 @@ public void read(Message message) { | |
//v2 deletion transaction | ||
assertEquals(0, Iterables.size(changes.getVertices(Change.ADDED))); | ||
assertEquals(1, Iterables.size(changes.getVertices(Change.REMOVED))); | ||
String vertexLabel = changes.getVertices(Change.REMOVED).iterator().next().label(); | ||
assertEquals(testVertexLabel, vertexLabel); | ||
Comment on lines
+5086
to
+5087
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This two lines would fail without the business logic change because |
||
assertEquals(2, Iterables.size(changes.getVertices(Change.ANY))); | ||
assertEquals(0, Iterables.size(changes.getRelations(Change.ADDED))); | ||
assertEquals(2, Iterables.size(changes.getRelations(Change.REMOVED))); | ||
|
@@ -5105,12 +5113,12 @@ public void read(Message message) { | |
final JanusGraphVertex v = Iterables.getOnlyElement(changes.getVertices(Change.ANY)); | ||
assertEquals(v1id, getId(v)); | ||
JanusGraphEdge e1 | ||
= Iterables.getOnlyElement(changes.getEdges(v, Change.REMOVED, Direction.OUT, "knows")); | ||
= Iterables.getOnlyElement(changes.getEdges(v, Change.REMOVED, OUT, "knows")); | ||
assertFalse(e1.property("weight").isPresent()); | ||
assertEquals(v, e1.vertex(Direction.IN)); | ||
e1 = Iterables.getOnlyElement(changes.getEdges(v, Change.ADDED, Direction.OUT, "knows")); | ||
assertEquals(v, e1.vertex(IN)); | ||
e1 = Iterables.getOnlyElement(changes.getEdges(v, Change.ADDED, OUT, "knows")); | ||
assertEquals(44.4, e1.<Float>value("weight").doubleValue(), 0.01); | ||
assertEquals(v, e1.vertex(Direction.IN)); | ||
assertEquals(v, e1.vertex(IN)); | ||
} | ||
|
||
//See only current state of graph in transaction | ||
|
@@ -5123,7 +5131,7 @@ public void read(Message message) { | |
// assertTrue(txNo + " - " + v2, v2 == null || v2.isRemoved()); | ||
} | ||
assertEquals(111.1, v11.<Float>value("weight").doubleValue(), 0.01); | ||
assertCount(1, v11.query().direction(Direction.OUT).edges()); | ||
assertCount(1, v11.query().direction(OUT).edges()); | ||
|
||
userLogCount.incrementAndGet(); | ||
}).build(); | ||
|
@@ -8242,6 +8250,68 @@ public void testIndexStoreForceInvalidationFromDBCache() throws InterruptedExcep | |
graph2.close(); | ||
} | ||
|
||
@Test | ||
public void testIndexWithIndexOnlyConstraintForceInvalidationFromDBCache() throws InterruptedException, ExecutionException { | ||
if (features.hasLocking() || !features.isDistributed()) { | ||
return; | ||
} | ||
|
||
String indexPropName = "indexedProp"; | ||
String vertexLabelName = "vertexLabelForIndexOnlyConstraint"; | ||
String indexName = "indexWithIndexOnlyConstraint"; | ||
PropertyKey indexedProp = mgmt.makePropertyKey(indexPropName).dataType(Integer.class).cardinality(Cardinality.SINGLE).make(); | ||
VertexLabel vertexLabel = mgmt.makeVertexLabel(vertexLabelName).make(); | ||
mgmt.buildIndex(indexName, Vertex.class).addKey(indexedProp).indexOnly(vertexLabel).buildCompositeIndex(); | ||
finishSchema(); | ||
ManagementSystem.awaitGraphIndexStatus(graph, indexName).call(); | ||
mgmt.updateIndex(mgmt.getGraphIndex(indexName), SchemaAction.REINDEX).get(); | ||
finishSchema(); | ||
|
||
StandardJanusGraph graph1 = openInstanceWithDBCacheEnabled("testIndexWithIndexOnlyConstraintForceInvalidationFromDBCache1"); | ||
StandardJanusGraph graph2 = openInstanceWithDBCacheEnabled("testIndexWithIndexOnlyConstraintForceInvalidationFromDBCache2"); | ||
|
||
JanusGraphVertex v1 = graph1.addVertex(vertexLabelName); | ||
v1.property(indexPropName, 1); | ||
|
||
graph1.tx().commit(); | ||
|
||
// Cache data | ||
JanusGraphTransaction tx2 = graph2.newTransaction(); | ||
assertEquals(1L, tx2.traversal().V().hasLabel(vertexLabelName).has(indexPropName, 1).count().next()); | ||
tx2.commit(); | ||
|
||
// Remove vertex | ||
JanusGraphTransaction tx1 = graph1.newTransaction(); | ||
tx1.traversal().V(v1.id()).drop().iterate(); | ||
tx1.commit(); | ||
|
||
// Check that cached indexed vertex in graph2 was not refreshed | ||
tx2 = graph2.newTransaction(); | ||
assertEquals(1L, tx2.traversal().V().hasLabel(vertexLabelName).has(indexPropName, 1).count().next()); | ||
|
||
// Try to invalidate data without vertex label | ||
invalidateUpdatedVertexProperty(graph2, v1.longId(), indexPropName, 1, -1); | ||
tx2.rollback(); | ||
|
||
tx2 = graph2.newTransaction(); | ||
// Check that invalidation didn't work | ||
assertEquals(1L, tx2.traversal().V().hasLabel(vertexLabelName).has(indexPropName, 1).count().next()); | ||
tx2.rollback(); | ||
|
||
tx2 = graph2.newTransaction(); | ||
// Invalidate data using vertex label | ||
invalidateUpdatedVertexProperty(graph2, v1.longId(), indexPropName, 1, -1, vertexLabelName); | ||
tx2.commit(); | ||
|
||
tx2 = graph2.newTransaction(); | ||
// Check that invalidation worked | ||
assertEquals(0L, tx2.traversal().V().hasLabel(vertexLabelName).has(indexPropName, 1).count().next()); | ||
tx2.rollback(); | ||
|
||
graph1.close(); | ||
graph2.close(); | ||
} | ||
|
||
@Test | ||
public void testFullDBCacheInvalidation() throws InterruptedException, ExecutionException { | ||
if (features.hasLocking() || !features.isDistributed()) { | ||
|
@@ -8339,10 +8409,18 @@ public void testFullDBCacheInvalidation() throws InterruptedException, Execution | |
} | ||
|
||
private void invalidateUpdatedVertexProperty(StandardJanusGraph graph, long vertexIdUpdated, String propertyNameUpdated, Object previousPropertyValue, Object newPropertyValue){ | ||
invalidateUpdatedVertexProperty(graph, vertexIdUpdated, propertyNameUpdated, previousPropertyValue, newPropertyValue, null); | ||
} | ||
|
||
private void invalidateUpdatedVertexProperty(StandardJanusGraph graph, long vertexIdUpdated, String propertyNameUpdated, Object previousPropertyValue, Object newPropertyValue, String vertexLabelName){ | ||
JanusGraphTransaction tx = graph.newTransaction(); | ||
JanusGraphManagement graphMgmt = graph.openManagement(); | ||
PropertyKey propertyKey = graphMgmt.getPropertyKey(propertyNameUpdated); | ||
CacheVertex cacheVertex = new CacheVertex((StandardJanusGraphTx) tx, vertexIdUpdated, ElementLifeCycle.Loaded); | ||
if(vertexLabelName != null){ | ||
VertexLabel vertexLabel = graphMgmt.getVertexLabel(vertexLabelName); | ||
AbstractVertexUtil.cacheInternalVertexLabel(cacheVertex, (VertexLabelVertex) vertexLabel); | ||
} | ||
StandardVertexProperty propertyPreviousVal = new StandardVertexProperty(propertyKey.longId(), propertyKey, cacheVertex, previousPropertyValue, ElementLifeCycle.Removed); | ||
StandardVertexProperty propertyNewVal = new StandardVertexProperty(propertyKey.longId(), propertyKey, cacheVertex, newPropertyValue, ElementLifeCycle.New); | ||
IndexSerializer indexSerializer = graph.getIndexSerializer(); | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -16,6 +16,7 @@ | |||
|
||||
import com.google.common.base.Preconditions; | ||||
import org.apache.commons.lang3.StringUtils; | ||||
import org.janusgraph.core.VertexLabel; | ||||
import org.janusgraph.core.log.Change; | ||||
import org.janusgraph.diskstorage.ReadBuffer; | ||||
import org.janusgraph.diskstorage.StaticBuffer; | ||||
|
@@ -26,6 +27,7 @@ | |||
import org.janusgraph.graphdb.database.serialize.DataOutput; | ||||
import org.janusgraph.graphdb.database.serialize.Serializer; | ||||
import org.janusgraph.graphdb.internal.InternalRelation; | ||||
import org.janusgraph.graphdb.internal.InternalVertex; | ||||
import org.janusgraph.graphdb.log.StandardTransactionId; | ||||
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx; | ||||
import org.janusgraph.graphdb.transaction.TransactionConfiguration; | ||||
|
@@ -85,7 +87,10 @@ public StaticBuffer serializeModifications(Serializer serializer, LogTxStatus st | |||
private static void logRelations(DataOutput out, final Collection<InternalRelation> relations, StandardJanusGraphTx tx) { | ||||
VariableLong.writePositive(out,relations.size()); | ||||
for (InternalRelation rel : relations) { | ||||
VariableLong.writePositive(out,rel.getVertex(0).longId()); | ||||
InternalVertex vertex = rel.getVertex(0); | ||||
VariableLong.writePositive(out,vertex.longId()); | ||||
VertexLabel vertexLabel = vertex.vertexLabel(); | ||||
VariableLong.writePositive(out, vertexLabel.hasId() ? vertexLabel.longId() : 0L); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's fine, and I believe we should cache vertex labels aggressively. But just to understand more about the intention here: cannot the receiver execute a backend query to fetch vertex label? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not guaranteed that there will be such vertex in the graph because the vertex might be already removed.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the vertex is already removed, why do we need the vertex label on the receiver side? I guess my real question is, can you elaborate on the following statement?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To invalidate an index which is constrained to a specific label we need to know the label of the removed / updated / added vertex There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Gotcha. Do you have a code snippet showing how we could invalidate normal index entries in DB cache? I thought index entries with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @li-boxuan added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not 100% sure but I believe vertex label is not needed here. In Collection<IndexUpdate> indexUpdates = indexSerializer.getIndexUpdates(cacheVertex, Arrays.asList(propertyPreviousVal, propertyNewVal)); to fetch a collection of index records. In janusgraph/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/IndexSerializer.java Line 190 in 11e40fc
This makes sense when we want to generate a new index entry, but it is not necessary when we try to remove an existing index entry. If you temporarily comment out this line, I believe your test case would pass even if the LogEntry does not contain vertex label. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @li-boxuan you are right. I guess in this case it's better to add another method to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @li-boxuan I opened the PR which adds possibility to get Index updates with disabled constraints here #3279 |
||||
org.janusgraph.diskstorage.Entry entry = tx.getEdgeSerializer().writeRelation(rel, 0, tx); | ||||
BufferUtil.writeEntry(out,entry); | ||||
} | ||||
|
@@ -238,9 +243,10 @@ private static List<Modification> readModifications(Change state, ReadBuffer in, | |||
long size = VariableLong.readPositive(in); | ||||
List<Modification> mods = new ArrayList<>((int) size); | ||||
for (int i = 0; i < size; i++) { | ||||
long vid = VariableLong.readPositive(in); | ||||
long vertexId = VariableLong.readPositive(in); | ||||
long labelId = VariableLong.readPositive(in); | ||||
org.janusgraph.diskstorage.Entry entry = BufferUtil.readEntry(in,serializer); | ||||
mods.add(new Modification(state,vid,entry)); | ||||
mods.add(new Modification(state,vertexId,labelId,entry)); | ||||
} | ||||
return mods; | ||||
} | ||||
|
@@ -269,11 +275,13 @@ public static class Modification { | |||
|
||||
public final Change state; | ||||
public final long outVertexId; | ||||
public final long outVertexLabelId; | ||||
public final org.janusgraph.diskstorage.Entry relationEntry; | ||||
|
||||
private Modification(Change state, long outVertexId, org.janusgraph.diskstorage.Entry relationEntry) { | ||||
private Modification(Change state, long outVertexId, long outVertexLabelId, org.janusgraph.diskstorage.Entry relationEntry) { | ||||
this.state = state; | ||||
this.outVertexId = outVertexId; | ||||
this.outVertexLabelId = outVertexLabelId; | ||||
this.relationEntry = relationEntry; | ||||
} | ||||
} | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,9 +18,11 @@ | |
import org.apache.tinkerpop.gremlin.structure.Direction; | ||
import org.janusgraph.core.EdgeLabel; | ||
import org.janusgraph.core.PropertyKey; | ||
import org.janusgraph.core.VertexLabel; | ||
import org.janusgraph.core.log.Change; | ||
import org.janusgraph.diskstorage.Entry; | ||
import org.janusgraph.graphdb.database.log.TransactionLogHeader; | ||
import org.janusgraph.graphdb.idmanagement.IDManager; | ||
import org.janusgraph.graphdb.internal.ElementLifeCycle; | ||
import org.janusgraph.graphdb.internal.InternalRelation; | ||
import org.janusgraph.graphdb.internal.InternalRelationType; | ||
|
@@ -31,19 +33,30 @@ | |
import org.janusgraph.graphdb.relations.StandardEdge; | ||
import org.janusgraph.graphdb.relations.StandardVertexProperty; | ||
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx; | ||
import org.janusgraph.graphdb.types.VertexLabelVertex; | ||
import org.janusgraph.graphdb.vertices.AbstractVertex; | ||
import org.janusgraph.graphdb.vertices.AbstractVertexUtil; | ||
|
||
/** | ||
* @author Matthias Broecheler ([email protected]) | ||
*/ | ||
public class ModificationDeserializer { | ||
|
||
|
||
public static InternalRelation parseRelation(TransactionLogHeader.Modification modification, StandardJanusGraphTx tx) { | ||
Change state = modification.state; | ||
assert state.isProper(); | ||
long outVertexId = modification.outVertexId; | ||
Entry relEntry = modification.relationEntry; | ||
InternalVertex outVertex = tx.getInternalVertex(outVertexId); | ||
if(outVertex instanceof AbstractVertex){ | ||
long outVertexLabelId = modification.outVertexLabelId; | ||
if(IDManager.VertexIDType.VertexLabel.is(outVertexLabelId)){ | ||
VertexLabel vertexLabel = tx.getExistingVertexLabel(outVertexLabelId); | ||
if(vertexLabel instanceof VertexLabelVertex){ | ||
AbstractVertexUtil.cacheInternalVertexLabel((AbstractVertex) outVertex, (VertexLabelVertex) vertexLabel); | ||
} | ||
} | ||
} | ||
//Special relation parsing, compare to {@link RelationConstructor} | ||
RelationCache relCache = tx.getEdgeSerializer().readRelation(relEntry, false, tx); | ||
assert relCache.direction == Direction.OUT; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only affects transaction logs started by users, right? I think so but just wanted to make sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually changes the mutation of two transactions logs:
tx.log-tx = true
.