diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java index 166b025a..6fb5a71d 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java @@ -26,11 +26,14 @@ import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.get.MultiGetItemResponse; import org.opensearch.action.get.MultiGetResponse; +import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.IndicesOptions; @@ -133,6 +136,33 @@ public IndexResponse updateDatasource(final Datasource datasource) { }); } + /** + * Update datasources in an index {@code DatasourceExtension.JOB_INDEX_NAME} + * @param datasources the datasources + * @param listener action listener + */ + public void updateDatasource(final List datasources, final ActionListener listener) { + BulkRequest bulkRequest = new BulkRequest(); + datasources.stream().map(datasource -> { + datasource.setLastUpdateTime(Instant.now()); + return datasource; + }).map(this::toIndexRequest).forEach(indexRequest -> bulkRequest.add(indexRequest)); + StashedThreadContext.run(client, () -> client.bulk(bulkRequest, listener)); + } + + private IndexRequest toIndexRequest(Datasource datasource) { + try { + IndexRequest indexRequest = new IndexRequest(); + indexRequest.index(DatasourceExtension.JOB_INDEX_NAME); + indexRequest.id(datasource.getName()); + indexRequest.opType(DocWriteRequest.OpType.INDEX); + indexRequest.source(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)); + return indexRequest; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + /** * Put datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME} * diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java index 5f1e5a4d..0f884c32 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java @@ -354,7 +354,15 @@ public void disable() { * @return Current index name of a datasource */ public String currentIndexName() { - return isExpired() ? null : indexNameFor(database.updatedAt.toEpochMilli()); + if (isExpired()) { + return null; + } + + if (database.updatedAt == null) { + return null; + } + + return indexNameFor(database.updatedAt.toEpochMilli()); } /** @@ -371,6 +379,14 @@ private String indexNameFor(final long suffix) { return String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, name, suffix); } + /** + * Reset database so that it can be updated in next run regardless there is new update or not + */ + public void resetDatabase() { + database.setUpdatedAt(null); + database.setSha256Hash(null); + } + /** * Checks if datasource is expired or not * @@ -537,7 +553,7 @@ public Database(final StreamInput in) throws IOException { public void writeTo(final StreamOutput out) throws IOException { out.writeOptionalString(provider); out.writeOptionalString(sha256Hash); - out.writeOptionalVLong(updatedAt.toEpochMilli()); + out.writeOptionalVLong(updatedAt == null ? null : updatedAt.toEpochMilli()); out.writeOptionalVLong(validForInDays); out.writeOptionalStringCollection(fields); } @@ -640,10 +656,10 @@ public UpdateStats(final StreamInput in) throws IOException { @Override public void writeTo(final StreamOutput out) throws IOException { - out.writeOptionalVLong(lastSucceededAt.toEpochMilli()); + out.writeOptionalVLong(lastSucceededAt == null ? null : lastSucceededAt.toEpochMilli()); out.writeOptionalVLong(lastProcessingTimeInMillis); - out.writeOptionalVLong(lastFailedAt.toEpochMilli()); - out.writeOptionalVLong(lastSkippedAt.toEpochMilli()); + out.writeOptionalVLong(lastFailedAt == null ? null : lastFailedAt.toEpochMilli()); + out.writeOptionalVLong(lastSkippedAt == null ? null : lastSkippedAt.toEpochMilli()); } @Override diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListener.java b/src/main/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListener.java new file mode 100644 index 00000000..e58ab4b9 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListener.java @@ -0,0 +1,112 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.listener; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; + +import lombok.AllArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.cluster.RestoreInProgress; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.component.AbstractLifecycleComponent; +import org.opensearch.common.inject.Inject; +import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; +import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.threadpool.ThreadPool; + +@Log4j2 +@AllArgsConstructor(onConstructor = @__(@Inject)) +public class Ip2GeoListener extends AbstractLifecycleComponent implements ClusterStateListener { + private static final int SCHEDULE_IN_MIN = 15; + private static final int DELAY_IN_MILLIS = 10000; + private final ClusterService clusterService; + private final ThreadPool threadPool; + private final DatasourceFacade datasourceFacade; + + @Override + public void clusterChanged(final ClusterChangedEvent event) { + if (event.localNodeClusterManager() == false) { + return; + } + + for (RestoreInProgress.Entry entry : event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) { + if (RestoreInProgress.State.SUCCESS.equals(entry.state()) == false) { + continue; + } + + if (entry.indices().stream().anyMatch(index -> DatasourceExtension.JOB_INDEX_NAME.equals(index)) == false) { + continue; + } + + threadPool.generic().submit(() -> forceUpdateGeoIpData()); + } + } + + private void forceUpdateGeoIpData() { + datasourceFacade.getAllDatasources(new ActionListener<>() { + @Override + public void onResponse(final List datasources) { + datasources.stream().forEach(Ip2GeoListener.this::scheduleForceUpdate); + datasourceFacade.updateDatasource(datasources, new ActionListener<>() { + @Override + public void onResponse(final BulkResponse bulkItemResponses) { + log.info("Datasources are updated for cleanup"); + } + + @Override + public void onFailure(final Exception e) { + log.error("Failed to update datasource for cleanup after restoring", e); + } + }); + } + + @Override + public void onFailure(final Exception e) { + log.error("Failed to get datasource after restoring", e); + } + }); + } + + /** + * Give a delay so that job scheduler can schedule the job right after the delay. Otherwise, it schedules + * the job after specified update interval. + */ + private void scheduleForceUpdate(Datasource datasource) { + IntervalSchedule schedule = new IntervalSchedule(Instant.now(), SCHEDULE_IN_MIN, ChronoUnit.MINUTES, DELAY_IN_MILLIS); + datasource.resetDatabase(); + datasource.setSystemSchedule(schedule); + datasource.setTask(DatasourceTask.ALL); + } + + @Override + protected void doStart() { + if (DiscoveryNode.isClusterManagerNode(clusterService.getSettings())) { + clusterService.addListener(this); + } + } + + @Override + protected void doStop() { + clusterService.removeListener(this); + } + + @Override + protected void doClose() throws IOException { + + } +} diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java index f3cdc3a7..e395e843 100644 --- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java +++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java @@ -20,6 +20,7 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.MapBuilder; +import org.opensearch.common.component.LifecycleComponent; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.IndexScopedSettings; @@ -55,6 +56,7 @@ import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceRunner; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; +import org.opensearch.geospatial.ip2geo.listener.Ip2GeoListener; import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor; import org.opensearch.geospatial.processor.FeatureProcessor; import org.opensearch.geospatial.rest.action.upload.geojson.RestUploadGeoJSONAction; @@ -107,6 +109,11 @@ public Map getProcessors(Processor.Parameters paramet .immutableMap(); } + @Override + public Collection> getGuiceServiceClasses() { + return List.of(Ip2GeoListener.class); + } + @Override public List> getExecutorBuilders(Settings settings) { List> executorBuilders = new ArrayList<>(); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java index d2ca28bb..2a29afb1 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java @@ -27,6 +27,7 @@ import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetRequest; @@ -311,6 +312,27 @@ public void testGetAllDatasources_whenValidInput_thenSucceed() { assertEquals(datasources, captor.getValue()); } + public void testUpdateDatasource_whenValidInput_thenUpdate() { + List datasources = Arrays.asList(randomDatasource(), randomDatasource()); + + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + // Verify + assertTrue(actionRequest instanceof BulkRequest); + BulkRequest bulkRequest = (BulkRequest) actionRequest; + assertEquals(2, bulkRequest.requests().size()); + for (int i = 0; i < bulkRequest.requests().size(); i++) { + IndexRequest request = (IndexRequest) bulkRequest.requests().get(i); + assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index()); + assertEquals(datasources.get(i).getName(), request.id()); + assertEquals(DocWriteRequest.OpType.INDEX, request.opType()); + assertTrue(request.source().utf8ToString().contains(datasources.get(i).getEndpoint())); + } + return null; + }); + + datasourceFacade.updateDatasource(datasources, mock(ActionListener.class)); + } + private SearchHits getMockedSearchHits(List datasources) { SearchHit[] searchHitArray = datasources.stream().map(this::toBytesReference).map(this::toSearchHit).toArray(SearchHit[]::new); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java index 23981524..1f2210eb 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java @@ -78,6 +78,21 @@ public void testCurrentIndexName_whenExpired_thenReturnNull() { assertNull(datasource.currentIndexName()); } + public void testCurrentIndexName_whenDatabaseUpdateDateIsNull_thenReturnNull() { + String id = GeospatialTestHelper.randomLowerCaseString(); + Datasource datasource = new Datasource(); + datasource.setName(id); + datasource.getDatabase().setProvider("provider"); + datasource.getDatabase().setSha256Hash("sha256Hash"); + datasource.getDatabase().setUpdatedAt(null); + datasource.getDatabase().setValidForInDays(1l); + datasource.getUpdateStats().setLastSucceededAt(Instant.now()); + datasource.getDatabase().setFields(new ArrayList<>()); + + assertFalse(datasource.isExpired()); + assertNull(datasource.currentIndexName()); + } + public void testGetIndexNameFor() { long updatedAt = randomPositiveLong(); DatasourceManifest manifest = mock(DatasourceManifest.class); @@ -92,6 +107,19 @@ public void testGetIndexNameFor() { ); } + public void testResetDatabase_whenCalled_thenNullifySomeFields() { + Datasource datasource = randomDatasource(); + assertNotNull(datasource.getDatabase().getSha256Hash()); + assertNotNull(datasource.getDatabase().getUpdatedAt()); + + // Run + datasource.resetDatabase(); + + // Verify + assertNull(datasource.getDatabase().getSha256Hash()); + assertNull(datasource.getDatabase().getUpdatedAt()); + } + public void testIsExpired_whenCalled_thenExpectedValue() { Datasource datasource = new Datasource(); // never expire if validForInDays is null diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java index d8fbc3e0..7ebab275 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java @@ -66,6 +66,35 @@ public void testUpdateOrCreateGeoIpData_whenHashValueIsSame_thenSkipUpdate() { verify(datasourceFacade).updateDatasource(datasource); } + @SneakyThrows + public void testUpdateOrCreateGeoIpData_whenExpired_thenUpdate() { + File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile()); + DatasourceManifest manifest = DatasourceManifest.Builder.build(manifestFile.toURI().toURL()); + + File sampleFile = new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.csv").getFile()); + when(geoIpDataFacade.getDatabaseReader(any())).thenReturn(CSVParser.parse(sampleFile, StandardCharsets.UTF_8, CSVFormat.RFC4180)); + + Datasource datasource = new Datasource(); + datasource.setState(DatasourceState.AVAILABLE); + datasource.getDatabase().setUpdatedAt(Instant.ofEpochMilli(manifest.getUpdatedAt())); + datasource.getDatabase().setSha256Hash(manifest.getSha256Hash()); + datasource.getDatabase().setValidForInDays(1l); + datasource.setEndpoint(manifestFile.toURI().toURL().toExternalForm()); + datasource.resetDatabase(); + + // Run + datasourceUpdateService.updateOrCreateGeoIpData(datasource, mock(Runnable.class)); + + // Verify + verify(geoIpDataFacade).putGeoIpData( + eq(datasource.currentIndexName()), + isA(String[].class), + any(Iterator.class), + anyInt(), + any(Runnable.class) + ); + } + @SneakyThrows public void testUpdateOrCreateGeoIpData_whenInvalidData_thenThrowException() { File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile()); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListenerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListenerTests.java new file mode 100644 index 00000000..ff2cd3e3 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListenerTests.java @@ -0,0 +1,173 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.listener; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.util.Arrays; +import java.util.List; + +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.RestoreInProgress; +import org.opensearch.common.settings.Settings; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; +import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask; +import org.opensearch.snapshots.Snapshot; +import org.opensearch.snapshots.SnapshotId; + +public class Ip2GeoListenerTests extends Ip2GeoTestCase { + private Ip2GeoListener ip2GeoListener; + + @Before + public void init() { + ip2GeoListener = new Ip2GeoListener(clusterService, threadPool, datasourceFacade); + } + + public void testDoStart_whenClusterManagerNode_thenAddListener() { + Settings settings = Settings.builder().put("node.roles", "cluster_manager").build(); + when(clusterService.getSettings()).thenReturn(settings); + + // Run + ip2GeoListener.doStart(); + + // Verify + verify(clusterService).addListener(ip2GeoListener); + } + + public void testDoStart_whenNotClusterManagerNode_thenDoNotAddListener() { + Settings settings = Settings.builder().put("node.roles", "data").build(); + when(clusterService.getSettings()).thenReturn(settings); + + // Run + ip2GeoListener.doStart(); + + // Verify + verify(clusterService, never()).addListener(ip2GeoListener); + } + + public void testDoStop_whenCalled_thenRemoveListener() { + // Run + ip2GeoListener.doStop(); + + // Verify + verify(clusterService).removeListener(ip2GeoListener); + } + + public void testClusterChanged_whenNotClusterManagerNode_thenDoNothing() { + ClusterChangedEvent event = mock(ClusterChangedEvent.class); + when(event.localNodeClusterManager()).thenReturn(false); + + // Run + ip2GeoListener.clusterChanged(event); + + // Verify + verify(threadPool, never()).generic(); + } + + public void testClusterChanged_whenNotComplete_thenDoNothing() { + SnapshotId snapshotId = new SnapshotId(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()); + Snapshot snapshot = new Snapshot(GeospatialTestHelper.randomLowerCaseString(), snapshotId); + RestoreInProgress.Entry entry = new RestoreInProgress.Entry( + GeospatialTestHelper.randomLowerCaseString(), + snapshot, + RestoreInProgress.State.STARTED, + Arrays.asList(DatasourceExtension.JOB_INDEX_NAME), + null + ); + RestoreInProgress restoreInProgress = new RestoreInProgress.Builder().add(entry).build(); + ClusterState clusterState = mock(ClusterState.class); + when(clusterState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)).thenReturn(restoreInProgress); + ClusterChangedEvent event = mock(ClusterChangedEvent.class); + when(event.localNodeClusterManager()).thenReturn(true); + when(event.state()).thenReturn(clusterState); + + // Run + ip2GeoListener.clusterChanged(event); + + // Verify + verify(threadPool, never()).generic(); + } + + public void testClusterChanged_whenNotDatasourceIndex_thenDoNothing() { + SnapshotId snapshotId = new SnapshotId(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()); + Snapshot snapshot = new Snapshot(GeospatialTestHelper.randomLowerCaseString(), snapshotId); + RestoreInProgress.Entry entry = new RestoreInProgress.Entry( + GeospatialTestHelper.randomLowerCaseString(), + snapshot, + RestoreInProgress.State.FAILURE, + Arrays.asList(GeospatialTestHelper.randomLowerCaseString()), + null + ); + RestoreInProgress restoreInProgress = new RestoreInProgress.Builder().add(entry).build(); + ClusterState clusterState = mock(ClusterState.class); + when(clusterState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)).thenReturn(restoreInProgress); + ClusterChangedEvent event = mock(ClusterChangedEvent.class); + when(event.localNodeClusterManager()).thenReturn(true); + when(event.state()).thenReturn(clusterState); + + // Run + ip2GeoListener.clusterChanged(event); + + // Verify + verify(threadPool, never()).generic(); + } + + public void testClusterChanged_whenDatasourceIndexIsRestored_thenUpdate() { + SnapshotId snapshotId = new SnapshotId(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()); + Snapshot snapshot = new Snapshot(GeospatialTestHelper.randomLowerCaseString(), snapshotId); + RestoreInProgress.Entry entry = new RestoreInProgress.Entry( + GeospatialTestHelper.randomLowerCaseString(), + snapshot, + RestoreInProgress.State.SUCCESS, + Arrays.asList(DatasourceExtension.JOB_INDEX_NAME), + null + ); + RestoreInProgress restoreInProgress = new RestoreInProgress.Builder().add(entry).build(); + ClusterState clusterState = mock(ClusterState.class); + when(clusterState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)).thenReturn(restoreInProgress); + ClusterChangedEvent event = mock(ClusterChangedEvent.class); + when(event.localNodeClusterManager()).thenReturn(true); + when(event.state()).thenReturn(clusterState); + + // Run + ip2GeoListener.clusterChanged(event); + + // Verify + verify(threadPool).generic(); + ArgumentCaptor>> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(datasourceFacade).getAllDatasources(captor.capture()); + + // Run + List datasources = Arrays.asList(randomDatasource(), randomDatasource()); + datasources.stream().forEach(datasource -> { datasource.setTask(DatasourceTask.DELETE_UNUSED_INDICES); }); + + captor.getValue().onResponse(datasources); + + // Verify + datasources.stream().forEach(datasource -> { + assertEquals(DatasourceTask.ALL, datasource.getTask()); + assertNull(datasource.getDatabase().getUpdatedAt()); + assertNull(datasource.getDatabase().getSha256Hash()); + assertTrue(datasource.getSystemSchedule().getNextExecutionTime(Instant.now()).isAfter(Instant.now())); + assertTrue(datasource.getSystemSchedule().getNextExecutionTime(Instant.now()).isBefore(Instant.now().plusSeconds(60))); + }); + verify(datasourceFacade).updateDatasource(eq(datasources), any()); + } + +} diff --git a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java index 596bd3f7..430268fb 100644 --- a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java +++ b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java @@ -25,6 +25,7 @@ import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.component.LifecycleComponent; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -42,6 +43,7 @@ import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; +import org.opensearch.geospatial.ip2geo.listener.Ip2GeoListener; import org.opensearch.geospatial.processor.FeatureProcessor; import org.opensearch.geospatial.rest.action.upload.geojson.RestUploadGeoJSONAction; import org.opensearch.geospatial.stats.upload.RestUploadStatsAction; @@ -159,6 +161,12 @@ public void testCreateComponents() { assertEquals(SUPPORTED_COMPONENTS, registeredComponents); } + public void testGetGuiceServiceClasses() { + GeospatialPlugin plugin = new GeospatialPlugin(); + Collection> classes = List.of(Ip2GeoListener.class); + assertEquals(classes, plugin.getGuiceServiceClasses()); + } + public void testIsAnIngestPlugin() { GeospatialPlugin plugin = new GeospatialPlugin(); assertTrue(plugin instanceof IngestPlugin);