From 638a0b0f89362a19d9a0f6a65e02c60a9d27be29 Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Tue, 1 Oct 2024 15:24:47 +0200 Subject: [PATCH 1/7] Update LinearQuerySelector to also accept starting index --- .../selector/impl/LinearQuerySelector.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java b/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java index 3d3faad3..ac3001ed 100644 --- a/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java +++ b/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java @@ -14,25 +14,29 @@ public class LinearQuerySelector extends QuerySelector { public LinearQuerySelector(int size) { super(size); - index = -1; + index = 0; + } + + public LinearQuerySelector(int size, int startIndex) { + super(size); + index = startIndex; } @Override public int getNextIndex() { - index++; if (index >= this.size) { index = 0; } - return index; + return index++; } /** - * Return the current index. This is the index of the last returned query. If no query was returned yet, it returns - * -1. - * @return + * Return the current index. This is the index of the last returned query. + * + * @return the current index */ @Override public int getCurrentIndex() { - return index; + return index - 1; } } From 133ede7e6d05775fba523f8ef73fea701aaed6ed Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Tue, 1 Oct 2024 15:29:33 +0200 Subject: [PATCH 2/7] Fix minor bug concerning failed results --- .../org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java b/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java index 565763f6..a4e84103 100644 --- a/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java +++ b/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java @@ -270,7 +270,7 @@ private HttpExecutionResult executeHttpRequest(Duration timeout) { try { request = requestFactory.buildHttpRequest(queryHandle); } catch (IOException | URISyntaxException e) { - return createFailedResultBeforeRequest(config.queries().getQuerySelectorInstance().getCurrentIndex(), e); + return createFailedResultBeforeRequest(querySelector.getCurrentIndex(), e); } // execute the request From 69952c09a6b052caa5d7ced5eca23014de831aa5 Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Tue, 1 Oct 2024 15:30:32 +0200 Subject: [PATCH 3/7] Update QueryHandler and Stresstest to give each worker a starting index when executing queries linearly --- .../iguana/cc/query/handler/QueryHandler.java | 7 ++++++- .../aksw/iguana/cc/tasks/impl/Stresstest.java | 21 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java b/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java index 6930d3f1..41d1491d 100644 --- a/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java +++ b/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java @@ -157,6 +157,7 @@ public record QueryStreamWrapper(int index, boolean cached, Supplier { return new LinearQuerySelector(queryList.size()); } + case LINEAR -> { return new LinearQuerySelector(queryList.size(), totalWorkerCount != 0 ? (queryList.size() * workerCount++) / totalWorkerCount : 0); } case RANDOM -> { return new RandomQuerySelector(queryList.size(), config.seed() + workerCount++); } } diff --git a/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java b/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java index 1e93882e..a2ad7a61 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java +++ b/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.aksw.iguana.cc.metrics.Metric; +import org.aksw.iguana.cc.query.handler.QueryHandler; import org.aksw.iguana.cc.storage.Storage; import org.aksw.iguana.cc.tasks.Task; import org.aksw.iguana.cc.worker.HttpWorker; @@ -82,11 +83,31 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody public void run() { if (!warmupWorkers.isEmpty()) { + // initialize query handlers + // count the number of workers for each query handler + final var queryHandlers = warmupWorkers.stream().map(w -> w.config().queries()).distinct().toList(); + queryHandlers.stream().map(qh1 -> + List.of(qh1, warmupWorkers.stream() + .map(HttpWorker::config) + .map(HttpWorker.Config::queries) + .filter(qh1::equals) + .count())) + .forEach(list -> ((QueryHandler) list.get(0)).setTotalWorkerCount((int) (long) list.get(1))); SPARQLProtocolWorker.initHttpClient(warmupWorkers.size()); var warmupResults = executeWorkers(warmupWorkers); // warmup results will be dismissed SPARQLProtocolWorker.closeHttpClient(); } + // initialize query handlers + // count the number of workers for each query handler + final var queryHandlers = workers.stream().map(w -> w.config().queries()).distinct().toList(); + queryHandlers.stream().map(qh1 -> + List.of(qh1, workers.stream() + .map(HttpWorker::config) + .map(HttpWorker.Config::queries) + .filter(qh1::equals) + .count())) + .forEach(list -> ((QueryHandler) list.get(0)).setTotalWorkerCount((int) (long) list.get(1))); SPARQLProtocolWorker.initHttpClient(workers.size()); var results = executeWorkers(workers); SPARQLProtocolWorker.closeHttpClient(); From 3851716fd3e3afcbff1c8f6424e88511228c31df Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Tue, 1 Oct 2024 15:53:52 +0200 Subject: [PATCH 4/7] Fix the initialization of the query handlers --- .../aksw/iguana/cc/tasks/impl/Stresstest.java | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java b/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java index a2ad7a61..1338c92c 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java +++ b/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java @@ -45,6 +45,15 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody // initialize workers if (config.warmupWorkers() != null) { + // initialize query handlers + // count the number of workers for each query handler + final var queryHandlers = config.warmupWorkers.stream().map(HttpWorker.Config::queries).distinct().toList(); + queryHandlers.stream().map(qh1 -> + List.of(qh1, config.warmupWorkers.stream() + .map(HttpWorker.Config::queries) + .filter(qh1::equals) + .count())) + .forEach(list -> ((QueryHandler) list.get(0)).setTotalWorkerCount((int) (long) list.get(1))); long workerId = 0; for (HttpWorker.Config workerConfig : config.warmupWorkers()) { for (int i = 0; i < workerConfig.number(); i++) { @@ -55,6 +64,15 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody } for (HttpWorker.Config workerConfig : config.workers()) { + // initialize query handlers + // count the number of workers for each query handler + final var queryHandlers = config.workers.stream().map(HttpWorker.Config::queries).distinct().toList(); + queryHandlers.stream().map(qh1 -> + List.of(qh1, config.workers.stream() + .filter(w -> w.queries().equals(qh1)) + .mapToInt(HttpWorker.Config::number) + .sum())) + .forEach(list -> ((QueryHandler) list.get(0)).setTotalWorkerCount((int) list.get(1))); long workerId = 0; for (int i = 0; i < workerConfig.number(); i++) { var responseBodyProcessor = (workerConfig.parseResults()) ? responseBodyProcessorInstances.getProcessor(workerConfig.acceptHeader()) : null; @@ -83,31 +101,10 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody public void run() { if (!warmupWorkers.isEmpty()) { - // initialize query handlers - // count the number of workers for each query handler - final var queryHandlers = warmupWorkers.stream().map(w -> w.config().queries()).distinct().toList(); - queryHandlers.stream().map(qh1 -> - List.of(qh1, warmupWorkers.stream() - .map(HttpWorker::config) - .map(HttpWorker.Config::queries) - .filter(qh1::equals) - .count())) - .forEach(list -> ((QueryHandler) list.get(0)).setTotalWorkerCount((int) (long) list.get(1))); SPARQLProtocolWorker.initHttpClient(warmupWorkers.size()); var warmupResults = executeWorkers(warmupWorkers); // warmup results will be dismissed SPARQLProtocolWorker.closeHttpClient(); } - - // initialize query handlers - // count the number of workers for each query handler - final var queryHandlers = workers.stream().map(w -> w.config().queries()).distinct().toList(); - queryHandlers.stream().map(qh1 -> - List.of(qh1, workers.stream() - .map(HttpWorker::config) - .map(HttpWorker.Config::queries) - .filter(qh1::equals) - .count())) - .forEach(list -> ((QueryHandler) list.get(0)).setTotalWorkerCount((int) (long) list.get(1))); SPARQLProtocolWorker.initHttpClient(workers.size()); var results = executeWorkers(workers); SPARQLProtocolWorker.closeHttpClient(); From 3be969e7426ce76e0dc1f2c0279e5388929841f8 Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Tue, 1 Oct 2024 15:53:56 +0200 Subject: [PATCH 5/7] Add simple test --- .../selector/impl/LinearQuerySelectorTest.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/test/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelectorTest.java b/src/test/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelectorTest.java index ca508685..c52331c9 100644 --- a/src/test/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelectorTest.java +++ b/src/test/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelectorTest.java @@ -26,4 +26,18 @@ public void ThrowOnLinearQuerySelectorSizeZero() { final var size = 0; assertThrows(IllegalArgumentException.class, () -> new LinearQuerySelector(size)); } + + @Test + public void testStartingIndex() { + final var size = 5; + final var startIndex = 3; + final var linearQuerySelector = new LinearQuerySelector(size, startIndex); + // -1, because the next index hasn't been requested yet + assertEquals(startIndex - 1, linearQuerySelector.getCurrentIndex()); + for (int i = 0; i < 10; i++) { + int currentIndex = linearQuerySelector.getNextIndex(); + assertEquals((i + startIndex) % size, currentIndex); + assertEquals(currentIndex, linearQuerySelector.getCurrentIndex()); + } + } } \ No newline at end of file From 5bf2e65baac2fd17c95c0f689e196e57bab215f7 Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Tue, 1 Oct 2024 15:54:24 +0200 Subject: [PATCH 6/7] Remove unused variable --- src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java b/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java index 1338c92c..ad36af22 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java +++ b/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java @@ -102,7 +102,7 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody public void run() { if (!warmupWorkers.isEmpty()) { SPARQLProtocolWorker.initHttpClient(warmupWorkers.size()); - var warmupResults = executeWorkers(warmupWorkers); // warmup results will be dismissed + executeWorkers(warmupWorkers); // warmup results will be dismissed SPARQLProtocolWorker.closeHttpClient(); } SPARQLProtocolWorker.initHttpClient(workers.size()); From b42622d928f9ab02589c3c661f2ce81399c517f4 Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Fri, 4 Oct 2024 11:56:34 +0200 Subject: [PATCH 7/7] Add comment back about LinearQuerySelector indexing --- .../aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java b/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java index ac3001ed..60c0ae0e 100644 --- a/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java +++ b/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java @@ -32,6 +32,7 @@ public int getNextIndex() { /** * Return the current index. This is the index of the last returned query. + * If no query was returned yet, the method will return -1. * * @return the current index */