Skip to content

Commit

Permalink
[HUDI-7095] Making perf enhancements to JSON serde (apache#10097)
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Nov 24, 2023
1 parent 4765f3e commit 3961362
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class TimelineServerBasedWriteMarkers extends WriteMarkers {
private final int timelineServerPort;
private final int timeoutSecs;
private static final TypeReference<Boolean> BOOLEAN_TYPE_REFERENCE = new TypeReference<Boolean>() {};
private static final TypeReference<Set<String>> STRING_TYPE_REFERENCE = new TypeReference<Set<String>>() {};
private static final TypeReference<Set<String>> SET_TYPE_REFERENCE = new TypeReference<Set<String>>() {};

public TimelineServerBasedWriteMarkers(HoodieTable table, String instantTime) {
this(table.getMetaClient().getBasePath(),
Expand Down Expand Up @@ -115,7 +115,7 @@ public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, int pa
Map<String, String> paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
Set<String> markerPaths = executeRequestToTimelineServer(
CREATE_AND_MERGE_MARKERS_URL, paramsMap, STRING_TYPE_REFERENCE, RequestMethod.GET);
CREATE_AND_MERGE_MARKERS_URL, paramsMap, SET_TYPE_REFERENCE, RequestMethod.GET);
return markerPaths.stream().map(WriteMarkers::stripMarkerSuffix).collect(Collectors.toSet());
} catch (IOException e) {
throw new HoodieRemoteException("Failed to get CREATE and MERGE data file paths in "
Expand All @@ -128,7 +128,7 @@ public Set<String> allMarkerFilePaths() {
Map<String, String> paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
return executeRequestToTimelineServer(
ALL_MARKERS_URL, paramsMap, STRING_TYPE_REFERENCE, RequestMethod.GET);
ALL_MARKERS_URL, paramsMap, SET_TYPE_REFERENCE, RequestMethod.GET);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to get all markers in " + markerDirPath.toString(), e);
}
Expand Down
6 changes: 6 additions & 0 deletions hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@
<artifactId>jackson-module-afterburner</artifactId>
</dependency>


<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-afterburner</artifactId>
</dependency>

<!-- Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
*/
public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, Serializable {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule());

private static final String BASE_URL = "/v1/hoodie/view";
public static final String LATEST_PARTITION_SLICES_URL = String.format("%s/%s", BASE_URL, "slices/partition/latest/");
public static final String LATEST_PARTITION_SLICE_URL = String.format("%s/%s", BASE_URL, "slices/file/latest/");
Expand Down Expand Up @@ -113,7 +115,6 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,

public static final String PENDING_CLUSTERING_FILEGROUPS = String.format("%s/%s", BASE_URL, "clustering/pending/");


public static final String LAST_INSTANT = String.format("%s/%s", BASE_URL, "timeline/instant/last");
public static final String LAST_INSTANTS = String.format("%s/%s", BASE_URL, "timeline/instants/last");

Expand Down Expand Up @@ -147,7 +148,6 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
private static final TypeReference<List<BaseFileDTO>> BASE_FILE_DTOS_REFERENCE = new TypeReference<List<BaseFileDTO>>() {};
private static final TypeReference<Map<String, List<BaseFileDTO>>> BASE_FILE_MAP_REFERENCE = new TypeReference<Map<String, List<BaseFileDTO>>>() {};
private static final TypeReference<Map<String, List<FileSliceDTO>>> FILE_SLICE_MAP_REFERENCE = new TypeReference<Map<String, List<FileSliceDTO>>>() {};
private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new AfterburnerModule());

private final String serverHost;
private final int serverPort;
Expand Down Expand Up @@ -202,7 +202,7 @@ private <T> T executeRequest(String requestPath, Map<String, String> queryParame
LOG.info("Sending request : (" + url + ")");
Response response = retryHelper != null ? retryHelper.start(() -> get(timeoutMs, url, method)) : get(timeoutMs, url, method);
String content = response.returnContent().asString(Consts.UTF_8);
return MAPPER.readValue(content, reference);
return (T) OBJECT_MAPPER.readValue(content, reference);
}

private Map<String, String> getParamsWithPartitionPath(String partitionPath) {
Expand Down Expand Up @@ -363,7 +363,8 @@ public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, Str
new String[] {MAX_INSTANT_PARAM, INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM},
new String[] {maxCommitTime, String.valueOf(includeFileSlicesInPendingCompaction)});
try {
List<FileSliceDTO> dataFiles = executeRequest(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap, FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
List<FileSliceDTO> dataFiles = executeRequest(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap,
FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
return dataFiles.stream().map(FileSliceDTO::toFileSlice);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand Down
15 changes: 13 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,11 @@
<shadedPattern>org.apache.hudi.org.apache.hadoop.metrics2.util.MetricSampleQuantiles
</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml.jackson.module</pattern>
<shadedPattern>org.apache.hudi.com.fasterxml.jackson.module
</shadedPattern>
</relocation>
</relocations>
</configuration>
</plugin>
Expand Down Expand Up @@ -879,6 +884,12 @@
<version>${fasterxml.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-afterburner</artifactId>
<version>${fasterxml.jackson.databind.version}</version>
</dependency>

<!-- Glassfish -->
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
Expand Down Expand Up @@ -2180,7 +2191,7 @@
<avro.version>1.8.2</avro.version>
<antlr.version>4.7</antlr.version>
<fasterxml.version>2.6.7</fasterxml.version>
<fasterxml.jackson.databind.version>2.6.7.3</fasterxml.jackson.databind.version>
<fasterxml.jackson.databind.version>${fasterxml.version}</fasterxml.jackson.databind.version>
<fasterxml.jackson.module.scala.version>2.6.7.1</fasterxml.jackson.module.scala.version>
<fasterxml.jackson.dataformat.yaml.version>2.7.4</fasterxml.jackson.dataformat.yaml.version>
<skip.hudi-spark3.unit.tests>true</skip.hudi-spark3.unit.tests>
Expand Down Expand Up @@ -2212,7 +2223,7 @@
<avro.version>1.8.2</avro.version>
<antlr.version>4.7</antlr.version>
<fasterxml.version>2.6.7</fasterxml.version>
<fasterxml.jackson.databind.version>2.6.7.3</fasterxml.jackson.databind.version>
<fasterxml.jackson.databind.version>${fasterxml.version}</fasterxml.jackson.databind.version>
<fasterxml.jackson.module.scala.version>2.6.7.1</fasterxml.jackson.module.scala.version>
<fasterxml.jackson.dataformat.yaml.version>2.7.4</fasterxml.jackson.dataformat.yaml.version>
<skip.hudi-spark3.unit.tests>true</skip.hudi-spark3.unit.tests>
Expand Down

0 comments on commit 3961362

Please sign in to comment.