Skip to content

Commit

Permalink
Support DataStream APIs (#416) (#419)
Browse files Browse the repository at this point in the history
* Add CreateDataStream, GetDataStream, DeleteDataStream, DataStreamsStats, and support specified data stream timestamp field in PutIndexTemplate.



* Add change log for data stream changes



* Add data stream into user guide



* add data stream to TOC of user guide



---------


(cherry picked from commit 876d379)

Signed-off-by: Tanqiu Liu <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 402c6b9 commit 04dc3ad
Show file tree
Hide file tree
Showing 19 changed files with 2,265 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add support for mapping limit settings ([#382](https://github.com/opensearch-project/opensearch-java/pull/382))
- Add buffered lookahead for Jackson ([#338](https://github.com/opensearch-project/opensearch-java/pull/338))
- Add support for headers and sort parameters in cat requests ([#388](https://github.com/opensearch-project/opensearch-java/issues/388))
- Add support for data stream operations ([#416](https://github.com/opensearch-project/opensearch-java/pull/416))

### Dependencies
- Bumps `grgit-gradle` from 4.0.1 to 5.0.0
Expand Down
52 changes: 52 additions & 0 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
- [Aggregations](#aggregations)
- [Delete the document](#delete-the-document)
- [Delete the index](#delete-the-index)
- [Data Stream API](#data-stream-api)
- [Create a data stream](#create-a-data-stream)
- [Get data stream](#get-data-stream)
- [Data stream stats](#data-stream-stats)
- [Delete data stream](#delete-data-stream-and-backing-indices)
- [Cat API](#cat-api)
- [Cat Indices](#cat-indices)
- [Cat Aliases](#cat-aliases)
Expand Down Expand Up @@ -203,6 +208,53 @@ DeleteIndexRequest deleteIndexRequest = new DeleteRequest.Builder().index(index)
DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest);
```

## Data Stream API

### Create a data stream
Before creating a data stream, you need to create an index template which configures a set of indices as a data stream.
A data stream must have a timestamp field. If not specified, OpenSearch uses `@timestamp` as the default timestamp field name.

The following sample code creates an index template for data stream with a custom timestamp field, and creates a data stream
which matches the name pattern specified in the index template.
```java
String dataStreamIndexTemplateName = "sample-data-stream-template";
String timestampFieldName = "my_timestamp_field";
String namePattern = "sample-data-stream-*";
String dataStreamName = "sample-data-stream-1";

// Create an index template which configures data stream
PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest.Builder()
.name(dataStreamIndexTemplateName)
.indexPatterns(namePattern)
.dataStream(new DataStream.Builder()
.timestampField(t -> t.name(timestampFieldName))
.build())
.build();
PutIndexTemplateResponse putIndexTemplateResponse = javaClient().indices().putIndexTemplate(putIndexTemplateRequest);

// Create a data stream
CreateDataStreamRequest createDataStreamRequest = new CreateDataStreamRequest.Builder().name(dataStreamName).build();
CreateDataStreamResponse createDataStreamResponse = javaClient().indices().createDataStream(createDataStreamRequest);
```

### Get data stream
```java
GetDataStreamRequest getDataStreamRequest = new GetDataStreamRequest.Builder().name(dataStreamName).build();
GetDataStreamResponse getDataStreamResponse = javaClient().indices().getDataStream(getDataStreamRequest);
```

### Data stream stats
```java
DataStreamsStatsRequest dataStreamsStatsRequest = new DataStreamsStatsRequest.Builder().name(dataStreamName).build();
DataStreamsStatsResponse dataStreamsStatsResponse = javaClient().indices().dataStreamsStats(dataStreamsStatsRequest);
```

### Delete data stream and backing indices
```java
DeleteDataStreamRequest deleteDataStreamRequest = new DeleteDataStreamRequest.Builder().name(dataStreamName).build();
DeleteDataStreamResponse deleteDataStreamResponse = javaClient().indices().deleteDataStream(deleteDataStreamRequest);
```

## Cat API

### Cat Indices
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.opensearch.indices;

import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch._types.RequestBase;
import org.opensearch.client.transport.Endpoint;
import org.opensearch.client.transport.endpoints.SimpleEndpoint;
import org.opensearch.client.util.ApiTypeHelper;
import org.opensearch.client.util.ObjectBuilder;
import org.opensearch.client.util.ObjectBuilderBase;

import java.util.function.Function;

// typedef: indices.create_data_stream.Request

/**
* Creates a data stream
*/
public class CreateDataStreamRequest extends RequestBase {

private final String name;

// ---------------------------------------------------------------------------------------------

private CreateDataStreamRequest(Builder builder) {
this.name = ApiTypeHelper.requireNonNull(builder.name, this, "name");
}

public static CreateDataStreamRequest of(Function<Builder, ObjectBuilder<CreateDataStreamRequest>> fn) {
return fn.apply(new Builder()).build();
}

/**
* Required - The name of the data stream
* <p>
* API name: {@code name}
*/
public final String name() {
return this.name;
}

// ---------------------------------------------------------------------------------------------

/**
* Builder for {@link CreateDataStreamRequest}.
*/
public static class Builder extends ObjectBuilderBase implements ObjectBuilder<CreateDataStreamRequest> {

private String name;

/**
* Required - The name of the data stream
* <p>
* API name: {@code name}
*/
public final Builder name(String name) {
this.name = name;
return this;
}

public CreateDataStreamRequest build() {
_checkSingleUse();

return new CreateDataStreamRequest(this);
}
}

// ---------------------------------------------------------------------------------------------

/**
* Endpoint "{@code indices.create_data_stream}".
*/
public static final Endpoint<CreateDataStreamRequest, CreateDataStreamResponse, ErrorResponse> _ENDPOINT = new SimpleEndpoint<>(
// Request method
request -> {
return "PUT";
},

// Request path
request -> {
final int _name = 1 << 0;
int propsSet = 0;
propsSet |= _name;

if (propsSet == (_name)) {
StringBuilder sbd = new StringBuilder();
sbd.append("/_data_stream");
sbd.append("/");
SimpleEndpoint.pathEncode(request.name, sbd);
return sbd.toString();
}
throw SimpleEndpoint.noPathTemplateFound("path");
},

// Request parameters
SimpleEndpoint.emptyMap(),

SimpleEndpoint.emptyMap(), false, CreateDataStreamResponse._DESERIALIZER);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.opensearch.indices;

import org.opensearch.client.json.JsonpDeserializable;
import org.opensearch.client.json.JsonpDeserializer;
import org.opensearch.client.json.ObjectBuilderDeserializer;
import org.opensearch.client.json.ObjectDeserializer;
import org.opensearch.client.opensearch._types.AcknowledgedResponseBase;
import org.opensearch.client.util.ObjectBuilder;

import java.util.function.Function;

// typedef: indices.create_data_stream.Response

@JsonpDeserializable
public class CreateDataStreamResponse extends AcknowledgedResponseBase {

// ---------------------------------------------------------------------------------------------

private CreateDataStreamResponse(Builder builder) {
super(builder);
}

public static CreateDataStreamResponse of(Function<Builder, ObjectBuilder<CreateDataStreamResponse>> fn) {
return fn.apply(new Builder()).build();
}

// ---------------------------------------------------------------------------------------------

/**
* Builder for {@link CreateDataStreamResponse}
*/
public static class Builder extends AcknowledgedResponseBase.AbstractBuilder<Builder>
implements ObjectBuilder<CreateDataStreamResponse> {

@Override
protected Builder self() {
return this;
}

/**
* Builds a {@link CreateDataStreamResponse}.
*
* @throws NullPointerException
* if any required field is null.
*/
public CreateDataStreamResponse build() {
_checkSingleUse();

return new CreateDataStreamResponse(this);
}
}


// ---------------------------------------------------------------------------------------------

/**
* Json deserializer for {@link CreateDataStreamResponse}
*/
public static final JsonpDeserializer<CreateDataStreamResponse> _DESERIALIZER = ObjectBuilderDeserializer
.lazy(Builder::new, CreateDataStreamResponse::setupCreateDataStreamResponseDeserializer);

protected static void setupCreateDataStreamResponseDeserializer(ObjectDeserializer<CreateDataStreamResponse.Builder> op) {
AcknowledgedResponseBase.setupAcknowledgedResponseBaseDeserializer(op);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,15 @@ public class DataStream implements JsonpSerializable {
@Nullable
private final Boolean hidden;

@Nullable
private final DataStreamTimestampField timestampField;

// ---------------------------------------------------------------------------------------------

private DataStream(Builder builder) {

this.hidden = builder.hidden;
this.timestampField = builder.timestampField;

}

Expand All @@ -76,6 +80,14 @@ public final Boolean hidden() {
return this.hidden;
}

/**
* API name: {@code timestamp_field}
*/
@Nullable
public final DataStreamTimestampField timestampField() {
return this.timestampField;
}

/**
* Serialize this object to JSON.
*/
Expand All @@ -90,7 +102,11 @@ protected void serializeInternal(JsonGenerator generator, JsonpMapper mapper) {
if (this.hidden != null) {
generator.writeKey("hidden");
generator.write(this.hidden);
}

if (this.timestampField != null) {
generator.writeKey("timestamp_field");
this.timestampField.serialize(generator, mapper);
}

}
Expand All @@ -105,6 +121,9 @@ public static class Builder extends ObjectBuilderBase implements ObjectBuilder<D
@Nullable
private Boolean hidden;

@Nullable
private DataStreamTimestampField timestampField;

/**
* API name: {@code hidden}
*/
Expand All @@ -113,6 +132,21 @@ public final Builder hidden(@Nullable Boolean value) {
return this;
}

/**
* API name: {@code timestamp_field}
*/
public final Builder timestampField(@Nullable DataStreamTimestampField value) {
this.timestampField = value;
return this;
}

/**
* API name: {@code timestamp_field}
*/
public final Builder timestampField(Function<DataStreamTimestampField.Builder, ObjectBuilder<DataStreamTimestampField>> fn) {
return this.timestampField(fn.apply(new DataStreamTimestampField.Builder()).build());
}

/**
* Builds a {@link DataStream}.
*
Expand All @@ -137,6 +171,7 @@ public DataStream build() {
protected static void setupDataStreamDeserializer(ObjectDeserializer<DataStream.Builder> op) {

op.add(Builder::hidden, JsonpDeserializer.booleanDeserializer(), "hidden");
op.add(Builder::timestampField, DataStreamTimestampField._DESERIALIZER, "timestamp_field");

}

Expand Down
Loading

0 comments on commit 04dc3ad

Please sign in to comment.