Skip to content

Commit

Permalink
Removed parameter and settings (#332)
Browse files Browse the repository at this point in the history
* Removed first_only parameter
* Removed max_concurrency and batch_size setting

first_only parameter was added as current geoip processor has it.
However, the parameter have no benefit for ip2geo processor as we don't do a sequantial search for array data but use multi search.

max_concurrency and batch_size setting is removed as these are only reveal internal implementation and could be a future blocker to improve performance later.

Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 authored Jun 7, 2023
1 parent 3d4d755 commit e1ada45
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
*/
@Log4j2
public class GeoIpDataFacade {
public static final int BUNDLE_SIZE = 128;
private static final String IP_RANGE_FIELD_NAME = "_cidr";
private static final String DATA_FIELD_NAME = "_data";
private static final Map<String, Object> INDEX_SETTING_TO_CREATE = Map.of(
Expand Down Expand Up @@ -279,28 +280,19 @@ public void onFailure(final Exception e) {
*
* @param indexName the index name
* @param ipIterator the iterator of ip addresses
* @param maxBundleSize number of ip address to pass in multi search
* @param maxConcurrentSearches the max concurrent search requests
* @param firstOnly return only the first matching result if true
* @param geoIpData collected geo data
* @param actionListener the action listener
*/
public void getGeoIpData(
final String indexName,
final Iterator<String> ipIterator,
final Integer maxBundleSize,
final Integer maxConcurrentSearches,
final boolean firstOnly,
final Map<String, Map<String, Object>> geoIpData,
final ActionListener<Map<String, Map<String, Object>>> actionListener
) {
MultiSearchRequestBuilder mRequestBuilder = client.prepareMultiSearch();
if (maxConcurrentSearches != 0) {
mRequestBuilder.setMaxConcurrentSearchRequests(maxConcurrentSearches);
}

List<String> ipsToSearch = new ArrayList<>(maxBundleSize);
while (ipIterator.hasNext() && ipsToSearch.size() < maxBundleSize) {
List<String> ipsToSearch = new ArrayList<>(BUNDLE_SIZE);
while (ipIterator.hasNext() && ipsToSearch.size() < BUNDLE_SIZE) {
String ip = ipIterator.next();
if (geoIpData.get(ip) == null) {
mRequestBuilder.add(
Expand Down Expand Up @@ -340,13 +332,8 @@ public void onResponse(final MultiSearchResponse items) {
).v2().get(DATA_FIELD_NAME);

geoIpData.put(ipsToSearch.get(i), data);

if (firstOnly) {
actionListener.onResponse(geoIpData);
return;
}
}
getGeoIpData(indexName, ipIterator, maxBundleSize, maxConcurrentSearches, firstOnly, geoIpData, actionListener);
getGeoIpData(indexName, ipIterator, geoIpData, actionListener);
}

@Override
Expand All @@ -362,20 +349,18 @@ public void onFailure(final Exception e) {
* @param indexName Index name to puts the GeoIP data
* @param fields Field name matching with data in CSVRecord in order
* @param iterator GeoIP data to insert
* @param bulkSize Bulk size of data to process
* @param renewLock Runnable to renew lock
*/
public void putGeoIpData(
@NonNull final String indexName,
@NonNull final String[] fields,
@NonNull final Iterator<CSVRecord> iterator,
final int bulkSize,
@NonNull final Runnable renewLock
) throws IOException {
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
final BulkRequest bulkRequest = new BulkRequest();
Queue<DocWriteRequest> requests = new LinkedList<>();
for (int i = 0; i < bulkSize; i++) {
for (int i = 0; i < BUNDLE_SIZE; i++) {
requests.add(Requests.indexRequest(indexName));
}
while (iterator.hasNext()) {
Expand All @@ -385,7 +370,7 @@ public void putGeoIpData(
indexRequest.source(document);
indexRequest.id(record.get(0));
bulkRequest.add(indexRequest);
if (iterator.hasNext() == false || bulkRequest.requests().size() == bulkSize) {
if (iterator.hasNext() == false || bulkRequest.requests().size() == BUNDLE_SIZE) {
BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout));
if (response.hasFailures()) {
throw new OpenSearchException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,30 +51,6 @@ public class Ip2GeoSettings {
Setting.Property.Dynamic
);

/**
* Bulk size for indexing GeoIP data
*/
public static final Setting<Integer> INDEXING_BULK_SIZE = Setting.intSetting(
"plugins.geospatial.ip2geo.datasource.indexing_bulk_size",
10000,
1,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Multi search bundle size for GeoIP data
*
* Multi search is used only when a field contains a list of ip addresses.
*/
public static final Setting<Integer> MAX_BUNDLE_SIZE = Setting.intSetting(
"plugins.geospatial.ip2geo.processor.max_bundle_size",
100,
1,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Multi search max concurrent searches
*
Expand All @@ -96,14 +72,7 @@ public class Ip2GeoSettings {
* @return a list of all settings for Ip2Geo feature
*/
public static final List<Setting<?>> settings() {
return List.of(
DATASOURCE_ENDPOINT,
DATASOURCE_UPDATE_INTERVAL,
TIMEOUT,
INDEXING_BULK_SIZE,
MAX_BUNDLE_SIZE,
MAX_CONCURRENT_SEARCHES
);
return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, TIMEOUT, MAX_CONCURRENT_SEARCHES);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;

@Log4j2
Expand Down Expand Up @@ -83,13 +82,7 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable
datasource.getDatabase().getFields().toString()
);
}
geoIpDataFacade.putGeoIpData(
indexName,
header,
reader.iterator(),
clusterSettings.get(Ip2GeoSettings.INDEXING_BULK_SIZE),
renewLock
);
geoIpDataFacade.putGeoIpData(indexName, header, reader.iterator(), renewLock);
}

Instant endTime = Instant.now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.IngestDocument;
Expand All @@ -49,7 +48,6 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
public static final String CONFIG_DATASOURCE = "datasource";
public static final String CONFIG_PROPERTIES = "properties";
public static final String CONFIG_IGNORE_MISSING = "ignore_missing";
public static final String CONFIG_FIRST_ONLY = "first_only";

private final String field;
private final String targetField;
Expand All @@ -60,7 +58,6 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
private final String datasourceName;
private final Set<String> properties;
private final boolean ignoreMissing;
private final boolean firstOnly;
private final ClusterSettings clusterSettings;
private final DatasourceFacade datasourceFacade;
private final GeoIpDataFacade geoIpDataFacade;
Expand All @@ -79,7 +76,6 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
* @param datasourceName the datasourceName
* @param properties the properties
* @param ignoreMissing true if documents with a missing value for the field should be ignored
* @param firstOnly true if only first result should be returned in case of array
* @param clusterSettings the cluster settings
* @param datasourceFacade the datasource facade
* @param geoIpDataFacade the geoip data facade
Expand All @@ -92,7 +88,6 @@ public Ip2GeoProcessor(
final String datasourceName,
final Set<String> properties,
final boolean ignoreMissing,
final boolean firstOnly,
final ClusterSettings clusterSettings,
final DatasourceFacade datasourceFacade,
final GeoIpDataFacade geoIpDataFacade
Expand All @@ -103,7 +98,6 @@ public Ip2GeoProcessor(
this.datasourceName = datasourceName;
this.properties = properties;
this.ignoreMissing = ignoreMissing;
this.firstOnly = firstOnly;
this.clusterSettings = clusterSettings;
this.datasourceFacade = datasourceFacade;
this.geoIpDataFacade = geoIpDataFacade;
Expand Down Expand Up @@ -252,9 +246,6 @@ public void onResponse(final Datasource datasource) {
geoIpDataFacade.getGeoIpData(
indexName,
ipList.iterator(),
clusterSettings.get(Ip2GeoSettings.MAX_BUNDLE_SIZE),
clusterSettings.get(Ip2GeoSettings.MAX_CONCURRENT_SEARCHES),
firstOnly,
data,
listenerToAppendDataToDocument(data, ipList, ingestDocument, handler)
);
Expand All @@ -277,33 +268,21 @@ protected ActionListener<Map<String, Map<String, Object>>> listenerToAppendDataT
return new ActionListener<>() {
@Override
public void onResponse(final Map<String, Map<String, Object>> response) {
if (firstOnly) {
for (String ipAddr : ipList) {
Map<String, Object> geoData = data.get(ipAddr);
// GeoData for ipAddr won't be null
if (geoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, filteredGeoData(geoData, ipAddr));
handler.accept(ingestDocument, null);
return;
}
}
} else {
boolean match = false;
List<Map<String, Object>> geoDataList = new ArrayList<>(ipList.size());
for (String ipAddr : ipList) {
Map<String, Object> geoData = data.get(ipAddr);
// GeoData for ipAddr won't be null
geoDataList.add(geoData.isEmpty() ? null : filteredGeoData(geoData, ipAddr));
if (geoData.isEmpty() == false) {
match = true;
}
}
if (match) {
ingestDocument.setFieldValue(targetField, geoDataList);
handler.accept(ingestDocument, null);
return;
boolean match = false;
List<Map<String, Object>> geoDataList = new ArrayList<>(ipList.size());
for (String ipAddr : ipList) {
Map<String, Object> geoData = data.get(ipAddr);
// GeoData for ipAddr won't be null
geoDataList.add(geoData.isEmpty() ? null : filteredGeoData(geoData, ipAddr));
if (geoData.isEmpty() == false) {
match = true;
}
}
if (match) {
ingestDocument.setFieldValue(targetField, geoDataList);
handler.accept(ingestDocument, null);
return;
}
handler.accept(ingestDocument, null);
}

Expand Down Expand Up @@ -363,7 +342,6 @@ public Ip2GeoProcessor create(
String datasourceName = readStringProperty(TYPE, processorTag, config, CONFIG_DATASOURCE);
List<String> propertyNames = readOptionalList(TYPE, processorTag, config, CONFIG_PROPERTIES);
boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, CONFIG_IGNORE_MISSING, false);
boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, CONFIG_FIRST_ONLY, true);

// Skip validation for the call by cluster applier service
if (Thread.currentThread().getName().contains(CLUSTER_UPDATE_THREAD_NAME) == false) {
Expand All @@ -378,7 +356,6 @@ public Ip2GeoProcessor create(
datasourceName,
propertyNames == null ? null : new HashSet<>(propertyNames),
ignoreMissing,
firstOnly,
ingestService.getClusterService().getClusterSettings(),
datasourceFacade,
geoIpDataFacade
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ protected Ip2GeoProcessor randomIp2GeoProcessor(String datasourceName) {
datasourceName,
properties,
true,
true,
clusterSettings,
datasourceFacade,
geoIpDataFacade
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade.BUNDLE_SIZE;
import static org.opensearch.geospatial.ip2geo.jobscheduler.Datasource.IP2GEO_DATA_INDEX_NAME_PREFIX;

import java.io.File;
Expand Down Expand Up @@ -193,7 +194,7 @@ public void testPutGeoIpData_whenValidInput_thenSucceed() {
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
if (actionRequest instanceof BulkRequest) {
BulkRequest request = (BulkRequest) actionRequest;
assertEquals(1, request.numberOfActions());
assertEquals(2, request.numberOfActions());
BulkResponse response = mock(BulkResponse.class);
when(response.hasFailures()).thenReturn(false);
return response;
Expand Down Expand Up @@ -224,7 +225,7 @@ public void testPutGeoIpData_whenValidInput_thenSucceed() {
try (CSVParser csvParser = CSVParser.parse(sampleIp2GeoFile(), StandardCharsets.UTF_8, CSVFormat.RFC4180)) {
Iterator<CSVRecord> iterator = csvParser.iterator();
String[] fields = iterator.next().values();
verifyingGeoIpDataFacade.putGeoIpData(index, fields, iterator, 1, renewLock);
verifyingGeoIpDataFacade.putGeoIpData(index, fields, iterator, renewLock);
verify(renewLock, times(2)).run();
}
}
Expand Down Expand Up @@ -261,53 +262,37 @@ public void testGetSingleGeoIpData() {
assertEquals("seattle", captor.getValue().get("city"));
}

public void testGetMultipleGeoIpDataNoSearchRequired() {
public void testGetGeoIpData_whenAllDataIsGathered_thenNoMoreSearch() {
String indexName = GeospatialTestHelper.randomLowerCaseString();
String ip1 = randomIpAddress();
String ip2 = randomIpAddress();
Iterator<String> ipIterator = Arrays.asList(ip1, ip2).iterator();
int maxBundleSize = 1;
int maxConcurrentSearches = 1;
boolean firstOnly = true;
Map<String, Map<String, Object>> geoData = new HashMap<>();
geoData.put(ip1, Map.of("city", "Seattle"));
geoData.put(ip2, Map.of("city", "Hawaii"));
ActionListener<Map<String, Map<String, Object>>> actionListener = mock(ActionListener.class);

// Run
verifyingGeoIpDataFacade.getGeoIpData(
indexName,
ipIterator,
maxBundleSize,
maxConcurrentSearches,
firstOnly,
geoData,
actionListener
);
verifyingGeoIpDataFacade.getGeoIpData(indexName, ipIterator, geoData, actionListener);

// Verify
verify(actionListener).onResponse(geoData);
}

public void testGetMultipleGeoIpData() {
public void testGetGeoIpData_whenCalled_thenGetGeoIpData() {
String indexName = GeospatialTestHelper.randomLowerCaseString();
int dataSize = Randomness.get().nextInt(10) + 1;
List<String> ips = new ArrayList<>();
for (int i = 0; i < dataSize; i++) {
ips.add(randomIpAddress());
}
int maxBundleSize = Randomness.get().nextInt(11) + 1;
int maxConcurrentSearches = 1;
boolean firstOnly = false;
Map<String, Map<String, Object>> geoData = new HashMap<>();
ActionListener<Map<String, Map<String, Object>>> actionListener = mock(ActionListener.class);

List<String> cities = new ArrayList<>();
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
assert actionRequest instanceof MultiSearchRequest;
MultiSearchRequest request = (MultiSearchRequest) actionRequest;
assertEquals(maxConcurrentSearches, request.maxConcurrentSearchRequests());
assertTrue(request.requests().size() == maxBundleSize || request.requests().size() == dataSize % maxBundleSize);
for (SearchRequest searchRequest : request.requests()) {
assertEquals("_local", searchRequest.preference());
assertEquals(1, searchRequest.source().size());
Expand Down Expand Up @@ -341,18 +326,10 @@ public void testGetMultipleGeoIpData() {
});

// Run
verifyingGeoIpDataFacade.getGeoIpData(
indexName,
ips.iterator(),
maxBundleSize,
maxConcurrentSearches,
firstOnly,
geoData,
actionListener
);
verifyingGeoIpDataFacade.getGeoIpData(indexName, ips.iterator(), geoData, actionListener);

// Verify
verify(verifyingClient, times((dataSize + maxBundleSize - 1) / maxBundleSize)).execute(
verify(verifyingClient, times((dataSize + BUNDLE_SIZE - 1) / BUNDLE_SIZE)).execute(
any(ActionType.class),
any(ActionRequest.class),
any(ActionListener.class)
Expand Down
Loading

0 comments on commit e1ada45

Please sign in to comment.