Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add DATE/TIME to connect integration test #7732

Merged
merged 6 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void shouldReturnError() {
}

@Test
public void shouldReadTimestampsFromConnect() {
public void shouldReadTimeTypesFromConnect() {
// Given:
create("mock-source", ImmutableMap.<String, String> builder()
.put("connector.class", "org.apache.kafka.connect.tools.VerifiableSourceConnector")
Expand All @@ -266,18 +266,22 @@ public void shouldReadTimestampsFromConnect() {
.put("topic.creation.default.partitions", "1")
.build());

final long start = System.nanoTime();
RestResponse<KsqlEntityList> response;
do {
response = ksqlRestClient.makeKsqlRequest("CREATE STREAM FOO (PAYLOAD TIMESTAMP) WITH (KAFKA_TOPIC='foo', VALUE_FORMAT='DELIMITED');");
} while(!response.isSuccessful() && System.nanoTime() - start < TIMEOUT_NS);
makeKsqlRequest("CREATE STREAM TIMESTAMP_STREAM (PAYLOAD TIMESTAMP) WITH (KAFKA_TOPIC='foo', VALUE_FORMAT='DELIMITED');");
makeKsqlRequest("CREATE STREAM TIME_STREAM (PAYLOAD TIME) WITH (KAFKA_TOPIC='foo', VALUE_FORMAT='DELIMITED');");
makeKsqlRequest("CREATE STREAM DATE_STREAM (PAYLOAD DATE) WITH (KAFKA_TOPIC='foo', VALUE_FORMAT='DELIMITED');");

// When:
final RestResponse<List<StreamedRow>> queryFoo = ksqlRestClient.makeQueryRequest("SELECT * FROM FOO EMIT CHANGES LIMIT 1;", 1L);
final RestResponse<List<StreamedRow>> queryTimestamp = ksqlRestClient.makeQueryRequest("SELECT * FROM TIMESTAMP_STREAM EMIT CHANGES LIMIT 1;", 1L);
final RestResponse<List<StreamedRow>> queryTime = ksqlRestClient.makeQueryRequest("SELECT * FROM TIME_STREAM EMIT CHANGES LIMIT 1;", 1L);
final RestResponse<List<StreamedRow>> queryDate = ksqlRestClient.makeQueryRequest("SELECT * FROM DATE_STREAM EMIT CHANGES LIMIT 1;", 1L);

// Then:
assertThat("successfully queried FOO", queryFoo.isSuccessful());
assertThat(queryFoo.getResponse().get(1).getRow().get().getColumns().get(0), is("1970-01-01T00:00:00.000"));
assertThat("successfully queried TIMESTAMP_STREAM", queryTimestamp.isSuccessful());
assertThat("successfully queried TIME_STREAM", queryTime.isSuccessful());
assertThat("successfully queried DATE_STREAM", queryDate.isSuccessful());
assertThat(queryTimestamp.getResponse().get(1).getRow().get().getColumns().get(0), is("1970-01-01T00:00:00.000"));
assertThat(queryTime.getResponse().get(1).getRow().get().getColumns().get(0), is("00:00"));
assertThat(queryDate.getResponse().get(1).getRow().get().getColumns().get(0), is("1970-01-01"));
}

@Test
Expand Down Expand Up @@ -330,4 +334,11 @@ private void create(final String name, final Map<String, String> properties, Con
LOG.info("Got response from Connect: {}", response);
}

private void makeKsqlRequest(final String request) {
final long start = System.nanoTime();
RestResponse<KsqlEntityList> response;
do {
response = ksqlRestClient.makeKsqlRequest(request);
} while(!response.isSuccessful() && System.nanoTime() - start < TIMEOUT_NS);
}
}
6 changes: 6 additions & 0 deletions ksqldb-rest-model/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
<version>${io.confluent.ksql.version}</version>
</dependency>

<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-engine-common</artifactId>
Copy link
Contributor Author

@jzaralim jzaralim Jun 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed because SqlTimeTypes is in this package. It's a very lightweight package and only has one dependency on ksqldb-common though.

<version>${io.confluent.ksql.version}</version>
</dependency>

<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,29 @@

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.datatype.guava.GuavaModule;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.confluent.ksql.json.KsqlTypesSerializationModule;
import io.confluent.ksql.json.StructSerializationModule;
import io.confluent.ksql.schema.ksql.SqlTimeTypes;
import io.confluent.ksql.util.KsqlConstants;
import java.io.IOException;
import java.sql.Date;
import java.sql.Time;
import java.text.SimpleDateFormat;
import java.util.TimeZone;

/**
* Mapper used by the Rest Api.
*/
// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public enum ApiJsonMapper {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

INSTANCE;

Expand All @@ -46,9 +55,29 @@ public enum ApiJsonMapper {
.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN)
.setDateFormat(new SimpleDateFormat(KsqlConstants.DATE_TIME_PATTERN))
.setTimeZone(TimeZone.getTimeZone("Z"))
.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true));
.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true))
.registerModule(new SimpleModule()
.addSerializer(Time.class, new TimeSerializer())
.addSerializer(Date.class, new DateSerializer())
);

public ObjectMapper get() {
return mapper;
}

public static class TimeSerializer extends JsonSerializer<Time> {
@Override
public void serialize(final Time time, final JsonGenerator jsonGenerator,
final SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeString(SqlTimeTypes.formatTime(time));
}
}

public static class DateSerializer extends JsonSerializer<Date> {
@Override
public void serialize(final Date date, final JsonGenerator jsonGenerator,
final SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeString(SqlTimeTypes.formatDate(date));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.DecimalNode;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import org.junit.Test;

Expand Down Expand Up @@ -79,4 +81,22 @@ public void shouldFormatTimestampsToISO8601() throws Exception {
// Then:
assertThat(result, is("\"1999-11-30T11:00:00.000\""));
}

@Test
public void shouldFormatTime() throws Exception {
// When:
final String result = OBJECT_MAPPER.writeValueAsString(new Time(10000));

// Then:
assertThat(result, is("\"00:00:10\""));
}

@Test
public void shouldFormatDate() throws Exception {
// When:
final String result = OBJECT_MAPPER.writeValueAsString(new Date(864000000));

// Then:
assertThat(result, is("\"1970-01-11\""));
}
}