From e1ada4580a2293353e6d3a7835d73ebdd535d27f Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Wed, 7 Jun 2023 15:14:55 -0700 Subject: [PATCH] Removed parameter and settings (#332) * Removed first_only parameter * Removed max_concurrency and batch_size setting first_only parameter was added as current geoip processor has it. However, the parameter have no benefit for ip2geo processor as we don't do a sequantial search for array data but use multi search. max_concurrency and batch_size setting is removed as these are only reveal internal implementation and could be a future blocker to improve performance later. Signed-off-by: Heemin Kim --- .../ip2geo/common/GeoIpDataFacade.java | 27 ++---- .../ip2geo/common/Ip2GeoSettings.java | 33 +------ .../jobscheduler/DatasourceUpdateService.java | 9 +- .../ip2geo/processor/Ip2GeoProcessor.java | 49 +++-------- .../geospatial/ip2geo/Ip2GeoTestCase.java | 1 - .../ip2geo/common/GeoIpDataFacadeTests.java | 39 ++------ .../DatasourceUpdateServiceTests.java | 3 - .../processor/Ip2GeoProcessorTests.java | 88 +------------------ 8 files changed, 33 insertions(+), 216 deletions(-) diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java index 11e499a0..10b2dbda 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java @@ -68,6 +68,7 @@ */ @Log4j2 public class GeoIpDataFacade { + public static final int BUNDLE_SIZE = 128; private static final String IP_RANGE_FIELD_NAME = "_cidr"; private static final String DATA_FIELD_NAME = "_data"; private static final Map INDEX_SETTING_TO_CREATE = Map.of( @@ -279,28 +280,19 @@ public void onFailure(final Exception e) { * * @param indexName the index name * @param ipIterator the iterator of ip addresses - * @param maxBundleSize number of ip address to pass in multi search - * @param maxConcurrentSearches the max concurrent search requests - * @param firstOnly return only the first matching result if true * @param geoIpData collected geo data * @param actionListener the action listener */ public void getGeoIpData( final String indexName, final Iterator ipIterator, - final Integer maxBundleSize, - final Integer maxConcurrentSearches, - final boolean firstOnly, final Map> geoIpData, final ActionListener>> actionListener ) { MultiSearchRequestBuilder mRequestBuilder = client.prepareMultiSearch(); - if (maxConcurrentSearches != 0) { - mRequestBuilder.setMaxConcurrentSearchRequests(maxConcurrentSearches); - } - List ipsToSearch = new ArrayList<>(maxBundleSize); - while (ipIterator.hasNext() && ipsToSearch.size() < maxBundleSize) { + List ipsToSearch = new ArrayList<>(BUNDLE_SIZE); + while (ipIterator.hasNext() && ipsToSearch.size() < BUNDLE_SIZE) { String ip = ipIterator.next(); if (geoIpData.get(ip) == null) { mRequestBuilder.add( @@ -340,13 +332,8 @@ public void onResponse(final MultiSearchResponse items) { ).v2().get(DATA_FIELD_NAME); geoIpData.put(ipsToSearch.get(i), data); - - if (firstOnly) { - actionListener.onResponse(geoIpData); - return; - } } - getGeoIpData(indexName, ipIterator, maxBundleSize, maxConcurrentSearches, firstOnly, geoIpData, actionListener); + getGeoIpData(indexName, ipIterator, geoIpData, actionListener); } @Override @@ -362,20 +349,18 @@ public void onFailure(final Exception e) { * @param indexName Index name to puts the GeoIP data * @param fields Field name matching with data in CSVRecord in order * @param iterator GeoIP data to insert - * @param bulkSize Bulk size of data to process * @param renewLock Runnable to renew lock */ public void putGeoIpData( @NonNull final String indexName, @NonNull final String[] fields, @NonNull final Iterator iterator, - final int bulkSize, @NonNull final Runnable renewLock ) throws IOException { TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT); final BulkRequest bulkRequest = new BulkRequest(); Queue requests = new LinkedList<>(); - for (int i = 0; i < bulkSize; i++) { + for (int i = 0; i < BUNDLE_SIZE; i++) { requests.add(Requests.indexRequest(indexName)); } while (iterator.hasNext()) { @@ -385,7 +370,7 @@ public void putGeoIpData( indexRequest.source(document); indexRequest.id(record.get(0)); bulkRequest.add(indexRequest); - if (iterator.hasNext() == false || bulkRequest.requests().size() == bulkSize) { + if (iterator.hasNext() == false || bulkRequest.requests().size() == BUNDLE_SIZE) { BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout)); if (response.hasFailures()) { throw new OpenSearchException( diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java index b236e95c..12d06b50 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java @@ -51,30 +51,6 @@ public class Ip2GeoSettings { Setting.Property.Dynamic ); - /** - * Bulk size for indexing GeoIP data - */ - public static final Setting INDEXING_BULK_SIZE = Setting.intSetting( - "plugins.geospatial.ip2geo.datasource.indexing_bulk_size", - 10000, - 1, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - - /** - * Multi search bundle size for GeoIP data - * - * Multi search is used only when a field contains a list of ip addresses. - */ - public static final Setting MAX_BUNDLE_SIZE = Setting.intSetting( - "plugins.geospatial.ip2geo.processor.max_bundle_size", - 100, - 1, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - /** * Multi search max concurrent searches * @@ -96,14 +72,7 @@ public class Ip2GeoSettings { * @return a list of all settings for Ip2Geo feature */ public static final List> settings() { - return List.of( - DATASOURCE_ENDPOINT, - DATASOURCE_UPDATE_INTERVAL, - TIMEOUT, - INDEXING_BULK_SIZE, - MAX_BUNDLE_SIZE, - MAX_CONCURRENT_SEARCHES - ); + return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, TIMEOUT, MAX_CONCURRENT_SEARCHES); } /** diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java index 0e6e16d1..9dc94570 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java @@ -25,7 +25,6 @@ import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; -import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; @Log4j2 @@ -83,13 +82,7 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable datasource.getDatabase().getFields().toString() ); } - geoIpDataFacade.putGeoIpData( - indexName, - header, - reader.iterator(), - clusterSettings.get(Ip2GeoSettings.INDEXING_BULK_SIZE), - renewLock - ); + geoIpDataFacade.putGeoIpData(indexName, header, reader.iterator(), renewLock); } Instant endTime = Instant.now(); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java index fcb32b56..5ce80a9a 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java @@ -29,7 +29,6 @@ import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; -import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.ingest.AbstractProcessor; import org.opensearch.ingest.IngestDocument; @@ -49,7 +48,6 @@ public final class Ip2GeoProcessor extends AbstractProcessor { public static final String CONFIG_DATASOURCE = "datasource"; public static final String CONFIG_PROPERTIES = "properties"; public static final String CONFIG_IGNORE_MISSING = "ignore_missing"; - public static final String CONFIG_FIRST_ONLY = "first_only"; private final String field; private final String targetField; @@ -60,7 +58,6 @@ public final class Ip2GeoProcessor extends AbstractProcessor { private final String datasourceName; private final Set properties; private final boolean ignoreMissing; - private final boolean firstOnly; private final ClusterSettings clusterSettings; private final DatasourceFacade datasourceFacade; private final GeoIpDataFacade geoIpDataFacade; @@ -79,7 +76,6 @@ public final class Ip2GeoProcessor extends AbstractProcessor { * @param datasourceName the datasourceName * @param properties the properties * @param ignoreMissing true if documents with a missing value for the field should be ignored - * @param firstOnly true if only first result should be returned in case of array * @param clusterSettings the cluster settings * @param datasourceFacade the datasource facade * @param geoIpDataFacade the geoip data facade @@ -92,7 +88,6 @@ public Ip2GeoProcessor( final String datasourceName, final Set properties, final boolean ignoreMissing, - final boolean firstOnly, final ClusterSettings clusterSettings, final DatasourceFacade datasourceFacade, final GeoIpDataFacade geoIpDataFacade @@ -103,7 +98,6 @@ public Ip2GeoProcessor( this.datasourceName = datasourceName; this.properties = properties; this.ignoreMissing = ignoreMissing; - this.firstOnly = firstOnly; this.clusterSettings = clusterSettings; this.datasourceFacade = datasourceFacade; this.geoIpDataFacade = geoIpDataFacade; @@ -252,9 +246,6 @@ public void onResponse(final Datasource datasource) { geoIpDataFacade.getGeoIpData( indexName, ipList.iterator(), - clusterSettings.get(Ip2GeoSettings.MAX_BUNDLE_SIZE), - clusterSettings.get(Ip2GeoSettings.MAX_CONCURRENT_SEARCHES), - firstOnly, data, listenerToAppendDataToDocument(data, ipList, ingestDocument, handler) ); @@ -277,33 +268,21 @@ protected ActionListener>> listenerToAppendDataT return new ActionListener<>() { @Override public void onResponse(final Map> response) { - if (firstOnly) { - for (String ipAddr : ipList) { - Map geoData = data.get(ipAddr); - // GeoData for ipAddr won't be null - if (geoData.isEmpty() == false) { - ingestDocument.setFieldValue(targetField, filteredGeoData(geoData, ipAddr)); - handler.accept(ingestDocument, null); - return; - } - } - } else { - boolean match = false; - List> geoDataList = new ArrayList<>(ipList.size()); - for (String ipAddr : ipList) { - Map geoData = data.get(ipAddr); - // GeoData for ipAddr won't be null - geoDataList.add(geoData.isEmpty() ? null : filteredGeoData(geoData, ipAddr)); - if (geoData.isEmpty() == false) { - match = true; - } - } - if (match) { - ingestDocument.setFieldValue(targetField, geoDataList); - handler.accept(ingestDocument, null); - return; + boolean match = false; + List> geoDataList = new ArrayList<>(ipList.size()); + for (String ipAddr : ipList) { + Map geoData = data.get(ipAddr); + // GeoData for ipAddr won't be null + geoDataList.add(geoData.isEmpty() ? null : filteredGeoData(geoData, ipAddr)); + if (geoData.isEmpty() == false) { + match = true; } } + if (match) { + ingestDocument.setFieldValue(targetField, geoDataList); + handler.accept(ingestDocument, null); + return; + } handler.accept(ingestDocument, null); } @@ -363,7 +342,6 @@ public Ip2GeoProcessor create( String datasourceName = readStringProperty(TYPE, processorTag, config, CONFIG_DATASOURCE); List propertyNames = readOptionalList(TYPE, processorTag, config, CONFIG_PROPERTIES); boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, CONFIG_IGNORE_MISSING, false); - boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, CONFIG_FIRST_ONLY, true); // Skip validation for the call by cluster applier service if (Thread.currentThread().getName().contains(CLUSTER_UPDATE_THREAD_NAME) == false) { @@ -378,7 +356,6 @@ public Ip2GeoProcessor create( datasourceName, propertyNames == null ? null : new HashSet<>(propertyNames), ignoreMissing, - firstOnly, ingestService.getClusterService().getClusterSettings(), datasourceFacade, geoIpDataFacade diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index fe3783cc..84609bae 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -258,7 +258,6 @@ protected Ip2GeoProcessor randomIp2GeoProcessor(String datasourceName) { datasourceName, properties, true, - true, clusterSettings, datasourceFacade, geoIpDataFacade diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java index a5ca25fd..74d5f189 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java @@ -11,6 +11,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade.BUNDLE_SIZE; import static org.opensearch.geospatial.ip2geo.jobscheduler.Datasource.IP2GEO_DATA_INDEX_NAME_PREFIX; import java.io.File; @@ -193,7 +194,7 @@ public void testPutGeoIpData_whenValidInput_thenSucceed() { verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { if (actionRequest instanceof BulkRequest) { BulkRequest request = (BulkRequest) actionRequest; - assertEquals(1, request.numberOfActions()); + assertEquals(2, request.numberOfActions()); BulkResponse response = mock(BulkResponse.class); when(response.hasFailures()).thenReturn(false); return response; @@ -224,7 +225,7 @@ public void testPutGeoIpData_whenValidInput_thenSucceed() { try (CSVParser csvParser = CSVParser.parse(sampleIp2GeoFile(), StandardCharsets.UTF_8, CSVFormat.RFC4180)) { Iterator iterator = csvParser.iterator(); String[] fields = iterator.next().values(); - verifyingGeoIpDataFacade.putGeoIpData(index, fields, iterator, 1, renewLock); + verifyingGeoIpDataFacade.putGeoIpData(index, fields, iterator, renewLock); verify(renewLock, times(2)).run(); } } @@ -261,44 +262,30 @@ public void testGetSingleGeoIpData() { assertEquals("seattle", captor.getValue().get("city")); } - public void testGetMultipleGeoIpDataNoSearchRequired() { + public void testGetGeoIpData_whenAllDataIsGathered_thenNoMoreSearch() { String indexName = GeospatialTestHelper.randomLowerCaseString(); String ip1 = randomIpAddress(); String ip2 = randomIpAddress(); Iterator ipIterator = Arrays.asList(ip1, ip2).iterator(); - int maxBundleSize = 1; - int maxConcurrentSearches = 1; - boolean firstOnly = true; Map> geoData = new HashMap<>(); geoData.put(ip1, Map.of("city", "Seattle")); geoData.put(ip2, Map.of("city", "Hawaii")); ActionListener>> actionListener = mock(ActionListener.class); // Run - verifyingGeoIpDataFacade.getGeoIpData( - indexName, - ipIterator, - maxBundleSize, - maxConcurrentSearches, - firstOnly, - geoData, - actionListener - ); + verifyingGeoIpDataFacade.getGeoIpData(indexName, ipIterator, geoData, actionListener); // Verify verify(actionListener).onResponse(geoData); } - public void testGetMultipleGeoIpData() { + public void testGetGeoIpData_whenCalled_thenGetGeoIpData() { String indexName = GeospatialTestHelper.randomLowerCaseString(); int dataSize = Randomness.get().nextInt(10) + 1; List ips = new ArrayList<>(); for (int i = 0; i < dataSize; i++) { ips.add(randomIpAddress()); } - int maxBundleSize = Randomness.get().nextInt(11) + 1; - int maxConcurrentSearches = 1; - boolean firstOnly = false; Map> geoData = new HashMap<>(); ActionListener>> actionListener = mock(ActionListener.class); @@ -306,8 +293,6 @@ public void testGetMultipleGeoIpData() { verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { assert actionRequest instanceof MultiSearchRequest; MultiSearchRequest request = (MultiSearchRequest) actionRequest; - assertEquals(maxConcurrentSearches, request.maxConcurrentSearchRequests()); - assertTrue(request.requests().size() == maxBundleSize || request.requests().size() == dataSize % maxBundleSize); for (SearchRequest searchRequest : request.requests()) { assertEquals("_local", searchRequest.preference()); assertEquals(1, searchRequest.source().size()); @@ -341,18 +326,10 @@ public void testGetMultipleGeoIpData() { }); // Run - verifyingGeoIpDataFacade.getGeoIpData( - indexName, - ips.iterator(), - maxBundleSize, - maxConcurrentSearches, - firstOnly, - geoData, - actionListener - ); + verifyingGeoIpDataFacade.getGeoIpData(indexName, ips.iterator(), geoData, actionListener); // Verify - verify(verifyingClient, times((dataSize + maxBundleSize - 1) / maxBundleSize)).execute( + verify(verifyingClient, times((dataSize + BUNDLE_SIZE - 1) / BUNDLE_SIZE)).execute( any(ActionType.class), any(ActionRequest.class), any(ActionListener.class) diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java index 7ebab275..8dcfbd7a 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java @@ -6,7 +6,6 @@ package org.opensearch.geospatial.ip2geo.jobscheduler; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; @@ -90,7 +89,6 @@ public void testUpdateOrCreateGeoIpData_whenExpired_thenUpdate() { eq(datasource.currentIndexName()), isA(String[].class), any(Iterator.class), - anyInt(), any(Runnable.class) ); } @@ -167,7 +165,6 @@ public void testUpdateOrCreateGeoIpData_whenValidInput_thenSucceed() { eq(datasource.currentIndexName()), isA(String[].class), any(Iterator.class), - anyInt(), any(Runnable.class) ); } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java index e83fc764..2a7ce296 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java @@ -6,8 +6,6 @@ package org.opensearch.geospatial.ip2geo.processor; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -188,80 +186,9 @@ public void testExecuteNotImplemented() throws Exception { assertTrue(e.getMessage().contains("Not implemented")); } - public void testGenerateDataToAppendWithFirstOnlyOption() throws Exception { - String datasourceName = GeospatialTestHelper.randomLowerCaseString(); - Ip2GeoProcessor processor = createProcessor( - datasourceName, - Map.of("first_only", true, "properties", Arrays.asList(SUPPORTED_FIELDS.get(0))) - ); - List ips = new ArrayList<>(); - Map> data = new HashMap<>(); - for (int i = 0; i < 3; i++) { - String ip = randomIpAddress(); - ips.add(ip); - Map geoData = new HashMap<>(); - for (String field : SUPPORTED_FIELDS) { - geoData.put(field, GeospatialTestHelper.randomLowerCaseString()); - } - data.put(ip, i == 0 ? Collections.emptyMap() : geoData); - } - IngestDocument document = new IngestDocument(new HashMap<>(), new HashMap<>()); - BiConsumer handler = mock(BiConsumer.class); - - // Run - processor.listenerToAppendDataToDocument(data, ips, document, handler).onResponse(data); - - // Verify - verify(handler).accept(document, null); - assertEquals(1, document.getFieldValue(DEFAULT_TARGET_FIELD, Map.class).size()); - assertEquals( - data.get(ips.get(1)).get(SUPPORTED_FIELDS.get(0)), - document.getFieldValue(DEFAULT_TARGET_FIELD, Map.class).get(SUPPORTED_FIELDS.get(0)) - ); - assertNull(document.getFieldValue(DEFAULT_TARGET_FIELD, Map.class).get(SUPPORTED_FIELDS.get(1))); - } - - public void testGenerateDataToAppendWithOutFirstOnlyOption() throws Exception { - String datasourceName = GeospatialTestHelper.randomLowerCaseString(); - Ip2GeoProcessor processor = createProcessor( - datasourceName, - Map.of("first_only", false, "properties", Arrays.asList(SUPPORTED_FIELDS.get(0))) - ); - List ips = new ArrayList<>(); - Map> data = new HashMap<>(); - for (int i = 0; i < 3; i++) { - String ip = randomIpAddress(); - ips.add(ip); - Map geoData = new HashMap<>(); - for (String field : SUPPORTED_FIELDS) { - geoData.put(field, GeospatialTestHelper.randomLowerCaseString()); - } - data.put(ip, i == 0 ? Collections.emptyMap() : geoData); - } - IngestDocument document = new IngestDocument(new HashMap<>(), new HashMap<>()); - BiConsumer handler = mock(BiConsumer.class); - - // Run - processor.listenerToAppendDataToDocument(data, ips, document, handler).onResponse(data); - - // Verify - verify(handler).accept(document, null); - assertEquals(ips.size(), document.getFieldValue(DEFAULT_TARGET_FIELD, List.class).size()); - for (int i = 0; i < ips.size(); i++) { - if (data.get(ips.get(i)).isEmpty()) { - assertNull(document.getFieldValue(DEFAULT_TARGET_FIELD, List.class).get(i)); - } else { - Map documentData = (Map) document.getFieldValue(DEFAULT_TARGET_FIELD, List.class).get(i); - assertEquals(1, documentData.size()); - assertEquals(data.get(ips.get(i)).get(SUPPORTED_FIELDS.get(0)), documentData.get(SUPPORTED_FIELDS.get(0))); - assertNull(documentData.get(SUPPORTED_FIELDS.get(1))); - } - } - } - public void testGenerateDataToAppendWithNoData() throws Exception { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); - Ip2GeoProcessor processor = createProcessor(datasourceName, Map.of("first_only", Randomness.get().nextInt() % 2 == 0)); + Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); List ips = new ArrayList<>(); Map> data = new HashMap<>(); for (int i = 0; i < 3; i++) { @@ -291,7 +218,8 @@ public void testExecuteInternalNonStringIp() throws Exception { assertTrue(e.getMessage().contains("should only contain strings")); } - public void testExecuteInternal() throws Exception { + @SneakyThrows + public void testExecuteInternal_whenCalled_thenGetDatasourceIsCalled() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); List ips = Arrays.asList(randomIpAddress(), randomIpAddress()); @@ -309,15 +237,7 @@ public void testExecuteInternal() throws Exception { when(datasource.isExpired()).thenReturn(false); when(datasource.currentIndexName()).thenReturn(GeospatialTestHelper.randomLowerCaseString()); captor.getValue().onResponse(datasource); - verify(geoIpDataFacade).getGeoIpData( - anyString(), - any(Iterator.class), - anyInt(), - anyInt(), - anyBoolean(), - anyMap(), - any(ActionListener.class) - ); + verify(geoIpDataFacade).getGeoIpData(anyString(), any(Iterator.class), anyMap(), any(ActionListener.class)); } @SneakyThrows