Skip to content

Commit

Permalink
Implement get datasource api
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed May 4, 2023
1 parent 76790bd commit 5c6bc1a
Show file tree
Hide file tree
Showing 19 changed files with 923 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import org.opensearch.action.ActionType;

/**
* Ip2Geo datasource get action
*/
public class GetDatasourceAction extends ActionType<GetDatasourceResponse> {
/**
* Get datasource action instance
*/
public static final GetDatasourceAction INSTANCE = new GetDatasourceAction();
/**
* Name of a get datasource action
*/
public static final String NAME = "cluster:admin/geospatial/datasource/get";

private GetDatasourceAction() {
super(NAME, GetDatasourceResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

/**
* Ip2Geo datasource get request
*/
@Getter
@Setter
@Log4j2
@EqualsAndHashCode
public class GetDatasourceRequest extends ActionRequest {
/**
* @param names the datasource names
* @return the datasource names
*/
private String[] names;

/**
* Constructs a new get datasource request with a list of datasources.
*
* If the list of datasources is empty or it contains a single element "_all", all registered datasources
* are returned.
*
* @param names list of datasource names
*/
public GetDatasourceRequest(final String[] names) {
this.names = names;
}

/**
* Constructor with stream input
* @param in the stream input
* @throws IOException IOException
*/
public GetDatasourceRequest(final StreamInput in) throws IOException {
super(in);
this.names = in.readStringArray();
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(names);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;
import java.time.Instant;
import java.util.List;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.core.ParseField;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;

/**
* Ip2Geo datasource get request
*/
@Getter
@Setter
@Log4j2
@EqualsAndHashCode
public class GetDatasourceResponse extends ActionResponse implements ToXContentObject {
private static final ParseField FIELD_NAME_DATASOURCES = new ParseField("datasources");
private static final ParseField FIELD_NAME_NAME = new ParseField("name");
private static final ParseField FIELD_NAME_STATE = new ParseField("state");
private static final ParseField FIELD_NAME_ENDPOINT = new ParseField("endpoint");
private static final ParseField FIELD_NAME_UPDATE_INTERVAL = new ParseField("update_interval_in_days");
private static final ParseField FIELD_NAME_NEXT_UPDATE_AT = new ParseField("next_update_at_in_epoch_millis");
private static final ParseField FIELD_NAME_NEXT_UPDATE_AT_READABLE = new ParseField("next_update_at");
private static final ParseField FIELD_NAME_DATABASE = new ParseField("database");
private static final ParseField FIELD_NAME_UPDATE_STATS = new ParseField("update_stats");
private List<Datasource> datasources;

/**
* Default constructor
*
* @param datasources List of datasources
*/
public GetDatasourceResponse(final List<Datasource> datasources) {
this.datasources = datasources;
}

/**
* Constructor with StreamInput
*
* @param in the stream input
*/
public GetDatasourceResponse(final StreamInput in) throws IOException {
datasources = in.readList(Datasource::new);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeList(datasources);
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
builder.startArray(FIELD_NAME_DATASOURCES.getPreferredName());
for (Datasource datasource : datasources) {
builder.startObject();
builder.field(FIELD_NAME_NAME.getPreferredName(), datasource.getName());
builder.field(FIELD_NAME_STATE.getPreferredName(), datasource.getState());
builder.field(FIELD_NAME_ENDPOINT.getPreferredName(), datasource.getEndpoint());
builder.field(FIELD_NAME_UPDATE_INTERVAL.getPreferredName(), datasource.getSchedule().getInterval());
builder.timeField(
FIELD_NAME_NEXT_UPDATE_AT.getPreferredName(),
FIELD_NAME_NEXT_UPDATE_AT_READABLE.getPreferredName(),
datasource.getSchedule().getNextExecutionTime(Instant.now()).toEpochMilli()
);
builder.field(FIELD_NAME_DATABASE.getPreferredName(), datasource.getDatabase());
builder.field(FIELD_NAME_UPDATE_STATS.getPreferredName(), datasource.getUpdateStats());
builder.endObject();
}
builder.endArray();
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import java.util.List;

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

/**
* Transport action to get datasource
*/
@Log4j2
public class GetDatasourceTransportAction extends HandledTransportAction<GetDatasourceRequest, GetDatasourceResponse> {
private final DatasourceFacade datasourceFacade;

/**
* Default constructor
* @param transportService the transport service
* @param actionFilters the action filters
* @param datasourceFacade the datasource facade
*/
@Inject
public GetDatasourceTransportAction(
final TransportService transportService,
final ActionFilters actionFilters,
final DatasourceFacade datasourceFacade
) {
super(GetDatasourceAction.NAME, transportService, actionFilters, GetDatasourceRequest::new);
this.datasourceFacade = datasourceFacade;
}

@Override
protected void doExecute(final Task task, final GetDatasourceRequest request, final ActionListener<GetDatasourceResponse> listener) {
if (shouldGetAllDatasource(request)) {
// We don't expect too many data sources. Therefore, querying all data sources without pagination should be fine.
datasourceFacade.getAllDatasources(newActionListener(listener));
} else {
datasourceFacade.getDatasources(request.getNames(), newActionListener(listener));
}
}

private boolean shouldGetAllDatasource(final GetDatasourceRequest request) {
if (request.getNames() == null) {
throw new OpenSearchException("names in a request should not be null");
}

return request.getNames().length == 0 || (request.getNames().length == 1 && "_all".equals(request.getNames()[0]));
}

private ActionListener<List<Datasource>> newActionListener(final ActionListener<GetDatasourceResponse> listener) {
return new ActionListener<>() {
@Override
public void onResponse(final List<Datasource> datasources) {
listener.onResponse(new GetDatasourceResponse(datasources));
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER;
import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix;
import static org.opensearch.rest.RestRequest.Method.GET;

import java.util.List;

import org.opensearch.client.node.NodeClient;
import org.opensearch.common.Strings;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

/**
* Rest handler for Ip2Geo datasource get request
*/
public class RestGetDatasourceHandler extends BaseRestHandler {
private static final String ACTION_NAME = "ip2geo_datasource_get";

@Override
public String getName() {
return ACTION_NAME;
}

@Override
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
final String[] names = request.paramAsStringArray("name", Strings.EMPTY_ARRAY);
final GetDatasourceRequest getDatasourceRequest = new GetDatasourceRequest(names);

return channel -> client.executeLocally(GetDatasourceAction.INSTANCE, getDatasourceRequest, new RestToXContentListener<>(channel));
}

@Override
public List<Route> routes() {
return List.of(
new Route(GET, String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource")),
new Route(GET, String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{name}"))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
*
*/
public class RestPutDatasourceHandler extends BaseRestHandler {
private static final String ACTION_NAME = "ip2geo_datasource";
private static final String ACTION_NAME = "ip2geo_datasource_put";
private final ClusterSettings clusterSettings;

public RestPutDatasourceHandler(final ClusterSettings clusterSettings) {
Expand All @@ -53,7 +53,7 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
final PutDatasourceRequest putDatasourceRequest = new PutDatasourceRequest(request.param("id"));
final PutDatasourceRequest putDatasourceRequest = new PutDatasourceRequest(request.param("name"));
if (request.hasContentOrSourceParam()) {
try (XContentParser parser = request.contentOrSourceParamParser()) {
PutDatasourceRequest.PARSER.parse(parser, putDatasourceRequest, null);
Expand All @@ -70,7 +70,7 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No

@Override
public List<Route> routes() {
String path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{id}");
String path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{name}");
return List.of(new Route(PUT, path));
}
}
Loading

0 comments on commit 5c6bc1a

Please sign in to comment.