Skip to content

Commit

Permalink
Refactored exception handlings
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Apr 24, 2023
1 parent d361460 commit d98f20f
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.opensearch.OpenSearchException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
Expand Down Expand Up @@ -123,17 +122,16 @@ public void onResponse(final IndexResponse indexResponse) {
@Override
public void onFailure(final Exception e) {
if (e instanceof VersionConflictEngineException) {
log.info("Datasource already exists {}", request.getDatasourceName(), e);
listener.onFailure(new ResourceAlreadyExistsException("Datasource already exists"));
listener.onFailure(
new ResourceAlreadyExistsException("datasource [{}] already exists", request.getDatasourceName())
);
} else {
log.error("Failed to create a datasource {}", request.getDatasourceName(), e);
listener.onFailure(new OpenSearchException("Failed to create a datasource"));
listener.onFailure(e);
}
}
});
} catch (Exception e) {
log.error("Error occurred while creating datasource {}", request.getDatasourceName(), e);
listener.onFailure(new OpenSearchException("Failed to create a datasource"));
listener.onFailure(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.geospatial.ip2geo.common;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.nio.CharBuffer;
Expand All @@ -19,7 +20,6 @@
import lombok.Getter;
import lombok.Setter;

import org.opensearch.OpenSearchException;
import org.opensearch.SpecialPermission;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -112,10 +112,9 @@ public static class Builder {
*
* @param url url to downloads a manifest file
* @return DatasourceManifest representing the manifest file
* @throws Exception exception
*/
@SuppressForbidden(reason = "Need to connect to http endpoint to read manifest file")
public static DatasourceManifest build(final URL url) throws Exception {
public static DatasourceManifest build(final URL url) {
SpecialPermission.check();
return AccessController.doPrivileged((PrivilegedAction<DatasourceManifest>) () -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream()))) {
Expand All @@ -128,8 +127,8 @@ public static DatasourceManifest build(final URL url) throws Exception {
charBuffer.toString()
);
return PARSER.parse(parser, null);
} catch (Exception e) {
throw new OpenSearchException("Failed to build manifest", e);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.opensearch.SpecialPermission;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
Expand Down Expand Up @@ -70,14 +69,13 @@ public class GeoIpDataHelper {
* @param client client
* @param indexName index name
* @param timeout timeout
* @throws IOException io exception
*/
public static void createIndexIfNotExists(
final ClusterService clusterService,
final Client client,
final String indexName,
final TimeValue timeout
) throws IOException {
) {
if (clusterService.state().metadata().hasIndex(indexName) == true) {
log.info("Index {} already exist", indexName);
return;
Expand All @@ -86,7 +84,7 @@ public static void createIndexIfNotExists(
indexSettings.put(INDEX_SETTING_NUM_OF_SHARDS.v1(), INDEX_SETTING_NUM_OF_SHARDS.v2());
indexSettings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2());
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(indexSettings).mapping(getIndexMapping());
CreateIndexResponse response = client.admin().indices().create(createIndexRequest).actionGet(timeout);
client.admin().indices().create(createIndexRequest).actionGet(timeout);
}

/**
Expand All @@ -111,8 +109,8 @@ private static String getIndexMapping() {
return reader.lines().collect(Collectors.joining());
}
}
} catch (Exception e) {
throw new IllegalArgumentException("Ip2Geo datasource mapping cannot be read correctly.");
} catch (IOException e) {
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -140,8 +138,11 @@ public static CSVParser getDatabaseReader(final DatasourceManifest manifest) {
} catch (IOException e) {
throw new RuntimeException(e);
}
log.error("ZIP file {} does not have database file {}", manifest.getUrl(), manifest.getDbName());
throw new RuntimeException("ZIP file does not have database file");
throw new OpenSearchException(
"database file [{}] does not exist in the zip file [{}]",
manifest.getDbName(),
manifest.getUrl()
);
});
}

Expand Down Expand Up @@ -332,25 +333,20 @@ public static void putGeoData(
) {
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
while (iterator.hasNext()) {
try {
CSVRecord record = iterator.next();
String document = createDocument(fields, record.values());
IndexRequest request = Requests.indexRequest(indexName).source(document, XContentType.JSON);
bulkRequest.add(request);
if (!iterator.hasNext() || bulkRequest.requests().size() == bulkSize) {
BulkResponse response = client.bulk(bulkRequest).actionGet(timeout);
if (response.hasFailures()) {
log.error(
"Error occurred while ingesting GeoIP data in {} with an error {}",
indexName,
response.buildFailureMessage()
);
throw new OpenSearchException("Error occurred while ingesting GeoIP data");
}
bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
CSVRecord record = iterator.next();
String document = createDocument(fields, record.values());
IndexRequest request = Requests.indexRequest(indexName).source(document, XContentType.JSON);
bulkRequest.add(request);
if (!iterator.hasNext() || bulkRequest.requests().size() == bulkSize) {
BulkResponse response = client.bulk(bulkRequest).actionGet(timeout);
if (response.hasFailures()) {
throw new OpenSearchException(
"error occurred while ingesting GeoIP data in {} with an error {}",
indexName,
response.buildFailureMessage()
);
}
} catch (Exception e) {
throw e;
bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
}
}
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -95,13 +96,13 @@ public void initialize(final ClusterService clusterService, final ThreadPool thr
@Override
public void runJob(final ScheduledJobParameter jobParameter, final JobExecutionContext context) {
if (initialized == false) {
throw new AssertionError("This instance is not initialized");
throw new AssertionError("this instance is not initialized");
}

log.info("Update job started for a datasource[{}]", jobParameter.getName());
if (jobParameter instanceof Datasource == false) {
throw new IllegalStateException(
"Job parameter is not instance of DatasourceUpdateJobParameter, type: " + jobParameter.getClass().getCanonicalName()
"job parameter is not instance of DatasourceUpdateJobParameter, type: " + jobParameter.getClass().getCanonicalName()
);
}

Expand All @@ -125,28 +126,29 @@ private Runnable updateDatasourceRunner(final ScheduledJobParameter jobParameter
if (lock == null) {
return;
}
Datasource datasource = DatasourceHelper.getDatasource(client, jobParameter.getName(), timeout);
if (datasource == null) {
log.info("Datasource[{}] is already deleted", jobParameter.getName());
}
try {
deleteUnusedIndices(datasource);
updateDatasource(datasource);
deleteUnusedIndices(datasource);
} catch (Exception e) {
log.error("Failed to update datasource for {}", datasource.getId(), e);
datasource.getUpdateStats().setLastFailedAt(Instant.now());
DatasourceHelper.updateDatasource(client, datasource, timeout);
Datasource datasource = DatasourceHelper.getDatasource(client, jobParameter.getName(), timeout);
if (datasource == null) {
log.info("Datasource[{}] is already deleted", jobParameter.getName());
return;
}

try {
deleteUnusedIndices(datasource);
updateDatasource(datasource);
deleteUnusedIndices(datasource);
} catch (Exception e) {
log.error("Failed to update datasource for {}", datasource.getId(), e);
datasource.getUpdateStats().setLastFailedAt(Instant.now());
DatasourceHelper.updateDatasource(client, datasource, timeout);
}
} finally {
lockService.release(
lock,
ActionListener.wrap(
released -> { log.info("Released lock for job {}", datasource.getId()); },
exception -> { throw new IllegalStateException("Failed to release lock."); }
)
ActionListener.wrap(released -> {}, exception -> { log.error("Failed to release lock [{}]", lock); })
);
}
}, exception -> { throw new IllegalStateException("Failed to acquire lock."); }));
}, exception -> { log.error("Failed to acquire lock for job [{}]", jobParameter.getName()); }));
}
};

Expand Down Expand Up @@ -180,10 +182,10 @@ private void deleteUnusedIndices(final Datasource parameter) {
.isAcknowledged()) {
deletedIndices.add(index);
} else {
log.error("Failed to delete an index {}", index);
log.error("Failed to delete an index [{}]", index);
}
} catch (Exception e) {
log.error("Failed to delete an index {}", index, e);
log.error("Failed to delete an index [{}]", index, e);
}
}
if (!deletedIndices.isEmpty()) {
Expand Down Expand Up @@ -230,15 +232,21 @@ private void updateDatasource(final Datasource jobParameter) throws Exception {
Iterator<CSVRecord> iter = reader.iterator();
fields = iter.next().values();
if (!jobParameter.getDatabase().getFields().equals(Arrays.asList(fields))) {
log.error("The previous fields and new fields does not match.");
log.error("Previous: {}, New: {}", jobParameter.getDatabase().getFields().toString(), Arrays.asList(fields).toString());
throw new IllegalStateException("Fields does not match between old and new");
throw new OpenSearchException(
"fields does not match between old [{}] and new [{}]",
jobParameter.getDatabase().getFields().toString(),
Arrays.asList(fields).toString()
);
}
GeoIpDataHelper.putGeoData(client, indexName, fields, iter, indexingBulkSize, timeout);
}

Instant endTime = Instant.now();
jobParameter.setDatabase(manifest, fields);
jobParameter.getDatabase().setProvider(manifest.getProvider());
jobParameter.getDatabase().setMd5Hash(manifest.getMd5Hash());
jobParameter.getDatabase().setUpdatedAt(Instant.ofEpochMilli(manifest.getUpdatedAt()));
jobParameter.getDatabase().setValidForInDays(manifest.getValidForInDays());
jobParameter.getDatabase().setFields(Arrays.asList(fields));
jobParameter.getUpdateStats().setLastSucceededAt(endTime);
jobParameter.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli());
DatasourceHelper.updateDatasource(client, jobParameter, timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -173,8 +172,7 @@ private void executeInternal(
@Override
public void onResponse(final Datasource datasource) {
if (datasource == null) {
log.error("Datasource[{}] does not exist", datasourceName);
handler.accept(null, new IllegalStateException("Datasource does not exist"));
handler.accept(null, new IllegalStateException("datasource does not exist"));
return;
}

Expand All @@ -196,16 +194,14 @@ public void onResponse(final Map<String, Object> stringObjectMap) {

@Override
public void onFailure(final Exception e) {
log.error("Error while retrieving geo data from datasource[{}] for a given ip[{}]", datasourceName, ip, e);
handler.accept(null, new OpenSearchException("Failed to geo data"));
handler.accept(null, e);
}
});
}

@Override
public void onFailure(final Exception e) {
log.error("Failed to get datasource[{}]", datasourceName, e);
handler.accept(null, new OpenSearchException("Failed to get datasource[{}]", datasourceName));
handler.accept(null, e);
}
});
}
Expand Down Expand Up @@ -235,8 +231,7 @@ private void executeInternal(
@Override
public void onResponse(final Datasource datasource) {
if (datasource == null) {
log.error("Datasource[{}] does not exist", datasourceName);
handler.accept(null, new IllegalStateException("Datasource does not exist"));
handler.accept(null, new IllegalStateException("datasource does not exist"));
return;
}

Expand Down Expand Up @@ -291,17 +286,15 @@ public void onResponse(final Object obj) {

@Override
public void onFailure(final Exception e) {
log.error("Error while retrieving geo data from datasource[{}] for a given ip[{}]", datasourceName, ipList, e);
handler.accept(null, new OpenSearchException("Failed to geo data"));
handler.accept(null, e);
}
}
);
}

@Override
public void onFailure(final Exception e) {
log.error("Failed to get datasource[{}]", datasourceName, e);
handler.accept(null, new OpenSearchException("Failed to get datasource[{}]", datasourceName));
handler.accept(null, e);
}
});
}
Expand Down

0 comments on commit d98f20f

Please sign in to comment.