Skip to content

Commit

Permalink
Implement get datasource api
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed May 3, 2023
1 parent 74c6903 commit 0b7d612
Show file tree
Hide file tree
Showing 16 changed files with 968 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import org.opensearch.action.ActionType;

/**
* Ip2Geo datasource get action
*/
public class GetDatasourceAction extends ActionType<GetDatasourceResponse> {
/**
* Get datasource action instance
*/
public static final GetDatasourceAction INSTANCE = new GetDatasourceAction();
/**
* Name of a get datasource action
*/
public static final String NAME = "cluster:admin/geospatial/datasource/get";

private GetDatasourceAction() {
super(NAME, GetDatasourceResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

/**
* Ip2Geo datasource get request
*/
@Getter
@Setter
@Log4j2
@EqualsAndHashCode
public class GetDatasourceRequest extends ActionRequest {
/**
* @param names the datasource names
* @return the datasource names
*/
private String[] names;

/**
* Constructs a new get datasource request with a list of datasources.
*
* If the list of datasources is empty or it contains a single element "_all", all registered datasources
* are returned.
*
* @param names list of datasource names
*/
public GetDatasourceRequest(final String[] names) {
this.names = names;
}

/**
* Constructor with stream input
* @param in the stream input
* @throws IOException IOException
*/
public GetDatasourceRequest(final StreamInput in) throws IOException {
super(in);
this.names = in.readStringArray();
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(names);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;
import java.time.Instant;
import java.util.List;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;

/**
* Ip2Geo datasource get request
*/
@Getter
@Setter
@Log4j2
@EqualsAndHashCode
public class GetDatasourceResponse extends ActionResponse implements ToXContentObject {
private static final String FIELD_NAME_DATASOURCES = "datasources";
private static final String FIELD_NAME_NAME = "name";
private static final String FIELD_NAME_STATE = "state";
private static final String FIELD_NAME_ENDPOINT = "endpoint";
private static final String FIELD_NAME_UPDATE_INTERVAL = "update_interval_in_days";
private static final String FIELD_NAME_NEXT_UPDATE_AT = "next_update_at_in_epoch_millis";
private static final String FIELD_NAME_NEXT_UPDATE_AT_READABLE = "next_update_at";
private static final String FIELD_NAME_DATABASE = "database";
private static final String FIELD_NAME_UPDATE_STATS = "update_stats";
private List<Datasource> datasources;

/**
* Default constructor
*
* @param datasources List of datasources
*/
public GetDatasourceResponse(final List<Datasource> datasources) {
this.datasources = datasources;
}

/**
* Constructor with StreamInput
*
* @param in the stream input
*/
public GetDatasourceResponse(final StreamInput in) throws IOException {
datasources = in.readList(Datasource::new);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeList(datasources);
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
builder.startArray(FIELD_NAME_DATASOURCES);
for (Datasource datasource : datasources) {
builder.startObject();
builder.field(FIELD_NAME_NAME, datasource.getName());
builder.field(FIELD_NAME_STATE, datasource.getState());
builder.field(FIELD_NAME_ENDPOINT, datasource.getEndpoint());
builder.field(FIELD_NAME_UPDATE_INTERVAL, datasource.getSchedule().getInterval());
builder.timeField(
FIELD_NAME_NEXT_UPDATE_AT,
FIELD_NAME_NEXT_UPDATE_AT_READABLE,
datasource.getSchedule().getNextExecutionTime(Instant.now()).toEpochMilli()
);
builder.field(FIELD_NAME_DATABASE, datasource.getDatabase());
builder.field(FIELD_NAME_UPDATE_STATS, datasource.getUpdateStats());
builder.endObject();
}
builder.endArray();
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import java.util.List;

import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

/**
* Transport action to get datasource
*/
@Log4j2
public class GetDatasourceTransportAction extends HandledTransportAction<GetDatasourceRequest, GetDatasourceResponse> {
private final DatasourceFacade datasourceFacade;

/**
* Default constructor
* @param transportService the transport service
* @param actionFilters the action filters
* @param datasourceFacade the datasource facade
*/
@Inject
public GetDatasourceTransportAction(
final TransportService transportService,
final ActionFilters actionFilters,
final DatasourceFacade datasourceFacade
) {
super(GetDatasourceAction.NAME, transportService, actionFilters, GetDatasourceRequest::new);
this.datasourceFacade = datasourceFacade;
}

@Override
protected void doExecute(final Task task, final GetDatasourceRequest request, final ActionListener<GetDatasourceResponse> listener) {
if (request.getNames().length == 0 || (request.getNames().length == 1 && "_all".equals(request.getNames()[0]))) {
// We don't expect too many data sources. Therefore, querying all data sources without pagination should be fine.
datasourceFacade.getAllDatasources(new ActionListener<>() {
@Override
public void onResponse(final List<Datasource> datasources) {
listener.onResponse(new GetDatasourceResponse(datasources));
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
});
} else {
datasourceFacade.getDatasources(request.getNames(), new ActionListener<>() {
@Override
public void onResponse(final List<Datasource> datasources) {
listener.onResponse(new GetDatasourceResponse(datasources));
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER;
import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix;
import static org.opensearch.rest.RestRequest.Method.GET;

import java.util.List;

import org.opensearch.client.node.NodeClient;
import org.opensearch.common.Strings;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

/**
* Rest handler for Ip2Geo datasource get request
*/
public class RestGetDatasourceHandler extends BaseRestHandler {
private static final String ACTION_NAME = "ip2geo_datasource_get";

@Override
public String getName() {
return ACTION_NAME;
}

@Override
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
final String[] names = request.paramAsStringArray("name", Strings.EMPTY_ARRAY);
final GetDatasourceRequest getDatasourceRequest = new GetDatasourceRequest(names);

return channel -> client.executeLocally(GetDatasourceAction.INSTANCE, getDatasourceRequest, new RestToXContentListener<>(channel));
}

@Override
public List<Route> routes() {
String path1 = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource");
String path2 = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{name}");
return List.of(new Route(GET, path1), new Route(GET, path2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
*
*/
public class RestPutDatasourceHandler extends BaseRestHandler {
private static final String ACTION_NAME = "ip2geo_datasource";
private static final String ACTION_NAME = "ip2geo_datasource_put";
private final ClusterSettings clusterSettings;

public RestPutDatasourceHandler(final ClusterSettings clusterSettings) {
Expand All @@ -53,7 +53,7 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
final PutDatasourceRequest putDatasourceRequest = new PutDatasourceRequest(request.param("id"));
final PutDatasourceRequest putDatasourceRequest = new PutDatasourceRequest(request.param("name"));
if (request.hasContentOrSourceParam()) {
try (XContentParser parser = request.contentOrSourceParamParser()) {
PutDatasourceRequest.PARSER.parse(parser, putDatasourceRequest, null);
Expand All @@ -70,7 +70,7 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No

@Override
public List<Route> routes() {
String path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{id}");
String path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{name}");
return List.of(new Route(PUT, path));
}
}
Loading

0 comments on commit 0b7d612

Please sign in to comment.