diff --git a/docs/changelog.md b/docs/changelog.md index 270878b754..47ad2105d8 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -171,6 +171,24 @@ A new optimization has been added to compute aggregations (min, max, sum and avg If the index backend is Elasticsearch, a `double` value is used to hold the result. As a result, aggregations on long numbers greater than 2^53 are approximate. In this case, if the accurate result is essential, the optimization can be disabled by removing the strategy `JanusGraphMixedIndexAggStrategy`: `g.traversal().withoutStrategies(JanusGraphMixedIndexAggStrategy.class)`. +##### Breaking change for transaction logs processing + +[Transaction Log](advanced-topics/transaction-log.md) processing has a breaking change. +Previously for all mutated vertices a default vertex label `vertex` has been used in `ChangeState` events. +The new approach stores vertex labels in logs as well which increases storage size for +8 bytes per each relation and +makes all previously stored transaction logs incompatible with the new structure. +To pass this breaking change it's necessary to ensure that transaction logs stored via previous JanusGraph versions are +not processed via JanusGraph version >= 1.0.0. +One of the possible ways to ensure previous logs are not processed is to use `setStartTimeNow` +to process only newest logs. Another way could be to process all previous logs, store log identifier state and start +processing logs from the latest log ensuring that the latest logs are send by the JanusGraph version >= 1.0.0. +For example: +```java +LogProcessorBuilder logProcessorBuilder = logProcessorFramework + .addLogProcessor("myLogProcessorIdentifier") + .setStartTimeNow(); +``` + ### Version 0.6.3 (Release Date: ???) ```xml tab='Maven' diff --git a/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java b/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java index 5599e187bd..e1e317bfaa 100644 --- a/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java +++ b/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java @@ -141,9 +141,11 @@ import org.janusgraph.graphdb.types.CompositeIndexType; import org.janusgraph.graphdb.types.StandardEdgeLabelMaker; import org.janusgraph.graphdb.types.StandardPropertyKeyMaker; +import org.janusgraph.graphdb.types.VertexLabelVertex; import org.janusgraph.graphdb.types.system.BaseVertexLabel; import org.janusgraph.graphdb.types.system.ImplicitKey; import org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex; +import org.janusgraph.graphdb.vertices.AbstractVertexUtil; import org.janusgraph.graphdb.vertices.CacheVertex; import org.janusgraph.testutil.FeatureFlag; import org.janusgraph.testutil.JanusGraphFeature; @@ -4871,6 +4873,8 @@ public void simpleLogTest(final boolean withLogFailure) throws InterruptedExcept PropertyKey weight = tx.makePropertyKey("weight").dataType(Float.class).cardinality(Cardinality.SINGLE).make(); EdgeLabel knows = tx.makeEdgeLabel("knows").make(); + String testVertexLabel = "testVertex"; + tx.makeVertexLabel(testVertexLabel).make(); JanusGraphVertex n1 = tx.addVertex("weight", 10.5); tx.addProperties(knows, weight); newTx(); @@ -4885,7 +4889,7 @@ public void simpleLogTest(final boolean withLogFailure) throws InterruptedExcept final long v1id = getId(v1); txTimes[1] = times.getTime(); tx2 = graph.buildTransaction().logIdentifier(userLogName).start(); - JanusGraphVertex v2 = tx2.addVertex("weight", 222.2); + JanusGraphVertex v2 = (JanusGraphVertex) tx2.traversal().addV(testVertexLabel).property("weight", 222.2).next(); v2.addEdge("knows", getV(tx2, v1id)); tx2.commit(); final long v2id = getId(v2); @@ -4998,8 +5002,8 @@ public void read(Message message) { assertEquals(0, userLogMsgCounter.get()); } else { assertEquals(4, userLogMsgCounter.get()); - assertEquals(7, userChangeCounter.get(Change.ADDED).get()); - assertEquals(4, userChangeCounter.get(Change.REMOVED).get()); + assertEquals(8, userChangeCounter.get(Change.ADDED).get()); + assertEquals(5, userChangeCounter.get(Change.REMOVED).get()); } clopen(option(VERBOSE_TX_RECOVERY), true); @@ -5057,6 +5061,8 @@ public void read(Message message) { txNo = 2; //v2 addition transaction assertEquals(1, Iterables.size(changes.getVertices(Change.ADDED))); + String vertexLabel = changes.getVertices(Change.ADDED).iterator().next().label(); + assertEquals(testVertexLabel, vertexLabel); assertEquals(0, Iterables.size(changes.getVertices(Change.REMOVED))); assertEquals(2, Iterables.size(changes.getVertices(Change.ANY))); assertEquals(2, Iterables.size(changes.getRelations(Change.ADDED))); @@ -5077,6 +5083,8 @@ public void read(Message message) { //v2 deletion transaction assertEquals(0, Iterables.size(changes.getVertices(Change.ADDED))); assertEquals(1, Iterables.size(changes.getVertices(Change.REMOVED))); + String vertexLabel = changes.getVertices(Change.REMOVED).iterator().next().label(); + assertEquals(testVertexLabel, vertexLabel); assertEquals(2, Iterables.size(changes.getVertices(Change.ANY))); assertEquals(0, Iterables.size(changes.getRelations(Change.ADDED))); assertEquals(2, Iterables.size(changes.getRelations(Change.REMOVED))); @@ -5105,12 +5113,12 @@ public void read(Message message) { final JanusGraphVertex v = Iterables.getOnlyElement(changes.getVertices(Change.ANY)); assertEquals(v1id, getId(v)); JanusGraphEdge e1 - = Iterables.getOnlyElement(changes.getEdges(v, Change.REMOVED, Direction.OUT, "knows")); + = Iterables.getOnlyElement(changes.getEdges(v, Change.REMOVED, OUT, "knows")); assertFalse(e1.property("weight").isPresent()); - assertEquals(v, e1.vertex(Direction.IN)); - e1 = Iterables.getOnlyElement(changes.getEdges(v, Change.ADDED, Direction.OUT, "knows")); + assertEquals(v, e1.vertex(IN)); + e1 = Iterables.getOnlyElement(changes.getEdges(v, Change.ADDED, OUT, "knows")); assertEquals(44.4, e1.value("weight").doubleValue(), 0.01); - assertEquals(v, e1.vertex(Direction.IN)); + assertEquals(v, e1.vertex(IN)); } //See only current state of graph in transaction @@ -5123,7 +5131,7 @@ public void read(Message message) { // assertTrue(txNo + " - " + v2, v2 == null || v2.isRemoved()); } assertEquals(111.1, v11.value("weight").doubleValue(), 0.01); - assertCount(1, v11.query().direction(Direction.OUT).edges()); + assertCount(1, v11.query().direction(OUT).edges()); userLogCount.incrementAndGet(); }).build(); @@ -8242,6 +8250,68 @@ public void testIndexStoreForceInvalidationFromDBCache() throws InterruptedExcep graph2.close(); } + @Test + public void testIndexWithIndexOnlyConstraintForceInvalidationFromDBCache() throws InterruptedException, ExecutionException { + if (features.hasLocking() || !features.isDistributed()) { + return; + } + + String indexPropName = "indexedProp"; + String vertexLabelName = "vertexLabelForIndexOnlyConstraint"; + String indexName = "indexWithIndexOnlyConstraint"; + PropertyKey indexedProp = mgmt.makePropertyKey(indexPropName).dataType(Integer.class).cardinality(Cardinality.SINGLE).make(); + VertexLabel vertexLabel = mgmt.makeVertexLabel(vertexLabelName).make(); + mgmt.buildIndex(indexName, Vertex.class).addKey(indexedProp).indexOnly(vertexLabel).buildCompositeIndex(); + finishSchema(); + ManagementSystem.awaitGraphIndexStatus(graph, indexName).call(); + mgmt.updateIndex(mgmt.getGraphIndex(indexName), SchemaAction.REINDEX).get(); + finishSchema(); + + StandardJanusGraph graph1 = openInstanceWithDBCacheEnabled("testIndexWithIndexOnlyConstraintForceInvalidationFromDBCache1"); + StandardJanusGraph graph2 = openInstanceWithDBCacheEnabled("testIndexWithIndexOnlyConstraintForceInvalidationFromDBCache2"); + + JanusGraphVertex v1 = graph1.addVertex(vertexLabelName); + v1.property(indexPropName, 1); + + graph1.tx().commit(); + + // Cache data + JanusGraphTransaction tx2 = graph2.newTransaction(); + assertEquals(1L, tx2.traversal().V().hasLabel(vertexLabelName).has(indexPropName, 1).count().next()); + tx2.commit(); + + // Remove vertex + JanusGraphTransaction tx1 = graph1.newTransaction(); + tx1.traversal().V(v1.id()).drop().iterate(); + tx1.commit(); + + // Check that cached indexed vertex in graph2 was not refreshed + tx2 = graph2.newTransaction(); + assertEquals(1L, tx2.traversal().V().hasLabel(vertexLabelName).has(indexPropName, 1).count().next()); + + // Try to invalidate data without vertex label + invalidateUpdatedVertexProperty(graph2, v1.longId(), indexPropName, 1, -1); + tx2.rollback(); + + tx2 = graph2.newTransaction(); + // Check that invalidation didn't work + assertEquals(1L, tx2.traversal().V().hasLabel(vertexLabelName).has(indexPropName, 1).count().next()); + tx2.rollback(); + + tx2 = graph2.newTransaction(); + // Invalidate data using vertex label + invalidateUpdatedVertexProperty(graph2, v1.longId(), indexPropName, 1, -1, vertexLabelName); + tx2.commit(); + + tx2 = graph2.newTransaction(); + // Check that invalidation worked + assertEquals(0L, tx2.traversal().V().hasLabel(vertexLabelName).has(indexPropName, 1).count().next()); + tx2.rollback(); + + graph1.close(); + graph2.close(); + } + @Test public void testFullDBCacheInvalidation() throws InterruptedException, ExecutionException { if (features.hasLocking() || !features.isDistributed()) { @@ -8339,10 +8409,18 @@ public void testFullDBCacheInvalidation() throws InterruptedException, Execution } private void invalidateUpdatedVertexProperty(StandardJanusGraph graph, long vertexIdUpdated, String propertyNameUpdated, Object previousPropertyValue, Object newPropertyValue){ + invalidateUpdatedVertexProperty(graph, vertexIdUpdated, propertyNameUpdated, previousPropertyValue, newPropertyValue, null); + } + + private void invalidateUpdatedVertexProperty(StandardJanusGraph graph, long vertexIdUpdated, String propertyNameUpdated, Object previousPropertyValue, Object newPropertyValue, String vertexLabelName){ JanusGraphTransaction tx = graph.newTransaction(); JanusGraphManagement graphMgmt = graph.openManagement(); PropertyKey propertyKey = graphMgmt.getPropertyKey(propertyNameUpdated); CacheVertex cacheVertex = new CacheVertex((StandardJanusGraphTx) tx, vertexIdUpdated, ElementLifeCycle.Loaded); + if(vertexLabelName != null){ + VertexLabel vertexLabel = graphMgmt.getVertexLabel(vertexLabelName); + AbstractVertexUtil.cacheInternalVertexLabel(cacheVertex, (VertexLabelVertex) vertexLabel); + } StandardVertexProperty propertyPreviousVal = new StandardVertexProperty(propertyKey.longId(), propertyKey, cacheVertex, previousPropertyValue, ElementLifeCycle.Removed); StandardVertexProperty propertyNewVal = new StandardVertexProperty(propertyKey.longId(), propertyKey, cacheVertex, newPropertyValue, ElementLifeCycle.New); IndexSerializer indexSerializer = graph.getIndexSerializer(); diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/log/TransactionLogHeader.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/log/TransactionLogHeader.java index b19cf644dd..8f5af53c0d 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/log/TransactionLogHeader.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/log/TransactionLogHeader.java @@ -16,6 +16,7 @@ import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; +import org.janusgraph.core.VertexLabel; import org.janusgraph.core.log.Change; import org.janusgraph.diskstorage.ReadBuffer; import org.janusgraph.diskstorage.StaticBuffer; @@ -26,6 +27,7 @@ import org.janusgraph.graphdb.database.serialize.DataOutput; import org.janusgraph.graphdb.database.serialize.Serializer; import org.janusgraph.graphdb.internal.InternalRelation; +import org.janusgraph.graphdb.internal.InternalVertex; import org.janusgraph.graphdb.log.StandardTransactionId; import org.janusgraph.graphdb.transaction.StandardJanusGraphTx; import org.janusgraph.graphdb.transaction.TransactionConfiguration; @@ -85,7 +87,10 @@ public StaticBuffer serializeModifications(Serializer serializer, LogTxStatus st private static void logRelations(DataOutput out, final Collection relations, StandardJanusGraphTx tx) { VariableLong.writePositive(out,relations.size()); for (InternalRelation rel : relations) { - VariableLong.writePositive(out,rel.getVertex(0).longId()); + InternalVertex vertex = rel.getVertex(0); + VariableLong.writePositive(out,vertex.longId()); + VertexLabel vertexLabel = vertex.vertexLabel(); + VariableLong.writePositive(out, vertexLabel.hasId() ? vertexLabel.longId() : 0L); org.janusgraph.diskstorage.Entry entry = tx.getEdgeSerializer().writeRelation(rel, 0, tx); BufferUtil.writeEntry(out,entry); } @@ -238,9 +243,10 @@ private static List readModifications(Change state, ReadBuffer in, long size = VariableLong.readPositive(in); List mods = new ArrayList<>((int) size); for (int i = 0; i < size; i++) { - long vid = VariableLong.readPositive(in); + long vertexId = VariableLong.readPositive(in); + long labelId = VariableLong.readPositive(in); org.janusgraph.diskstorage.Entry entry = BufferUtil.readEntry(in,serializer); - mods.add(new Modification(state,vid,entry)); + mods.add(new Modification(state,vertexId,labelId,entry)); } return mods; } @@ -269,11 +275,13 @@ public static class Modification { public final Change state; public final long outVertexId; + public final long outVertexLabelId; public final org.janusgraph.diskstorage.Entry relationEntry; - private Modification(Change state, long outVertexId, org.janusgraph.diskstorage.Entry relationEntry) { + private Modification(Change state, long outVertexId, long outVertexLabelId, org.janusgraph.diskstorage.Entry relationEntry) { this.state = state; this.outVertexId = outVertexId; + this.outVertexLabelId = outVertexLabelId; this.relationEntry = relationEntry; } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/log/ModificationDeserializer.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/log/ModificationDeserializer.java index 7ff3a36f54..8b706ecc7c 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/log/ModificationDeserializer.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/log/ModificationDeserializer.java @@ -18,9 +18,11 @@ import org.apache.tinkerpop.gremlin.structure.Direction; import org.janusgraph.core.EdgeLabel; import org.janusgraph.core.PropertyKey; +import org.janusgraph.core.VertexLabel; import org.janusgraph.core.log.Change; import org.janusgraph.diskstorage.Entry; import org.janusgraph.graphdb.database.log.TransactionLogHeader; +import org.janusgraph.graphdb.idmanagement.IDManager; import org.janusgraph.graphdb.internal.ElementLifeCycle; import org.janusgraph.graphdb.internal.InternalRelation; import org.janusgraph.graphdb.internal.InternalRelationType; @@ -31,19 +33,30 @@ import org.janusgraph.graphdb.relations.StandardEdge; import org.janusgraph.graphdb.relations.StandardVertexProperty; import org.janusgraph.graphdb.transaction.StandardJanusGraphTx; +import org.janusgraph.graphdb.types.VertexLabelVertex; +import org.janusgraph.graphdb.vertices.AbstractVertex; +import org.janusgraph.graphdb.vertices.AbstractVertexUtil; /** * @author Matthias Broecheler (me@matthiasb.com) */ public class ModificationDeserializer { - public static InternalRelation parseRelation(TransactionLogHeader.Modification modification, StandardJanusGraphTx tx) { Change state = modification.state; assert state.isProper(); long outVertexId = modification.outVertexId; Entry relEntry = modification.relationEntry; InternalVertex outVertex = tx.getInternalVertex(outVertexId); + if(outVertex instanceof AbstractVertex){ + long outVertexLabelId = modification.outVertexLabelId; + if(IDManager.VertexIDType.VertexLabel.is(outVertexLabelId)){ + VertexLabel vertexLabel = tx.getExistingVertexLabel(outVertexLabelId); + if(vertexLabel instanceof VertexLabelVertex){ + AbstractVertexUtil.cacheInternalVertexLabel((AbstractVertex) outVertex, (VertexLabelVertex) vertexLabel); + } + } + } //Special relation parsing, compare to {@link RelationConstructor} RelationCache relCache = tx.getEdgeSerializer().readRelation(relEntry, false, tx); assert relCache.direction == Direction.OUT; diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/types/vertices/JanusGraphSchemaVertex.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/types/vertices/JanusGraphSchemaVertex.java index 7a20846951..b2b1b3c377 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/types/vertices/JanusGraphSchemaVertex.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/types/vertices/JanusGraphSchemaVertex.java @@ -20,7 +20,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; import org.apache.tinkerpop.gremlin.structure.Direction; -import org.apache.tinkerpop.gremlin.structure.Vertex; import org.janusgraph.core.JanusGraphEdge; import org.janusgraph.core.JanusGraphVertex; import org.janusgraph.core.JanusGraphVertexProperty; @@ -34,6 +33,7 @@ import org.janusgraph.graphdb.types.TypeDefinitionCategory; import org.janusgraph.graphdb.types.TypeDefinitionDescription; import org.janusgraph.graphdb.types.TypeDefinitionMap; +import org.janusgraph.graphdb.types.VertexLabelVertex; import org.janusgraph.graphdb.types.indextype.CompositeIndexTypeWrapper; import org.janusgraph.graphdb.types.indextype.MixedIndexTypeWrapper; import org.janusgraph.graphdb.types.system.BaseKey; @@ -73,7 +73,7 @@ public String name() { } @Override - protected Vertex getVertexLabelInternal() { + protected VertexLabelVertex getVertexLabelInternal() { return null; } diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/vertices/AbstractVertex.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/vertices/AbstractVertex.java index 0364d0c554..e8245c7851 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/vertices/AbstractVertex.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/vertices/AbstractVertex.java @@ -46,6 +46,7 @@ public abstract class AbstractVertex extends AbstractElement implements Internal private final StandardJanusGraphTx tx; + private volatile VertexLabelVertex internalCachedVertexLabel; protected AbstractVertex(StandardJanusGraphTx tx, long id) { super(id); @@ -129,15 +130,21 @@ public String label() { return vertexLabel().name(); } - protected Vertex getVertexLabelInternal() { - return Iterables.getOnlyElement(tx().query(this).noPartitionRestriction().type(BaseLabel.VertexLabelEdge).direction(Direction.OUT).vertices(),null); + protected VertexLabelVertex getVertexLabelInternal() { + // Only if we `internalCachedVertexLabel` is not null it means that the vertex is guaranteed to have that label + // because `label` is immutable. In all other cases we need to try to fetch the vertex label even if it's a + // repeated operation. + if(internalCachedVertexLabel == null){ + internalCachedVertexLabel = (VertexLabelVertex) Iterables.getOnlyElement(tx().query(this) + .noPartitionRestriction().type(BaseLabel.VertexLabelEdge).direction(Direction.OUT).vertices(),null); + } + return internalCachedVertexLabel; } @Override public VertexLabel vertexLabel() { - Vertex label = getVertexLabelInternal(); - if (label==null) return BaseVertexLabel.DEFAULT_VERTEXLABEL; - else return (VertexLabelVertex)label; + VertexLabelVertex label = getVertexLabelInternal(); + return label == null ? BaseVertexLabel.DEFAULT_VERTEXLABEL : label; } @Override @@ -200,10 +207,9 @@ public Iterator> properties(String... keys) { public Iterator vertices(final Direction direction, final String... edgeLabels) { return (Iterator)query().direction(direction).labels(edgeLabels).vertices().iterator(); - } - - - + void cacheInternalVertexLabel(VertexLabelVertex internalCachedVertexLabel){ + this.internalCachedVertexLabel = internalCachedVertexLabel; + } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/vertices/AbstractVertexUtil.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/vertices/AbstractVertexUtil.java new file mode 100644 index 0000000000..e0ad8daa88 --- /dev/null +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/vertices/AbstractVertexUtil.java @@ -0,0 +1,27 @@ +// Copyright 2022 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb.vertices; + +import org.janusgraph.graphdb.types.VertexLabelVertex; + +public class AbstractVertexUtil { + + private AbstractVertexUtil(){} + + public static void cacheInternalVertexLabel(AbstractVertex abstractVertex, VertexLabelVertex internalCachedVertexLabel){ + abstractVertex.cacheInternalVertexLabel(internalCachedVertexLabel); + } + +}