From 860c90292ab26b2eb387497bd291dd31bad8bb0a Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Wed, 7 Jun 2023 19:13:03 -0700 Subject: [PATCH] Add a field in datasource for current index name (#333) Signed-off-by: Heemin Kim --- .../ip2geo/action/GetDatasourceResponse.java | 2 +- .../ip2geo/action/PutDatasourceRequest.java | 6 +- .../action/UpdateDatasourceRequest.java | 4 +- .../ip2geo/jobscheduler/Datasource.java | 46 ++++++++------ .../jobscheduler/DatasourceUpdateService.java | 19 +++--- .../geospatial/ip2geo/Ip2GeoTestCase.java | 1 + .../ip2geo/jobscheduler/DatasourceTests.java | 61 ++++++++----------- .../DatasourceUpdateServiceTests.java | 1 + 8 files changed, 71 insertions(+), 69 deletions(-) diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponse.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponse.java index d0dc2dcc..c2e3cb0c 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponse.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponse.java @@ -26,7 +26,7 @@ */ @Getter @Setter -@EqualsAndHashCode +@EqualsAndHashCode(callSuper = false) public class GetDatasourceResponse extends ActionResponse implements ToXContentObject { private static final ParseField FIELD_NAME_DATASOURCES = new ParseField("datasources"); private static final ParseField FIELD_NAME_NAME = new ParseField("name"); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java index 3426008b..9f2335f9 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java @@ -18,8 +18,8 @@ import lombok.extern.log4j.Log4j2; import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; -import org.opensearch.action.support.master.AcknowledgedRequest; import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -34,8 +34,8 @@ @Getter @Setter @Log4j2 -@EqualsAndHashCode -public class PutDatasourceRequest extends AcknowledgedRequest { +@EqualsAndHashCode(callSuper = false) +public class PutDatasourceRequest extends ActionRequest { private static final int MAX_DATASOURCE_NAME_BYTES = 255; public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days"); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequest.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequest.java index 41a7fd90..45f0132e 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequest.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequest.java @@ -16,8 +16,8 @@ import lombok.Setter; import lombok.extern.log4j.Log4j2; +import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; -import org.opensearch.action.support.master.AcknowledgedRequest; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.unit.TimeValue; @@ -32,7 +32,7 @@ @Setter @Log4j2 @EqualsAndHashCode(callSuper = false) -public class UpdateDatasourceRequest extends AcknowledgedRequest { +public class UpdateDatasourceRequest extends ActionRequest { public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days"); private static final int MAX_DATASOURCE_NAME_BYTES = 255; diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java index 0f884c32..dd63d746 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java @@ -89,6 +89,7 @@ public class Datasource implements Writeable, ScheduledJobParameter { */ private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); private static final ParseField STATE_FIELD = new ParseField("state"); + private static final ParseField CURRENT_INDEX_FIELD = new ParseField("current_index"); private static final ParseField INDICES_FIELD = new ParseField("indices"); private static final ParseField DATABASE_FIELD = new ParseField("database"); private static final ParseField UPDATE_STATS_FIELD = new ParseField("update_stats"); @@ -150,8 +151,14 @@ public class Datasource implements Writeable, ScheduledJobParameter { */ private DatasourceState state; /** - * @param indices A list of indices having GeoIP data - * @return A list of indices having GeoIP data + * @param currentIndex the current index name having GeoIP data + * @return the current index name having GeoIP data + */ + @Getter(AccessLevel.NONE) + private String currentIndex; + /** + * @param indices A list of indices having GeoIP data including currentIndex + * @return A list of indices having GeoIP data including currentIndex */ private List indices; /** @@ -181,9 +188,10 @@ public class Datasource implements Writeable, ScheduledJobParameter { DatasourceTask task = DatasourceTask.valueOf((String) args[6]); String endpoint = (String) args[7]; DatasourceState state = DatasourceState.valueOf((String) args[8]); - List indices = (List) args[9]; - Database database = (Database) args[10]; - UpdateStats updateStats = (UpdateStats) args[11]; + String currentIndex = (String) args[9]; + List indices = (List) args[10]; + Database database = (Database) args[11]; + UpdateStats updateStats = (UpdateStats) args[12]; Datasource parameter = new Datasource( name, lastUpdateTime, @@ -194,6 +202,7 @@ public class Datasource implements Writeable, ScheduledJobParameter { task, endpoint, state, + currentIndex, indices, database, updateStats @@ -212,6 +221,7 @@ public class Datasource implements Writeable, ScheduledJobParameter { PARSER.declareString(ConstructingObjectParser.constructorArg(), TASK_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), ENDPOINT_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), CURRENT_INDEX_FIELD); PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD); PARSER.declareObject(ConstructingObjectParser.constructorArg(), Database.PARSER, DATABASE_FIELD); PARSER.declareObject(ConstructingObjectParser.constructorArg(), UpdateStats.PARSER, UPDATE_STATS_FIELD); @@ -233,6 +243,7 @@ public Datasource(final String name, final IntervalSchedule schedule, final Stri DatasourceTask.ALL, endpoint, DatasourceState.CREATING, + null, new ArrayList<>(), new Database(), new UpdateStats() @@ -249,6 +260,7 @@ public Datasource(final StreamInput in) throws IOException { task = DatasourceTask.valueOf(in.readString()); endpoint = in.readString(); state = DatasourceState.valueOf(in.readString()); + currentIndex = in.readOptionalString(); indices = in.readStringList(); database = new Database(in); updateStats = new UpdateStats(in); @@ -265,6 +277,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeString(task.name()); out.writeString(endpoint); out.writeString(state.name()); + out.writeOptionalString(currentIndex); out.writeStringCollection(indices); database.writeTo(out); updateStats.writeTo(out); @@ -292,6 +305,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(TASK_FIELD.getPreferredName(), task.name()); builder.field(ENDPOINT_FIELD.getPreferredName(), endpoint); builder.field(STATE_FIELD.getPreferredName(), state.name()); + if (currentIndex != null) { + builder.field(CURRENT_INDEX_FIELD.getPreferredName(), currentIndex); + } builder.field(INDICES_FIELD.getPreferredName(), indices); builder.field(DATABASE_FIELD.getPreferredName(), database); builder.field(UPDATE_STATS_FIELD.getPreferredName(), updateStats); @@ -358,25 +374,17 @@ public String currentIndexName() { return null; } - if (database.updatedAt == null) { - return null; - } - - return indexNameFor(database.updatedAt.toEpochMilli()); + return currentIndex; } /** - * Index name for a given manifest + * Index name for a datasource with given suffix * - * @param manifest manifest - * @return Index name for a given manifest + * @param suffix the suffix of a index name + * @return index name for a datasource with given suffix */ - public String indexNameFor(final DatasourceManifest manifest) { - return indexNameFor(manifest.getUpdatedAt()); - } - - private String indexNameFor(final long suffix) { - return String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, name, suffix); + public String newIndexName(final String suffix) { + return String.format(Locale.ROOT, "%s.%s.%s", IP2GEO_DATA_INDEX_NAME_PREFIX, name, suffix); } /** diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java index 9dc94570..579f5bbe 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java @@ -12,6 +12,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; import lombok.extern.log4j.Log4j2; @@ -68,7 +69,7 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable } Instant startTime = Instant.now(); - String indexName = setupIndex(manifest, datasource); + String indexName = setupIndex(datasource); String[] header; List fieldsToStore; try (CSVParser reader = geoIpDataFacade.getDatabaseReader(manifest)) { @@ -86,7 +87,7 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable } Instant endTime = Instant.now(); - updateDatasourceAsSucceeded(datasource, manifest, fieldsToStore, startTime, endTime); + updateDatasourceAsSucceeded(indexName, datasource, manifest, fieldsToStore, startTime, endTime); } /** @@ -199,16 +200,16 @@ private CSVRecord validateHeader(CSVRecord header) { * * @param manifest the manifest * @param datasource the datasource - * @return - * @throws IOException */ private void updateDatasourceAsSucceeded( + final String newIndexName, final Datasource datasource, final DatasourceManifest manifest, final List fields, final Instant startTime, final Instant endTime - ) throws IOException { + ) { + datasource.setCurrentIndex(newIndexName); datasource.setDatabase(manifest, fields); datasource.getUpdateStats().setLastSucceededAt(endTime); datasource.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli()); @@ -225,13 +226,11 @@ private void updateDatasourceAsSucceeded( /*** * Setup index to add a new geoip data * - * @param manifest the manifest * @param datasource the datasource - * @return - * @throws IOException + * @return new index name */ - private String setupIndex(final DatasourceManifest manifest, final Datasource datasource) throws IOException { - String indexName = datasource.indexNameFor(manifest); + private String setupIndex(final Datasource datasource) { + String indexName = datasource.newIndexName(UUID.randomUUID().toString()); datasource.getIndices().add(indexName); datasourceFacade.updateDatasource(datasource); geoIpDataFacade.createIndexIfNotExists(indexName); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index 84609bae..f8b40232 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -208,6 +208,7 @@ protected Datasource randomDatasource(final Instant updateStartTime) { datasource.setSystemSchedule(datasource.getUserSchedule()); datasource.setTask(randomTask()); datasource.setState(randomState()); + datasource.setCurrentIndex(GeospatialTestHelper.randomLowerCaseString()); datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString())); datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString())); datasource.getDatabase() diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java index 1f2210eb..aaa0d29b 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java @@ -5,8 +5,6 @@ package org.opensearch.geospatial.ip2geo.jobscheduler; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.opensearch.geospatial.ip2geo.jobscheduler.Datasource.IP2GEO_DATA_INDEX_NAME_PREFIX; import java.time.Instant; @@ -15,20 +13,23 @@ import java.util.Arrays; import java.util.Locale; +import lombok.SneakyThrows; + import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; -import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; public class DatasourceTests extends Ip2GeoTestCase { - public void testParser() throws Exception { + @SneakyThrows + public void testParser_whenAllValueIsFilled_thenSucceed() { String id = GeospatialTestHelper.randomLowerCaseString(); IntervalSchedule schedule = new IntervalSchedule(Instant.now().truncatedTo(ChronoUnit.MILLIS), 1, ChronoUnit.DAYS); String endpoint = GeospatialTestHelper.randomLowerCaseString(); Datasource datasource = new Datasource(id, schedule, endpoint); datasource.enable(); + datasource.setCurrentIndex(GeospatialTestHelper.randomLowerCaseString()); datasource.getDatabase().setFields(Arrays.asList("field1", "field2")); datasource.getDatabase().setProvider("test_provider"); datasource.getDatabase().setUpdatedAt(Instant.now().truncatedTo(ChronoUnit.MILLIS)); @@ -46,20 +47,31 @@ public void testParser() throws Exception { assertTrue(datasource.equals(anotherDatasource)); } + @SneakyThrows + public void testParser_whenNullForOptionalFields_thenSucceed() { + String id = GeospatialTestHelper.randomLowerCaseString(); + IntervalSchedule schedule = new IntervalSchedule(Instant.now().truncatedTo(ChronoUnit.MILLIS), 1, ChronoUnit.DAYS); + String endpoint = GeospatialTestHelper.randomLowerCaseString(); + Datasource datasource = new Datasource(id, schedule, endpoint); + Datasource anotherDatasource = Datasource.PARSER.parse( + createParser(datasource.toXContent(XContentFactory.jsonBuilder(), null)), + null + ); + assertTrue(datasource.equals(anotherDatasource)); + } + public void testCurrentIndexName_whenNotExpired_thenReturnName() { String id = GeospatialTestHelper.randomLowerCaseString(); Instant now = Instant.now(); Datasource datasource = new Datasource(); datasource.setName(id); + datasource.setCurrentIndex(datasource.newIndexName(GeospatialTestHelper.randomLowerCaseString())); datasource.getDatabase().setProvider("provider"); datasource.getDatabase().setSha256Hash("sha256Hash"); datasource.getDatabase().setUpdatedAt(now); datasource.getDatabase().setFields(new ArrayList<>()); - assertEquals( - String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, id, now.toEpochMilli()), - datasource.currentIndexName() - ); + assertNotNull(datasource.currentIndexName()); } public void testCurrentIndexName_whenExpired_thenReturnNull() { @@ -67,6 +79,7 @@ public void testCurrentIndexName_whenExpired_thenReturnNull() { Instant now = Instant.now(); Datasource datasource = new Datasource(); datasource.setName(id); + datasource.setCurrentIndex(datasource.newIndexName(GeospatialTestHelper.randomLowerCaseString())); datasource.getDatabase().setProvider("provider"); datasource.getDatabase().setSha256Hash("sha256Hash"); datasource.getDatabase().setUpdatedAt(now); @@ -78,33 +91,13 @@ public void testCurrentIndexName_whenExpired_thenReturnNull() { assertNull(datasource.currentIndexName()); } - public void testCurrentIndexName_whenDatabaseUpdateDateIsNull_thenReturnNull() { - String id = GeospatialTestHelper.randomLowerCaseString(); - Datasource datasource = new Datasource(); - datasource.setName(id); - datasource.getDatabase().setProvider("provider"); - datasource.getDatabase().setSha256Hash("sha256Hash"); - datasource.getDatabase().setUpdatedAt(null); - datasource.getDatabase().setValidForInDays(1l); - datasource.getUpdateStats().setLastSucceededAt(Instant.now()); - datasource.getDatabase().setFields(new ArrayList<>()); - - assertFalse(datasource.isExpired()); - assertNull(datasource.currentIndexName()); - } - - public void testGetIndexNameFor() { - long updatedAt = randomPositiveLong(); - DatasourceManifest manifest = mock(DatasourceManifest.class); - when(manifest.getUpdatedAt()).thenReturn(updatedAt); - - String id = GeospatialTestHelper.randomLowerCaseString(); + @SneakyThrows + public void testNewIndexName_whenCalled_thenReturnedExpectedValue() { + String name = GeospatialTestHelper.randomLowerCaseString(); + String suffix = GeospatialTestHelper.randomLowerCaseString(); Datasource datasource = new Datasource(); - datasource.setName(id); - assertEquals( - String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, id, updatedAt), - datasource.indexNameFor(manifest) - ); + datasource.setName(name); + assertEquals(String.format(Locale.ROOT, "%s.%s.%s", IP2GEO_DATA_INDEX_NAME_PREFIX, name, suffix), datasource.newIndexName(suffix)); } public void testResetDatabase_whenCalled_thenNullifySomeFields() { diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java index 8dcfbd7a..f650fc98 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java @@ -190,6 +190,7 @@ public void testDeleteUnusedIndices_whenValidInput_thenSucceed() { String lingeringIndex = indexPrefix + now.minusMillis(2).toEpochMilli(); Datasource datasource = new Datasource(); datasource.setName(datasourceName); + datasource.setCurrentIndex(currentIndex); datasource.getIndices().add(currentIndex); datasource.getIndices().add(oldIndex); datasource.getIndices().add(lingeringIndex);