Skip to content

Commit

Permalink
feat: support BYTES column types in Java client API (#7823)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena authored Jul 21, 2021
1 parent cea0989 commit df5964e
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*/
public interface ColumnType {

enum Type { STRING, INTEGER, BIGINT, DOUBLE, BOOLEAN, DECIMAL, ARRAY, MAP, STRUCT }
enum Type { STRING, INTEGER, BIGINT, DOUBLE, BOOLEAN, DECIMAL, BYTES, ARRAY, MAP, STRUCT }

/**
* Returns the type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ public BigDecimal getDecimal(final int pos) {
return new BigDecimal(getValue(pos).toString());
}

/**
* Returns the value at a specified index as a byte array.
*
* @param pos the index
* @return the value
* @throws IndexOutOfBoundsException if the index is invalid
* @throws IllegalArgumentException if the array value is not a Base64 encoded string
*/
public byte[] getBytes(final int pos) {
return delegate.getBinary(pos);
}

/**
* Returns the value at a specified index as a {@code KsqlArray}.
*
Expand Down Expand Up @@ -288,6 +300,17 @@ public KsqlArray add(final BigDecimal value) {
return this;
}

/**
* Appends the specified value to the end of the array.
*
* @param value the value to append
* @return a reference to this
*/
public KsqlArray add(final byte[] value) {
delegate.add(value);
return this;
}

/**
* Appends the specified value to the end of the array.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,19 @@ public BigDecimal getDecimal(final String key) {
return new BigDecimal(getValue(key).toString());
}

/**
* Returns the value associated with the specified key as a byte array. Returns null if
* the key is not present.
*
* @param key the key
* @return the value
* @throws ClassCastException if the value is not a {@code String}
* @throws IllegalArgumentException if the column value is not a base64 encoded string
*/
public byte[] getBytes(final String key) {
return delegate.getBinary(key);
}

/**
* Returns the value associated with the specified key as a {@link KsqlArray}. Returns null if the
* key is not present.
Expand Down Expand Up @@ -306,6 +319,11 @@ public KsqlObject put(final String key, final BigDecimal value) {
return this;
}

public KsqlObject put(final String key, final byte[] value) {
delegate.put(key, value);
return this;
}

/**
* Adds an entry for the specified key and value to the map.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,28 @@ public interface Row {
*/
BigDecimal getDecimal(String columnName);

/**
* Returns the value for a particular column of the {@code Row} as a byte array.
*
* @param columnIndex index of column (1-indexed)
* @return column value
* @throws ClassCastException if the column value is not a {@code String}
* @throws IllegalArgumentException if the column value is not a base64 encoded string
* @throws IndexOutOfBoundsException if the index is invalid
*/
byte[] getBytes(int columnIndex);

/**
* Returns the value for a particular column of the {@code Row} as byte array.
*
* @param columnName name of column
* @return column value
* @throws ClassCastException if the column value is not a {@code String}
* @throws IllegalArgumentException if the column name is invalid or the column value is not
* a base64 encoded string
*/
byte[] getBytes(String columnName);

/**
* Returns the value for a particular column of the {@code Row} as a {@link KsqlObject}.
* Useful for {@code MAP} and {@code STRUCT} column types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ public BigDecimal getDecimal(final String columnName) {
return getDecimal(indexFromName(columnName));
}

@Override
public byte[] getBytes(final int columnIndex) {
return values.getBytes(columnIndex - 1);
}

@Override
public byte[] getBytes(final String columnName) {
return getBytes(indexFromName(columnName));
}

@Override
public KsqlObject getKsqlObject(final int columnIndex) {
return values.getKsqlObject(columnIndex - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1777,6 +1777,7 @@ private static void verifyRowWithIndex(final Row row, final int index) {
assertThat(row.getLong("f_long"), is(((long) index) * index));
assertThat(row.getDouble("f_double"), is(index + 0.1111));
assertThat(row.getDecimal("f_decimal"), is(BigDecimal.valueOf(index + 0.1)));
assertThat(row.getBytes("f_bytes"), is(new byte[]{0, 1, 2, 3, 4, 5}));
final KsqlArray arrayVal = row.getKsqlArray("f_array");
assertThat(arrayVal, is(new KsqlArray().add("s" + index).add("t" + index)));
assertThat(arrayVal.getString(0), is("s" + index));
Expand Down Expand Up @@ -1810,10 +1811,11 @@ private static void verifyRowWithIndex(final Row row, final int index) {
assertThat(values.getLong(3), is(row.getLong("f_long")));
assertThat(values.getDouble(4), is(row.getDouble("f_double")));
assertThat(values.getDecimal(5), is(row.getDecimal("f_decimal")));
assertThat(values.getKsqlArray(6), is(row.getKsqlArray("f_array")));
assertThat(values.getKsqlObject(7), is(row.getKsqlObject("f_map")));
assertThat(values.getKsqlObject(8), is(row.getKsqlObject("f_struct")));
assertThat(values.getValue(9), is(nullValue()));
assertThat(values.getBytes(6), is(row.getBytes("f_bytes")));
assertThat(values.getKsqlArray(7), is(row.getKsqlArray("f_array")));
assertThat(values.getKsqlObject(8), is(row.getKsqlObject("f_map")));
assertThat(values.getKsqlObject(9), is(row.getKsqlObject("f_struct")));
assertThat(values.getValue(10), is(nullValue()));
assertThat(values.toJsonString(), is((new JsonArray(values.getList())).toString()));
assertThat(values.toString(), is(values.toJsonString()));

Expand All @@ -1828,6 +1830,7 @@ private static void verifyRowWithIndex(final Row row, final int index) {
assertThat(obj.getLong("f_long"), is(row.getLong("f_long")));
assertThat(obj.getDouble("f_double"), is(row.getDouble("f_double")));
assertThat(obj.getDecimal("f_decimal"), is(row.getDecimal("f_decimal")));
assertThat(obj.getBytes("f_bytes"), is(row.getBytes("f_bytes")));
assertThat(obj.getKsqlArray("f_array"), is(row.getKsqlArray("f_array")));
assertThat(obj.getKsqlObject("f_map"), is(row.getKsqlObject("f_map")));
assertThat(obj.getKsqlObject("f_struct"), is(row.getKsqlObject("f_struct")));
Expand Down Expand Up @@ -1860,6 +1863,7 @@ private static List<KsqlObject> generateInsertRows() {
.put("f_long", i * i)
.put("f_double", i + 0.1111)
.put("f_decimal", new BigDecimal(i + 0.1))
.put("f_bytes", new byte[]{0, 1, 2, 3, 4, 5})
.put("f_array", new KsqlArray().add("s" + i).add("t" + i))
.put("f_map", new KsqlObject().put("k" + i, "v" + i))
.put("f_struct", new KsqlObject().put("F1", "v" + i).put("F2", i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import io.confluent.ksql.api.client.KsqlArray;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.util.RowUtil;
import io.netty.buffer.ByteBuf;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.junit.Before;
Expand All @@ -38,9 +40,11 @@
public class RowImplTest {

private static final List<String> COLUMN_NAMES =
ImmutableList.of("f_str", "f_int", "f_long", "f_double", "f_bool", "f_decimal", "f_array", "f_map", "f_struct", "f_null");
ImmutableList.of("f_str", "f_int", "f_long", "f_double", "f_bool", "f_decimal", "f_bytes",
"f_array", "f_map", "f_struct", "f_null");
private static final List<ColumnType> COLUMN_TYPES = RowUtil.columnTypesFromStrings(
ImmutableList.of("STRING", "INTEGER", "BIGINT", "DOUBLE", "BOOLEAN", "DECIMAL", "ARRAY", "MAP", "STRUCT", "INTEGER"));
ImmutableList.of("STRING", "INTEGER", "BIGINT", "DOUBLE", "BOOLEAN", "DECIMAL", "BYTES",
"ARRAY", "MAP", "STRUCT", "INTEGER"));
private static final Map<String, Integer> COLUMN_NAME_TO_INDEX = RowUtil.valueToIndexMap(COLUMN_NAMES);
private static final JsonArray VALUES = new JsonArray()
.add("foo")
Expand All @@ -49,6 +53,7 @@ public class RowImplTest {
.add(34.43)
.add(false)
.add(12.21) // server endpoint returns decimals as doubles
.add(new byte[]{0, 1, 2})
.add(new JsonArray("[\"e1\",\"e2\"]"))
.add(new JsonObject("{\"k1\":\"v1\",\"k2\":\"v2\"}"))
.add(new JsonObject("{\"f1\":\"baz\",\"f2\":12}"))
Expand All @@ -69,10 +74,15 @@ public void shouldOneIndexColumnNames() {
assertThat(row.getValue(4), is(34.43));
assertThat(row.getValue(5), is(false));
assertThat(row.getValue(6), is(12.21));
assertThat(row.getValue(7), is(new JsonArray("[\"e1\",\"e2\"]")));
assertThat(row.getValue(8), is(new JsonObject("{\"k1\":\"v1\",\"k2\":\"v2\"}")));
assertThat(row.getValue(9), is(new JsonObject("{\"f1\":\"baz\",\"f2\":12}")));
assertThat(row.getValue(10), is(nullValue()));

// Base64 encoded byte. The getValue() does not know if the returned value is a String or
// byte (same as JsonArray), so it cannot be decoded to bytes. Only getBytes() will do the
// decoding. In this test, we only test that getValue() returns the encoded string.
assertThat(row.getValue(7), is("AAEC"));
assertThat(row.getValue(8), is(new JsonArray("[\"e1\",\"e2\"]")));
assertThat(row.getValue(9), is(new JsonObject("{\"k1\":\"v1\",\"k2\":\"v2\"}")));
assertThat(row.getValue(10), is(new JsonObject("{\"f1\":\"baz\",\"f2\":12}")));
assertThat(row.getValue(11), is(nullValue()));
}

@Test
Expand Down Expand Up @@ -105,6 +115,11 @@ public void shouldGetBoolean() {
public void shouldGetDecimal() {
assertThat(row.getDecimal("f_decimal"), is(new BigDecimal("12.21")));
}
@Test
public void shouldGetBytes() {
assertThat(row.getBytes("f_bytes"), is(new byte[]{0, 1, 2}));
}


@Test
public void shouldGetKsqlArray() {
Expand Down Expand Up @@ -156,6 +171,7 @@ public void shouldGetAsObject() {
assertThat(obj.getDouble("f_double"), is(34.43));
assertThat(obj.getBoolean("f_bool"), is(false));
assertThat(obj.getDecimal("f_decimal"), is(new BigDecimal("12.21")));
assertThat(obj.getBytes("f_bytes"), is(new byte[]{0, 1, 2}));
assertThat(obj.getKsqlArray("f_array"), is(new KsqlArray(ImmutableList.of("e1", "e2"))));
assertThat(obj.getKsqlObject("f_map"), is(new KsqlObject(ImmutableMap.of("k1", "v1", "k2", "v2"))));
assertThat(obj.getKsqlObject("f_struct"), is(new KsqlObject(ImmutableMap.of("f1", "baz", "f2", 12))));
Expand Down
Loading

0 comments on commit df5964e

Please sign in to comment.