diff --git a/.gitignore b/.gitignore index 5be0a5d7..523a4d66 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# ignore maven generated files +dependency-reduced-pom.xml + #ignore idea configuration .idea *.iml @@ -34,4 +37,4 @@ target hs_err_pid* # ignore Mac files -.DS_Store \ No newline at end of file +.DS_Store diff --git a/online-bulk-load/README.md b/online-bulk-load/README.md index 97ef471f..92346c85 100644 --- a/online-bulk-load/README.md +++ b/online-bulk-load/README.md @@ -22,7 +22,7 @@ mvn clean package -DskipTests -am -pl online-bulk-load ``` spark-submit \ --master local[*] \ ---jars /path/to/tikv-client-java-3.2.0-SNAPSHOT.jar \ +--jars /path/to/tikv-client-java-3.3.0-SNAPSHOT.jar \ --class org.tikv.bulkload.example.BulkLoadExample \ online-bulk-load/target/online-bulk-load-0.0.1-SNAPSHOT.jar \ diff --git a/pom.xml b/pom.xml index c70ee8aa..f0e33c2b 100644 --- a/pom.xml +++ b/pom.xml @@ -122,7 +122,7 @@ org.tikv tikv-client-java - 3.2.0-SNAPSHOT + 3.3.0-SNAPSHOT diff --git a/sst-data-source/README.md b/sst-data-source/README.md index 2e6e3f83..8227ca58 100644 --- a/sst-data-source/README.md +++ b/sst-data-source/README.md @@ -34,7 +34,7 @@ br backup raw \ ``` spark-submit \ --master local[*] \ ---jars /path/to/tikv-client-java-3.2.0-SNAPSHOT.jar \ +--jars /path/to/tikv-client-java-3.3.0-SNAPSHOT.jar \ --class org.tikv.datasources.sst.example.SSTDataSourceExample \ sst-data-source/target/sst-data-source-0.0.1-SNAPSHOT.jar \ hdfs:///path/to/sst/ diff --git a/sst-data-source/pom.xml b/sst-data-source/pom.xml index 2c6c2963..80f5be6a 100644 --- a/sst-data-source/pom.xml +++ b/sst-data-source/pom.xml @@ -27,6 +27,18 @@ jar Spark SST Data Source + + 6.22.1.1 + + + + + org.rocksdb + rocksdbjni + ${rocksdb.version} + + + @@ -36,6 +48,30 @@ + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + + + + org.rocksdb:rocksdbjni + + + + + + org.antipathy diff --git a/sst-data-source/src/main/java/org/tikv/datasources/br/BackupDecoder.java b/sst-data-source/src/main/java/org/tikv/datasources/br/BackupDecoder.java new file mode 100644 index 00000000..ab919927 --- /dev/null +++ b/sst-data-source/src/main/java/org/tikv/datasources/br/BackupDecoder.java @@ -0,0 +1,67 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.datasources.br; + +import java.io.Serializable; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.tikv.common.exception.SSTDecodeException; +import org.tikv.kvproto.Brpb; + +public class BackupDecoder implements Serializable { + private final Brpb.BackupMeta backupMeta; + private final boolean ttlEnabled; + private final KVDecoder kvDecoder; + + public BackupDecoder(Brpb.BackupMeta backupMeta) throws SSTDecodeException { + this.backupMeta = backupMeta; + this.ttlEnabled = false; + this.kvDecoder = initKVDecoder(); + } + + public BackupDecoder(Brpb.BackupMeta backupMeta, boolean ttlEnabled) throws SSTDecodeException { + this.backupMeta = backupMeta; + this.ttlEnabled = ttlEnabled; + this.kvDecoder = initKVDecoder(); + } + + private KVDecoder initKVDecoder() throws SSTDecodeException { + if (backupMeta.getIsRawKv()) { + if ("V1".equals(backupMeta.getApiVersion().name())) { + return new RawKVDecoderV1(ttlEnabled); + } else { + throw new SSTDecodeException( + "does not support decode APIVersion " + backupMeta.getApiVersion().name()); + } + } else { + throw new SSTDecodeException("TxnKV is not supported yet!"); + } + } + + public org.tikv.datasources.br.SSTDecoder decodeSST(String sstFilePath) { + return decodeSST(sstFilePath, new Options(), new ReadOptions()); + } + + public org.tikv.datasources.br.SSTDecoder decodeSST(String sstFilePath, Options options, ReadOptions readOptions) { + return new SSTDecoder(sstFilePath, kvDecoder, options, readOptions); + } + + public Brpb.BackupMeta getBackupMeta() { + return backupMeta; + } +} diff --git a/sst-data-source/src/main/java/org/tikv/datasources/br/BackupMetaDecoder.java b/sst-data-source/src/main/java/org/tikv/datasources/br/BackupMetaDecoder.java new file mode 100644 index 00000000..40ce363d --- /dev/null +++ b/sst-data-source/src/main/java/org/tikv/datasources/br/BackupMetaDecoder.java @@ -0,0 +1,40 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.datasources.br; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import org.tikv.kvproto.Brpb; + +public class BackupMetaDecoder { + private final Brpb.BackupMeta backupMeta; + + public BackupMetaDecoder(byte[] data) throws org.tikv.shade.com.google.protobuf.InvalidProtocolBufferException { + this.backupMeta = Brpb.BackupMeta.parseFrom(data); + } + + public Brpb.BackupMeta getBackupMeta() { + return backupMeta; + } + + public static BackupMetaDecoder parse(String backupMetaFilePath) throws IOException { + byte[] data = Files.readAllBytes(new File(backupMetaFilePath).toPath()); + return new BackupMetaDecoder(data); + } +} diff --git a/sst-data-source/src/main/java/org/tikv/datasources/br/KVDecoder.java b/sst-data-source/src/main/java/org/tikv/datasources/br/KVDecoder.java new file mode 100644 index 00000000..d3a29cbe --- /dev/null +++ b/sst-data-source/src/main/java/org/tikv/datasources/br/KVDecoder.java @@ -0,0 +1,27 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.datasources.br; + +import com.google.protobuf.ByteString; +import java.io.Serializable; + +public interface KVDecoder extends Serializable { + ByteString decodeKey(byte[] key); + + ByteString decodeValue(byte[] value); +} diff --git a/sst-data-source/src/main/java/org/tikv/datasources/br/RawKVDecoderV1.java b/sst-data-source/src/main/java/org/tikv/datasources/br/RawKVDecoderV1.java new file mode 100644 index 00000000..c1632a94 --- /dev/null +++ b/sst-data-source/src/main/java/org/tikv/datasources/br/RawKVDecoderV1.java @@ -0,0 +1,56 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.datasources.br; + +import com.google.protobuf.ByteString; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RawKVDecoderV1 implements KVDecoder { + private static final Logger logger = LoggerFactory.getLogger(SSTIterator.class); + + private final boolean ttlEnabled; + + public RawKVDecoderV1(boolean ttlEnabled) { + this.ttlEnabled = ttlEnabled; + } + + @Override + public ByteString decodeKey(byte[] key) { + if (key == null || key.length == 0) { + logger.warn( + "skip Key-Value pair because key == null || key.length == 0, key = " + + Arrays.toString(key)); + return null; + } else if (key[0] != 'z') { + logger.warn("skip Key-Value pair because key[0] != 'z', key = " + Arrays.toString(key)); + return null; + } + return ByteString.copyFrom(key, 1, key.length - 1); + } + + @Override + public ByteString decodeValue(byte[] value) { + if (!ttlEnabled) { + return ByteString.copyFrom(value); + } else { + return ByteString.copyFrom(value).substring(0, value.length - 8); + } + } +} diff --git a/sst-data-source/src/main/java/org/tikv/datasources/br/SSTDecoder.java b/sst-data-source/src/main/java/org/tikv/datasources/br/SSTDecoder.java new file mode 100644 index 00000000..a675ae68 --- /dev/null +++ b/sst-data-source/src/main/java/org/tikv/datasources/br/SSTDecoder.java @@ -0,0 +1,93 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.datasources.br; + +import com.google.protobuf.ByteString; +import java.util.Iterator; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDBException; +import org.rocksdb.SstFileReader; +import org.rocksdb.SstFileReaderIterator; +import org.tikv.common.util.Pair; + +public class SSTDecoder { + private final String filePath; + private final KVDecoder kvDecoder; + private final Options options; + private final ReadOptions readOptions; + + private SstFileReader sstFileReader; + private SstFileReaderIterator iterator; + + public SSTDecoder(String sstFilePath, KVDecoder kvDecoder) { + this.filePath = sstFilePath; + this.kvDecoder = kvDecoder; + this.options = new Options(); + this.readOptions = new ReadOptions(); + } + + public SSTDecoder( + String filePath, KVDecoder kvDecoder, Options options, ReadOptions readOptions) { + this.filePath = filePath; + this.kvDecoder = kvDecoder; + this.options = options; + this.readOptions = readOptions; + } + + public synchronized Iterator> getIterator() throws RocksDBException { + if (sstFileReader != null || iterator != null) { + throw new RocksDBException("File already opened!"); + } + + sstFileReader = new SstFileReader(new Options()); + sstFileReader.open(filePath); + iterator = sstFileReader.newIterator(new ReadOptions()); + return new SSTIterator(iterator, kvDecoder); + } + + public synchronized void close() { + try { + if (iterator != null) { + iterator.close(); + } + } finally { + iterator = null; + } + + try { + if (sstFileReader != null) { + sstFileReader.close(); + } + } finally { + sstFileReader = null; + } + } + + public String getFilePath() { + return filePath; + } + + public Options getOptions() { + return options; + } + + public ReadOptions getReadOptions() { + return readOptions; + } +} diff --git a/sst-data-source/src/main/java/org/tikv/datasources/br/SSTIterator.java b/sst-data-source/src/main/java/org/tikv/datasources/br/SSTIterator.java new file mode 100644 index 00000000..39f8027f --- /dev/null +++ b/sst-data-source/src/main/java/org/tikv/datasources/br/SSTIterator.java @@ -0,0 +1,64 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.datasources.br; + +import com.google.protobuf.ByteString; +import java.util.Iterator; +import org.rocksdb.SstFileReaderIterator; +import org.tikv.common.util.Pair; + +public class SSTIterator implements Iterator> { + private final SstFileReaderIterator iterator; + private final KVDecoder kvDecoder; + + private Pair nextPair; + + public SSTIterator(SstFileReaderIterator iterator, KVDecoder kvDecoder) { + this.iterator = iterator; + this.kvDecoder = kvDecoder; + this.iterator.seekToFirst(); + this.nextPair = processNext(); + } + + @Override + public boolean hasNext() { + return nextPair != null; + } + + @Override + public Pair next() { + Pair result = nextPair; + nextPair = processNext(); + return result; + } + + private Pair processNext() { + if (iterator.isValid()) { + ByteString key = kvDecoder.decodeKey(iterator.key()); + ByteString value = kvDecoder.decodeValue(iterator.value()); + iterator.next(); + if (key != null) { + return Pair.create(key, value); + } else { + return processNext(); + } + } else { + return null; + } + } +} diff --git a/sst-data-source/src/main/scala/org/tikv/datasources/sst/SSTPartitionReader.scala b/sst-data-source/src/main/scala/org/tikv/datasources/sst/SSTPartitionReader.scala index 65f28e89..7b78a12f 100644 --- a/sst-data-source/src/main/scala/org/tikv/datasources/sst/SSTPartitionReader.scala +++ b/sst-data-source/src/main/scala/org/tikv/datasources/sst/SSTPartitionReader.scala @@ -19,11 +19,12 @@ package org.tikv.datasources.sst import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.connector.read.PartitionReader -import org.tikv.br.{BackupDecoder, SSTDecoder} +import org.tikv.datasources.br +import org.tikv.datasources.br.SSTDecoder import java.io.File -class SSTPartitionReader(sstFilePath: String, backupDecoder: BackupDecoder) +class SSTPartitionReader(sstFilePath: String, backupDecoder: br.BackupDecoder) extends PartitionReader[InternalRow] { private val sstDecoder: SSTDecoder = backupDecoder.decodeSST(sstFilePath) private val iterator = sstDecoder.getIterator diff --git a/sst-data-source/src/main/scala/org/tikv/datasources/sst/SSTPartitionReaderFactory.scala b/sst-data-source/src/main/scala/org/tikv/datasources/sst/SSTPartitionReaderFactory.scala index 237c1f2b..eba40375 100644 --- a/sst-data-source/src/main/scala/org/tikv/datasources/sst/SSTPartitionReaderFactory.scala +++ b/sst-data-source/src/main/scala/org/tikv/datasources/sst/SSTPartitionReaderFactory.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.datasources.v2.{ } import org.apache.spark.util.SerializableConfiguration import org.slf4j.LoggerFactory -import org.tikv.br.BackupDecoder +import org.tikv.datasources.br.BackupDecoder import java.io.File diff --git a/sst-data-source/src/main/scala/org/tikv/datasources/sst/SSTScan.scala b/sst-data-source/src/main/scala/org/tikv/datasources/sst/SSTScan.scala index 32072dbf..3146de6d 100644 --- a/sst-data-source/src/main/scala/org/tikv/datasources/sst/SSTScan.scala +++ b/sst-data-source/src/main/scala/org/tikv/datasources/sst/SSTScan.scala @@ -27,7 +27,9 @@ import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration import org.slf4j.LoggerFactory -import org.tikv.br.{BackupDecoder, BackupMetaDecoder} +import org.tikv.datasources.br.BackupDecoder +import org.tikv.datasources.br +import org.tikv.datasources.br.BackupMetaDecoder import java.io.File import scala.collection.convert.ImplicitConversions.`map AsScala` @@ -69,7 +71,7 @@ case class SSTScan( val backupMetaFilePath = downloadBackupMeta(path, hadoopConf) val backupMetaDecoder = BackupMetaDecoder.parse(backupMetaFilePath) val ttlEnabled = options.getBoolean(SSTDataSource.ENABLE_TTL, SSTDataSource.DEF_ENABLE_TTL) - val backupDecoder: BackupDecoder = + val backupDecoder: br.BackupDecoder = new BackupDecoder(backupMetaDecoder.getBackupMeta, ttlEnabled) val broadcastedBackupDecoder = sparkSession.sparkContext.broadcast(backupDecoder) SSTPartitionReaderFactory(broadcastedConf, broadcastedBackupDecoder)