Skip to content

Commit

Permalink
[HUDI-4971] Remove direct use of kryo from SerDeUtils (apache#7014)
Browse files Browse the repository at this point in the history

Co-authored-by: Alexey Kudinkin <[email protected]>
  • Loading branch information
xushiyan and Alexey Kudinkin committed Dec 14, 2022
1 parent 8da013e commit c4b3b12
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.orc.TypeDescription;

import static org.apache.avro.JsonProperties.NULL_VALUE;
import static org.apache.hudi.common.util.BinaryUtil.toBytes;

/**
* Methods including addToVector, addUnionValue, createOrcSchema are originally from
Expand Down Expand Up @@ -221,8 +222,7 @@ public static void addToVector(TypeDescription type, ColumnVector colVector, Sch
binaryBytes = ((GenericData.Fixed)value).bytes();
} else if (value instanceof ByteBuffer) {
final ByteBuffer byteBuffer = (ByteBuffer) value;
binaryBytes = new byte[byteBuffer.remaining()];
byteBuffer.get(binaryBytes);
binaryBytes = toBytes(byteBuffer);
} else if (value instanceof byte[]) {
binaryBytes = (byte[]) value;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.util;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.zip.CRC32;

Expand Down Expand Up @@ -117,6 +118,15 @@ public static byte updatePos(byte a, int apos, byte b, int bpos) {
return (byte) (a ^ (1 << (7 - apos)));
}

/**
* Copies {@link ByteBuffer} into allocated {@code byte[]} array
*/
public static byte[] toBytes(ByteBuffer buffer) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return bytes;
}

public static byte[] toBytes(int val) {
byte[] b = new byte[4];
for (int i = 3; i > 0; i--) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.BinaryUtil.toBytes;

/**
* Utility functions for ORC files.
*/
Expand Down Expand Up @@ -238,8 +240,7 @@ public Schema readAvroSchema(Configuration conf, Path orcFilePath) {
try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) {
if (reader.hasMetadataValue("orc.avro.schema")) {
ByteBuffer metadataValue = reader.getMetadataValue("orc.avro.schema");
byte[] bytes = new byte[metadataValue.remaining()];
metadataValue.get(bytes);
byte[] bytes = toBytes(metadataValue);
return new Schema.Parser().parse(new String(bytes));
} else {
TypeDescription orcSchema = reader.getSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hudi.common.data.HoodieData
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
import org.apache.hudi.common.util.BinaryUtil.toBytes
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.collection
import org.apache.hudi.common.util.hash.ColumnIndexID
Expand Down Expand Up @@ -469,10 +470,7 @@ object ColumnStatsIndexSupport {
}
case BinaryType =>
value match {
case b: ByteBuffer =>
val bytes = new Array[Byte](b.remaining)
b.get(bytes)
bytes
case b: ByteBuffer => toBytes(b)
case other => other
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,28 @@

package org.apache.spark.sql.hudi

import java.io.ByteArrayOutputStream

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
import org.apache.hudi.common.util.BinaryUtil
import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.serializer.{KryoSerializer, SerializerInstance}

import java.nio.ByteBuffer


object SerDeUtils {

private val kryoLocal = new ThreadLocal[Kryo] {
private val SERIALIZER_THREAD_LOCAL = new ThreadLocal[SerializerInstance] {

override protected def initialValue: Kryo = {
val serializer = new KryoSerializer(new SparkConf(true))
serializer.newKryo()
override protected def initialValue: SerializerInstance = {
new KryoSerializer(new SparkConf(true)).newInstance()
}
}

def toBytes(o: Any): Array[Byte] = {
val outputStream = new ByteArrayOutputStream(4096 * 5)
val output = new Output(outputStream)
try {
kryoLocal.get.writeClassAndObject(output, o)
output.flush()
} finally {
output.clear()
output.close()
}
outputStream.toByteArray
val buf = SERIALIZER_THREAD_LOCAL.get.serialize(o)
BinaryUtil.toBytes(buf)
}

def toObject(bytes: Array[Byte]): Any = {
val input = new Input(bytes)
kryoLocal.get.readClassAndObject(input)
SERIALIZER_THREAD_LOCAL.get.deserialize(ByteBuffer.wrap(bytes))
}
}

0 comments on commit c4b3b12

Please sign in to comment.