Skip to content

Commit

Permalink
Pipe: add compression level config for connector ZSTD compressor (apa…
Browse files Browse the repository at this point in the history
…che#12630)

Co-authored-by: Steve Yurong Su <[email protected]>
  • Loading branch information
2 people authored and chrisdutz committed Jun 7, 2024
1 parent 7f4769d commit 3fb5818
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.MultiEnvFactory;
Expand All @@ -36,11 +38,17 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.fail;

@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeConnectorCompressionIT extends AbstractPipeDualAutoIT {
Expand Down Expand Up @@ -179,4 +187,122 @@ private void doTest(
Collections.singleton("8,"));
}
}

@Test
public void testZstdCompressorLevel() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s1) values (1, 1)",
"insert into root.db.d1(time, s2) values (1, 1)",
"insert into root.db.d1(time, s3) values (1, 1)",
"insert into root.db.d1(time, s4) values (1, 1)",
"insert into root.db.d1(time, s5) values (1, 1)",
"flush"))) {
return;
}

// Create 5 pipes with different zstd compression levels, p4 and p5 should fail.

try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe p1"
+ " with extractor ('extractor.pattern'='root.db.d1.s1')"
+ " with connector ("
+ "'connector.ip'='%s',"
+ "'connector.port'='%s',"
+ "'connector.compressor'='zstd, zstd',"
+ "'connector.compressor.zstd.level'='3')",
receiverIp, receiverPort));
} catch (SQLException e) {
e.printStackTrace();
fail(e.getMessage());
}

try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe p2"
+ " with extractor ('extractor.pattern'='root.db.d1.s2')"
+ " with connector ("
+ "'connector.ip'='%s',"
+ "'connector.port'='%s',"
+ "'connector.compressor'='zstd, zstd',"
+ "'connector.compressor.zstd.level'='22')",
receiverIp, receiverPort));
} catch (SQLException e) {
e.printStackTrace();
fail(e.getMessage());
}

try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe p3"
+ " with extractor ('extractor.pattern'='root.db.d1.s3')"
+ " with connector ("
+ "'connector.ip'='%s',"
+ "'connector.port'='%s',"
+ "'connector.compressor'='zstd, zstd',"
+ "'connector.compressor.zstd.level'='-131072')",
receiverIp, receiverPort));
} catch (SQLException e) {
e.printStackTrace();
fail(e.getMessage());
}

try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe p4"
+ " with extractor ('extractor.pattern'='root.db.d1.s4')"
+ " with connector ("
+ "'connector.ip'='%s',"
+ "'connector.port'='%s',"
+ "'connector.compressor'='zstd, zstd',"
+ "'connector.compressor.zstd.level'='-131073')",
receiverIp, receiverPort));
fail();
} catch (SQLException e) {
// Make sure the error message in IoTDBConnector.java is returned
Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range"));
}

try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe p5"
+ " with extractor ('extractor.pattern'='root.db.d1.s5')"
+ " with connector ("
+ "'connector.ip'='%s',"
+ "'connector.port'='%s',"
+ "'connector.compressor'='zstd, zstd',"
+ "'connector.compressor.zstd.level'='23')",
receiverIp, receiverPort));
fail();
} catch (SQLException e) {
// Make sure the error message in IoTDBConnector.java is returned
Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range"));
}

final List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
Assert.assertEquals(3, showPipeResult.size());

TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "count timeseries", "count(timeseries),", Collections.singleton("3,"));
}
}
}
4 changes: 4 additions & 0 deletions iotdb-core/node-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-jexl3</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;

import com.github.luben.zstd.Zstd;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -177,6 +179,14 @@ public class PipeConnectorConstant {
CONNECTOR_COMPRESSOR_ZSTD,
CONNECTOR_COMPRESSOR_LZMA2)));

public static final String CONNECTOR_COMPRESSOR_ZSTD_LEVEL_KEY =
"connector.compressor.zstd.level";
public static final String SINK_COMPRESSOR_ZSTD_LEVEL_KEY = "sink.compressor.zstd.level";
public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_DEFAULT_VALUE =
Zstd.defaultCompressionLevel();
public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE = Zstd.minCompressionLevel();
public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE = Zstd.maxCompressionLevel();

public static final String CONNECTOR_RATE_LIMIT_KEY = "connector.rate-limit-bytes-per-second";
public static final String SINK_RATE_LIMIT_KEY = "sink.rate-limit-bytes-per-second";
public static final double CONNECTOR_RATE_LIMIT_DEFAULT_VALUE = -1;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.commons.pipe.connector.compressor;

public class PipeCompressorConfig {

private final String name;
private final int zstdCompressionLevel;

public PipeCompressorConfig(String name, int zstdCompressionLevel) {
this.name = name;
this.zstdCompressionLevel = zstdCompressionLevel;
}

public String getName() {
return name;
}

public int getZstdCompressionLevel() {
return zstdCompressionLevel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,50 +19,86 @@

package org.apache.iotdb.commons.pipe.connector.compressor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_GZIP;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_LZ4;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_LZMA2;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_SNAPPY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD_LEVEL_DEFAULT_VALUE;

public class PipeCompressorFactory {

private static Map<String, PipeCompressor> COMPRESSOR_NAME_TO_INSTANCE = new HashMap<>();
private static final Logger LOGGER = LoggerFactory.getLogger(PipeCompressorFactory.class);

private static final Map<String, PipeCompressor> COMPRESSOR_NAME_TO_INSTANCE =
new ConcurrentHashMap<>();

static {
COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_SNAPPY, new PipeSnappyCompressor());
COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_GZIP, new PipeGZIPCompressor());
COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_LZ4, new PipeLZ4Compressor());
COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_ZSTD, new PipeZSTDCompressor());
COMPRESSOR_NAME_TO_INSTANCE.put(
CONNECTOR_COMPRESSOR_ZSTD,
new PipeZSTDCompressor(CONNECTOR_COMPRESSOR_ZSTD_LEVEL_DEFAULT_VALUE));
COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_LZMA2, new PipeLZMA2Compressor());
COMPRESSOR_NAME_TO_INSTANCE = Collections.unmodifiableMap(COMPRESSOR_NAME_TO_INSTANCE);
}

public static PipeCompressor getCompressor(String name) {
final PipeCompressor compressor = COMPRESSOR_NAME_TO_INSTANCE.get(name);
if (compressor == null) {
throw new UnsupportedOperationException("PipeCompressor not found for name: " + name);
public static PipeCompressor getCompressor(PipeCompressorConfig config) {
if (config == null) {
throw new IllegalArgumentException("PipeCompressorConfig is null");
}
return compressor;
if (config.getName() == null) {
throw new IllegalArgumentException("PipeCompressorConfig.getName() is null");
}

final String compressorName = config.getName();

// For ZSTD compressor, we need to consider the compression level
if (compressorName.equals(CONNECTOR_COMPRESSOR_ZSTD)) {
final int zstdCompressionLevel = config.getZstdCompressionLevel();
return COMPRESSOR_NAME_TO_INSTANCE.computeIfAbsent(
CONNECTOR_COMPRESSOR_ZSTD + "_" + zstdCompressionLevel,
key -> {
LOGGER.info("Create new PipeZSTDCompressor with level: {}", zstdCompressionLevel);
return new PipeZSTDCompressor(zstdCompressionLevel);
});
}

// For other compressors, we can directly get the instance by name
final PipeCompressor compressor = COMPRESSOR_NAME_TO_INSTANCE.get(compressorName);
if (compressor != null) {
return compressor;
}

throw new UnsupportedOperationException("PipeCompressor not found for name: " + compressorName);
}

private static Map<Byte, PipeCompressor> COMPRESSOR_INDEX_TO_INSTANCE = new HashMap<>();

static {
COMPRESSOR_INDEX_TO_INSTANCE.put(
PipeCompressor.PipeCompressionType.SNAPPY.getIndex(), new PipeSnappyCompressor());
PipeCompressor.PipeCompressionType.SNAPPY.getIndex(),
COMPRESSOR_NAME_TO_INSTANCE.get(CONNECTOR_COMPRESSOR_SNAPPY));
COMPRESSOR_INDEX_TO_INSTANCE.put(
PipeCompressor.PipeCompressionType.GZIP.getIndex(), new PipeGZIPCompressor());
PipeCompressor.PipeCompressionType.GZIP.getIndex(),
COMPRESSOR_NAME_TO_INSTANCE.get(CONNECTOR_COMPRESSOR_GZIP));
COMPRESSOR_INDEX_TO_INSTANCE.put(
PipeCompressor.PipeCompressionType.LZ4.getIndex(), new PipeLZ4Compressor());
PipeCompressor.PipeCompressionType.LZ4.getIndex(),
COMPRESSOR_NAME_TO_INSTANCE.get(CONNECTOR_COMPRESSOR_LZ4));
COMPRESSOR_INDEX_TO_INSTANCE.put(
PipeCompressor.PipeCompressionType.ZSTD.getIndex(), new PipeZSTDCompressor());
PipeCompressor.PipeCompressionType.ZSTD.getIndex(),
COMPRESSOR_NAME_TO_INSTANCE.get(CONNECTOR_COMPRESSOR_ZSTD));
COMPRESSOR_INDEX_TO_INSTANCE.put(
PipeCompressor.PipeCompressionType.LZMA2.getIndex(), new PipeLZMA2Compressor());
PipeCompressor.PipeCompressionType.LZMA2.getIndex(),
COMPRESSOR_NAME_TO_INSTANCE.get(CONNECTOR_COMPRESSOR_LZMA2));
COMPRESSOR_INDEX_TO_INSTANCE = Collections.unmodifiableMap(COMPRESSOR_INDEX_TO_INSTANCE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,26 @@

package org.apache.iotdb.commons.pipe.connector.compressor;

import org.apache.tsfile.compress.ICompressor;
import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import com.github.luben.zstd.Zstd;

import java.io.IOException;

public class PipeZSTDCompressor extends PipeCompressor {

private static final ICompressor COMPRESSOR = ICompressor.getCompressor(CompressionType.ZSTD);
private static final IUnCompressor DECOMPRESSOR =
IUnCompressor.getUnCompressor(CompressionType.ZSTD);
private final int compressionLevel;

public PipeZSTDCompressor() {
public PipeZSTDCompressor(int compressionLevel) {
super(PipeCompressionType.ZSTD);
this.compressionLevel = compressionLevel;
}

@Override
public byte[] compress(byte[] data) throws IOException {
return COMPRESSOR.compress(data);
return Zstd.compress(data, compressionLevel);
}

@Override
public byte[] decompress(byte[] byteArray) throws IOException {
return DECOMPRESSOR.uncompress(byteArray);
public byte[] decompress(byte[] byteArray) {
return Zstd.decompress(byteArray, (int) Zstd.decompressedSize(byteArray, 0, byteArray.length));
}
}
Loading

0 comments on commit 3fb5818

Please sign in to comment.