Skip to content

Commit

Permalink
[HUDI-4517] If no marker type file, fallback to timeline based marker (
Browse files Browse the repository at this point in the history
…#6266)

- If MARKERS.type file is not present, the logic assumes that the direct markers are stored, which causes the read failure in certain cases even where timeline server based marker is enabled. This PR handles the failure by falling back to timeline based marker in such cases.
  • Loading branch information
codope authored Aug 7, 2022
1 parent 98f0166 commit 95d7489
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@

import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -36,10 +37,19 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.marker.MarkerType.DIRECT;
import static org.apache.hudi.common.table.marker.MarkerType.TIMELINE_SERVER_BASED;
import static org.apache.hudi.common.util.MarkerUtils.MARKER_TYPE_FILENAME;
import static org.apache.hudi.common.util.MarkerUtils.readMarkerType;
import static org.apache.hudi.common.util.MarkerUtils.readTimelineServerBasedMarkersFromFileSystem;

/**
* A utility class for marker-based rollback.
*/
public class MarkerBasedRollbackUtils {

private static final Logger LOG = LogManager.getLogger(MarkerBasedRollbackUtils.class);

/**
* Gets all marker paths.
*
Expand All @@ -54,25 +64,35 @@ public static List<String> getAllMarkerPaths(HoodieTable table, HoodieEngineCont
String instant, int parallelism) throws IOException {
String markerDir = table.getMetaClient().getMarkerFolderPath(instant);
FileSystem fileSystem = table.getMetaClient().getFs();
Option<MarkerType> markerTypeOption = MarkerUtils.readMarkerType(fileSystem, markerDir);
Option<MarkerType> markerTypeOption = readMarkerType(fileSystem, markerDir);

// If there is no marker type file "MARKERS.type", we assume "DIRECT" markers are used
// If there is no marker type file "MARKERS.type", first assume "DIRECT" markers are used.
// If not, then fallback to "TIMELINE_SERVER_BASED" markers.
if (!markerTypeOption.isPresent()) {
WriteMarkers writeMarkers = WriteMarkersFactory.get(MarkerType.DIRECT, table, instant);
return new ArrayList<>(writeMarkers.allMarkerFilePaths());
WriteMarkers writeMarkers = WriteMarkersFactory.get(DIRECT, table, instant);
try {
return new ArrayList<>(writeMarkers.allMarkerFilePaths());
} catch (IOException | IllegalArgumentException e) {
LOG.warn(String.format("%s not present and %s marker failed with error: %s. So, falling back to %s marker",
MARKER_TYPE_FILENAME, DIRECT, e.getMessage(), TIMELINE_SERVER_BASED));
return getTimelineServerBasedMarkers(context, parallelism, markerDir, fileSystem);
}
}

switch (markerTypeOption.get()) {
case TIMELINE_SERVER_BASED:
// Reads all markers written by the timeline server
Map<String, Set<String>> markersMap =
MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(
markerDir, fileSystem, context, parallelism);
return markersMap.values().stream().flatMap(Collection::stream)
.collect(Collectors.toCollection(ArrayList::new));
return getTimelineServerBasedMarkers(context, parallelism, markerDir, fileSystem);
default:
throw new HoodieException(
"The marker type \"" + markerTypeOption.get().name() + "\" is not supported.");
}
}

private static List<String> getTimelineServerBasedMarkers(HoodieEngineContext context, int parallelism, String markerDir, FileSystem fileSystem) {
Map<String, Set<String>> markersMap = readTimelineServerBasedMarkersFromFileSystem(markerDir, fileSystem, context, parallelism);
return markersMap.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
import org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy;
import org.apache.hudi.table.marker.DirectWriteMarkers;
import org.apache.hudi.testutils.HoodieClientTestBase;

import org.apache.hadoop.fs.FileStatus;
Expand All @@ -48,12 +49,16 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

@Tag("functional")
public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
Expand Down Expand Up @@ -204,4 +209,21 @@ private List<HoodieRollbackStat> testRun(boolean useFileListingMetadata, HoodieW
rollbackRequests);
}

@Test
public void testMarkerBasedRollbackFallbackToTimelineServerWhenDirectMarkerFails() throws Exception {
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String f0 = testTable.addRequestedCommit("000")
.getFileIdsWithBaseFilesInPartitions("partA").get("partA");
testTable.forCommit("001")
.withMarkerFile("partA", f0, IOType.APPEND);

HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, metaClient);

DirectWriteMarkers writeMarkers = mock(DirectWriteMarkers.class);
initMocks(this);
when(writeMarkers.allMarkerFilePaths()).thenThrow(new IOException("Markers.type file not present"));
MarkerBasedRollbackStrategy rollbackStrategy = new MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(), "002");
List<HoodieRollbackRequest> rollbackRequests = rollbackStrategy.getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"));
assertEquals(1, rollbackRequests.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public class MarkerUtils {
* @return marker file name
*/
public static String stripMarkerFolderPrefix(String fullMarkerPath, String basePath, String instantTime) {
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN));
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN),
String.format("Using DIRECT markers but marker path does not contain extension: %s", HoodieTableMetaClient.MARKER_EXTN));
String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString();
return stripMarkerFolderPrefix(fullMarkerPath, markerRootPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ private void syncMarkersFromFileSystem() {
private void writeMarkerTypeToFile() {
Path dirPath = new Path(markerDirPath);
try {
if (!fileSystem.exists(dirPath)) {
if (!fileSystem.exists(dirPath) || !MarkerUtils.doesMarkerTypeFileExist(fileSystem, markerDirPath)) {
// There is no existing marker directory, create a new directory and write marker type
fileSystem.mkdirs(dirPath);
MarkerUtils.writeMarkerTypeToFile(MarkerType.TIMELINE_SERVER_BASED, fileSystem, markerDirPath);
Expand Down

0 comments on commit 95d7489

Please sign in to comment.