Skip to content

Commit

Permalink
Add possibility to execute multiple slice queries together to KeyColu…
Browse files Browse the repository at this point in the history
…mnValueStore [cql-tests] [tp-tests]

- Adds possibility to execute groups of same key sets slice queries together
- Implement parallel execution of all provided Slice queries to CQL storage backend
- Adds a basic implementation (i.e. current non-optimized implementation) to any other storage backend which doesn't have optimized implementation right now
- Adds queries grouping algorithm (`MultiSliceQueriesGroupingUtil`) (required for #3816 and also mimimizes keys duplicate collections creation)

Fixes #3824
Related to #3816

Signed-off-by: Oleksandr Porunov <[email protected]>
  • Loading branch information
porunov committed Jun 22, 2023
1 parent 6b420c5 commit 98d409b
Show file tree
Hide file tree
Showing 33 changed files with 2,171 additions and 90 deletions.
14 changes: 14 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,20 @@ mandatory and cannot be disabled. The default ExecutorService core pool size is
the default value is considered to be optimal unless users want to artificially limit parallelism of CQL results deserialization
jobs.

##### A new multi-query method added into `KeyColumnValueStore` (affects storage adapters implementations only)

A new method `Map<SliceQuery, Map<StaticBuffer, EntryList>> getMultiSlices(MultiKeysQueryGroups<StaticBuffer, SliceQuery> multiKeysQueryGroups, StoreTransaction txh)`
is added into `KeyColumnValueStore` which is now preferred for multi-queries (batch queries) with multiple slice queries.
In case a multi-query executes more than one Slice query per multi-query execution then those Slice queries will be
grouped per same key sets and the new `getMultiSlices` query will be executed with groups of different `SliceQuery` for the same sets of keys.

Notice, if the storage doesn't have multiQuery feature enabled - the method won't be used. Hence, it's not necessary to implement it.

In case storage backend has multiQuery feature enabled, then it is highly recommended to overwrite the default (non-optimized) implementation
and optimize this method execution to execute all the slice queries for all the requested keys in the shortest time possible
(for example, by using asynchronous programming, slice queries grouping, multi-threaded execution, or any other technique which
is efficient for the respective storage adapter).

##### Removal of deprecated classes/methods/functionalities

###### Methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ public static void evaluateQuery(JanusGraphVertexQuery query, RelationCategory r
int subQueryCounter = 0;
for (SimpleQueryProfiler subProfiler : profiler) {
assertNotNull(subProfiler);
if (subProfiler.getGroupName().equals(QueryProfiler.OPTIMIZATION)) continue;
if (subProfiler.getGroupName().equals(QueryProfiler.OPTIMIZATION) || Boolean.TRUE.equals(subProfiler.getAnnotation(QueryProfiler.MULTI_SLICES_ANNOTATION))) continue;
if (subQuerySpecs.length == 2) { //0=>fitted, 1=>ordered
assertEquals(subQuerySpecs[0], subProfiler.getAnnotation(QueryProfiler.FITTED_ANNOTATION));
assertEquals(subQuerySpecs[1], subProfiler.getAnnotation(QueryProfiler.ORDERED_ANNOTATION));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@

import static org.apache.tinkerpop.gremlin.process.traversal.Order.asc;
import static org.apache.tinkerpop.gremlin.process.traversal.Order.desc;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.has;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.hasNot;
import static org.apache.tinkerpop.gremlin.structure.Direction.BOTH;
import static org.apache.tinkerpop.gremlin.structure.Direction.IN;
Expand Down Expand Up @@ -5817,62 +5816,62 @@ private void testLimitBatchSizeForHasStep(int numV, int barrierSize, int limit,
option(PROPERTY_PREFETCHING), false,
option(HAS_STEP_BATCH_MODE), MultiQueryHasStepStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName());
// `foo` and `fooBar` properties are prefetched separately
assertEquals(6, countBackendQueriesOfSize(barrierSize, profile.getMetrics()));
assertEquals(2, countBackendQueriesOfSize(numV - 3 * barrierSize, profile.getMetrics()));
assertEquals(3, countBackendQueriesOfSize(barrierSize * 2, profile.getMetrics()));
assertEquals(1, countBackendQueriesOfSize((numV - 3 * barrierSize) * 2, profile.getMetrics()));

// test batching for 3 `has()` steps of required properties only
profile = testLimitedBatch(() -> graph.traversal().V(cs).barrier(barrierSize).has("foo", "bar").has("fooBar", "Bar").has("barFoo", "Foo"),
option(USE_MULTIQUERY), true, option(LIMITED_BATCH), true,
option(PROPERTY_PREFETCHING), false,
option(HAS_STEP_BATCH_MODE), MultiQueryHasStepStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName());
// `foo`, `fooBar`, and `barFoo` properties are prefetched separately
assertEquals(9, countBackendQueriesOfSize(barrierSize, profile.getMetrics()));
assertEquals(3, countBackendQueriesOfSize(numV - 3 * barrierSize, profile.getMetrics()));
assertEquals(3, countBackendQueriesOfSize(barrierSize * 3, profile.getMetrics()));
assertEquals(1, countBackendQueriesOfSize((numV - 3 * barrierSize) * 3, profile.getMetrics()));

// test `has()` step `REQUIRED_AND_NEXT_PROPERTIES` mode with not following any properties steps
profile = testLimitedBatch(() -> graph.traversal().V(cs).barrier(barrierSize).has("foo", "bar").has("fooBar", "Bar"),
option(USE_MULTIQUERY), true, option(LIMITED_BATCH), true,
option(PROPERTY_PREFETCHING), false,
option(HAS_STEP_BATCH_MODE), MultiQueryHasStepStrategyMode.REQUIRED_AND_NEXT_PROPERTIES.getConfigName());
// `foo` and `fooBar` properties are prefetched separately
assertEquals(6, countBackendQueriesOfSize(barrierSize, profile.getMetrics()));
assertEquals(2, countBackendQueriesOfSize(numV - 3 * barrierSize, profile.getMetrics()));
assertEquals(3, countBackendQueriesOfSize(barrierSize * 2, profile.getMetrics()));
assertEquals(1, countBackendQueriesOfSize((numV - 3 * barrierSize) * 2, profile.getMetrics()));

// test `has()` step `REQUIRED_AND_NEXT_PROPERTIES` mode with following valueMap step
profile = testLimitedBatch(() -> graph.traversal().V(cs).barrier(barrierSize).has("foo", "bar").has("fooBar", "Bar").valueMap("barFoo"),
option(USE_MULTIQUERY), true, option(LIMITED_BATCH), true,
option(PROPERTY_PREFETCHING), false,
option(HAS_STEP_BATCH_MODE), MultiQueryHasStepStrategyMode.REQUIRED_AND_NEXT_PROPERTIES.getConfigName());
// `foo`, `fooBar`, and `barFoo` properties are prefetched separately
assertEquals(9, countBackendQueriesOfSize(barrierSize, profile.getMetrics()));
assertEquals(3, countBackendQueriesOfSize(numV - 3 * barrierSize, profile.getMetrics()));
assertEquals(3, countBackendQueriesOfSize(barrierSize * 3, profile.getMetrics()));
assertEquals(1, countBackendQueriesOfSize((numV - 3 * barrierSize) * 3, profile.getMetrics()));

// test `has()` step `REQUIRED_AND_NEXT_PROPERTIES` mode with following properties step
profile = testLimitedBatch(() -> graph.traversal().V(cs).barrier(barrierSize).has("foo", "bar").has("fooBar", "Bar").properties("barFoo"),
option(USE_MULTIQUERY), true, option(LIMITED_BATCH), true,
option(PROPERTY_PREFETCHING), false,
option(HAS_STEP_BATCH_MODE), MultiQueryHasStepStrategyMode.REQUIRED_AND_NEXT_PROPERTIES.getConfigName());
// `foo`, `fooBar`, and `barFoo` properties are prefetched separately
assertEquals(9, countBackendQueriesOfSize(barrierSize, profile.getMetrics()));
assertEquals(3, countBackendQueriesOfSize(numV - 3 * barrierSize, profile.getMetrics()));
assertEquals(3, countBackendQueriesOfSize(barrierSize * 3, profile.getMetrics()));
assertEquals(1, countBackendQueriesOfSize((numV - 3 * barrierSize) * 3, profile.getMetrics()));

// test `has()` step `REQUIRED_AND_NEXT_PROPERTIES` mode with following values step
profile = testLimitedBatch(() -> graph.traversal().V(cs).barrier(barrierSize).has("foo", "bar").has("fooBar", "Bar").values("barFoo"),
option(USE_MULTIQUERY), true, option(LIMITED_BATCH), true,
option(PROPERTY_PREFETCHING), false,
option(HAS_STEP_BATCH_MODE), MultiQueryHasStepStrategyMode.REQUIRED_AND_NEXT_PROPERTIES.getConfigName());
// `foo`, `fooBar`, and `barFoo` properties are prefetched separately
assertEquals(9, countBackendQueriesOfSize(barrierSize, profile.getMetrics()));
assertEquals(3, countBackendQueriesOfSize(numV - 3 * barrierSize, profile.getMetrics()));
assertEquals(3, countBackendQueriesOfSize(barrierSize * 3, profile.getMetrics()));
assertEquals(1, countBackendQueriesOfSize((numV - 3 * barrierSize) * 3, profile.getMetrics()));

// test `has()` step `REQUIRED_AND_NEXT_PROPERTIES` mode with following elementMap step
profile = testLimitedBatch(() -> graph.traversal().V(cs).barrier(barrierSize).has("foo", "bar").has("fooBar", "Bar").elementMap("barFoo"),
option(USE_MULTIQUERY), true, option(LIMITED_BATCH), true,
option(PROPERTY_PREFETCHING), false,
option(HAS_STEP_BATCH_MODE), MultiQueryHasStepStrategyMode.REQUIRED_AND_NEXT_PROPERTIES.getConfigName());
// `foo`, `fooBar`, and `barFoo` properties are prefetched separately
assertEquals(9, countBackendQueriesOfSize(barrierSize, profile.getMetrics()));
assertEquals(3, countBackendQueriesOfSize(numV - 3 * barrierSize, profile.getMetrics()));
assertEquals(3, countBackendQueriesOfSize(barrierSize * 3, profile.getMetrics()));
assertEquals(1, countBackendQueriesOfSize((numV - 3 * barrierSize) * 3, profile.getMetrics()));

// test `has()` step `REQUIRED_AND_NEXT_PROPERTIES` mode with following valueMap step (all properties)
profile = testLimitedBatch(() -> graph.traversal().V(cs).barrier(barrierSize).has("foo", "bar").has("fooBar", "Bar").valueMap(),
Expand Down Expand Up @@ -5974,8 +5973,8 @@ private void testLimitBatchSizeForHasStep(int numV, int barrierSize, int limit,
option(PROPERTY_PREFETCHING), false,
option(HAS_STEP_BATCH_MODE), MultiQueryHasStepStrategyMode.REQUIRED_AND_NEXT_PROPERTIES_OR_ALL.getConfigName());
// `foo`, `fooBar`, and `barFoo` properties are prefetched separately
assertEquals(9, countBackendQueriesOfSize(barrierSize, profile.getMetrics()));
assertEquals(3, countBackendQueriesOfSize(numV - 3 * barrierSize, profile.getMetrics()));
assertEquals(3, countBackendQueriesOfSize(barrierSize * 3, profile.getMetrics()));
assertEquals(1, countBackendQueriesOfSize((numV - 3 * barrierSize) * 3, profile.getMetrics()));

// test `has()` step `REQUIRED_AND_NEXT_PROPERTIES_OR_ALL` mode with not following values step
profile = testLimitedBatch(() -> graph.traversal().V(cs).barrier(barrierSize).has("foo", "bar").has("fooBar", "Bar")
Expand Down Expand Up @@ -6093,6 +6092,70 @@ private void testLimitBatchSizeForMultiQueryOfConnectiveSteps(JanusGraphVertex[]
assertEquals((int) Math.ceil((double) Math.min(bs.length, limit) / barrierSize), countOptimizationQueries(profile.getMetrics()));
}

@Test
public void testMultiSliceDBCachedRequests(){
clopen(option(DB_CACHE), false);
mgmt.makeVertexLabel("testVertex").make();
finishSchema();
int numV = 100;
JanusGraphVertex a = graph.addVertex();
JanusGraphVertex[] bs = new JanusGraphVertex[numV];
JanusGraphVertex[] cs = new JanusGraphVertex[numV];
for (int i = 0; i < numV; ++i) {
bs[i] = graph.addVertex();
cs[i] = graph.addVertex("testVertex");
cs[i].property("foo", "bar");
cs[i].property("fooBar", "Bar");
cs[i].property("barFoo", "Foo");
a.addEdge("knows", bs[i]);
bs[i].addEdge("knows", cs[i]);
}

clopen(option(USE_MULTIQUERY), true, option(LIMITED_BATCH), true,
option(DB_CACHE), true, option(DB_CACHE_TIME), 1000000L,
option(HAS_STEP_BATCH_MODE), MultiQueryHasStepStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(),
option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName());
assertEquals(numV, graph.traversal().V(cs).values("foo").toList().size());
graph.tx().rollback();
assertEquals(numV, graph.traversal().V(cs).values("foo").toList().size());
graph.tx().rollback();
assertEquals(numV * 2, graph.traversal().V(cs).values("foo", "fooBar").toList().size());
graph.tx().rollback();
assertEquals(numV * 2, graph.traversal().V(cs).values("foo", "fooBar").toList().size());
graph.tx().rollback();
assertEquals(numV * 3, graph.traversal().V(cs).values("foo", "barFoo", "fooBar").toList().size());
graph.tx().rollback();
assertEquals(numV * 3, graph.traversal().V(cs).values("foo", "barFoo", "fooBar").toList().size());

clopen(option(USE_MULTIQUERY), true, option(LIMITED_BATCH), true,
option(DB_CACHE), true, option(DB_CACHE_TIME), 1000000L,
option(HAS_STEP_BATCH_MODE), MultiQueryHasStepStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(),
option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName());
assertEquals(numV, graph.traversal().V(cs).values("foo").toList().size());
graph.tx().rollback();
assertEquals(numV * 3, graph.traversal().V(cs).values("foo", "barFoo", "fooBar").toList().size());
graph.tx().rollback();

clopen(option(USE_MULTIQUERY), true, option(LIMITED_BATCH), true,
option(DB_CACHE), true, option(DB_CACHE_TIME), 1000000L,
option(HAS_STEP_BATCH_MODE), MultiQueryHasStepStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName(),
option(PROPERTIES_BATCH_MODE), MultiQueryPropertiesStrategyMode.REQUIRED_PROPERTIES_ONLY.getConfigName());
assertEquals(3, graph.traversal().V(cs[1], cs[5], cs[10]).values("foo").toList().size());
graph.tx().rollback();
assertEquals(3, graph.traversal().V(cs[1], cs[7], cs[10]).values("foo").toList().size());
graph.tx().rollback();
assertEquals(6, graph.traversal().V(cs[1], cs[6], cs[8]).values("foo", "fooBar").toList().size());
graph.tx().rollback();
assertEquals(6, graph.traversal().V(cs[1], cs[7], cs[10]).values("foo", "fooBar").toList().size());
graph.tx().rollback();
assertEquals(9, graph.traversal().V(cs[1], cs[5], cs[10]).values("foo", "barFoo", "fooBar").toList().size());
graph.tx().rollback();
assertEquals(numV * 3, graph.traversal().V(cs).values("foo", "barFoo", "fooBar").toList().size());
graph.tx().rollback();
assertEquals(numV * 2, graph.traversal().V(cs).values("foo", "fooBar").toList().size());
assertEquals(numV * 3, graph.traversal().V().values("foo", "barFoo", "fooBar").toList().size());
}

private TraversalMetrics testLimitedBatch(Supplier<GraphTraversal<?, ?>> traversal, Object... settings){
assertEqualResultWithAndWithoutLimitBatchSize(traversal);
if(settings.length == 0){
Expand Down Expand Up @@ -7791,7 +7854,7 @@ public void testVertexCentricQueryProfiling() {
}};
mMultiLabels.getAnnotations().remove("percentDur");
assertEquals(annotations, mMultiLabels.getAnnotations());
assertEquals(3, mMultiLabels.getNested().size());
assertEquals(4, mMultiLabels.getNested().size());
Metrics friendMetrics = (Metrics) mMultiLabels.getNested().toArray()[1];
assertEquals("OR-query", friendMetrics.getName());
// FIXME: assertTrue(friendMetrics.getDuration(TimeUnit.MICROSECONDS) > 0);
Expand All @@ -7810,6 +7873,14 @@ public void testVertexCentricQueryProfiling() {
put("query", "friend-no-index:SliceQuery[0x7180,0x7181)"); // no vertex-centric index found
}};
assertEquals(annotations, friendNoIndexMetrics.getAnnotations());
Metrics backendQueryMetrics = (Metrics) mMultiLabels.getNested().toArray()[3];
assertEquals("backend-query", backendQueryMetrics.getName());
Map<String, Object> annotatnions = backendQueryMetrics.getAnnotations();
assertEquals(3, annotatnions.size());
assertEquals("true", annotatnions.get("multiSlices"));
assertEquals(2, annotatnions.get("queriesAmount"));
assertTrue(annotatnions.get("queries").equals("[2069:byTime:SliceQuery[0xB0E0FF7FFFFF9B,0xB0E0FF7FFFFF9C), friend-no-index:SliceQuery[0x7180,0x7181)]") ||
annotatnions.get("queries").equals("[friend-no-index:SliceQuery[0x7180,0x7181), 2069:byTime:SliceQuery[0xB0E0FF7FFFFF9B,0xB0E0FF7FFFFF9C)]"));
}

@Test
Expand Down
Loading

1 comment on commit 98d409b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 98d409b Previous: 3a7ba53 Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 14777.51113772866 ms/op 20132.68260361965 ms/op 0.73
org.janusgraph.GraphCentricQueryBenchmark.getVertices 1380.7539098525629 ms/op 1617.8956294193085 ms/op 0.85
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 219.13727245217393 ms/op 222.76279991304347 ms/op 0.98
org.janusgraph.MgmtOlapJobBenchmark.runReindex 461.3728925930303 ms/op 541.8996275500001 ms/op 0.85
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 354.15739462782807 ms/op 368.0788711730627 ms/op 0.96
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 4339.982463577545 ms/op
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 30118.99498364445 ms/op
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 23008.702452833335 ms/op
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 40694.64488569999 ms/op
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 15614.86522907647 ms/op
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 640.4218382146752 ms/op
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 4173.289054289941 ms/op
org.janusgraph.CQLMultiQueryBenchmark.getNames 15020.539389873426 ms/op 19643.66567439762 ms/op 0.76
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 6977.270539935606 ms/op
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 683.6432060171409 ms/op
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 21089.80216258 ms/op
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 591.6684639691728 ms/op
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 21060.89587275 ms/op
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 407.4965431192044 ms/op
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 24361.753701466663 ms/op
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 14915.023356591606 ms/op 19824.388658859913 ms/op 0.75
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 15979.780932136513 ms/op
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 15707.786933848482 ms/op

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.