Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add restoring event listener #328

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import org.opensearch.action.StepListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.get.MultiGetItemResponse;
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
Expand Down Expand Up @@ -133,6 +136,33 @@ public IndexResponse updateDatasource(final Datasource datasource) {
});
}

/**
* Update datasources in an index {@code DatasourceExtension.JOB_INDEX_NAME}
* @param datasources the datasources
* @param listener action listener
*/
public void updateDatasource(final List<Datasource> datasources, final ActionListener<BulkResponse> listener) {
BulkRequest bulkRequest = new BulkRequest();
datasources.stream().map(datasource -> {
datasource.setLastUpdateTime(Instant.now());
return datasource;
}).map(this::toIndexRequest).forEach(indexRequest -> bulkRequest.add(indexRequest));
StashedThreadContext.run(client, () -> client.bulk(bulkRequest, listener));
}

private IndexRequest toIndexRequest(Datasource datasource) {
try {
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(DatasourceExtension.JOB_INDEX_NAME);
indexRequest.id(datasource.getName());
indexRequest.opType(DocWriteRequest.OpType.INDEX);
indexRequest.source(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS));
return indexRequest;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* Put datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,15 @@ public void disable() {
* @return Current index name of a datasource
*/
public String currentIndexName() {
return isExpired() ? null : indexNameFor(database.updatedAt.toEpochMilli());
if (isExpired()) {
return null;
}

if (database.updatedAt == null) {
return null;
}

return indexNameFor(database.updatedAt.toEpochMilli());
}

/**
Expand All @@ -371,6 +379,14 @@ private String indexNameFor(final long suffix) {
return String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, name, suffix);
}

/**
* Reset database so that it can be updated in next run regardless there is new update or not
*/
public void resetDatabase() {
database.setUpdatedAt(null);
database.setSha256Hash(null);
}

/**
* Checks if datasource is expired or not
*
Expand Down Expand Up @@ -537,7 +553,7 @@ public Database(final StreamInput in) throws IOException {
public void writeTo(final StreamOutput out) throws IOException {
out.writeOptionalString(provider);
out.writeOptionalString(sha256Hash);
out.writeOptionalVLong(updatedAt.toEpochMilli());
out.writeOptionalVLong(updatedAt == null ? null : updatedAt.toEpochMilli());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the check for null already encapsulated into method if it does "optional"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I call .toEpochMillis() method and will will throw NPE if I don't do null check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see, that makes sense

out.writeOptionalVLong(validForInDays);
out.writeOptionalStringCollection(fields);
}
Expand Down Expand Up @@ -640,10 +656,10 @@ public UpdateStats(final StreamInput in) throws IOException {

@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeOptionalVLong(lastSucceededAt.toEpochMilli());
out.writeOptionalVLong(lastSucceededAt == null ? null : lastSucceededAt.toEpochMilli());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as for previous call to write optional, I think null value should be already handled

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for previous one. I call .toEpochMillis() method.

out.writeOptionalVLong(lastProcessingTimeInMillis);
out.writeOptionalVLong(lastFailedAt.toEpochMilli());
out.writeOptionalVLong(lastSkippedAt.toEpochMilli());
out.writeOptionalVLong(lastFailedAt == null ? null : lastFailedAt.toEpochMilli());
out.writeOptionalVLong(lastSkippedAt == null ? null : lastSkippedAt.toEpochMilli());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.listener;

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;

import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionListener;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.RestoreInProgress;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.component.AbstractLifecycleComponent;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.threadpool.ThreadPool;

@Log4j2
@AllArgsConstructor(onConstructor = @__(@Inject))
public class Ip2GeoListener extends AbstractLifecycleComponent implements ClusterStateListener {
private static final int SCHEDULE_IN_MIN = 15;
private static final int DELAY_IN_MILLIS = 10000;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final DatasourceFacade datasourceFacade;

@Override
public void clusterChanged(final ClusterChangedEvent event) {
if (event.localNodeClusterManager() == false) {
return;
}

for (RestoreInProgress.Entry entry : event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) {
if (RestoreInProgress.State.SUCCESS.equals(entry.state()) == false) {
continue;
}

if (entry.indices().stream().anyMatch(index -> DatasourceExtension.JOB_INDEX_NAME.equals(index)) == false) {
continue;
}

threadPool.generic().submit(() -> forceUpdateGeoIpData());
}
}

private void forceUpdateGeoIpData() {
datasourceFacade.getAllDatasources(new ActionListener<>() {
@Override
public void onResponse(final List<Datasource> datasources) {
datasources.stream().forEach(Ip2GeoListener.this::scheduleForceUpdate);
datasourceFacade.updateDatasource(datasources, new ActionListener<>() {
@Override
public void onResponse(final BulkResponse bulkItemResponses) {
log.info("Datasources are updated for cleanup");
}

@Override
public void onFailure(final Exception e) {
log.error("Failed to update datasource for cleanup after restoring", e);
}
});
}

@Override
public void onFailure(final Exception e) {
log.error("Failed to get datasource after restoring", e);
}
});
}

/**
* Give a delay so that job scheduler can schedule the job right after the delay. Otherwise, it schedules
* the job after specified update interval.
*/
private void scheduleForceUpdate(Datasource datasource) {
IntervalSchedule schedule = new IntervalSchedule(Instant.now(), SCHEDULE_IN_MIN, ChronoUnit.MINUTES, DELAY_IN_MILLIS);
datasource.resetDatabase();
datasource.setSystemSchedule(schedule);
datasource.setTask(DatasourceTask.ALL);
}

@Override
protected void doStart() {
if (DiscoveryNode.isClusterManagerNode(clusterService.getSettings())) {
clusterService.addListener(this);
}
}

@Override
protected void doStop() {
clusterService.removeListener(this);
}

@Override
protected void doClose() throws IOException {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we do same as on stop here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think doStop is called always before doClose(). Removing listener in doClose() will have no effect.
Saw the same pattern in OpenSearch core.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.common.component.LifecycleComponent;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceRunner;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.geospatial.ip2geo.listener.Ip2GeoListener;
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
import org.opensearch.geospatial.processor.FeatureProcessor;
import org.opensearch.geospatial.rest.action.upload.geojson.RestUploadGeoJSONAction;
Expand Down Expand Up @@ -107,6 +109,11 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
.immutableMap();
}

@Override
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
return List.of(Ip2GeoListener.class);
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
List<ExecutorBuilder<?>> executorBuilders = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.action.StepListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
Expand Down Expand Up @@ -311,6 +312,27 @@ public void testGetAllDatasources_whenValidInput_thenSucceed() {
assertEquals(datasources, captor.getValue());
}

public void testUpdateDatasource_whenValidInput_thenUpdate() {
List<Datasource> datasources = Arrays.asList(randomDatasource(), randomDatasource());

verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
// Verify
assertTrue(actionRequest instanceof BulkRequest);
BulkRequest bulkRequest = (BulkRequest) actionRequest;
assertEquals(2, bulkRequest.requests().size());
for (int i = 0; i < bulkRequest.requests().size(); i++) {
IndexRequest request = (IndexRequest) bulkRequest.requests().get(i);
assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index());
assertEquals(datasources.get(i).getName(), request.id());
assertEquals(DocWriteRequest.OpType.INDEX, request.opType());
assertTrue(request.source().utf8ToString().contains(datasources.get(i).getEndpoint()));
}
return null;
});

datasourceFacade.updateDatasource(datasources, mock(ActionListener.class));
}

private SearchHits getMockedSearchHits(List<Datasource> datasources) {
SearchHit[] searchHitArray = datasources.stream().map(this::toBytesReference).map(this::toSearchHit).toArray(SearchHit[]::new);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ public void testCurrentIndexName_whenExpired_thenReturnNull() {
assertNull(datasource.currentIndexName());
}

public void testCurrentIndexName_whenDatabaseUpdateDateIsNull_thenReturnNull() {
String id = GeospatialTestHelper.randomLowerCaseString();
Datasource datasource = new Datasource();
datasource.setName(id);
datasource.getDatabase().setProvider("provider");
datasource.getDatabase().setSha256Hash("sha256Hash");
datasource.getDatabase().setUpdatedAt(null);
datasource.getDatabase().setValidForInDays(1l);
datasource.getUpdateStats().setLastSucceededAt(Instant.now());
datasource.getDatabase().setFields(new ArrayList<>());

assertFalse(datasource.isExpired());
assertNull(datasource.currentIndexName());
}

public void testGetIndexNameFor() {
long updatedAt = randomPositiveLong();
DatasourceManifest manifest = mock(DatasourceManifest.class);
Expand All @@ -92,6 +107,19 @@ public void testGetIndexNameFor() {
);
}

public void testResetDatabase_whenCalled_thenNullifySomeFields() {
Datasource datasource = randomDatasource();
assertNotNull(datasource.getDatabase().getSha256Hash());
assertNotNull(datasource.getDatabase().getUpdatedAt());

// Run
datasource.resetDatabase();

// Verify
assertNull(datasource.getDatabase().getSha256Hash());
assertNull(datasource.getDatabase().getUpdatedAt());
}

public void testIsExpired_whenCalled_thenExpectedValue() {
Datasource datasource = new Datasource();
// never expire if validForInDays is null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,35 @@ public void testUpdateOrCreateGeoIpData_whenHashValueIsSame_thenSkipUpdate() {
verify(datasourceFacade).updateDatasource(datasource);
}

@SneakyThrows
public void testUpdateOrCreateGeoIpData_whenExpired_thenUpdate() {
File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile());
DatasourceManifest manifest = DatasourceManifest.Builder.build(manifestFile.toURI().toURL());

File sampleFile = new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.csv").getFile());
when(geoIpDataFacade.getDatabaseReader(any())).thenReturn(CSVParser.parse(sampleFile, StandardCharsets.UTF_8, CSVFormat.RFC4180));

Datasource datasource = new Datasource();
datasource.setState(DatasourceState.AVAILABLE);
datasource.getDatabase().setUpdatedAt(Instant.ofEpochMilli(manifest.getUpdatedAt()));
datasource.getDatabase().setSha256Hash(manifest.getSha256Hash());
datasource.getDatabase().setValidForInDays(1l);
datasource.setEndpoint(manifestFile.toURI().toURL().toExternalForm());
datasource.resetDatabase();

// Run
datasourceUpdateService.updateOrCreateGeoIpData(datasource, mock(Runnable.class));

// Verify
verify(geoIpDataFacade).putGeoIpData(
eq(datasource.currentIndexName()),
isA(String[].class),
any(Iterator.class),
anyInt(),
any(Runnable.class)
);
}

@SneakyThrows
public void testUpdateOrCreateGeoIpData_whenInvalidData_thenThrowException() {
File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile());
Expand Down
Loading