Skip to content

Commit

Permalink
Mergemaster0620 (#12779)
Browse files Browse the repository at this point in the history
* [IOTDB-6336] Add max retry time duration and whether to retry for unknown errors configurations

* Optimize error message when creating unsupported data type (#12650)

Optimize error message when creating unsupported data type

* Pipe: fix uninitialized config when validate PipeHistoricalDataRegionTsFileExtractor parameters & Subscription: avoid poll messages from unsubscribed topics (#12648)

* Pipe: fix uninitialized config when validate PipeHistoricalDataRegionTsFileExtractor parameters

* Subscription: avoid poll messages from unsubscribed topics

* Fix region migration's timeout judgement #12639

* fix compaction recover skip negative time partition (#12637)

* Fix Concurrency Instability When IoTConsensus LogDispatcher Exits #12655

* JDBC: Fix IndexOutOfBoundsException when calling getBytes (#12662)

* JDBC: Fix ArrayOutOfBoundException when calling getBytes

* Update JDBCCharsetExample.java

* [IOTDB-6335] Redundant rows when using GROUP BY TIME with LIMIT

* Pipe: Enhance error handling logic in pipe async connector's handlers (#12669)

* Print log when read resource error (#12666)

* Load: Make load SQL option VERIFY can only be set to true (#12670)

* Pipe: Improved hybrid algorithm and avoid the caculations of tsfiles in connector pending queue tsfiles to block the forwarding of realtime requests (#12668)

* Migrate to Develocity Maven Extension (#12658)

* Migrate to Develocity Maven Extension
* Update to Common Custom User Data Maven Extension 2.0

* Update import schema logic implementation and optimize -help display information (#12643)

* add import-schema.sh/bat and export-schema.sh/bat in cli/tools

* fix import schema

* fix import schema

---------

Co-authored-by: 2b3c511 <[email protected]>

* Subscription: fix topic now timestamp precision (#12663)

* [IOTDB-6337] Refine the count calculation in RegionScan framework

* Bump version of master branch to 1.3.3-SNAPSHOT (#12672)

* Pipe: Reduce warn log when PipeConfigRegionExtractorMetrics does not exist (#12673)

* [IOTDB-6061] Fix the instability failure caused by initServer in IoTConsensus UT not binding to the corresponding port (#12674)

* fix concurrent

* fix concurrent

* Load: Make load SQL option VERIFY can only be set to true (A follow-up fix for #12670) (#12676)

* Pipe: add compression level config for connector ZSTD compressor (#12630)

Co-authored-by: Steve Yurong Su <[email protected]>

* [PY-client] Unify setup.py and requirement.txt (#12601)

* Pipe IT: Ignore failed cases caused by cluster restart failure (#12678)

* update import and export data -help description (#12677)

Co-authored-by: 2b3c511 <[email protected]>

* Pipe: Fix potential NPE when lastEvent == null in PipeConnectorSubtask (#12680)

* Fix wrong calculation for maxBytesCanReserve for IdentitySinkOperator

* Change floating point encoding method in IT

* Throw `IndexOutOfBound` exception in row implementation.

* Thread safely SeriesPartitionTable (#12679)

* finish

* use concurrentskiplist

* bug fix

---------

Co-authored-by: OneSizeFitQuorum <[email protected]>

* Pipe: Refactor some features for user access (#12686)

* Set timestamp precision same as time column (#12681)

* done

* done

* rollback change

* fix IT

* fix IT

* Set configuration on node (#12626)

Co-authored-by: Haonan <[email protected]>

* Fix cli report error when ignoreTimestamp is true (#12691)

* done

* done

* rollback change

* fix IT

* fix IT

* done

* invoke ci

* done

* invoke ci

* Load: Add check for BufferUnderflowException (#12690)

Co-authored-by: xz m <[email protected]>

* remove excess stack

* Fix new UDTF execution framework dead loop error.

* Perfect methods of IAnalysis to adapt the write process of table model

* Load: Support storing piecenode in multiple folders during 2nd phase (#12675)

Co-authored-by: Steve Yurong Su <[email protected]>

* Pipe: add decompressed length in RPC compression payload to avoid potential OOM on receiver (#12701)

* Load: Update loadTsFileDirs after all newLoadTsFileDirs are generated to avoid undefined behavior (#12712)

* Pipe: Fixed NPE in schema pattern parsing & Improved connector subtask retry logic and logs & Fixed the executor used by consensus pipe (#12704)

* Procedure: Fix ProcedureInfo on CN leader may decrease its procId (#12711)

* Fix missing of linux memory metrics (#12713)

* fix missing memory metric

* remove the print of type

* update import and export tsfile description (#12684)

Co-authored-by: 2b3c511 <[email protected]>

* Subscription: skip on setup and cluster failure when running subscription restart IT & fix some bugs in SubscriptionExecutorServiceManager (#12710)

* Update pom.xml to fix security issue (#12705)

Co-authored-by: Christofer Dutz <[email protected]>

* Rename IoTV2Consensus to IoTConsensusV2 (#12715)

Signed-off-by: OneSizeFitQuorum <[email protected]>

* Fix NumberFormatException when upgrade from old version #12719

* Replenish effective mode in iotdb-system.properties (#12706)

* load-tsfile script: disable redirection & load: handle exceptions using the Analysis objects instead of throwing exceptions & pipe: handle SYSTEM_READ_ONLY correctly on receiver side (#12716)

* Subscription: improve parsing logic when using JAVA SDK client & refactor subscription IT & intro `getSubscribedTopicNames` API (#12721)

* Pipe: Fix schema events can not report & Fix delete data events in data regions may fail to mark at schema metrics (#12722)

* Pipe: Support `"source.history.loose-range" = "path"` in iotdb-source  (#12651)

Co-authored-by: Steve Yurong Su <[email protected]>

* [IOTDB-6338] Fix wrong query result while using some value filter with LIMIT/OFFSET

* Pipe: fix threads of IoTDB-Pipe-Processor-Executor-Pool stucked by PipeTsFileInsertionEvent#waitForTsFileClose (#12727)

* [ISSUE 12499] Reject query with massive time span in limited memroy

* Fix/include thrift lib and headers in cpp client (#12734)

* fix: Hopefully fixed the build of the cpp-example on windows systems

* fix: Increase the timeout even more to finally make the build succeed.

* fix: Fix the build.

* chore: Fixed the assembly descriptor of the cpp-client to include the thrift libs and header files.

* Pipe / Load / Subscription: Support new TsDataTypes (STRING / BLOB / TIMESTAMP / DATE) (#12665)

* Optimize the log printing when multiple error occur (#12732)

* Update packege hierachy in Client-cpp readme (#12736)

* Support agg(*) query in templated align by device situation

* fix config node use IoTDBDescriptor (#12730)

* PipeConsensus: Delete WAL after dropping database & fix receiver directory recovery (#12738)

* fix wal deletion & create receiver dir

* update consensus protocol class

* merge judge

* modify directory path

* modify directory path

* Add instructions of compiling client-cpp with VS2019. (#12739)

* Subscription: adapt pipe completion signal for automatically drop subscription (#12724)

* Verify connection && Refactor async connection framework (#12667)

* PipeConsensus: add metrics and fix some bugs for pipeConsensus (#12723)

* Decrease TTL Deletion in compaction modification cache (#12687)

* decrease TTL Deletion in compaction modification cache

* Update iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java

Co-authored-by: Jiang Tian <[email protected]>

* modify MultiTsFileDeviceIterator

* fix spotless

---------

Co-authored-by: Jiang Tian <[email protected]>

* Stabilize verify connection IT  (#12745)

* done

* more !

* Fix compaction npe when set modified status (#12729)

* fix compaction npe when set modified status

* check empty page

* Subscription IT: execute jstack for potential stuck test cases (#12728)

* Pipe: Smoothed the rate in pipe's remaining time calculations (#12699)

* Pipe: Fix PipeDataRegionEventCounter not work in PipeRealtimeDataRegionExtractor (#12747)

* Fix python CI (#12754)

* Support show device ttl on path pattern (#12709)

* support show ttl on pathPattern

* fix UT

* modify details

* Support WAL Compression (#12476)

* enable wal compression

remove metrics in mem table flush task, cache hash code in partial path, use gzip to compress wal

batch update metrics

* fix bug

* fix compilation problem

* remove useless code

* recover some code

* support compression type in WAL Compress Header

* support multi version WAL

* edit configuration item

* add log for WAL size

* temp for debug

* fix bug

* remove useless log

* remove one configuration

* use compression rate to update wal disk usage

* fix ut

* fix test

* set default to uncompress

* fix wal ut

* optimize calculating of wal size

* close wal file when the origin size of wal buffer is larger than threshold

* add the size of magic string

* may be fix the bug

* fix with comment

* edit with review

* fix test

* add test for wal compression

* add hot reload

* clean the code to make it more readable

* reuse the byte buffer if possible

* Indicate the encoding of String

* Edit according to comment

* spotless

* Optimize the devicePath generation efficiency of PlanNode when deserializing (#12749)

* Pipe: Support "source.realtime.loose-range" = "path" in iotdb-source (#12751)

* Support active schema query (#12718)

* Check duplicate in start-node and stop-node scripts (#12756)

* check duplicate key in start node script

* modify stop node scripts

* use \n as line separator

* fix config node send showConfiguration to config node classCastException

* [IOTDB-6339] Optimize the time slice control of SeriesScanOperator and AlignedSeriesScanOperator

* Optimize node startup process & Normalize system.properties management (#12752)

* [IOTDB-6340] Clear Coordinator.queryExecutionMap while encountering MemoryNotEnough Exception

* [IOTDB-6342] Clear FragmentInstanceManager.dataNodeQueryContextMap while encountering MemoryNotEnough Exception (#12762)

* Refactor node registration check (#12766)

* done

* done

* Fix overlap between tsfile is not correctly marked (#12748)

* file fileOverlap is not correctly marked

* fix repair compaction may generate too large chunk

* Subscription: support topic loose range for path and time  (#12760)

* Add clean logic for FragmentInstance in case that callback is not added. (#12768)

* SystemPropertiesHandler use Files::move instead of File::renameTo (#12770)

* Files.move

* ?

* ?

* ?

* Pipe: Filter devices by pattern before reading device metadata from TsFile (#12765)

Currently we read the metadata of all devices and measurements when constructing a TsFileInsertionDataContainer. This is both time-consuming and memory-wasting if only a few devices match the pattern. This PR filters the devices by pattern before reading the metadata of devices and measurements, saving memory and I/O cost when there are many unmatched devices.

Note: this only works when TsFile metadata are not cached (due to high memory usage of TsFile metadata cache), because cached metadata can not be filtered so that pipes with arbitrary patterns can use it.

---------

Co-authored-by: Steve Yurong Su <[email protected]>

* Pipe: Adjusted some loggers of metrics to avoid unnecessary warns & Include "lastEvent" into pipe's event count metrics & Removed the "userConflict" judgment to data sync failure caused by METADATA_ERROR (#12758)

* Synchronize takeSnapshotAsync on RaftGroupID (#12767)

* SystemPropertiesHandler close reader and writer #12772

* Pipe Consensus: Fix events in connector are not in order because of parameter `'realtime-first' = 'true'` (#12773)

* Pipe: Avoid event being continuously rate-limited even its pipe is dropped (#12753)

Co-authored-by: Steve Yurong Su <[email protected]>

* Load: Add some load metrics of time cost, write point and disk throughput (#12735)

Co-authored-by: Steve Yurong Su <[email protected]>

---------

Signed-off-by: OneSizeFitQuorum <[email protected]>
Co-authored-by: Chen YZ <[email protected]>
Co-authored-by: V_Galaxy <[email protected]>
Co-authored-by: Li Yu Heng <[email protected]>
Co-authored-by: shuwenwei <[email protected]>
Co-authored-by: Xiangpeng Hu <[email protected]>
Co-authored-by: Steve Yurong Su <[email protected]>
Co-authored-by: YangCaiyin <[email protected]>
Co-authored-by: Haonan <[email protected]>
Co-authored-by: Caideyipi <[email protected]>
Co-authored-by: Clay Johnson <[email protected]>
Co-authored-by: Summer <[email protected]>
Co-authored-by: 2b3c511 <[email protected]>
Co-authored-by: Zikun Ma <[email protected]>
Co-authored-by: Liao Lanyu <[email protected]>
Co-authored-by: Zhihao Shen <[email protected]>
Co-authored-by: Yongzao <[email protected]>
Co-authored-by: OneSizeFitQuorum <[email protected]>
Co-authored-by: ppppoooo <[email protected]>
Co-authored-by: xz m <[email protected]>
Co-authored-by: CritasWang <[email protected]>
Co-authored-by: Beyyes <[email protected]>
Co-authored-by: Itami Sho <[email protected]>
Co-authored-by: ZhangHongYin <[email protected]>
Co-authored-by: Christofer Dutz <[email protected]>
Co-authored-by: liuminghui233 <[email protected]>
Co-authored-by: Jiang Tian <[email protected]>
Co-authored-by: yschengzi <[email protected]>
Co-authored-by: Peng Junzhi <[email protected]>
Co-authored-by: 周沛辰 <[email protected]>
Co-authored-by: Liu Xuxin <[email protected]>
Co-authored-by: William Song <[email protected]>
  • Loading branch information
1 parent 625269b commit 0343fc2
Show file tree
Hide file tree
Showing 518 changed files with 15,742 additions and 6,248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,28 @@ public class CountPointProcessor implements PipeProcessor {
private PartialPath aggregateSeries;

@Override
public void validate(PipeParameterValidator validator) {
public void validate(final PipeParameterValidator validator) {
validator.validateRequiredAttribute(AGGREGATE_SERIES_KEY);
}

@Override
public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
public void customize(
final PipeParameters parameters, final PipeProcessorRuntimeConfiguration configuration)
throws Exception {
this.aggregateSeries = new PartialPath(parameters.getString(AGGREGATE_SERIES_KEY));
}

@Override
public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) {
public void process(
final TabletInsertionEvent tabletInsertionEvent, final EventCollector eventCollector) {
tabletInsertionEvent.processTablet(
(tablet, rowCollector) -> writePointCount.addAndGet(tablet.rowSize));
}

@Override
public void process(Event event, EventCollector eventCollector) throws Exception {
public void process(final Event event, final EventCollector eventCollector) throws Exception {
if (event instanceof PipeHeartbeatEvent) {
Tablet tablet =
final Tablet tablet =
new Tablet(
aggregateSeries.getIDeviceID().toString(),
Collections.singletonList(
Expand All @@ -73,7 +75,7 @@ public void process(Event event, EventCollector eventCollector) throws Exception
tablet.addTimestamp(0, System.currentTimeMillis());
tablet.addValue(aggregateSeries.getMeasurement(), 0, writePointCount.get());
eventCollector.collect(
new PipeRawTabletInsertionEvent(tablet, false, null, null, null, false));
new PipeRawTabletInsertionEvent(tablet, false, null, 0, null, null, false));
}
}

Expand Down
1 change: 0 additions & 1 deletion example/schema/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.3.13</version>
</dependency>
<dependency>
<groupId>org.apache.tsfile</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,23 @@
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.subscription.SubscriptionSession;
import org.apache.iotdb.session.subscription.consumer.AckStrategy;
import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;

import org.apache.tsfile.read.TsFileReader;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.expression.QueryExpression;
import org.apache.tsfile.read.query.dataset.QueryDataSet;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
Expand All @@ -46,11 +55,14 @@ public class SubscriptionSessionExample {
private static final int PORT = 6667;

private static final String TOPIC_1 = "topic1";
private static final String TOPIC_2 = "`topic2`";
private static final String TOPIC_2 = "`'topic2'`";
private static final String TOPIC_3 = "`\"topic3\"`";
private static final String TOPIC_4 = "`\"top \\.i.c4\"`";

public static final long SLEEP_NS = 1_000_000_000L;
public static final long POLL_TIMEOUT_MS = 10_000L;
private static final long SLEEP_NS = 1_000_000_000L;
private static final long POLL_TIMEOUT_MS = 10_000L;
private static final int MAX_RETRY_TIMES = 3;
private static final int PARALLELISM = 8;

private static void prepareData() throws Exception {
// Open session
Expand All @@ -66,13 +78,13 @@ private static void prepareData() throws Exception {

// Insert some historical data
final long currentTime = System.currentTimeMillis();
for (int i = 0; i < 10000; ++i) {
for (int i = 0; i < 100; ++i) {
session.executeNonQueryStatement(
String.format("insert into root.db.d1(time, s1, s2) values (%s, 1, 2)", i));
session.executeNonQueryStatement(
String.format("insert into root.db.d2(time, s3, s4) values (%s, 3, 4)", currentTime + i));
String.format("insert into root.db.d2(time, s1, s2) values (%s, 3, 4)", currentTime + i));
session.executeNonQueryStatement(
String.format("insert into root.sg.d3(time, s5) values (%s, 5)", currentTime + 2 * i));
String.format("insert into root.sg.d3(time, s1) values (%s, 5)", currentTime + 2 * i));
}
session.executeNonQueryStatement("flush");

Expand Down Expand Up @@ -104,11 +116,16 @@ private static void dataQuery() throws Exception {
session = null;
}

private static void subscriptionExample1() throws Exception {
/** single pull consumer subscribe topic with path and time range */
private static void dataSubscription1() throws Exception {
// Create topics
try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) {
subscriptionSession.open();
subscriptionSession.createTopic(TOPIC_1);
final Properties config = new Properties();
config.put(TopicConstant.PATH_KEY, "root.db.d1.s1");
config.put(TopicConstant.START_TIME_KEY, 25);
config.put(TopicConstant.END_TIME_KEY, 75);
subscriptionSession.createTopic(TOPIC_1, config);
}

int retryCount = 0;
Expand Down Expand Up @@ -151,7 +168,8 @@ private static void subscriptionExample1() throws Exception {
consumer1.close();
}

private static void subscriptionExample2() throws Exception {
/** multi pull consumer subscribe topic with tsfile format */
private static void dataSubscription2() throws Exception {
try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) {
subscriptionSession.open();
final Properties config = new Properties();
Expand All @@ -160,7 +178,7 @@ private static void subscriptionExample2() throws Exception {
}

final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 8; ++i) {
for (int i = 0; i < PARALLELISM; ++i) {
final int idx = i;
final Thread thread =
new Thread(
Expand All @@ -187,7 +205,16 @@ private static void subscriptionExample2() throws Exception {
}
for (final SubscriptionMessage message : messages) {
try (final TsFileReader reader = message.getTsFileHandler().openReader()) {
// do something...
final QueryDataSet dataSet =
reader.query(
QueryExpression.create(
Arrays.asList(
new Path("root.db.d2", "s2", true),
new Path("root.db.d3", "s1", true)),
null));
while (dataSet.hasNext()) {
System.out.println(dataSet.next());
}
}
}
consumer2.commitSync(messages);
Expand All @@ -206,10 +233,109 @@ private static void subscriptionExample2() throws Exception {
}
}

/** multi push consumer subscribe topic with tsfile format and query mode */
private static void dataSubscription3() throws Exception {
try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) {
subscriptionSession.open();
final Properties config = new Properties();
config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_QUERY_VALUE);
subscriptionSession.createTopic(TOPIC_3, config);
}

final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < PARALLELISM; ++i) {
final int idx = i;
final Thread thread =
new Thread(
() -> {
// Subscription: builder-style ctor
try (final SubscriptionPushConsumer consumer3 =
new SubscriptionPushConsumer.Builder()
.consumerId("c" + idx)
.consumerGroupId("cg3")
.ackStrategy(AckStrategy.AFTER_CONSUME)
.consumeListener(
message -> {
// do something for SubscriptionTsFileHandler
System.out.println(
message.getTsFileHandler().getFile().getAbsolutePath());
return ConsumeResult.SUCCESS;
})
.buildPushConsumer()) {
consumer3.open();
consumer3.subscribe(TOPIC_3);
while (consumer3.hasMoreData()) {
LockSupport.parkNanos(SLEEP_NS); // wait some time
}
}
});
thread.start();
threads.add(thread);
}

for (final Thread thread : threads) {
thread.join();
}
}

/** multi pull consumer subscribe topic with tsfile format and query mode */
private static void dataSubscription4() throws Exception {
try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) {
subscriptionSession.open();
final Properties config = new Properties();
config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_QUERY_VALUE);
subscriptionSession.createTopic(TOPIC_4, config);
}

final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < PARALLELISM; ++i) {
final int idx = i;
final Thread thread =
new Thread(
() -> {
// Subscription: builder-style ctor
try (final SubscriptionPullConsumer consumer4 =
new SubscriptionPullConsumer.Builder()
.consumerId("c" + idx)
.consumerGroupId("cg4")
.autoCommit(true)
.fileSaveFsync(true)
.buildPullConsumer()) {
consumer4.open();
consumer4.subscribe(TOPIC_4);
while (true) {
LockSupport.parkNanos(SLEEP_NS); // wait some time
if (!consumer4.hasMoreData()) {
break;
}
for (final SubscriptionMessage message : consumer4.poll(POLL_TIMEOUT_MS)) {
final SubscriptionTsFileHandler handler = message.getTsFileHandler();
handler.moveFile(
Paths.get(System.getProperty("user.dir"), "new-subscription-dir")
.resolve(handler.getPath().getFileName()));
}
}
} catch (final IOException e) {
throw new RuntimeException(e);
}
});
thread.start();
threads.add(thread);
}

for (final Thread thread : threads) {
thread.join();
}
}

public static void main(final String[] args) throws Exception {
prepareData();
dataQuery();
subscriptionExample1();
subscriptionExample2();
// dataQuery();
// dataSubscription1();
// dataSubscription2();
// dataSubscription3();
dataSubscription4();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,6 @@ public class ClusterConstant {
public static final String COMMON_PROPERTIES_FILE = "iotdb-common.properties";
public static final String IOTDB_SYSTEM_PROPERTIES_FILE = "iotdb-system.properties";

public static final String SYSTEM_PROPERTIES_FILE = "system.properties";
public static final String CONFIG_NODE_SYSTEM_PROPERTIES_FILE = "confignode-system.properties";

// Properties' keys
// Common
public static final String CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import java.util.stream.IntStream;

import static org.apache.iotdb.consensus.ConsensusFactory.FAST_IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOTV2_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2;
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.REAL_PIPE_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
Expand Down Expand Up @@ -224,7 +224,7 @@ public static String fromConsensusFullNameToAbbr(String consensus) {
return IOT_CONSENSUS_STR;
case REAL_PIPE_CONSENSUS:
return PIPE_CONSENSUS_STR;
case IOTV2_CONSENSUS:
case IOT_CONSENSUS_V2:
return STREAM_CONSENSUS_STR;
case FAST_IOT_CONSENSUS:
return BATCH_CONSENSUS_STR;
Expand All @@ -244,7 +244,7 @@ public static String fromConsensusAbbrToFullName(String consensus) {
case PIPE_CONSENSUS_STR:
return REAL_PIPE_CONSENSUS;
case STREAM_CONSENSUS_STR:
return IOTV2_CONSENSUS;
return IOT_CONSENSUS_V2;
case BATCH_CONSENSUS_STR:
return FAST_IOT_CONSENSUS;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,32 @@ public CommonConfig setCnConnectionTimeoutMs(int connectionTimeoutMs) {
return this;
}

@Override
public CommonConfig setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
int pipeHeartbeatIntervalSecondsForCollectingPipeMeta) {
setProperty(
"pipe_heartbeat_interval_seconds_for_collecting_pipe_meta",
String.valueOf(pipeHeartbeatIntervalSecondsForCollectingPipeMeta));
return this;
}

@Override
public CommonConfig setPipeMetaSyncerInitialSyncDelayMinutes(
long pipeMetaSyncerInitialSyncDelayMinutes) {
setProperty(
"pipe_meta_syncer_initial_sync_delay_minutes",
String.valueOf(pipeMetaSyncerInitialSyncDelayMinutes));
return this;
}

@Override
public CommonConfig setPipeMetaSyncerSyncIntervalMinutes(long pipeMetaSyncerSyncIntervalMinutes) {
setProperty(
"pipe_meta_syncer_sync_interval_minutes",
String.valueOf(pipeMetaSyncerSyncIntervalMinutes));
return this;
}

// For part of the log directory
public String getClusterConfigStr() {
return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,4 +445,29 @@ public CommonConfig setCnConnectionTimeoutMs(int connectionTimeoutMs) {
cnConfig.setCnConnectionTimeoutMs(connectionTimeoutMs);
return this;
}

@Override
public CommonConfig setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
int pipeHeartbeatIntervalSecondsForCollectingPipeMeta) {
dnConfig.setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
pipeHeartbeatIntervalSecondsForCollectingPipeMeta);
cnConfig.setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
pipeHeartbeatIntervalSecondsForCollectingPipeMeta);
return this;
}

@Override
public CommonConfig setPipeMetaSyncerInitialSyncDelayMinutes(
long pipeMetaSyncerInitialSyncDelayMinutes) {
dnConfig.setPipeMetaSyncerInitialSyncDelayMinutes(pipeMetaSyncerInitialSyncDelayMinutes);
cnConfig.setPipeMetaSyncerInitialSyncDelayMinutes(pipeMetaSyncerInitialSyncDelayMinutes);
return this;
}

@Override
public CommonConfig setPipeMetaSyncerSyncIntervalMinutes(long pipeMetaSyncerSyncIntervalMinutes) {
dnConfig.setPipeMetaSyncerSyncIntervalMinutes(pipeMetaSyncerSyncIntervalMinutes);
cnConfig.setPipeMetaSyncerSyncIntervalMinutes(pipeMetaSyncerSyncIntervalMinutes);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -640,10 +640,10 @@ public void setTestMethodName(String testMethodName) {
@Override
public void dumpTestJVMSnapshot() {
for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
configNodeWrapper.dumpJVMSnapshot(testMethodName);
configNodeWrapper.executeJstack(testMethodName);
}
for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) {
dataNodeWrapper.dumpJVMSnapshot(testMethodName);
dataNodeWrapper.executeJstack(testMethodName);
}
}

Expand Down
Loading

0 comments on commit 0343fc2

Please sign in to comment.