Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make linearly executing workers start at different positions #286

Merged
merged 7 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public record QueryStreamWrapper(int index, boolean cached, Supplier<InputStream
final protected QueryList queryList;

private int workerCount = 0; // give every worker inside the same worker config an offset seed
private int totalWorkerCount = 0;

final protected int hashCode;

Expand Down Expand Up @@ -186,6 +187,10 @@ public QueryHandler(Config config) throws IOException {
this.hashCode = queryList.hashCode();
}

public void setTotalWorkerCount(int workers) {
this.totalWorkerCount = workers;
}

private QueryList initializeTemplateQueryHandler(QuerySource templateSource) throws IOException {
QuerySource querySource = templateSource;
final var originalPath = templateSource.getPath();
Expand Down Expand Up @@ -238,7 +243,7 @@ private QuerySource createQuerySource(Path path) throws IOException {

public QuerySelector getQuerySelectorInstance() {
switch (config.order()) {
case LINEAR -> { return new LinearQuerySelector(queryList.size()); }
case LINEAR -> { return new LinearQuerySelector(queryList.size(), totalWorkerCount != 0 ? (queryList.size() * workerCount++) / totalWorkerCount : 0); }
nck-mlcnv marked this conversation as resolved.
Show resolved Hide resolved
case RANDOM -> { return new RandomQuerySelector(queryList.size(), config.seed() + workerCount++); }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,30 @@ public class LinearQuerySelector extends QuerySelector {

public LinearQuerySelector(int size) {
super(size);
index = -1;
index = 0;
}

public LinearQuerySelector(int size, int startIndex) {
super(size);
index = startIndex;
}

@Override
public int getNextIndex() {
index++;
if (index >= this.size) {
index = 0;
}
return index;
return index++;
}

/**
* Return the current index. This is the index of the last returned query. If no query was returned yet, it returns
* -1.
* @return
* Return the current index. This is the index of the last returned query.
* If no query was returned yet, the method will return -1.
*
* @return the current index
*/
@Override
public int getCurrentIndex() {
return index;
return index - 1;
}
}
22 changes: 20 additions & 2 deletions src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import org.aksw.iguana.cc.metrics.Metric;
import org.aksw.iguana.cc.query.handler.QueryHandler;
import org.aksw.iguana.cc.storage.Storage;
import org.aksw.iguana.cc.tasks.Task;
import org.aksw.iguana.cc.worker.HttpWorker;
Expand Down Expand Up @@ -44,6 +45,15 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody

// initialize workers
if (config.warmupWorkers() != null) {
// initialize query handlers
// count the number of workers for each query handler
final var queryHandlers = config.warmupWorkers.stream().map(HttpWorker.Config::queries).distinct().toList();
queryHandlers.stream().map(qh1 ->
List.of(qh1, config.warmupWorkers.stream()
.map(HttpWorker.Config::queries)
.filter(qh1::equals)
.count()))
.forEach(list -> ((QueryHandler) list.get(0)).setTotalWorkerCount((int) (long) list.get(1)));
long workerId = 0;
for (HttpWorker.Config workerConfig : config.warmupWorkers()) {
for (int i = 0; i < workerConfig.number(); i++) {
Expand All @@ -54,6 +64,15 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody
}

for (HttpWorker.Config workerConfig : config.workers()) {
// initialize query handlers
// count the number of workers for each query handler
final var queryHandlers = config.workers.stream().map(HttpWorker.Config::queries).distinct().toList();
queryHandlers.stream().map(qh1 ->
List.of(qh1, config.workers.stream()
.filter(w -> w.queries().equals(qh1))
.mapToInt(HttpWorker.Config::number)
.sum()))
.forEach(list -> ((QueryHandler) list.get(0)).setTotalWorkerCount((int) list.get(1)));
long workerId = 0;
for (int i = 0; i < workerConfig.number(); i++) {
var responseBodyProcessor = (workerConfig.parseResults()) ? responseBodyProcessorInstances.getProcessor(workerConfig.acceptHeader()) : null;
Expand Down Expand Up @@ -83,10 +102,9 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody
public void run() {
if (!warmupWorkers.isEmpty()) {
SPARQLProtocolWorker.initHttpClient(warmupWorkers.size());
var warmupResults = executeWorkers(warmupWorkers); // warmup results will be dismissed
executeWorkers(warmupWorkers); // warmup results will be dismissed
SPARQLProtocolWorker.closeHttpClient();
}

SPARQLProtocolWorker.initHttpClient(workers.size());
var results = executeWorkers(workers);
SPARQLProtocolWorker.closeHttpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ private HttpExecutionResult executeHttpRequest(Duration timeout) {
try {
request = requestFactory.buildHttpRequest(queryHandle);
} catch (IOException | URISyntaxException e) {
return createFailedResultBeforeRequest(config.queries().getQuerySelectorInstance().getCurrentIndex(), e);
return createFailedResultBeforeRequest(querySelector.getCurrentIndex(), e);
}

// execute the request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,18 @@ public void ThrowOnLinearQuerySelectorSizeZero() {
final var size = 0;
assertThrows(IllegalArgumentException.class, () -> new LinearQuerySelector(size));
}

@Test
public void testStartingIndex() {
final var size = 5;
final var startIndex = 3;
final var linearQuerySelector = new LinearQuerySelector(size, startIndex);
// -1, because the next index hasn't been requested yet
assertEquals(startIndex - 1, linearQuerySelector.getCurrentIndex());
for (int i = 0; i < 10; i++) {
int currentIndex = linearQuerySelector.getNextIndex();
assertEquals((i + startIndex) % size, currentIndex);
assertEquals(currentIndex, linearQuerySelector.getCurrentIndex());
}
}
}
Loading