Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: Fixed NPE in schema pattern parsing & Improved connector subtask retry logic and logs & Fixed the executor used by consensus pipe #12704

Merged
merged 5 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ public class PipeConfigNodeSubtaskExecutor extends PipeSubtaskExecutor {
private static final int THREAD_NUM = 1;

private PipeConfigNodeSubtaskExecutor() {
super(THREAD_NUM, ThreadName.PIPE_CONFIGNODE_EXECUTOR_POOL);
super(THREAD_NUM, ThreadName.PIPE_CONFIGNODE_EXECUTOR_POOL, true);
}

/**
* @param ignored Used to distinguish this constructor from the default constructor.
*/
@TestOnly
public PipeConfigNodeSubtaskExecutor(Object ignored) {
super(THREAD_NUM, ThreadName.PIPE_CONFIGNODE_EXECUTOR_POOL);
public PipeConfigNodeSubtaskExecutor(final Object ignored) {
super(THREAD_NUM, ThreadName.PIPE_CONFIGNODE_EXECUTOR_POOL, true);
}

private static class PipeSchemaSubtaskExecutorHolder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.execution.executor.PipeSubtaskExecutor;
import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;

public class PipeConsensusSubtaskExecutor extends PipeSubtaskExecutor {
public class PipeConsensusSubtaskExecutor extends PipeConnectorSubtaskExecutor {

public PipeConsensusSubtaskExecutor() {
super(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ public class PipeConnectorSubtaskExecutor extends PipeSubtaskExecutor {
public PipeConnectorSubtaskExecutor() {
super(
PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
ThreadName.PIPE_CONNECTOR_EXECUTOR_POOL);
ThreadName.PIPE_CONNECTOR_EXECUTOR_POOL,
true);
}

public PipeConnectorSubtaskExecutor(int corePoolSize, ThreadName threadName) {
super(corePoolSize, threadName);
public PipeConnectorSubtaskExecutor(final int corePoolSize, final ThreadName threadName) {
super(corePoolSize, threadName, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class PipeProcessorSubtaskExecutor extends PipeSubtaskExecutor {
public PipeProcessorSubtaskExecutor() {
super(
PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
ThreadName.PIPE_PROCESSOR_EXECUTOR_POOL);
ThreadName.PIPE_PROCESSOR_EXECUTOR_POOL,
false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class PipeDataNodeTaskBuilder {
CONNECTOR_EXECUTOR_MAP.put(
PipeType.SUBSCRIPTION, PipeSubtaskExecutorManager.getInstance().getSubscriptionExecutor());
CONNECTOR_EXECUTOR_MAP.put(
PipeType.CONSENSUS, PipeSubtaskExecutorManager.getInstance().getConnectorExecutor());
PipeType.CONSENSUS, PipeSubtaskExecutorManager.getInstance().getConsensusExecutor());
}

protected final Map<String, String> systemParameters = new HashMap<>();
Expand Down Expand Up @@ -89,7 +89,7 @@ public PipeDataNodeTask build() {
blendUserAndSystemParameters(pipeStaticMeta.getConnectorParameters()),
regionId,
CONNECTOR_EXECUTOR_MAP.get(pipeType));
} else { // user pipe
} else { // user pipe or consensus pipe
connectorStage =
new PipeTaskConnectorStage(
pipeStaticMeta.getPipeName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class WrappedThreadPoolExecutor extends ThreadPoolExecutor
implements WrappedThreadPoolExecutorMBean {
private static final Logger logger = LoggerFactory.getLogger(WrappedThreadPoolExecutor.class);
private final String mbeanName;
private volatile boolean disableErrorLog = false;

public WrappedThreadPoolExecutor(
int corePoolSize,
Expand Down Expand Up @@ -125,8 +126,12 @@ protected void afterExecute(Runnable r, Throwable t) {
Thread.currentThread().interrupt();
}
}
if (t != null) {
if (t != null && !disableErrorLog) {
logger.error("Exception in thread pool {}", mbeanName, t);
}
}

public void disableErrorLog() {
disableErrorLog = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.task.subtask.PipeSubtask;
import org.apache.iotdb.commons.utils.TestOnly;
Expand All @@ -41,17 +42,22 @@ public abstract class PipeSubtaskExecutor {
private static final ExecutorService subtaskCallbackListeningExecutor =
IoTDBThreadPoolFactory.newSingleThreadExecutor(
ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName());
private final ListeningExecutorService subtaskWorkerThreadPoolExecutor;
protected final ListeningExecutorService subtaskWorkerThreadPoolExecutor;

private final Map<String, PipeSubtask> registeredIdSubtaskMapper;

private final int corePoolSize;
private int runningSubtaskNumber;

protected PipeSubtaskExecutor(int corePoolSize, ThreadName threadName) {
subtaskWorkerThreadPoolExecutor =
MoreExecutors.listeningDecorator(
IoTDBThreadPoolFactory.newFixedThreadPool(corePoolSize, threadName.getName()));
protected PipeSubtaskExecutor(
final int corePoolSize, final ThreadName threadName, final boolean disableLogInThreadPool) {
final WrappedThreadPoolExecutor executor =
(WrappedThreadPoolExecutor)
IoTDBThreadPoolFactory.newFixedThreadPool(corePoolSize, threadName.getName());
if (disableLogInThreadPool) {
executor.disableErrorLog();
}
subtaskWorkerThreadPoolExecutor = MoreExecutors.listeningDecorator(executor);

registeredIdSubtaskMapper = new ConcurrentHashMap<>();

Expand All @@ -61,7 +67,7 @@ protected PipeSubtaskExecutor(int corePoolSize, ThreadName threadName) {

/////////////////////// Subtask management ///////////////////////

public final synchronized void register(PipeSubtask subtask) {
public final synchronized void register(final PipeSubtask subtask) {
if (registeredIdSubtaskMapper.containsKey(subtask.getTaskID())) {
LOGGER.warn("The subtask {} is already registered.", subtask.getTaskID());
return;
Expand All @@ -74,7 +80,7 @@ public final synchronized void register(PipeSubtask subtask) {
new PipeSubtaskScheduler(this));
}

public final synchronized void start(String subTaskID) {
public final synchronized void start(final String subTaskID) {
if (!registeredIdSubtaskMapper.containsKey(subTaskID)) {
LOGGER.warn("The subtask {} is not registered.", subTaskID);
return;
Expand All @@ -93,7 +99,7 @@ public final synchronized void start(String subTaskID) {
}
}

public final synchronized void stop(String subTaskID) {
public final synchronized void stop(final String subTaskID) {
if (!registeredIdSubtaskMapper.containsKey(subTaskID)) {
LOGGER.warn("The subtask {} is not registered.", subTaskID);
return;
Expand All @@ -104,7 +110,7 @@ public final synchronized void stop(String subTaskID) {
}
}

public final synchronized void deregister(String subTaskID) {
public final synchronized void deregister(final String subTaskID) {
stop(subTaskID);

final PipeSubtask subtask = registeredIdSubtaskMapper.remove(subTaskID);
Expand All @@ -113,14 +119,14 @@ public final synchronized void deregister(String subTaskID) {
try {
subtask.close();
LOGGER.info("The subtask {} is closed successfully.", subTaskID);
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.error("Failed to close the subtask {}.", subTaskID, e);
}
}
}

@TestOnly
public final boolean isRegistered(String subTaskID) {
public final boolean isRegistered(final String subTaskID) {
return registeredIdSubtaskMapper.containsKey(subTaskID);
}

Expand All @@ -137,7 +143,7 @@ public final synchronized void shutdown() {
}

// stop all subtasks before shutting down the executor
for (PipeSubtask subtask : registeredIdSubtaskMapper.values()) {
for (final PipeSubtask subtask : registeredIdSubtaskMapper.values()) {
subtask.disallowSubmittingSelf();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public IoTDBPipePattern(final String pattern) {

public static <T> List<T> applyIndexesOnList(
final int[] filteredIndexes, final List<T> originalList) {
return Arrays.stream(filteredIndexes).mapToObj(originalList::get).collect(Collectors.toList());
return Objects.nonNull(originalList)
? Arrays.stream(filteredIndexes).mapToObj(originalList::get).collect(Collectors.toList())
: null;
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.task.DecoratingLock;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;

Expand All @@ -43,49 +42,37 @@ public abstract class PipeAbstractConnectorSubtask extends PipeReportableSubtask
protected PipeConnector outputPipeConnector;

// For thread pool to execute callbacks
protected final DecoratingLock callbackDecoratingLock = new DecoratingLock();
protected ExecutorService subtaskCallbackListeningExecutor;

// For controlling subtask submitting, making sure that
// a subtask is submitted to only one thread at a time
protected volatile boolean isSubmitted = false;

protected PipeAbstractConnectorSubtask(
String taskID, long creationTime, PipeConnector outputPipeConnector) {
final String taskID, final long creationTime, final PipeConnector outputPipeConnector) {
super(taskID, creationTime);
this.outputPipeConnector = outputPipeConnector;
}

@Override
public void bindExecutors(
ListeningExecutorService subtaskWorkerThreadPoolExecutor,
ExecutorService subtaskCallbackListeningExecutor,
PipeSubtaskScheduler subtaskScheduler) {
final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
final ExecutorService subtaskCallbackListeningExecutor,
final PipeSubtaskScheduler subtaskScheduler) {
this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor;
this.subtaskScheduler = subtaskScheduler;
}

@Override
public Boolean call() throws Exception {
final boolean hasAtLeastOneEventProcessed = super.call();

// Wait for the callable to be decorated by Futures.addCallback in the executorService
// to make sure that the callback can be submitted again on success or failure.
callbackDecoratingLock.waitForDecorated();

return hasAtLeastOneEventProcessed;
}

@Override
public synchronized void onSuccess(Boolean hasAtLeastOneEventProcessed) {
public synchronized void onSuccess(final Boolean hasAtLeastOneEventProcessed) {
isSubmitted = false;

super.onSuccess(hasAtLeastOneEventProcessed);
}

@Override
public synchronized void onFailure(Throwable throwable) {
public synchronized void onFailure(final Throwable throwable) {
isSubmitted = false;

if (isClosed.get()) {
Expand Down Expand Up @@ -116,7 +103,7 @@ public synchronized void onFailure(Throwable throwable) {
/**
* @return {@code true} if the {@link PipeSubtask} should be stopped, {@code false} otherwise
*/
private boolean onPipeConnectionException(Throwable throwable) {
private boolean onPipeConnectionException(final Throwable throwable) {
LOGGER.warn(
"PipeConnectionException occurred, {} retries to handshake with the target system.",
outputPipeConnector.getClass().getName(),
Expand All @@ -130,7 +117,7 @@ private boolean onPipeConnectionException(Throwable throwable) {
"{} handshakes with the target system successfully.",
outputPipeConnector.getClass().getName());
break;
} catch (Exception e) {
} catch (final Exception e) {
retry++;
LOGGER.warn(
"{} failed to handshake with the target system for {} times, "
Expand All @@ -141,7 +128,7 @@ private boolean onPipeConnectionException(Throwable throwable) {
e);
try {
Thread.sleep(retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
} catch (InterruptedException interruptedException) {
} catch (final InterruptedException interruptedException) {
LOGGER.info(
"Interrupted while sleeping, will retry to handshake with the target system.",
interruptedException);
Expand Down Expand Up @@ -195,13 +182,8 @@ public synchronized void submitSelf() {
return;
}

callbackDecoratingLock.markAsDecorating();
try {
final ListenableFuture<Boolean> nextFuture = subtaskWorkerThreadPoolExecutor.submit(this);
Futures.addCallback(nextFuture, this, subtaskCallbackListeningExecutor);
isSubmitted = true;
} finally {
callbackDecoratingLock.markAsDecorated();
}
final ListenableFuture<Boolean> nextFuture = subtaskWorkerThreadPoolExecutor.submit(this);
Futures.addCallback(nextFuture, this, subtaskCallbackListeningExecutor);
isSubmitted = true;
}
}
Loading
Loading