diff --git a/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java b/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java index 18183f6c8d6..7a139b7d88d 100644 --- a/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java +++ b/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java @@ -96,7 +96,9 @@ import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; import org.janusgraph.graphdb.database.EdgeSerializer; import org.janusgraph.graphdb.database.IndexRecordEntry; +import org.janusgraph.graphdb.database.IndexSerializer; import org.janusgraph.graphdb.database.StandardJanusGraph; +import org.janusgraph.graphdb.database.cache.CacheInvalidationService; import org.janusgraph.graphdb.database.index.IndexMutationType; import org.janusgraph.graphdb.database.index.IndexUpdate; import org.janusgraph.graphdb.database.log.LogTxMeta; @@ -129,6 +131,7 @@ import org.janusgraph.graphdb.relations.AbstractEdge; import org.janusgraph.graphdb.relations.RelationIdentifier; import org.janusgraph.graphdb.relations.StandardEdge; +import org.janusgraph.graphdb.relations.StandardVertexProperty; import org.janusgraph.graphdb.serializer.SpecialInt; import org.janusgraph.graphdb.serializer.SpecialIntSerializer; import org.janusgraph.graphdb.tinkerpop.optimize.step.JanusGraphEdgeVertexStep; @@ -196,6 +199,7 @@ import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.CUSTOM_ATTRIBUTE_CLASS; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.CUSTOM_SERIALIZER_CLASS; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.DB_CACHE; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.DB_CACHE_TIME; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.FORCE_INDEX_USAGE; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.HARD_MAX_LIMIT; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.IDS_STORE_NAME; @@ -206,6 +210,7 @@ import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.LOG_SEND_DELAY; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.MANAGEMENT_LOG; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.MAX_COMMIT_TIME; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.REPLACE_INSTANCE_IF_EXISTS; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.SCHEMA_CONSTRAINTS; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.SCRIPT_EVAL_ENABLED; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.SCRIPT_EVAL_ENGINE; @@ -215,6 +220,7 @@ import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.SYSTEM_LOG_TRANSACTIONS; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.TRANSACTION_LOG; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.TX_CACHE_SIZE; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.USER_LOG; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.USE_MULTIQUERY; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.VERBOSE_TX_RECOVERY; @@ -8006,4 +8012,337 @@ public void testGremlinScriptEvaluationWithGremlinGroovyScriptEngine() { assertFalse(graph.traversal().V().hasNext()); } } + + @Test + public void testVertexPropertiesForceInvalidationFromDBCache() { + if (features.hasLocking() || !features.isDistributed()) { + return; + } + + JanusGraph graph1 = openInstanceWithDBCacheEnabled("testVertexPropertiesForceInvalidationFromDBCache1"); + JanusGraph graph2 = openInstanceWithDBCacheEnabled("testVertexPropertiesForceInvalidationFromDBCache2"); + + JanusGraphVertex v1 = graph1.addVertex(); + v1.property("name", "vertex1"); + JanusGraphVertex v2 = graph1.addVertex(); + v2.property("name", "vertex2"); + JanusGraphVertex v3 = graph1.addVertex(); + v3.property("name", "vertex3"); + + graph1.tx().commit(); + + // Cache vertices for graph1 + JanusGraphTransaction tx1 = graph1.newTransaction(); + tx1.traversal().V().valueMap().toList(); + tx1.commit(); + + // Cache vertices for graph2 + JanusGraphTransaction tx2 = graph2.newTransaction(); + tx2.traversal().V().valueMap().toList(); + tx2.commit(); + + tx1 = graph1.newTransaction(); + + // Update properties using graph1 + tx1.traversal().V(v1.id()).property("name", "vertex1_updated").iterate(); + tx1.traversal().V(v2.id()).property("name", "vertex2_updated").iterate(); + tx1.traversal().V(v3.id()).property("name", "vertex3_updated").iterate(); + + tx1.commit(); + + tx2 = graph2.newTransaction(); + + // Check that cached properties in graph2 were not refreshed + assertEquals(Arrays.asList("vertex1"), tx2.traversal().V(v1.id()).valueMap().next().get("name")); + assertEquals(Arrays.asList("vertex2"), tx2.traversal().V(v2.id()).valueMap().next().get("name")); + assertEquals(Arrays.asList("vertex3"), tx2.traversal().V(v3.id()).valueMap().next().get("name")); + + tx2.rollback(); + + // Invalidate cache for v1 only + graph2.getDBCacheInvalidationService().forceInvalidateVertexInEdgeStoreCache(v1.longId()); + + tx2 = graph2.newTransaction(); + + // Check that v1 cache was refreshed but cache for v2 and v3 was not refreshed + assertEquals(Arrays.asList("vertex1_updated"), tx2.traversal().V(v1.id()).valueMap().next().get("name")); + assertEquals(Arrays.asList("vertex2"), tx2.traversal().V(v2.id()).valueMap().next().get("name")); + assertEquals(Arrays.asList("vertex3"), tx2.traversal().V(v3.id()).valueMap().next().get("name")); + + tx2.rollback(); + + // Invalidate cache for v2 and v3 + graph2.getDBCacheInvalidationService().forceInvalidateVerticesInEdgeStoreCache(Arrays.asList(v2.longId(), v3.longId())); + + tx2 = graph2.newTransaction(); + + // Check that cache was refreshed for all three vertices + assertEquals(Arrays.asList("vertex1_updated"), tx2.traversal().V(v1.id()).valueMap().next().get("name")); + assertEquals(Arrays.asList("vertex2_updated"), tx2.traversal().V(v2.id()).valueMap().next().get("name")); + assertEquals(Arrays.asList("vertex3_updated"), tx2.traversal().V(v3.id()).valueMap().next().get("name")); + + tx2.rollback(); + + graph1.close(); + graph2.close(); + } + + @Test + public void testIndexStoreForceInvalidationFromDBCache() throws InterruptedException, ExecutionException { + if (features.hasLocking() || !features.isDistributed()) { + return; + } + + // Define schema with 2 properties and 2 indices which include the same property + String indexProp1Name = "indexedProp1"; + String indexProp2Name = "indexedProp2"; + String index1Name = "index1"; + String index2Name = "index2"; + PropertyKey indexedProp1 = mgmt.makePropertyKey(indexProp1Name).dataType(Integer.class).cardinality(Cardinality.SINGLE).make(); + PropertyKey indexedProp2 = mgmt.makePropertyKey(indexProp2Name).dataType(Integer.class).cardinality(Cardinality.SINGLE).make(); + mgmt.buildIndex(index1Name, Vertex.class).addKey(indexedProp1).buildCompositeIndex(); + mgmt.buildIndex(index2Name, Vertex.class).addKey(indexedProp1).addKey(indexedProp2).buildCompositeIndex(); + finishSchema(); + ManagementSystem.awaitGraphIndexStatus(graph, index1Name).call(); + ManagementSystem.awaitGraphIndexStatus(graph, index2Name).call(); + mgmt.updateIndex(mgmt.getGraphIndex(index1Name), SchemaAction.REINDEX).get(); + mgmt.updateIndex(mgmt.getGraphIndex(index2Name), SchemaAction.REINDEX).get(); + finishSchema(); + + StandardJanusGraph graph1 = openInstanceWithDBCacheEnabled("testIndexStoreForceInvalidationFromDBCache1"); + StandardJanusGraph graph2 = openInstanceWithDBCacheEnabled("testIndexStoreForceInvalidationFromDBCache2"); + + // Add 3 vertices which are indexed by 2 indexes + JanusGraphVertex v1 = graph1.addVertex(); + v1.property(indexProp1Name, 1); + v1.property(indexProp2Name, 1); + + JanusGraphVertex v2 = graph1.addVertex(); + v2.property(indexProp1Name, 2); + v2.property(indexProp2Name, 2); + + JanusGraphVertex v3 = graph1.addVertex(); + v3.property(indexProp1Name, 3); + v3.property(indexProp2Name, 3); + + graph1.tx().commit(); + + // Cache data + JanusGraphTransaction tx2 = graph2.newTransaction(); + // Cache index 1 for graph 2 + queryVertices(tx2, indexProp1Name, 1, 2, 3, -1, -2, -3); + // Cache index 2 and index 3 for graph 2 + queryVertices(tx2, indexProp1Name, Arrays.asList(1, 2, 3, -1, -2, -3), indexProp2Name, Arrays.asList(1, 2, 3)); + tx2.commit(); + + // Update indexProp1Name for all vertices using graph1 + JanusGraphTransaction tx1 = graph1.newTransaction(); + tx1.traversal().V(v1.id()).property(indexProp1Name, -1).iterate(); + tx1.traversal().V(v2.id()).property(indexProp1Name, -2).iterate(); + tx1.traversal().V(v3.id()).property(indexProp1Name, -3).iterate(); + tx1.commit(); + + // Check that cached indexed vertices in graph2 were not refreshed + tx2 = graph2.newTransaction(); + // Check that old (outdated) data is retrieved because of the cache + assertTrue(tx2.traversal().V().has(indexProp1Name, 1).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, 2).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, 3).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, 1).has(indexProp2Name, 1).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, 2).has(indexProp2Name, 2).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, 3).has(indexProp2Name, 3).hasNext()); + // Check that new (updated) data is not retrieved because of the cache + assertFalse(tx2.traversal().V().has(indexProp1Name, -1).hasNext()); + assertFalse(tx2.traversal().V().has(indexProp1Name, -2).hasNext()); + assertFalse(tx2.traversal().V().has(indexProp1Name, -3).hasNext()); + assertFalse(tx2.traversal().V().has(indexProp1Name, -1).has(indexProp2Name, 1).hasNext()); + assertFalse(tx2.traversal().V().has(indexProp1Name, -2).has(indexProp2Name, 2).hasNext()); + assertFalse(tx2.traversal().V().has(indexProp1Name, -3).has(indexProp2Name, 3).hasNext()); + tx2.rollback(); + + invalidateUpdatedVertexProperty(graph2, v1.longId(), indexProp1Name, 1, -1); + + tx2 = graph2.newTransaction(); + // Check that cache for index1 and values `1`, `-1` was refreshed but cache for 2,3,-2,-3 was not refreshed + assertFalse(tx2.traversal().V().has(indexProp1Name, 1).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, 2).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, 3).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, -1).hasNext()); + assertFalse(tx2.traversal().V().has(indexProp1Name, -2).hasNext()); + assertFalse(tx2.traversal().V().has(indexProp1Name, -3).hasNext()); + // Check that cache for index2 and values `1`, `-1` was refreshed but cache for 2,3,-2,-3 was not refreshed + assertFalse(tx2.traversal().V().has(indexProp1Name, 1).has(indexProp2Name, 1).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, -1).has(indexProp2Name, 1).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, 2).has(indexProp2Name, 2).hasNext()); + assertFalse(tx2.traversal().V().has(indexProp1Name, -2).has(indexProp2Name, 2).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, 3).has(indexProp2Name, 3).hasNext()); + assertFalse(tx2.traversal().V().has(indexProp1Name, -3).has(indexProp2Name, 3).hasNext()); + tx2.rollback(); + + invalidateUpdatedVertexProperty(graph2, v2.longId(), indexProp1Name, 2, -2); + invalidateUpdatedVertexProperty(graph2, v3.longId(), indexProp1Name, 3, -3); + + tx2 = graph2.newTransaction(); + // Check that cache was refreshed for all 3 vertices + assertFalse(tx2.traversal().V().has(indexProp1Name, 1).hasNext()); + assertFalse(tx2.traversal().V().has(indexProp1Name, 2).hasNext()); + assertFalse(tx2.traversal().V().has(indexProp1Name, 3).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, -1).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, -2).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, -3).hasNext()); + assertFalse(tx2.traversal().V().has(indexProp1Name, 1).has(indexProp2Name, 1).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, -1).has(indexProp2Name, 1).hasNext()); + assertFalse(tx2.traversal().V().has(indexProp1Name, 2).has(indexProp2Name, 2).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, -2).has(indexProp2Name, 2).hasNext()); + assertFalse(tx2.traversal().V().has(indexProp1Name, 3).has(indexProp2Name, 3).hasNext()); + assertTrue(tx2.traversal().V().has(indexProp1Name, -3).has(indexProp2Name, 3).hasNext()); + + graph1.close(); + graph2.close(); + } + + @Test + public void testFullDBCacheInvalidation() throws InterruptedException, ExecutionException { + if (features.hasLocking() || !features.isDistributed()) { + return; + } + + // Define schema with 2 properties and 2 indices which include the same property + String indexProp1Name = "someProp1"; + String index1Name = "someIndex1"; + PropertyKey indexedProp1 = mgmt.makePropertyKey(indexProp1Name).dataType(Integer.class).cardinality(Cardinality.SINGLE).make(); + mgmt.buildIndex(index1Name, Vertex.class).addKey(indexedProp1).buildCompositeIndex(); + finishSchema(); + ManagementSystem.awaitGraphIndexStatus(graph, index1Name).call(); + mgmt.updateIndex(mgmt.getGraphIndex(index1Name), SchemaAction.REINDEX).get(); + finishSchema(); + + StandardJanusGraph graph1 = openInstanceWithDBCacheEnabled("testIndexStoreForceInvalidationFromDBCache1"); + StandardJanusGraph graph2 = openInstanceWithDBCacheEnabled("testIndexStoreForceInvalidationFromDBCache2"); + + // Add 3 vertices which are indexed by 2 indexes + JanusGraphVertex v1 = graph1.addVertex(); + v1.property(indexProp1Name, 1); + + graph1.tx().commit(); + + // Cache data + JanusGraphTransaction tx2 = graph2.newTransaction(); + tx2.traversal().V(v1.id()).valueMap().next(); + tx2.traversal().V().has(indexProp1Name, 1).toList(); + tx2.traversal().V().has(indexProp1Name, -1).toList(); + tx2.commit(); + + // Update indexProp1Name using graph1 + JanusGraphTransaction tx1 = graph1.newTransaction(); + tx1.traversal().V(v1.id()).property(indexProp1Name, -1).iterate(); + tx1.commit(); + + // Check that cached indexed vertices in graph2 were not refreshed + tx2 = graph2.newTransaction(); + assertTrue(tx2.traversal().V().has(indexProp1Name, 1).hasNext()); + assertEquals(Arrays.asList(1), tx2.traversal().V(v1.id()).valueMap().next().get(indexProp1Name)); + assertFalse(tx2.traversal().V().has(indexProp1Name, -1).hasNext()); + tx2.rollback(); + + graph2.getDBCacheInvalidationService().clearEdgeStoreCache(); + + // Check only edgeStore was invalidated + tx2 = graph2.newTransaction(); + assertTrue(tx2.traversal().V().has(indexProp1Name, 1).hasNext()); + assertEquals(Arrays.asList(-1), tx2.traversal().V(v1.id()).valueMap().next().get(indexProp1Name)); + assertFalse(tx2.traversal().V().has(indexProp1Name, -1).hasNext()); + tx2.rollback(); + + graph2.getDBCacheInvalidationService().clearIndexStoreCache(); + + // Check both edgeStore and indexStore were invalidated + tx2 = graph2.newTransaction(); + assertFalse(tx2.traversal().V().has(indexProp1Name, 1).hasNext()); + assertEquals(Arrays.asList(-1), tx2.traversal().V(v1.id()).valueMap().next().get(indexProp1Name)); + assertTrue(tx2.traversal().V().has(indexProp1Name, -1).hasNext()); + tx2.rollback(); + + //// Check full cache clear method + + // Cache data + tx2 = graph2.newTransaction(); + tx2.traversal().V(v1.id()).valueMap().next(); + tx2.traversal().V().has(indexProp1Name, -1).toList(); + tx2.traversal().V().has(indexProp1Name, 2).toList(); + tx2.commit(); + + // Update indexProp1Name using graph1 again + tx1 = graph1.newTransaction(); + tx1.traversal().V(v1.id()).property(indexProp1Name, 2).iterate(); + tx1.commit(); + + // Check cached data is now outdated in graph2 + tx2 = graph2.newTransaction(); + assertFalse(tx2.traversal().V().has(indexProp1Name, 2).hasNext()); + assertEquals(Arrays.asList(-1), tx2.traversal().V(v1.id()).valueMap().next().get(indexProp1Name)); + assertTrue(tx2.traversal().V().has(indexProp1Name, -1).hasNext()); + tx2.rollback(); + + graph2.getDBCacheInvalidationService().clearDBCache(); + + // Check both edgeStore and indexStore caches were invalidated + tx2 = graph2.newTransaction(); + assertTrue(tx2.traversal().V().has(indexProp1Name, 2).hasNext()); + assertEquals(Arrays.asList(2), tx2.traversal().V(v1.id()).valueMap().next().get(indexProp1Name)); + assertFalse(tx2.traversal().V().has(indexProp1Name, -1).hasNext()); + tx2.rollback(); + + graph1.close(); + graph2.close(); + } + + private void invalidateUpdatedVertexProperty(StandardJanusGraph graph, long vertexIdUpdated, String propertyNameUpdated, Object previousPropertyValue, Object newPropertyValue){ + JanusGraphTransaction tx = graph.newTransaction(); + JanusGraphManagement graphMgmt = graph.openManagement(); + PropertyKey propertyKey = graphMgmt.getPropertyKey(propertyNameUpdated); + CacheVertex cacheVertex = new CacheVertex((StandardJanusGraphTx) tx, vertexIdUpdated, ElementLifeCycle.Loaded); + StandardVertexProperty propertyPreviousVal = new StandardVertexProperty(propertyKey.longId(), propertyKey, cacheVertex, previousPropertyValue, ElementLifeCycle.Removed); + StandardVertexProperty propertyNewVal = new StandardVertexProperty(propertyKey.longId(), propertyKey, cacheVertex, newPropertyValue, ElementLifeCycle.New); + IndexSerializer indexSerializer = graph.getIndexSerializer(); + + Collection indexUpdates = indexSerializer.getIndexUpdates(cacheVertex, Arrays.asList(propertyPreviousVal, propertyNewVal)); + CacheInvalidationService invalidationService = graph.getDBCacheInvalidationService(); + + for(IndexUpdate indexUpdate : indexUpdates){ + StaticBuffer keyToInvalidate = (StaticBuffer) indexUpdate.getKey(); + invalidationService.markKeyAsExpiredInIndexStore(keyToInvalidate); + } + + invalidationService.forceClearExpiredKeysInIndexStoreCache(); + invalidationService.forceInvalidateVertexInEdgeStoreCache(vertexIdUpdated); + + graphMgmt.rollback(); + tx.rollback(); + } + + private void queryVertices(JanusGraphTransaction tx, String propName, Object ... values){ + for(Object value : values){ + tx.traversal().V().has(propName, value).toList(); + } + } + + private void queryVertices(JanusGraphTransaction tx, String prop1Name, Iterable prop1Values, String prop2Name, Iterable prop2Values){ + for(Object prop1Value : prop1Values){ + for(Object prop2Value : prop2Values){ + tx.traversal().V().has(prop1Name, prop1Value).has(prop2Name, prop2Value).toList(); + tx.traversal().V().has(prop2Name, prop2Value).has(prop1Name, prop1Value).toList(); + } + } + } + + private StandardJanusGraph openInstanceWithDBCacheEnabled(String uniqueInstanceId){ + WriteConfiguration configCopy = config.copy(); + configCopy.set(ConfigElement.getPath(UNIQUE_INSTANCE_ID), uniqueInstanceId); + configCopy.set(ConfigElement.getPath(REPLACE_INSTANCE_IF_EXISTS), true); + configCopy.set(ConfigElement.getPath(DB_CACHE), true); + configCopy.set(ConfigElement.getPath(DB_CACHE_TIME), 0); + configCopy.set(ConfigElement.getPath(STORAGE_BATCH), false); + return (StandardJanusGraph) JanusGraphFactory.open(configCopy); + } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/core/JanusGraph.java b/janusgraph-core/src/main/java/org/janusgraph/core/JanusGraph.java index deec3032f62..52d082003d2 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/core/JanusGraph.java +++ b/janusgraph-core/src/main/java/org/janusgraph/core/JanusGraph.java @@ -18,6 +18,7 @@ import org.apache.tinkerpop.gremlin.util.Gremlin; import org.janusgraph.core.schema.JanusGraphManagement; import org.janusgraph.graphdb.configuration.JanusGraphConstants; +import org.janusgraph.graphdb.database.cache.CacheInvalidationService; /** * JanusGraph graph database implementation of the Blueprint's interface. @@ -157,6 +158,8 @@ public interface JanusGraph extends Transaction { @Override void close() throws JanusGraphException; + CacheInvalidationService getDBCacheInvalidationService(); + /** * The version of this JanusGraph graph database * diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java index 2e0592ec8fe..087b61de583 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java @@ -708,4 +708,12 @@ static ExecutorService buildExecutorService(Configuration configuration){ } return executorService; } + + public KCVSCache getEdgeStoreCache(){ + return edgeStore; + } + + public KCVSCache getIndexStoreCache(){ + return indexStore; + } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/ExpirationKCVSCache.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/ExpirationKCVSCache.java index 4b3317c8ded..ed5c9d86fe2 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/ExpirationKCVSCache.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/ExpirationKCVSCache.java @@ -139,9 +139,16 @@ public Map getSlice(final List keys, final @Override public void clearCache() { + // We should not call `expiredKeys.clear();` directly because there could be a race condition + // where already invalidated cache but then added new entries into it and made some mutation before `expiredKeys.clear();` + // is finished which may result in getSlice to return previously cached result and not a new mutated result. + // Thus, we are clearing expired entries first, and then we are safe to invalidate the rest of non-expired entries. + // Moreover, we shouldn't create `penaltyCountdown = new CountDownLatch(PENALTY_THRESHOLD);` because the cleaning thread + // may await on the previous `penaltyCountdown` which may result in that thread never wake up to proceed with + // probabilistic cleaning. Thus, only that cleaning thread have to have a right to reinitialize `penaltyCountdown`. + forceClearExpiredCache(); + // It's always safe to invalidate full cache cache.invalidateAll(); - expiredKeys.clear(); - penaltyCountdown = new CountDownLatch(PENALTY_THRESHOLD); } @Override @@ -151,6 +158,31 @@ public void invalidate(StaticBuffer key, List entries) { if (Math.random()<1.0/INVALIDATE_KEY_FRACTION_PENALTY) penaltyCountdown.countDown(); } + @Override + public void forceClearExpiredCache() { + clearExpiredCache(false); + } + + private synchronized void clearExpiredCache(boolean withNewPenaltyCountdown) { + //Do clean up work by invalidating all entries for expired keys + final Map expiredKeysCopy = new HashMap<>(expiredKeys.size()); + for (Map.Entry expKey : expiredKeys.entrySet()) { + if (isBeyondExpirationTime(expKey.getValue())) + expiredKeys.remove(expKey.getKey(), expKey.getValue()); + else if (getAge(expKey.getValue())>= invalidationGracePeriodMS) + expiredKeysCopy.put(expKey.getKey(),expKey.getValue()); + } + for (KeySliceQuery ksq : cache.asMap().keySet()) { + if (expiredKeysCopy.containsKey(ksq.getKey())) cache.invalidate(ksq); + } + if(withNewPenaltyCountdown){ + penaltyCountdown = new CountDownLatch(PENALTY_THRESHOLD); + } + for (Map.Entry expKey : expiredKeysCopy.entrySet()) { + expiredKeys.remove(expKey.getKey(),expKey.getValue()); + } + } + @Override public void close() throws BackendException { cleanupThread.stopThread(); @@ -203,21 +235,7 @@ public void run() { if (stop) return; else throw new RuntimeException("Cleanup thread got interrupted",e); } - //Do clean up work by invalidating all entries for expired keys - final Map expiredKeysCopy = new HashMap<>(expiredKeys.size()); - for (Map.Entry expKey : expiredKeys.entrySet()) { - if (isBeyondExpirationTime(expKey.getValue())) - expiredKeys.remove(expKey.getKey(), expKey.getValue()); - else if (getAge(expKey.getValue())>= invalidationGracePeriodMS) - expiredKeysCopy.put(expKey.getKey(),expKey.getValue()); - } - for (KeySliceQuery ksq : cache.asMap().keySet()) { - if (expiredKeysCopy.containsKey(ksq.getKey())) cache.invalidate(ksq); - } - penaltyCountdown = new CountDownLatch(PENALTY_THRESHOLD); - for (Map.Entry expKey : expiredKeysCopy.entrySet()) { - expiredKeys.remove(expKey.getKey(),expKey.getValue()); - } + clearExpiredCache(true); } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/KCVSCache.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/KCVSCache.java index da8351ca753..f5956a670cc 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/KCVSCache.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/KCVSCache.java @@ -57,7 +57,9 @@ protected void incActionBy(int by, CacheMetricsAction action, StoreTransaction t public abstract void clearCache(); - protected abstract void invalidate(StaticBuffer key, List entries); + public abstract void invalidate(StaticBuffer key, List entries); + + public abstract void forceClearExpiredCache(); @Override public void mutate(StaticBuffer key, List additions, List deletions, StoreTransaction txh) throws BackendException { diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/NoKCVSCache.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/NoKCVSCache.java index 4ad0f7c096c..ba115788107 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/NoKCVSCache.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/NoKCVSCache.java @@ -34,7 +34,11 @@ public void clearCache() { } @Override - protected void invalidate(StaticBuffer key, List entries) { + public void invalidate(StaticBuffer key, List entries) { + } + + @Override + public void forceClearExpiredCache() { } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java index 5d4d4b18bad..9ede2bb0e27 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java @@ -66,6 +66,8 @@ import org.janusgraph.diskstorage.util.StaticArrayEntry; import org.janusgraph.diskstorage.util.time.TimestampProvider; import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; +import org.janusgraph.graphdb.database.cache.CacheInvalidationService; +import org.janusgraph.graphdb.database.cache.KCVSCacheInvalidationService; import org.janusgraph.graphdb.database.cache.SchemaCache; import org.janusgraph.graphdb.database.idassigner.VertexIDAssigner; import org.janusgraph.graphdb.database.idhandling.IDHandler; @@ -160,6 +162,8 @@ public class StandardJanusGraph extends JanusGraphBlueprintsGraph { private final IDManager idManager; private final VertexIDAssigner idAssigner; private final TimestampProvider times; + private final CacheInvalidationService cacheInvalidationService; + //Serializers protected final IndexSerializer indexSerializer; @@ -200,6 +204,9 @@ public StandardJanusGraph(GraphDatabaseConfiguration configuration) { this.idAssigner = config.getIDAssigner(backend); this.idManager = idAssigner.getIDManager(); + this.cacheInvalidationService = new KCVSCacheInvalidationService( + backend.getEdgeStoreCache(), backend.getIndexStoreCache(), idManager); + this.serializer = config.getSerializer(); StoreFeatures storeFeatures = backend.getStoreFeatures(); this.indexSerializer = new IndexSerializer(configuration.getConfiguration(), this.serializer, @@ -298,6 +305,11 @@ public synchronized void close() throws JanusGraphException { } } + @Override + public CacheInvalidationService getDBCacheInvalidationService() { + return cacheInvalidationService; + } + private synchronized void closeInternal() { if (!isOpen) return; diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/cache/CacheInvalidationService.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/cache/CacheInvalidationService.java new file mode 100644 index 00000000000..498ef9154c1 --- /dev/null +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/cache/CacheInvalidationService.java @@ -0,0 +1,192 @@ +// Copyright 2022 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb.database.cache; + +import org.janusgraph.core.log.Change; +import org.janusgraph.diskstorage.StaticBuffer; + +/** + * Cache invalidation service for manual JanusGraph database-level cache invalidation. + * Use with great care because improper invalidation may result in stale data left in db-cache. + * This service wraps two different caches called as `edgeStore` and `indexStore` which form a + * single database-level cache (can be enabled via `cache.db-cache` configuration property). + * When db-cache is disabled this service doesn't make any effective changes because the cache is + * fully disabled in that case. + *

+ * This class provides method for data invalidation for both `edgeStore` cache and `indexStore` cache + * but invalidating entries in one cache doesn't invalidate entries in another cache. + * Thus, for proper invalidation you need to invalidate necessary keys for both `edgeStore` and `indexStore`. + *

+ * EdgeStore accepts keys where a key is an encoded vertex id. It's usually easy to invalidate EdgeStore because + * it doesn't require you to know more information to form a key except knowing a vertex id. + *

+ * IndexStore accepts keys where a key is an encoded IndexUpdate key. To form IndexUpdate key you need to know + * the next information: vertex id, updated property name, previous property value and / or new property value. + * Thus forming IndexUpdate key to invalidate IndexStore may be more complicated. + *

+ * EdgeStore caches properties and edges for vertices. + * IndexStore caches results for queries which use indices. + *

+ * See JavaDoc on methods to learn how to properly form a `key` to invalidate cache in `edgeStore` or `indexStore`. + */ +public interface CacheInvalidationService { + + /** + * Marks specific vertex as expired in `edgeStore` cache. + * It will make sure that any retrieved properties and edges associated with this vertex will be invalidated in vertex cache. + *

+ * Warning! This doesn't invalidate `indexStore` cache. Thus, any queries which are using indices may still return + * stale data. See {@link #markKeyAsExpiredInIndexStore(StaticBuffer)} JavaDoc to learn how to invalidate data for + * `indexStore`. + * + * @param vertexId vertex id to expire in `edgeStore` cache + */ + void markVertexAsExpiredInEdgeStore(Long vertexId); + + /** + * Marks specific key as expired in `edgeStore` cache. + * It will make sure that any retrieved properties and edges associated with this key will be invalidated in vertex cache. + *

+ * Warning! This doesn't invalidate `indexStore` cache. Thus, any queries which are using indices may still return + * stale data. See {@link #markKeyAsExpiredInIndexStore(StaticBuffer)} JavaDoc to learn how to invalidate data for + * `indexStore`. + *

+ * {@link org.janusgraph.graphdb.idmanagement.IDManager#getKey(long)} can be used to form a `key` from vertex id. + * Alternatively, a method {@link #markVertexAsExpiredInEdgeStore(Long)} can be used which converts vertex id into + * the `key` before passing the key to this method. + *

+ * In case vertices invalidation is needed by processing transaction logs via {@link org.janusgraph.core.log.ChangeState} + * then the method {@link org.janusgraph.core.log.ChangeState#getVertices(Change)} can be used to retrieve all + * changed vertices and passing their ids to {@link #markVertexAsExpiredInEdgeStore(Long)}. + * + * @param key key to expire in `edgeStore` cache + */ + void markKeyAsExpiredInEdgeStore(StaticBuffer key); + + /** + * Marks specific key as expired in `indexStore` cache. + * It will make sure that any retrieved data associated with this key will be invalidated in index cache. + *

+ * Warning! This doesn't invalidate `edgeStore` cache. Thus, trying to return properties or edges for the vertex + * may still return stale data. See {@link #markKeyAsExpiredInEdgeStore(StaticBuffer)} JavaDoc to learn how to invalidate + * data for `edgeStore`. + *

+ * To form a `key` for invalidation it is needed to know vertex id, updated property name, previous value (if there was any), + * new value (if there is any) from which IndexUpdate key can be formed. + *

+ * Usually this information can be found in a retrieved mutation logs which are passed via {@link org.janusgraph.core.log.ChangeState} + * (described in `Transaction Log` documentation of JanusGraph). In case `indexStore` invalidation should triggered + * from processing transaction logs via `ChangeState` then it can be done using an example like below: + * + *

+     * IndexSerializer indexSerializer = (StandardJanusGraph) graph.getIndexSerializer();
+     * CacheInvalidationService invalidationService = graph.getDBCacheInvalidationService();
+     * changeState.getRelations(Change.ANY).forEach(janusGraphRelation -> {
+     *     if(janusGraphRelation.isProperty() && janusGraphRelation instanceof InternalRelation){
+     *         JanusGraphVertexProperty property = (JanusGraphVertexProperty) janusGraphRelation;
+     *         if(property.element() instanceof InternalVertex){
+     *             Collection indexUpdates = indexSerializer.getIndexUpdates((InternalVertex) property.element(),
+     *                 Collections.singleton((InternalRelation) property));
+     *
+     *             for(IndexUpdate indexUpdate : indexUpdates){
+     *                 StaticBuffer keyToInvalidate = (StaticBuffer) indexUpdate.getKey();
+     *                 invalidationService.markKeyAsExpiredInIndexStore(keyToInvalidate);
+     *             }
+     *
+     *             invalidationService.forceClearExpiredKeysInIndexStoreCache();
+     *             invalidationService.forceInvalidateVertexInEdgeStoreCache(property.element().longId());
+     *         }
+     *     }
+     * });
+     * 
+ * + * It is also possible to trigger `indexStore` invalidation by forming vertex and a property yourself. The example + * below can be used as a reference. + * + *
+     * public void invalidateUpdatedVertexProperty(StandardJanusGraph graph, long vertexIdUpdated, String propertyNameUpdated, Object previousPropertyValue, Object newPropertyValue){
+     *     JanusGraphTransaction tx = graph.newTransaction();
+     *     JanusGraphManagement graphMgmt = graph.openManagement();
+     *     PropertyKey propertyKey = graphMgmt.getPropertyKey(propertyNameUpdated);
+     *     CacheVertex cacheVertex = new CacheVertex((StandardJanusGraphTx) tx, vertexIdUpdated, ElementLifeCycle.Loaded);
+     *     StandardVertexProperty propertyPreviousVal = new StandardVertexProperty(propertyKey.longId(), propertyKey, cacheVertex, previousPropertyValue, ElementLifeCycle.Removed);
+     *     StandardVertexProperty propertyNewVal = new StandardVertexProperty(propertyKey.longId(), propertyKey, cacheVertex, newPropertyValue, ElementLifeCycle.New);
+     *     IndexSerializer indexSerializer = graph.getIndexSerializer();
+     *
+     *     Collection indexUpdates = indexSerializer.getIndexUpdates(cacheVertex, Arrays.asList(propertyPreviousVal, propertyNewVal));
+     *     CacheInvalidationService invalidationService = graph.getDBCacheInvalidationService();
+     *
+     *     for(IndexUpdate indexUpdate : indexUpdates){
+     *         StaticBuffer keyToInvalidate = (StaticBuffer) indexUpdate.getKey();
+     *         invalidationService.markKeyAsExpiredInIndexStore(keyToInvalidate);
+     *     }
+     *
+     *     invalidationService.forceClearExpiredKeysInIndexStoreCache();
+     *     invalidationService.forceInvalidateVertexInEdgeStoreCache(vertexIdUpdated);
+     *
+     *     graphMgmt.rollback();
+     *     tx.rollback();
+     * }
+     * 
+ * + * @param key key to expire in `indexStore` cache + */ + void markKeyAsExpiredInIndexStore(StaticBuffer key); + + /** + * Instead of waiting for a probabilistic invalidation it triggers all cached queries scan and invalidation in `edgeStore`. + * This will remove any cached expired data. + */ + void forceClearExpiredKeysInEdgeStoreCache(); + + /** + * Instead of waiting for a probabilistic invalidation it triggers all cached queries scan and invalidation in `indexStore`. + * This will remove any cached expired data. + */ + void forceClearExpiredKeysInIndexStoreCache(); + + /** + * Marks a vertex as expired in `edgeStore` cache ({@link #markVertexAsExpiredInEdgeStore(Long)}) and triggers force + * clear of expired cache (i.e. {@link #forceClearExpiredKeysInEdgeStoreCache()}) + * + * @param vertexId vertex id to invalidate in `edgeStore` cache + */ + void forceInvalidateVertexInEdgeStoreCache(Long vertexId); + + /** + * Marks vertices as expired in `edgeStore` cache ({@link #markVertexAsExpiredInEdgeStore(Long)}) and triggers force + * clear of expired cache (i.e. {@link #forceClearExpiredKeysInEdgeStoreCache()}) + * + * @param vertexIds vertex ids to invalidate in `edgeStore` cache + */ + void forceInvalidateVerticesInEdgeStoreCache(Iterable vertexIds); + + /** + * Clears `edgeStore` cache fully + */ + void clearEdgeStoreCache(); + + /** + * Clears `indexStore` cache fully + */ + void clearIndexStoreCache(); + + /** + * Clears both `edgeStore` cache and `indexStore` cache fully. + * It is the same as calling {@link #clearEdgeStoreCache()} and {@link #clearIndexStoreCache()} + */ + void clearDBCache(); + +} diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/cache/KCVSCacheInvalidationService.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/cache/KCVSCacheInvalidationService.java new file mode 100644 index 00000000000..706c25944d5 --- /dev/null +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/cache/KCVSCacheInvalidationService.java @@ -0,0 +1,91 @@ +// Copyright 2022 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb.database.cache; + +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.keycolumnvalue.cache.KCVSCache; +import org.janusgraph.graphdb.idmanagement.IDManager; + +import java.util.Collections; + +public class KCVSCacheInvalidationService implements CacheInvalidationService{ + + private final KCVSCache edgeStore; + private final KCVSCache indexStore; + + private final IDManager idManager; + + public KCVSCacheInvalidationService(KCVSCache edgeStore, KCVSCache indexStore, IDManager idManager) { + this.edgeStore = edgeStore; + this.indexStore = indexStore; + this.idManager = idManager; + } + + @Override + public void markVertexAsExpiredInEdgeStore(Long vertexId) { + StaticBuffer vertexIdKey = idManager.getKey(vertexId); + markKeyAsExpiredInEdgeStore(vertexIdKey); + } + + @Override + public void markKeyAsExpiredInEdgeStore(StaticBuffer key) { + edgeStore.invalidate(key, Collections.emptyList()); + } + + @Override + public void markKeyAsExpiredInIndexStore(StaticBuffer key) { + indexStore.invalidate(key, Collections.emptyList()); + } + + @Override + public void forceClearExpiredKeysInEdgeStoreCache() { + edgeStore.forceClearExpiredCache(); + } + + @Override + public void forceClearExpiredKeysInIndexStoreCache() { + indexStore.forceClearExpiredCache(); + } + + @Override + public void forceInvalidateVertexInEdgeStoreCache(Long vertexId) { + markVertexAsExpiredInEdgeStore(vertexId); + forceClearExpiredKeysInEdgeStoreCache(); + } + + @Override + public void forceInvalidateVerticesInEdgeStoreCache(Iterable vertexIds) { + for(Long vertexId : vertexIds){ + markVertexAsExpiredInEdgeStore(vertexId); + } + forceClearExpiredKeysInEdgeStoreCache(); + } + + @Override + public void clearEdgeStoreCache() { + edgeStore.clearCache(); + } + + @Override + public void clearIndexStoreCache() { + indexStore.clearCache(); + } + + @Override + public void clearDBCache() { + clearEdgeStoreCache(); + clearIndexStoreCache(); + } +}