Skip to content

Commit

Permalink
Revert "[HUDI-4741] hotfix to avoid partial failover cause restored s…
Browse files Browse the repository at this point in the history
…ubtask timeout (apache#6796)" (apache#7090)

This reverts commit e222693.
  • Loading branch information
danny0405 authored and Alexey Kudinkin committed Dec 14, 2022
1 parent d17fb10 commit 199399d
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
Expand Down Expand Up @@ -153,12 +152,6 @@ public class StreamWriteOperatorCoordinator
*/
private CkpMetadata ckpMetadata;

/**
* Counter for the failed tasks, a number within the range (0, task_num) means
* a partial failover.
*/
private transient AtomicInteger failedCnt;

/**
* Constructs a StreamingSinkOperatorCoordinator.
*
Expand Down Expand Up @@ -301,17 +294,6 @@ public void subtaskFailed(int i, @Nullable Throwable throwable) {
// reset the event
this.eventBuffer[i] = null;
LOG.warn("Reset the event for task [" + i + "]", throwable);

// based on the fact: the #subtaskFailed in invoked before all the failed tasks scheduling,
// when a sub-task event is received, we can decide whether it recovers from a partial or complete failover,
// then to reuse the current instant(PARTIAL) or start a new one(COMPLETE).

// reset the ckp metadata for either partial or complete failover
if (this.failedCnt.get() == 0) {
this.ckpMetadata.reset();
}
// inc the failed tasks counter
this.failedCnt.incrementAndGet();
}

@Override
Expand Down Expand Up @@ -365,14 +347,6 @@ private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) thr

private void reset() {
this.eventBuffer = new WriteMetadataEvent[this.parallelism];
this.failedCnt = new AtomicInteger(0);
}

/**
* Checks whether it is a PARTIAL failover.
*/
private boolean isPartialFailover() {
return this.failedCnt.get() > 0 && this.failedCnt.get() < this.parallelism;
}

/**
Expand Down Expand Up @@ -436,16 +410,6 @@ private void handleBootstrapEvent(WriteMetadataEvent event) {
if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null && evt.isBootstrap())) {
// start to initialize the instant.
initInstant(event.getInstantTime());
} else if (isPartialFailover()) {
// if the bootstrap event comes from a partial failover,
// decrement the failed tasks by one.

// if all the failed task bootstrap events are received, send a start instant
// to the ckp metadata and unblock the data flushing.
if (this.failedCnt.decrementAndGet() <= 0) {
this.ckpMetadata.startInstant(this.instant);
this.failedCnt.set(0);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -62,7 +61,7 @@ public class CkpMetadata implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class);

private static final int MAX_RETAIN_CKP_NUM = 3;
protected static final int MAX_RETAIN_CKP_NUM = 3;

// the ckp metadata directory
private static final String CKP_META = "ckp_meta";
Expand Down Expand Up @@ -100,65 +99,37 @@ public void bootstrap() throws IOException {
fs.mkdirs(path);
}

/**
* Resets the message bus, would clean all the messages.
*
* <p>This expects to be called by the driver.
*/
public void reset() {
Iterator<String> itr = this.instantCache.iterator();
while (itr.hasNext()) {
cleanInstant(itr.next(), true);
itr.remove();
}
}

public void startInstant(String instant) {
Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT));
try {
fs.createNewFile(path);
} catch (IOException e) {
throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant, e);
}
// cache the instant
cache(instant);
// cleaning
clean();
clean(instant);
}

private void cache(String newInstant) {
private void clean(String newInstant) {
if (this.instantCache == null) {
this.instantCache = new ArrayList<>();
}
this.instantCache.add(newInstant);
}

private void clean() {
if (instantCache.size() > MAX_RETAIN_CKP_NUM) {
boolean success = cleanInstant(instantCache.get(0), false);
if (success) {
instantCache.remove(0);
}
}
}

private boolean cleanInstant(String instant, boolean throwsT) {
boolean success = true;
for (String fileName : CkpMessage.getAllFileNames(instant)) {
Path path = fullPath(fileName);
try {
fs.delete(path, false);
} catch (IOException ex) {
success = false;
final String errMsg = "Exception while cleaning the checkpoint meta file: " + path;
if (throwsT) {
throw new HoodieException(errMsg, ex);
} else {
LOG.warn(errMsg, ex);
final String instant = instantCache.get(0);
boolean[] error = new boolean[1];
CkpMessage.getAllFileNames(instant).stream().map(this::fullPath).forEach(path -> {
try {
fs.delete(path, false);
} catch (IOException e) {
error[0] = true;
LOG.warn("Exception while cleaning the checkpoint meta file: " + path);
}
});
if (!error[0]) {
instantCache.remove(0);
}
}
return success;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
Expand Down Expand Up @@ -165,14 +164,6 @@ public void testCheckpointCompleteWithPartialEvents() {
assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant));
}

@Test
public void testSubTaskFailed() {
coordinator.subtaskFailed(0, null);
assertNull(coordinator.getEventBuffer()[0], "The write meta event should be cleaned");
CkpMetadata ckpMetadata = CkpMetadata.getInstance(TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()));
assertNull(ckpMetadata.lastPendingInstant(), "The pending instant should be cleaned");
}

@Test
public void testHiveSyncInvoked() throws Exception {
// reset
Expand Down

0 comments on commit 199399d

Please sign in to comment.