Skip to content

Commit

Permalink
Remove jitter and move index setting from DatasourceFacade to Datasou…
Browse files Browse the repository at this point in the history
…rceExtension (#319)

Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 authored May 24, 2023
1 parent 534f7be commit 4b8fd80
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

Expand All @@ -40,7 +38,6 @@
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
Expand All @@ -62,9 +59,6 @@
@Log4j2
public class DatasourceFacade {
private static final Integer MAX_SIZE = 1000;
private static final Tuple<String, Integer> INDEX_SETTING_NUM_OF_SHARDS = new Tuple<>("index.number_of_shards", 1);
private static final Tuple<String, String> INDEX_SETTING_AUTO_EXPAND_REPLICAS = new Tuple<>("index.auto_expand_replicas", "0-all");
private static final Tuple<String, Boolean> INDEX_SETTING_HIDDEN = new Tuple<>("index.hidden", true);
private final Client client;
private final ClusterService clusterService;
private final ClusterSettings clusterSettings;
Expand All @@ -76,22 +70,17 @@ public DatasourceFacade(final Client client, final ClusterService clusterService
}

/**
* Create a datasource index of single shard with auto expand replicas to all nodes
* Create datasource index
*
* We want the index to expand to all replica so that datasource query request can be executed locally
* for faster ingestion time.
* @param stepListener setp listener
*/
public void createIndexIfNotExists(final StepListener<Void> stepListener) {
if (clusterService.state().metadata().hasIndex(DatasourceExtension.JOB_INDEX_NAME) == true) {
stepListener.onResponse(null);
return;
}
final Map<String, Object> indexSettings = new HashMap<>();
indexSettings.put(INDEX_SETTING_NUM_OF_SHARDS.v1(), INDEX_SETTING_NUM_OF_SHARDS.v2());
indexSettings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2());
indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2());
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(DatasourceExtension.JOB_INDEX_NAME).mapping(getIndexMapping())
.settings(indexSettings);
.settings(DatasourceExtension.INDEX_SETTING);
StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, new ActionListener<>() {
@Override
public void onResponse(final CreateIndexResponse createIndexResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ public class Datasource implements Writeable, ScheduledJobParameter {
* Prefix of indices having Ip2Geo data
*/
public static final String IP2GEO_DATA_INDEX_NAME_PREFIX = ".ip2geo-data";
private static final long MAX_JITTER_IN_MINUTES = 5;
private static final long ONE_DAY_IN_HOURS = 24;
private static final long ONE_HOUR_IN_MINUTES = 60;

/**
* Default fields for job scheduling
Expand Down Expand Up @@ -285,21 +282,6 @@ public Long getLockDurationSeconds() {
return Ip2GeoLockService.LOCK_DURATION_IN_SECONDS;
}

/**
* Jitter in scheduling a task
*
* We want a job to be delayed randomly with range of (0, 5) minutes for the
* next execution time.
*
* @see ScheduledJobParameter#getJitter()
*
* @return the jitter
*/
@Override
public Double getJitter() {
return MAX_JITTER_IN_MINUTES / ((double) schedule.getInterval() * ONE_DAY_IN_HOURS * ONE_HOUR_IN_MINUTES);
}

/**
* Enable auto update of GeoIP data
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.geospatial.ip2geo.jobscheduler;

import java.util.Map;

import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
Expand All @@ -21,6 +23,20 @@ public class DatasourceExtension implements JobSchedulerExtension {
* Job index name for a datasource
*/
public static final String JOB_INDEX_NAME = ".scheduler_geospatial_ip2geo_datasource";
/**
* Job index setting
*
* We want it to be single shard so that job can be run only in a single node by job scheduler.
* We want it to expand to all replicas so that querying to this index can be done locally to reduce latency.
*/
public static final Map<String, Object> INDEX_SETTING = Map.of(
"index.number_of_shards",
1,
"index.auto_expand_replicas",
"0-all",
"index.hidden",
true
);

@Override
public String getJobType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.Arrays;
import java.util.Locale;

import org.opensearch.common.Randomness;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
Expand Down Expand Up @@ -77,14 +76,6 @@ public void testGetIndexNameFor() {
);
}

public void testGetJitter() {
Datasource datasource = new Datasource();
datasource.setSchedule(new IntervalSchedule(Instant.now(), Randomness.get().ints(1, 31).findFirst().getAsInt(), ChronoUnit.DAYS));
long intervalInMinutes = datasource.getSchedule().getInterval() * 60l * 24l;
double sixMinutes = 6;
assertTrue(datasource.getJitter() * intervalInMinutes <= sixMinutes);
}

public void testIsExpired() {
Datasource datasource = new Datasource();
// never expire if validForInDays is null
Expand Down

0 comments on commit 4b8fd80

Please sign in to comment.