-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add restoring event listener #328
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -354,7 +354,15 @@ public void disable() { | |
* @return Current index name of a datasource | ||
*/ | ||
public String currentIndexName() { | ||
return isExpired() ? null : indexNameFor(database.updatedAt.toEpochMilli()); | ||
if (isExpired()) { | ||
return null; | ||
} | ||
|
||
if (database.updatedAt == null) { | ||
return null; | ||
} | ||
|
||
return indexNameFor(database.updatedAt.toEpochMilli()); | ||
} | ||
|
||
/** | ||
|
@@ -371,6 +379,14 @@ private String indexNameFor(final long suffix) { | |
return String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, name, suffix); | ||
} | ||
|
||
/** | ||
* Reset database so that it can be updated in next run regardless there is new update or not | ||
*/ | ||
public void resetDatabase() { | ||
database.setUpdatedAt(null); | ||
database.setSha256Hash(null); | ||
} | ||
|
||
/** | ||
* Checks if datasource is expired or not | ||
* | ||
|
@@ -537,7 +553,7 @@ public Database(final StreamInput in) throws IOException { | |
public void writeTo(final StreamOutput out) throws IOException { | ||
out.writeOptionalString(provider); | ||
out.writeOptionalString(sha256Hash); | ||
out.writeOptionalVLong(updatedAt.toEpochMilli()); | ||
out.writeOptionalVLong(updatedAt == null ? null : updatedAt.toEpochMilli()); | ||
out.writeOptionalVLong(validForInDays); | ||
out.writeOptionalStringCollection(fields); | ||
} | ||
|
@@ -640,10 +656,10 @@ public UpdateStats(final StreamInput in) throws IOException { | |
|
||
@Override | ||
public void writeTo(final StreamOutput out) throws IOException { | ||
out.writeOptionalVLong(lastSucceededAt.toEpochMilli()); | ||
out.writeOptionalVLong(lastSucceededAt == null ? null : lastSucceededAt.toEpochMilli()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as for previous call to write optional, I think null value should be already handled There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same for previous one. I call |
||
out.writeOptionalVLong(lastProcessingTimeInMillis); | ||
out.writeOptionalVLong(lastFailedAt.toEpochMilli()); | ||
out.writeOptionalVLong(lastSkippedAt.toEpochMilli()); | ||
out.writeOptionalVLong(lastFailedAt == null ? null : lastFailedAt.toEpochMilli()); | ||
out.writeOptionalVLong(lastSkippedAt == null ? null : lastSkippedAt.toEpochMilli()); | ||
} | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.geospatial.ip2geo.listener; | ||
|
||
import java.io.IOException; | ||
import java.time.Instant; | ||
import java.time.temporal.ChronoUnit; | ||
import java.util.List; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.extern.log4j.Log4j2; | ||
|
||
import org.opensearch.action.ActionListener; | ||
import org.opensearch.action.bulk.BulkResponse; | ||
import org.opensearch.cluster.ClusterChangedEvent; | ||
import org.opensearch.cluster.ClusterStateListener; | ||
import org.opensearch.cluster.RestoreInProgress; | ||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.component.AbstractLifecycleComponent; | ||
import org.opensearch.common.inject.Inject; | ||
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; | ||
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; | ||
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; | ||
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask; | ||
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
||
@Log4j2 | ||
@AllArgsConstructor(onConstructor = @__(@Inject)) | ||
public class Ip2GeoListener extends AbstractLifecycleComponent implements ClusterStateListener { | ||
private static final int SCHEDULE_IN_MIN = 15; | ||
private static final int DELAY_IN_MILLIS = 10000; | ||
private final ClusterService clusterService; | ||
private final ThreadPool threadPool; | ||
private final DatasourceFacade datasourceFacade; | ||
|
||
@Override | ||
public void clusterChanged(final ClusterChangedEvent event) { | ||
if (event.localNodeClusterManager() == false) { | ||
return; | ||
} | ||
|
||
for (RestoreInProgress.Entry entry : event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) { | ||
if (RestoreInProgress.State.SUCCESS.equals(entry.state()) == false) { | ||
continue; | ||
} | ||
|
||
if (entry.indices().stream().anyMatch(index -> DatasourceExtension.JOB_INDEX_NAME.equals(index)) == false) { | ||
continue; | ||
} | ||
|
||
threadPool.generic().submit(() -> forceUpdateGeoIpData()); | ||
} | ||
} | ||
|
||
private void forceUpdateGeoIpData() { | ||
datasourceFacade.getAllDatasources(new ActionListener<>() { | ||
@Override | ||
public void onResponse(final List<Datasource> datasources) { | ||
datasources.stream().forEach(Ip2GeoListener.this::scheduleForceUpdate); | ||
datasourceFacade.updateDatasource(datasources, new ActionListener<>() { | ||
@Override | ||
public void onResponse(final BulkResponse bulkItemResponses) { | ||
log.info("Datasources are updated for cleanup"); | ||
} | ||
|
||
@Override | ||
public void onFailure(final Exception e) { | ||
log.error("Failed to update datasource for cleanup after restoring", e); | ||
} | ||
}); | ||
} | ||
|
||
@Override | ||
public void onFailure(final Exception e) { | ||
log.error("Failed to get datasource after restoring", e); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* Give a delay so that job scheduler can schedule the job right after the delay. Otherwise, it schedules | ||
* the job after specified update interval. | ||
*/ | ||
private void scheduleForceUpdate(Datasource datasource) { | ||
IntervalSchedule schedule = new IntervalSchedule(Instant.now(), SCHEDULE_IN_MIN, ChronoUnit.MINUTES, DELAY_IN_MILLIS); | ||
datasource.resetDatabase(); | ||
datasource.setSystemSchedule(schedule); | ||
datasource.setTask(DatasourceTask.ALL); | ||
} | ||
|
||
@Override | ||
protected void doStart() { | ||
if (DiscoveryNode.isClusterManagerNode(clusterService.getSettings())) { | ||
clusterService.addListener(this); | ||
} | ||
} | ||
|
||
@Override | ||
protected void doStop() { | ||
clusterService.removeListener(this); | ||
} | ||
|
||
@Override | ||
protected void doClose() throws IOException { | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we do same as on stop here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't the check for null already encapsulated into method if it does "optional"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I call
.toEpochMillis()
method and will will throw NPE if I don't do null check.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I see, that makes sense