Skip to content

Commit

Permalink
Pipe: Avoid event being continuously rate-limited even its pipe is dr…
Browse files Browse the repository at this point in the history
…opped (#12753)

Co-authored-by: Steve Yurong Su <[email protected]>
  • Loading branch information
2 people authored and OneSizeFitsQuorum committed Jun 27, 2024
1 parent bd76aea commit beb4a9c
Show file tree
Hide file tree
Showing 74 changed files with 483 additions and 313 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ private void doTransfer(
throws PipeException, IOException {
if (!send(
pipeConfigRegionWritePlanEvent.getPipeName(),
pipeConfigRegionWritePlanEvent.getCreationTime(),
socket,
PipeTransferConfigPlanReq.toTPipeTransferBytes(
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
Expand Down Expand Up @@ -189,17 +190,19 @@ private void doTransfer(
final AirGapSocket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent)
throws PipeException, IOException {
final String pipeName = pipeConfigRegionSnapshotEvent.getPipeName();
final long creationTime = pipeConfigRegionSnapshotEvent.getCreationTime();
final File snapshot = pipeConfigRegionSnapshotEvent.getSnapshotFile();
final File templateFile = pipeConfigRegionSnapshotEvent.getTemplateFile();

// 1. Transfer snapshotFile, and template file if exists
transferFilePieces(pipeName, snapshot, socket, true);
transferFilePieces(pipeName, creationTime, snapshot, socket, true);
if (Objects.nonNull(templateFile)) {
transferFilePieces(pipeName, templateFile, socket, true);
transferFilePieces(pipeName, creationTime, templateFile, socket, true);
}
// 2. Transfer file seal signal, which means the snapshots are transferred completely
if (!send(
pipeName,
creationTime,
socket,
PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
// The pattern is surely Non-null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ private void doTransfer(final PipeConfigRegionWritePlanEvent pipeConfigRegionWri
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()));
rateLimitIfNeeded(
pipeConfigRegionWritePlanEvent.getPipeName(),
pipeConfigRegionWritePlanEvent.getCreationTime(),
clientAndStatus.getLeft().getEndPoint(),
req.getBody().length);
resp = clientAndStatus.getLeft().pipeTransfer(req);
Expand Down Expand Up @@ -181,14 +182,15 @@ private void doTransferWrapper(final PipeConfigRegionSnapshotEvent pipeConfigReg
private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent)
throws PipeException, IOException {
final String pipeName = snapshotEvent.getPipeName();
final long creationTime = snapshotEvent.getCreationTime();
final File snapshotFile = snapshotEvent.getSnapshotFile();
final File templateFile = snapshotEvent.getTemplateFile();
final Pair<IoTDBSyncClient, Boolean> clientAndStatus = clientManager.getClient();

// 1. Transfer snapshotFile, and template File if exists
transferFilePieces(pipeName, snapshotFile, clientAndStatus, true);
transferFilePieces(pipeName, creationTime, snapshotFile, clientAndStatus, true);
if (Objects.nonNull(templateFile)) {
transferFilePieces(pipeName, templateFile, clientAndStatus, true);
transferFilePieces(pipeName, creationTime, templateFile, clientAndStatus, true);
}
// 2. Transfer file seal signal, which means the snapshots are transferred completely
final TPipeTransferResp resp;
Expand All @@ -206,6 +208,7 @@ private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent)
snapshotEvent.toSealTypeString()));
rateLimitIfNeeded(
snapshotEvent.getPipeName(),
snapshotEvent.getCreationTime(),
clientAndStatus.getLeft().getEndPoint(),
req.getBody().length);
resp = clientAndStatus.getLeft().pipeTransfer(req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.statemachine.dataregion.DataRegionStateMachine;
import org.apache.iotdb.db.consensus.statemachine.dataregion.IoTConsensusDataRegionStateMachine;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeDispatcher;
import org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeRuntimeAgentGuardian;
import org.apache.iotdb.db.pipe.consensus.ProgressIndexDataNodeManager;
Expand Down Expand Up @@ -80,7 +80,7 @@ private static class DataRegionConsensusImplHolder {
// Make sure both statics are initialized.
static {
reinitializeStatics();
PipeAgent.receiver().pipeConsensus().initConsensusInRuntime();
PipeDataNodeAgent.receiver().pipeConsensus().initConsensusInRuntime();
}

private static void reinitializeStatics() {
Expand Down Expand Up @@ -164,8 +164,9 @@ private static ConsensusConfig buildConsensusConfig() {
// name
.setConsensusPipeDispatcher(new ConsensusPipeDataNodeDispatcher())
.setConsensusPipeGuardian(new ConsensusPipeDataNodeRuntimeAgentGuardian())
.setConsensusPipeSelector(() -> PipeAgent.task().getAllConsensusPipe())
.setConsensusPipeReceiver(PipeAgent.receiver().pipeConsensus())
.setConsensusPipeSelector(
() -> PipeDataNodeAgent.task().getAllConsensusPipe())
.setConsensusPipeReceiver(PipeDataNodeAgent.receiver().pipeConsensus())
.setProgressIndexManager(new ProgressIndexDataNodeManager())
.setConsensusPipeGuardJobIntervalInSeconds(300) // TODO: move to config
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningQueue;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
Expand Down Expand Up @@ -541,7 +541,7 @@ public TSStatus visitPipeEnrichedNonWritePlanNode(
public TSStatus visitPipeOperateSchemaQueueNode(
final PipeOperateSchemaQueueNode node, final ISchemaRegion schemaRegion) {
final SchemaRegionId id = schemaRegion.getSchemaRegionId();
final SchemaRegionListeningQueue queue = PipeAgent.runtime().schemaListener(id);
final SchemaRegionListeningQueue queue = PipeDataNodeAgent.runtime().schemaListener(id);
if (node.isOpen() && !queue.isOpened()) {
logger.info("Opened pipe listening queue on schema region {}", id);
queue.open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.iotdb.consensus.ratis.utils.Utils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.statemachine.BaseStateMachine;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningQueue;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
Expand Down Expand Up @@ -64,7 +64,7 @@ public void start() {
@Override
public void stop() {
// Stop leader related service for schema pipe
PipeAgent.runtime().notifySchemaLeaderUnavailable(schemaRegion.getSchemaRegionId());
PipeDataNodeAgent.runtime().notifySchemaLeaderUnavailable(schemaRegion.getSchemaRegionId());
}

@Override
Expand All @@ -79,7 +79,7 @@ public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) {
newLeaderId);

// Shutdown leader related service for schema pipe
PipeAgent.runtime().notifySchemaLeaderUnavailable(schemaRegion.getSchemaRegionId());
PipeDataNodeAgent.runtime().notifySchemaLeaderUnavailable(schemaRegion.getSchemaRegionId());

logger.info(
"Current node [nodeId: {}] is no longer the schema region leader [regionId: {}], "
Expand All @@ -97,7 +97,7 @@ public void notifyLeaderReady() {
schemaRegion.getSchemaRegionId());

// Activate leader related service for schema pipe
PipeAgent.runtime().notifySchemaLeaderReady(schemaRegion.getSchemaRegionId());
PipeDataNodeAgent.runtime().notifySchemaLeaderReady(schemaRegion.getSchemaRegionId());

logger.info(
"Current node [nodeId: {}] as schema region leader [regionId: {}] is ready to work",
Expand All @@ -113,7 +113,7 @@ public boolean isReadOnly() {
@Override
public boolean takeSnapshot(File snapshotDir) {
if (schemaRegion.createSnapshot(snapshotDir)
&& PipeAgent.runtime()
&& PipeDataNodeAgent.runtime()
.schemaListener(schemaRegion.getSchemaRegionId())
.createSnapshot(snapshotDir)) {
listen2Snapshot4PipeListener(true);
Expand All @@ -125,7 +125,7 @@ public boolean takeSnapshot(File snapshotDir) {
@Override
public void loadSnapshot(File latestSnapshotRootDir) {
schemaRegion.loadSnapshot(latestSnapshotRootDir);
PipeAgent.runtime()
PipeDataNodeAgent.runtime()
.schemaListener(schemaRegion.getSchemaRegionId())
.loadSnapshot(latestSnapshotRootDir);
// We recompute the snapshot for pipe listener when loading snapshot
Expand All @@ -141,7 +141,7 @@ public void listen2Snapshot4PipeListener(boolean isTmp) {
.toString(),
isTmp);
SchemaRegionListeningQueue listener =
PipeAgent.runtime().schemaListener(schemaRegion.getSchemaRegionId());
PipeDataNodeAgent.runtime().schemaListener(schemaRegion.getSchemaRegionId());
if (Objects.isNull(snapshotPaths) || Objects.isNull(snapshotPaths.getLeft())) {
if (listener.isOpened()) {
logger.warn(
Expand All @@ -164,7 +164,7 @@ public TSStatus write(IConsensusRequest request) {
try {
TSStatus result = ((PlanNode) request).accept(new SchemaExecutionVisitor(), schemaRegion);
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
PipeAgent.runtime()
PipeDataNodeAgent.runtime()
.schemaListener(schemaRegion.getSchemaRegionId())
.tryListenToNode((PlanNode) request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,25 @@
import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTaskAgent;
import org.apache.iotdb.db.service.DataNode;

/** {@link PipeAgent} is the entry point of the pipe module in {@link DataNode}. */
public class PipeAgent {
/** {@link PipeDataNodeAgent} is the entry point of the pipe module in {@link DataNode}. */
public class PipeDataNodeAgent {

private final PipeDataNodePluginAgent pipeDataNodePluginAgent;
private final PipeDataNodeTaskAgent pipeDataNodeTaskAgent;
private final PipeDataNodeRuntimeAgent pipeDataNodeRuntimeAgent;
private final PipeDataNodeReceiverAgent pipeDataNodeReceiverAgent;

/** Private constructor to prevent users from creating a new instance. */
private PipeAgent() {
private PipeDataNodeAgent() {
pipeDataNodePluginAgent = new PipeDataNodePluginAgent();
pipeDataNodeTaskAgent = new PipeDataNodeTaskAgent();
pipeDataNodeRuntimeAgent = new PipeDataNodeRuntimeAgent();
pipeDataNodeReceiverAgent = new PipeDataNodeReceiverAgent();
}

/** The singleton holder of {@link PipeAgent}. */
private static class PipeAgentHolder {
private static final PipeAgent HANDLE = new PipeAgent();
/** The singleton holder of {@link PipeDataNodeAgent}. */
private static class PipeDataNodeAgentHolder {
private static final PipeDataNodeAgent HANDLE = new PipeDataNodeAgent();
}

/**
Expand All @@ -52,7 +52,7 @@ private static class PipeAgentHolder {
* @return the singleton instance of {@link PipeDataNodeTaskAgent}
*/
public static PipeDataNodeTaskAgent task() {
return PipeAgentHolder.HANDLE.pipeDataNodeTaskAgent;
return PipeDataNodeAgentHolder.HANDLE.pipeDataNodeTaskAgent;
}

/**
Expand All @@ -61,7 +61,7 @@ public static PipeDataNodeTaskAgent task() {
* @return the singleton instance of {@link PipeDataNodePluginAgent}
*/
public static PipeDataNodePluginAgent plugin() {
return PipeAgentHolder.HANDLE.pipeDataNodePluginAgent;
return PipeDataNodeAgentHolder.HANDLE.pipeDataNodePluginAgent;
}

/**
Expand All @@ -70,7 +70,7 @@ public static PipeDataNodePluginAgent plugin() {
* @return the singleton instance of {@link PipeDataNodeRuntimeAgent}
*/
public static PipeDataNodeRuntimeAgent runtime() {
return PipeAgentHolder.HANDLE.pipeDataNodeRuntimeAgent;
return PipeDataNodeAgentHolder.HANDLE.pipeDataNodeRuntimeAgent;
}

/**
Expand All @@ -79,6 +79,6 @@ public static PipeDataNodeRuntimeAgent runtime() {
* @return the singleton instance of {@link PipeDataNodeReceiverAgent}
*/
public static PipeDataNodeReceiverAgent receiver() {
return PipeAgentHolder.HANDLE.pipeDataNodeReceiverAgent;
return PipeDataNodeAgentHolder.HANDLE.pipeDataNodeReceiverAgent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
Expand Down Expand Up @@ -88,7 +88,7 @@ public static synchronized void launchPipePluginAgent(
if (meta.isBuiltin()) {
continue;
}
PipeAgent.plugin().doRegister(meta);
PipeDataNodeAgent.plugin().doRegister(meta);
}
} catch (Exception e) {
throw new StartupException(e);
Expand Down Expand Up @@ -159,7 +159,7 @@ public static synchronized void launchPipeTaskAgent() {
throw new StartupException("Failed to get pipe task meta from config node.");
}

PipeAgent.task()
PipeDataNodeAgent.task()
.handlePipeMetaChanges(
getAllPipeInfoResp.getAllPipeInfo().stream()
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningQueue;
import org.apache.iotdb.db.pipe.progress.SimpleProgressIndexAssigner;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner;
Expand Down Expand Up @@ -70,7 +70,7 @@ public synchronized void preparePipeResources(
PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.clean();

// Clean receiver file dir
PipeAgent.receiver().cleanPipeReceiverDirs();
PipeDataNodeAgent.receiver().cleanPipeReceiverDirs();

PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder);
simpleProgressIndexAssigner.start();
Expand All @@ -83,7 +83,7 @@ public synchronized void start() throws StartupException {

registerPeriodicalJob(
"PipeTaskAgent#restartAllStuckPipes",
PipeAgent.task()::restartAllStuckPipes,
PipeDataNodeAgent.task()::restartAllStuckPipes,
PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds());
pipePeriodicalJobExecutor.start();

Expand All @@ -98,7 +98,7 @@ public synchronized void stop() {
isShutdown.set(true);

pipePeriodicalJobExecutor.stop();
PipeAgent.task().dropAllPipeTasks();
PipeDataNodeAgent.task().dropAllPipeTasks();
}

public boolean isShutdown() {
Expand Down Expand Up @@ -207,7 +207,7 @@ public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException pipeRuntimeEx
// Quick stop all pipes locally if critical exception occurs,
// no need to wait for the next heartbeat cycle.
if (pipeRuntimeException instanceof PipeRuntimeCriticalException) {
PipeAgent.task().stopAllPipesWithCriticalException();
PipeDataNodeAgent.task().stopAllPipesWithCriticalException();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
Expand Down Expand Up @@ -96,7 +96,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {

@Override
protected boolean isShutdown() {
return PipeAgent.runtime().isShutdown();
return PipeDataNodeAgent.runtime().isShutdown();
}

@Override
Expand Down Expand Up @@ -204,7 +204,7 @@ private Set<Integer> clearSchemaRegionListeningQueueIfNecessary(

schemaRegionId2ListeningQueueNewFirstIndex.forEach(
(schemaRegionId, listeningQueueNewFirstIndex) ->
PipeAgent.runtime()
PipeDataNodeAgent.runtime()
.schemaListener(new SchemaRegionId(schemaRegionId))
.removeBefore(listeningQueueNewFirstIndex));

Expand All @@ -218,12 +218,12 @@ private void closeSchemaRegionListeningQueueIfNecessary(
return;
}

PipeAgent.runtime()
PipeDataNodeAgent.runtime()
.listeningSchemaRegionIds()
.forEach(
schemaRegionId -> {
if (!validSchemaRegionIds.contains(schemaRegionId.getId())
&& PipeAgent.runtime().isSchemaLeaderReady(schemaRegionId)) {
&& PipeDataNodeAgent.runtime().isSchemaLeaderReady(schemaRegionId)) {
try {
SchemaRegionConsensusImpl.getInstance()
.write(
Expand Down Expand Up @@ -297,7 +297,7 @@ public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws TExcep

private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) throws TException {
// Do nothing if data node is removing or removed, or request does not need pipe meta list
if (PipeAgent.runtime().isShutdown()) {
if (PipeDataNodeAgent.runtime().isShutdown()) {
return;
}

Expand Down Expand Up @@ -375,7 +375,7 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro
protected void collectPipeMetaListInternal(
final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws TException {
// Do nothing if data node is removing or removed, or request does not need pipe meta list
if (PipeAgent.runtime().isShutdown()) {
if (PipeDataNodeAgent.runtime().isShutdown()) {
return;
}
LOGGER.info("Received pipe heartbeat request {} from config node.", req.heartbeatId);
Expand Down Expand Up @@ -631,11 +631,6 @@ public Set<Integer> getPipeTaskRegionIdSet(final String pipeName, final long cre

///////////////////////// Pipe Consensus /////////////////////////

public long getPipeCreationTime(final String pipeName) {
final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
return pipeMeta == null ? 0 : pipeMeta.getStaticMeta().getCreationTime();
}

public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final int consensusGroupId) {
if (!tryReadLockWithTimeOut(10)) {
throw new PipeException(
Expand Down
Loading

0 comments on commit beb4a9c

Please sign in to comment.