Skip to content

Commit

Permalink
[to #519] move br component from tikv java client to this repo (tikv#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
marsishandsome authored Mar 23, 2022
1 parent 9afacac commit 57ff45e
Show file tree
Hide file tree
Showing 14 changed files with 398 additions and 9 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# ignore maven generated files
dependency-reduced-pom.xml

#ignore idea configuration
.idea
*.iml
Expand Down Expand Up @@ -34,4 +37,4 @@ target
hs_err_pid*

# ignore Mac files
.DS_Store
.DS_Store
2 changes: 1 addition & 1 deletion online-bulk-load/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mvn clean package -DskipTests -am -pl online-bulk-load
```
spark-submit \
--master local[*] \
--jars /path/to/tikv-client-java-3.2.0-SNAPSHOT.jar \
--jars /path/to/tikv-client-java-3.3.0-SNAPSHOT.jar \
--class org.tikv.bulkload.example.BulkLoadExample \
online-bulk-load/target/online-bulk-load-0.0.1-SNAPSHOT.jar \
<pdaddr> <key_prefix> <data_size> <partition_nums>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
<dependency>
<groupId>org.tikv</groupId>
<artifactId>tikv-client-java</artifactId>
<version>3.2.0-SNAPSHOT</version>
<version>3.3.0-SNAPSHOT</version>
</dependency>

<dependency>
Expand Down
2 changes: 1 addition & 1 deletion sst-data-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ br backup raw \
```
spark-submit \
--master local[*] \
--jars /path/to/tikv-client-java-3.2.0-SNAPSHOT.jar \
--jars /path/to/tikv-client-java-3.3.0-SNAPSHOT.jar \
--class org.tikv.datasources.sst.example.SSTDataSourceExample \
sst-data-source/target/sst-data-source-0.0.1-SNAPSHOT.jar \
hdfs:///path/to/sst/
Expand Down
36 changes: 36 additions & 0 deletions sst-data-source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@
<packaging>jar</packaging>
<name>Spark SST Data Source</name>

<properties>
<rocksdb.version>6.22.1.1</rocksdb.version>
</properties>

<dependencies>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>${rocksdb.version}</version>
</dependency>
</dependencies>

<build>
<extensions>
<extension>
Expand All @@ -36,6 +48,30 @@
</extension>
</extensions>
<plugins>
<!--- Needs to shade Protobuf 3 since other projects might use other version -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<artifactSet>
<includes>
<include>org.rocksdb:rocksdbjni</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
<!-- Scala Format Plug-in -->
<plugin>
<groupId>org.antipathy</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import java.io.Serializable;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.tikv.common.exception.SSTDecodeException;
import org.tikv.kvproto.Brpb;

public class BackupDecoder implements Serializable {
private final Brpb.BackupMeta backupMeta;
private final boolean ttlEnabled;
private final KVDecoder kvDecoder;

public BackupDecoder(Brpb.BackupMeta backupMeta) throws SSTDecodeException {
this.backupMeta = backupMeta;
this.ttlEnabled = false;
this.kvDecoder = initKVDecoder();
}

public BackupDecoder(Brpb.BackupMeta backupMeta, boolean ttlEnabled) throws SSTDecodeException {
this.backupMeta = backupMeta;
this.ttlEnabled = ttlEnabled;
this.kvDecoder = initKVDecoder();
}

private KVDecoder initKVDecoder() throws SSTDecodeException {
if (backupMeta.getIsRawKv()) {
if ("V1".equals(backupMeta.getApiVersion().name())) {
return new RawKVDecoderV1(ttlEnabled);
} else {
throw new SSTDecodeException(
"does not support decode APIVersion " + backupMeta.getApiVersion().name());
}
} else {
throw new SSTDecodeException("TxnKV is not supported yet!");
}
}

public org.tikv.datasources.br.SSTDecoder decodeSST(String sstFilePath) {
return decodeSST(sstFilePath, new Options(), new ReadOptions());
}

public org.tikv.datasources.br.SSTDecoder decodeSST(String sstFilePath, Options options, ReadOptions readOptions) {
return new SSTDecoder(sstFilePath, kvDecoder, options, readOptions);
}

public Brpb.BackupMeta getBackupMeta() {
return backupMeta;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.tikv.kvproto.Brpb;

public class BackupMetaDecoder {
private final Brpb.BackupMeta backupMeta;

public BackupMetaDecoder(byte[] data) throws org.tikv.shade.com.google.protobuf.InvalidProtocolBufferException {
this.backupMeta = Brpb.BackupMeta.parseFrom(data);
}

public Brpb.BackupMeta getBackupMeta() {
return backupMeta;
}

public static BackupMetaDecoder parse(String backupMetaFilePath) throws IOException {
byte[] data = Files.readAllBytes(new File(backupMetaFilePath).toPath());
return new BackupMetaDecoder(data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import com.google.protobuf.ByteString;
import java.io.Serializable;

public interface KVDecoder extends Serializable {
ByteString decodeKey(byte[] key);

ByteString decodeValue(byte[] value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import com.google.protobuf.ByteString;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RawKVDecoderV1 implements KVDecoder {
private static final Logger logger = LoggerFactory.getLogger(SSTIterator.class);

private final boolean ttlEnabled;

public RawKVDecoderV1(boolean ttlEnabled) {
this.ttlEnabled = ttlEnabled;
}

@Override
public ByteString decodeKey(byte[] key) {
if (key == null || key.length == 0) {
logger.warn(
"skip Key-Value pair because key == null || key.length == 0, key = "
+ Arrays.toString(key));
return null;
} else if (key[0] != 'z') {
logger.warn("skip Key-Value pair because key[0] != 'z', key = " + Arrays.toString(key));
return null;
}
return ByteString.copyFrom(key, 1, key.length - 1);
}

@Override
public ByteString decodeValue(byte[] value) {
if (!ttlEnabled) {
return ByteString.copyFrom(value);
} else {
return ByteString.copyFrom(value).substring(0, value.length - 8);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import com.google.protobuf.ByteString;
import java.util.Iterator;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileReader;
import org.rocksdb.SstFileReaderIterator;
import org.tikv.common.util.Pair;

public class SSTDecoder {
private final String filePath;
private final KVDecoder kvDecoder;
private final Options options;
private final ReadOptions readOptions;

private SstFileReader sstFileReader;
private SstFileReaderIterator iterator;

public SSTDecoder(String sstFilePath, KVDecoder kvDecoder) {
this.filePath = sstFilePath;
this.kvDecoder = kvDecoder;
this.options = new Options();
this.readOptions = new ReadOptions();
}

public SSTDecoder(
String filePath, KVDecoder kvDecoder, Options options, ReadOptions readOptions) {
this.filePath = filePath;
this.kvDecoder = kvDecoder;
this.options = options;
this.readOptions = readOptions;
}

public synchronized Iterator<Pair<ByteString, ByteString>> getIterator() throws RocksDBException {
if (sstFileReader != null || iterator != null) {
throw new RocksDBException("File already opened!");
}

sstFileReader = new SstFileReader(new Options());
sstFileReader.open(filePath);
iterator = sstFileReader.newIterator(new ReadOptions());
return new SSTIterator(iterator, kvDecoder);
}

public synchronized void close() {
try {
if (iterator != null) {
iterator.close();
}
} finally {
iterator = null;
}

try {
if (sstFileReader != null) {
sstFileReader.close();
}
} finally {
sstFileReader = null;
}
}

public String getFilePath() {
return filePath;
}

public Options getOptions() {
return options;
}

public ReadOptions getReadOptions() {
return readOptions;
}
}
Loading

0 comments on commit 57ff45e

Please sign in to comment.