Skip to content

Commit

Permalink
Add support for ccr follow info api to HLRC.
Browse files Browse the repository at this point in the history
This API was introduces after elastic#33824 was closed.
  • Loading branch information
martijnvg committed Feb 19, 2019
1 parent a54d1c6 commit cae7220
Show file tree
Hide file tree
Showing 11 changed files with 597 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.client.ccr.CcrStatsRequest;
import org.elasticsearch.client.ccr.CcrStatsResponse;
import org.elasticsearch.client.ccr.DeleteAutoFollowPatternRequest;
import org.elasticsearch.client.ccr.FollowInfoRequest;
import org.elasticsearch.client.ccr.FollowInfoResponse;
import org.elasticsearch.client.ccr.FollowStatsRequest;
import org.elasticsearch.client.ccr.FollowStatsResponse;
import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest;
Expand Down Expand Up @@ -452,4 +454,47 @@ public void getFollowStatsAsync(FollowStatsRequest request,
);
}

/**
* Gets follow info for specific indices.
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-get-follow-info.html">
* the docs</a> for more.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public FollowInfoResponse getFollowInfo(FollowInfoRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
CcrRequestConverters::getFollowInfo,
options,
FollowInfoResponse::fromXContent,
Collections.emptySet()
);
}

/**
* Asynchronously gets follow info for specific indices.
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-get-follow-info.html">
* the docs</a> for more.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
*/
public void getFollowInfoAsync(FollowInfoRequest request,
RequestOptions options,
ActionListener<FollowInfoResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(
request,
CcrRequestConverters::getFollowInfo,
options,
FollowInfoResponse::fromXContent,
listener,
Collections.emptySet()
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.ccr.CcrStatsRequest;
import org.elasticsearch.client.ccr.DeleteAutoFollowPatternRequest;
import org.elasticsearch.client.ccr.FollowInfoRequest;
import org.elasticsearch.client.ccr.FollowStatsRequest;
import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest;
import org.elasticsearch.client.ccr.PauseFollowRequest;
Expand Down Expand Up @@ -119,4 +120,12 @@ static Request getFollowStats(FollowStatsRequest followStatsRequest) {
return new Request(HttpGet.METHOD_NAME, endpoint);
}

static Request getFollowInfo(FollowInfoRequest followInfoRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPart(followInfoRequest.getFollowerIndex())
.addPathPartAsIs("_ccr", "info")
.build();
return new Request(HttpGet.METHOD_NAME, endpoint);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -41,6 +43,44 @@ public class FollowConfig {
static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay");
static final ParseField READ_POLL_TIMEOUT = new ParseField("read_poll_timeout");

private static final ObjectParser<FollowConfig, Void> PARSER = new ObjectParser<>(
"follow_config",
true,
FollowConfig::new);

static {
PARSER.declareInt(FollowConfig::setMaxReadRequestOperationCount, MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareInt(FollowConfig::setMaxOutstandingReadRequests, MAX_OUTSTANDING_READ_REQUESTS);
PARSER.declareField(
FollowConfig::setMaxReadRequestSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_READ_REQUEST_SIZE.getPreferredName()),
MAX_READ_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt(FollowConfig::setMaxWriteRequestOperationCount, MAX_WRITE_REQUEST_OPERATION_COUNT);
PARSER.declareField(
FollowConfig::setMaxWriteRequestSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_REQUEST_SIZE.getPreferredName()),
MAX_WRITE_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt(FollowConfig::setMaxOutstandingWriteRequests, MAX_OUTSTANDING_WRITE_REQUESTS);
PARSER.declareInt(FollowConfig::setMaxWriteBufferCount, MAX_WRITE_BUFFER_COUNT);
PARSER.declareField(
FollowConfig::setMaxWriteBufferSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()),
MAX_WRITE_BUFFER_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareField(FollowConfig::setMaxRetryDelay,
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()),
MAX_RETRY_DELAY_FIELD, ObjectParser.ValueType.STRING);
PARSER.declareField(FollowConfig::setReadPollTimeout,
(p, c) -> TimeValue.parseTimeValue(p.text(), READ_POLL_TIMEOUT.getPreferredName()),
READ_POLL_TIMEOUT, ObjectParser.ValueType.STRING);
}

static FollowConfig fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

private Integer maxReadRequestOperationCount;
private Integer maxOutstandingReadRequests;
private ByteSizeValue maxReadRequestSize;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.ccr;

import org.elasticsearch.client.Validatable;

import java.util.Objects;

public final class FollowInfoRequest implements Validatable {

private final String followerIndex;

public FollowInfoRequest(String followerIndex) {
this.followerIndex = Objects.requireNonNull(followerIndex);
}

public String getFollowerIndex() {
return followerIndex;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.ccr;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;

import java.util.List;
import java.util.Objects;

public final class FollowInfoResponse {

static final ParseField FOLLOWER_INDICES_FIELD = new ParseField("follower_indices");

private static final ConstructingObjectParser<FollowInfoResponse, Void> PARSER = new ConstructingObjectParser<>(
"indices",
true,
args -> {
@SuppressWarnings("unchecked")
List<FollowerInfo> infos = (List<FollowerInfo>) args[0];
return new FollowInfoResponse(infos);
});

static {
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FollowerInfo.PARSER, FOLLOWER_INDICES_FIELD);
}


public static FollowInfoResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

private final List<FollowerInfo> infos;

FollowInfoResponse(List<FollowerInfo> infos) {
this.infos = infos;
}

public List<FollowerInfo> getInfos() {
return infos;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FollowInfoResponse that = (FollowInfoResponse) o;
return infos.equals(that.infos);
}

@Override
public int hashCode() {
return Objects.hash(infos);
}

public static final class FollowerInfo {

static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
static final ParseField STATUS_FIELD = new ParseField("status");
static final ParseField PARAMETERS_FIELD = new ParseField("parameters");

private static final ConstructingObjectParser<FollowerInfo, Void> PARSER = new ConstructingObjectParser<>(
"follower_info",
true,
args -> {
return new FollowerInfo((String) args[0], (String) args[1], (String) args[2],
Status.fromString((String) args[3]), (FollowConfig) args[4]);
});

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOWER_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), REMOTE_CLUSTER_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), STATUS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> FollowConfig.fromXContent(p), PARAMETERS_FIELD);
}

private final String followerIndex;
private final String remoteCluster;
private final String leaderIndex;
private final Status status;
private final FollowConfig parameters;

FollowerInfo(String followerIndex, String remoteCluster, String leaderIndex, Status status,
FollowConfig parameters) {
this.followerIndex = followerIndex;
this.remoteCluster = remoteCluster;
this.leaderIndex = leaderIndex;
this.status = status;
this.parameters = parameters;
}

public String getFollowerIndex() {
return followerIndex;
}

public String getRemoteCluster() {
return remoteCluster;
}

public String getLeaderIndex() {
return leaderIndex;
}

public Status getStatus() {
return status;
}

public FollowConfig getParameters() {
return parameters;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FollowerInfo that = (FollowerInfo) o;
return Objects.equals(followerIndex, that.followerIndex) &&
Objects.equals(remoteCluster, that.remoteCluster) &&
Objects.equals(leaderIndex, that.leaderIndex) &&
status == that.status &&
Objects.equals(parameters, that.parameters);
}

@Override
public int hashCode() {
return Objects.hash(followerIndex, remoteCluster, leaderIndex, status, parameters);
}

}

public enum Status {

ACTIVE("active"),
PAUSED("paused");

private final String name;

Status(String name) {
this.name = name;
}

public String getName() {
return name;
}

public static Status fromString(String value) {
switch (value) {
case "active":
return Status.ACTIVE;
case "paused":
return Status.PAUSED;
default:
throw new IllegalArgumentException("unexpected status value [" + value + "]");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.elasticsearch.client.ccr.CcrStatsRequest;
import org.elasticsearch.client.ccr.CcrStatsResponse;
import org.elasticsearch.client.ccr.DeleteAutoFollowPatternRequest;
import org.elasticsearch.client.ccr.FollowInfoRequest;
import org.elasticsearch.client.ccr.FollowInfoResponse;
import org.elasticsearch.client.ccr.FollowStatsRequest;
import org.elasticsearch.client.ccr.FollowStatsResponse;
import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest;
Expand Down Expand Up @@ -113,6 +115,15 @@ public void testIndexFollowing() throws Exception {

try {
assertBusy(() -> {
FollowInfoRequest followInfoRequest = new FollowInfoRequest("follower");
FollowInfoResponse followInfoResponse =
execute(followInfoRequest, ccrClient::getFollowInfo, ccrClient::getFollowInfoAsync);
assertThat(followInfoResponse.getInfos().size(), equalTo(1));
assertThat(followInfoResponse.getInfos().get(0).getFollowerIndex(), equalTo("follower"));
assertThat(followInfoResponse.getInfos().get(0).getLeaderIndex(), equalTo("leader"));
assertThat(followInfoResponse.getInfos().get(0).getRemoteCluster(), equalTo("local_cluster"));
assertThat(followInfoResponse.getInfos().get(0).getStatus(), equalTo(FollowInfoResponse.Status.ACTIVE));

FollowStatsRequest followStatsRequest = new FollowStatsRequest("follower");
FollowStatsResponse followStatsResponse =
execute(followStatsRequest, ccrClient::getFollowStats, ccrClient::getFollowStatsAsync);
Expand Down Expand Up @@ -170,6 +181,17 @@ public void testIndexFollowing() throws Exception {
pauseFollowResponse = execute(pauseFollowRequest, ccrClient::pauseFollow, ccrClient::pauseFollowAsync);
assertThat(pauseFollowResponse.isAcknowledged(), is(true));

assertBusy(() -> {
FollowInfoRequest followInfoRequest = new FollowInfoRequest("follower");
FollowInfoResponse followInfoResponse =
execute(followInfoRequest, ccrClient::getFollowInfo, ccrClient::getFollowInfoAsync);
assertThat(followInfoResponse.getInfos().size(), equalTo(1));
assertThat(followInfoResponse.getInfos().get(0).getFollowerIndex(), equalTo("follower"));
assertThat(followInfoResponse.getInfos().get(0).getLeaderIndex(), equalTo("leader"));
assertThat(followInfoResponse.getInfos().get(0).getRemoteCluster(), equalTo("local_cluster"));
assertThat(followInfoResponse.getInfos().get(0).getStatus(), equalTo(FollowInfoResponse.Status.PAUSED));
});

// Need to close index prior to unfollowing it:
CloseIndexRequest closeIndexRequest = new CloseIndexRequest("follower");
org.elasticsearch.action.support.master.AcknowledgedResponse closeIndexReponse =
Expand Down
Loading

0 comments on commit cae7220

Please sign in to comment.