Skip to content

Commit

Permalink
feat: add TO_BYTES/FROM_BYTES functions for bytes/string conversions (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
spena authored Jul 21, 2021
1 parent eddac72 commit cea0989
Show file tree
Hide file tree
Showing 20 changed files with 1,595 additions and 0 deletions.
22 changes: 22 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,17 @@ multiple elements, like those containing wildcards, aren't supported.

`CREATE STREAM LOGS (LOG STRUCT<CLOUD STRING, APP STRING, INSTANCE INT>, ...) WITH (VALUE_FORMAT='JSON', ...)`

### `FROM_BYTES`

Since: - 0.21

```sql
FROM_BYTES(bytes, encoding)
```

Converts a BYTES column to a STRING in the specified encoding type.
Supported encoding types are: `hex`, `utf8`, `ascii`, and `base64`.

### `INITCAP`

Since: 0.6.0
Expand Down Expand Up @@ -1046,6 +1057,17 @@ the string.
For example, `SUBSTRING("stream", 1, 4)`
returns "stre".

### `TO_BYTES`

Since: - 0.21

```sql
TO_BYTES(string, encoding)
```

Converts a STRING column in the specified encoding type to a BYTES column.
Supported encoding types are: `hex`, `utf8`, `ascii`, and `base64`.

### `TRIM`

Since: -
Expand Down
136 changes: 136 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/BytesUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;

import com.google.common.collect.ImmutableMap;
import com.google.common.io.BaseEncoding;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.function.Function;

public final class BytesUtils {
private static final Base64.Encoder BASE64_ENCODER = Base64.getMimeEncoder();
private static final Base64.Decoder BASE64_DECODER = Base64.getMimeDecoder();

enum Encoding {
HEX,
UTF8,
ASCII,
BASE64;

public static Encoding from(final String value) {
if (value.equalsIgnoreCase("hex")) {
return HEX;
} else if (value.equalsIgnoreCase("utf8")) {
return UTF8;
} else if (value.equalsIgnoreCase("ascii")) {
return ASCII;
} else if (value.equalsIgnoreCase("base64")) {
return BASE64;
}

throw new IllegalArgumentException("Unknown encoding type '" + value + "'. "
+ "Supported formats are 'hex', 'utf8', 'ascii', and 'base64'.");
}
}

private BytesUtils() {
}

private static final Map<Encoding, Function<byte[], String>> ENCODERS = ImmutableMap.of(
Encoding.HEX, v -> hexEncoding(v),
Encoding.UTF8, v -> utf8Encoding(v),
Encoding.ASCII, v -> asciiEncoding(v),
Encoding.BASE64, v -> base64Encoding(v)
);

private static final Map<Encoding, Function<String, byte[]>> DECODERS = ImmutableMap.of(
Encoding.HEX, v -> hexDecoding(v),
Encoding.UTF8, v -> utf8Decoding(v),
Encoding.ASCII, v -> asciiDecoding(v),
Encoding.BASE64, v -> base64Decoding(v)
);

public static String encode(final byte[] value, final String encoding) {
final Function<byte[], String> encoder = ENCODERS.get(Encoding.from(encoding));
if (encoder == null) {
throw new IllegalStateException(String.format("Unknown encoding type '%s'. "
+ "Supported formats are 'hex', 'utf8', 'ascii', and 'base64'.", encoding));
}

return encoder.apply(value);
}

public static byte[] decode(final String value, final String encoding) {
final Function<String, byte[]> decoder = DECODERS.get(Encoding.from(encoding));
if (decoder == null) {
throw new IllegalStateException(String.format("Unknown encoding type '%s'. "
+ "Supported formats are 'hex', 'utf8', 'ascii', and 'base64'.", encoding));
}

return decoder.apply(value);
}

public static byte[] getByteArray(final ByteBuffer buffer) {
if (buffer == null) {
return null;
}

// ByteBuffer.array() throws an exception if it is in read-only state. Protobuf usually
// returns ByteBuffer in read-only, so this util allows us to get the internal byte array.
if (buffer.isReadOnly()) {
final byte[] internalByteArray = new byte[buffer.capacity()];
buffer.get(internalByteArray);
return internalByteArray;
}

return buffer.array();
}

private static String hexEncoding(final byte[] value) {
return BaseEncoding.base16().encode(value);
}

private static byte[] hexDecoding(final String value) {
return BaseEncoding.base16().decode(value);
}

private static String utf8Encoding(final byte[] value) {
return new String(value, StandardCharsets.UTF_8);
}

private static byte[] utf8Decoding(final String value) {
return value.getBytes(StandardCharsets.UTF_8);
}

private static String asciiEncoding(final byte[] value) {
return new String(value, StandardCharsets.US_ASCII);
}

private static byte[] asciiDecoding(final String value) {
return value.getBytes(StandardCharsets.US_ASCII);
}

private static String base64Encoding(final byte[] value) {
return BASE64_ENCODER.encodeToString(value);
}

private static byte[] base64Decoding(final String value) {
return BASE64_DECODER.decode(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;

import org.junit.Test;

import java.nio.ByteBuffer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

public class BytesUtilTest {
@Test
public void shouldReturnByteArrayOnReadOnlyByteBuffer() {
// Given
final ByteBuffer buffer = ByteBuffer.wrap(new byte[]{5}).asReadOnlyBuffer();

// When
final byte[] bytes = BytesUtils.getByteArray(buffer);

// Then
assertThat(bytes, is(new byte[]{5}));
}

@Test
public void shouldReturnByteArrayOnWritableByteBuffer() {
// Given
final ByteBuffer buffer = ByteBuffer.wrap(new byte[]{5});

// When
final byte[] bytes = BytesUtils.getByteArray(buffer);

// Then
assertThat(bytes, is(new byte[]{5}));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.string;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.util.BytesUtils;
import io.confluent.ksql.util.KsqlConstants;

import java.nio.ByteBuffer;

@UdfDescription(
name = "from_bytes",
category = FunctionCategory.STRING,
description = "Converts a BYTES value to STRING in the specified encoding. "
+ "The accepted encoders are 'hex', 'utf8', 'ascii', and 'base64'.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class FromBytes {
@Udf(description = "Converts a BYTES value to STRING in the specified encoding. "
+ "The accepted encoders are 'hex', 'utf8', 'ascii', and 'base64'.")
public String fromBytes(final ByteBuffer value, final String encoding) {
return (value == null) ? null : BytesUtils.encode(BytesUtils.getByteArray(value), encoding);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.string;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.util.BytesUtils;
import io.confluent.ksql.util.KsqlConstants;

import java.nio.ByteBuffer;

@UdfDescription(
name = "to_bytes",
category = FunctionCategory.STRING,
description = "Converts a STRING value in the specified encoding to BYTES. "
+ "The accepted encoders are 'hex', 'utf8', 'ascii', and 'base64'.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class ToBytes {
@Udf(description = "Converts a STRING value in the specified encoding to BYTES. "
+ "The accepted encoders are 'hex', 'utf8', 'ascii', and 'base64'.")
public ByteBuffer toBytes(final String value, final String encoding) {
return (value == null) ? null : ByteBuffer.wrap(BytesUtils.decode(value, encoding));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.string;

import org.junit.Before;
import org.junit.Test;

import java.nio.ByteBuffer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThrows;

public class FromBytesTest {
private FromBytes udf;

@Before
public void setUp() {
udf = new FromBytes();
}

@Test
public void shouldConvertBytesToHexString() {
assertThat(udf.fromBytes(ByteBuffer.wrap(new byte[]{33}), "hex"),
is("21"));
assertThat(udf.fromBytes(ByteBuffer.wrap(new byte[]{}), "hex"),
is(""));
}

@Test
public void shouldConvertBytesToUtf8String() {
assertThat(udf.fromBytes(ByteBuffer.wrap(new byte[]{33}), "utf8"),
is("!"));
assertThat(udf.fromBytes(ByteBuffer.wrap(new byte[]{}), "utf8"),
is(""));
}

@Test
public void shouldConvertAsciiStringToBytes() {
assertThat(udf.fromBytes(ByteBuffer.wrap(new byte[]{33}), "ascii"),
is("!"));
assertThat(udf.fromBytes(ByteBuffer.wrap(new byte[]{}), "ascii"),
is(""));
}

@Test
public void shouldConvertBase64StringToBytes() {
assertThat(udf.fromBytes(ByteBuffer.wrap(new byte[]{33}), "base64"),
is("IQ=="));
assertThat(udf.fromBytes(ByteBuffer.wrap(new byte[]{}), "base64"),
is(""));
}

@Test
public void shouldReturnNullOnNullBytes() {
assertThat(udf.fromBytes(null, "base64"), nullValue());
}

@Test
public void shouldThrowOnUnknownEncodingType() {
final Exception e = assertThrows(IllegalArgumentException.class,
() -> udf.fromBytes(ByteBuffer.wrap(new byte[]{0, 1, 2}), "base5000"));

assertThat(e.getMessage(), containsString("Unknown encoding type 'base5000'"));
}
}
Loading

0 comments on commit cea0989

Please sign in to comment.