Skip to content

Commit

Permalink
[ISSUE #7093] Avoid dispatch tasks too much cause dispatch task failed (
Browse files Browse the repository at this point in the history
#7094)

* Avoid dispatch tasks too much cause dispatch task failed

* set schedule task async
  • Loading branch information
lizhimins authored Aug 1, 2023
1 parent 8bcc948 commit a1bf49d
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private void initScheduleTask() {
TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(() ->
tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile -> {
if (!flatFile.getCompositeFlatFileLock().isLocked()) {
dispatchFlatFile(flatFile);
dispatchFlatFileAsync(flatFile);
}
}), 30, 10, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -180,10 +180,6 @@ public void dispatch(DispatchRequest request) {
message.release();
flatFile.getCompositeFlatFileLock().unlock();
}
} else {
if (!flatFile.getCompositeFlatFileLock().isLocked()) {
this.dispatchFlatFileAsync(flatFile);
}
}
}

Expand All @@ -199,6 +195,11 @@ public void dispatchFlatFileAsync(CompositeQueueFlatFile flatFile) {
}

public void dispatchFlatFileAsync(CompositeQueueFlatFile flatFile, Consumer<Long> consumer) {
// Avoid dispatch tasks too much
if (TieredStoreExecutor.dispatchThreadPoolQueue.size() >
TieredStoreExecutor.QUEUE_CAPACITY * 0.75) {
return;
}
TieredStoreExecutor.dispatchExecutor.execute(() -> {
try {
dispatchFlatFile(flatFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

public class TieredStoreExecutor {

private static final int QUEUE_CAPACITY = 10000;
public static final int QUEUE_CAPACITY = 10000;

// Visible for monitor
public static BlockingQueue<Runnable> dispatchThreadPoolQueue;
Expand Down

0 comments on commit a1bf49d

Please sign in to comment.