Skip to content

Commit

Permalink
Merge branch 'master' into fg_reader_implement_bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Vexler committed Nov 27, 2023
2 parents 38b2603 + 4c3a1db commit c22d1db
Show file tree
Hide file tree
Showing 250 changed files with 3,399 additions and 7,915 deletions.
41 changes: 23 additions & 18 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ jobs:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
run:
mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS -am -pl "hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES"
mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS -am -pl "hudi-examples/hudi-examples-spark,$SPARK_COMMON_MODULES,$SPARK_MODULES"
- name: Quickstart Test
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
Expand All @@ -112,7 +112,7 @@ jobs:
SPARK_MODULES: ${{ matrix.sparkModules }}
if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI
run:
mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- name: FT - Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
Expand Down Expand Up @@ -221,7 +221,6 @@ jobs:
strategy:
matrix:
include:
- flinkProfile: "flink1.13"
- flinkProfile: "flink1.14"
- flinkProfile: "flink1.15"
- flinkProfile: "flink1.16"
Expand Down Expand Up @@ -300,28 +299,22 @@ jobs:
- flinkProfile: 'flink1.18'
sparkProfile: 'spark3.4'
sparkRuntime: 'spark3.4.0'
- flinkProfile: 'flink1.18'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.2'
- flinkProfile: 'flink1.17'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.2'
- flinkProfile: 'flink1.16'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.2'
- flinkProfile: 'flink1.15'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.1'
- flinkProfile: 'flink1.14'
- flinkProfile: 'flink1.15'
sparkProfile: 'spark3.2'
sparkRuntime: 'spark3.2.3'
- flinkProfile: 'flink1.13'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.1'
sparkRuntime: 'spark3.1.3'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.0'
sparkRuntime: 'spark3.0.2'
- flinkProfile: 'flink1.13'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark2.4'
sparkRuntime: 'spark2.4.8'
steps:
Expand Down Expand Up @@ -381,22 +374,34 @@ jobs:
strategy:
matrix:
include:
- flinkProfile: 'flink1.16'
- flinkProfile: 'flink1.18'
sparkProfile: 'spark3'
sparkRuntime: 'spark3.5.0'
- flinkProfile: 'flink1.18'
sparkProfile: 'spark3.5'
sparkRuntime: 'spark3.5.0'
- flinkProfile: 'flink1.18'
sparkProfile: 'spark3.4'
sparkRuntime: 'spark3.4.0'
- flinkProfile: 'flink1.17'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.2'
- flinkProfile: 'flink1.15'
- flinkProfile: 'flink1.16'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.1'
- flinkProfile: 'flink1.14'
- flinkProfile: 'flink1.15'
sparkProfile: 'spark3.2'
sparkRuntime: 'spark3.2.3'
- flinkProfile: 'flink1.13'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.1'
sparkRuntime: 'spark3.1.3'
- flinkProfile: 'flink1.13'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.0'
sparkRuntime: 'spark3.0.2'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark'
sparkRuntime: 'spark2.4.8'
- flinkProfile: 'flink1.13'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark2.4'
sparkRuntime: 'spark2.4.8'
steps:
Expand Down
16 changes: 6 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ analytical datasets on DFS (Cloud stores, HDFS or any Hadoop FileSystem compatib
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.apache.hudi/hudi/badge.svg)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.hudi%22)
![GitHub commit activity](https://img.shields.io/github/commit-activity/m/apache/hudi)
[![Join on Slack](https://img.shields.io/badge/slack-%23hudi-72eff8?logo=slack&color=48c628&label=Join%20on%20Slack)](https://join.slack.com/t/apache-hudi/shared_invite/zt-1e94d3xro-JvlNO1kSeIHJBTVfLPlI5w)
![Twitter Follow](https://img.shields.io/twitter/follow/ApacheHudi)
[![Twitter Follow](https://img.shields.io/twitter/follow/ApacheHudi)](https://twitter.com/apachehudi)

## Features

Expand Down Expand Up @@ -66,8 +66,8 @@ git clone https://github.com/apache/hudi.git && cd hudi
mvn clean package -DskipTests
# Start command
spark-3.2.3-bin-hadoop3.2/bin/spark-shell \
--jars `ls packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-*.*.*-SNAPSHOT.jar` \
spark-3.5.0-bin-hadoop3/bin/spark-shell \
--jars `ls packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-*.*.*-SNAPSHOT.jar` \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
Expand All @@ -85,7 +85,7 @@ mvn clean javadoc:aggregate -Pjavadocs
### Build with different Spark versions

The default Spark 2.x version supported is 2.4.4. The default Spark 3.x version, corresponding to `spark3` profile is
3.4.0. The default Scala version is 2.12. Refer to the table below for building with different Spark and Scala versions.
3.5.0. The default Scala version is 2.12. Refer to the table below for building with different Spark and Scala versions.

| Maven build options | Expected Spark bundle jar name | Notes |
|:--------------------------|:---------------------------------------------|:-------------------------------------------------|
Expand All @@ -96,9 +96,10 @@ The default Spark 2.x version supported is 2.4.4. The default Spark 3.x version,
| `-Dspark3.2` | hudi-spark3.2-bundle_2.12 | For Spark 3.2.x and Scala 2.12 (same as default) |
| `-Dspark3.3` | hudi-spark3.3-bundle_2.12 | For Spark 3.3.x and Scala 2.12 |
| `-Dspark3.4` | hudi-spark3.4-bundle_2.12 | For Spark 3.4.x and Scala 2.12 |
| `-Dspark3.5` | hudi-spark3.5-bundle_2.12 | For Spark 3.5.x and Scala 2.12 |
| `-Dspark2 -Dscala-2.11` | hudi-spark-bundle_2.11 (legacy bundle name) | For Spark 2.4.4 and Scala 2.11 |
| `-Dspark2 -Dscala-2.12` | hudi-spark-bundle_2.12 (legacy bundle name) | For Spark 2.4.4 and Scala 2.12 |
| `-Dspark3` | hudi-spark3-bundle_2.12 (legacy bundle name) | For Spark 3.4.x and Scala 2.12 |
| `-Dspark3` | hudi-spark3-bundle_2.12 (legacy bundle name) | For Spark 3.5.x and Scala 2.12 |

For example,
```
Expand Down Expand Up @@ -131,8 +132,6 @@ Refer to the table below for building with different Flink and Scala versions.
| `-Dflink1.15` | hudi-flink1.15-bundle | For Flink 1.15 |
| `-Dflink1.14` | hudi-flink1.14-bundle | For Flink 1.14 and Scala 2.12 |
| `-Dflink1.14 -Dscala-2.11` | hudi-flink1.14-bundle | For Flink 1.14 and Scala 2.11 |
| `-Dflink1.13` | hudi-flink1.13-bundle | For Flink 1.13 and Scala 2.12 |
| `-Dflink1.13 -Dscala-2.11` | hudi-flink1.13-bundle | For Flink 1.13 and Scala 2.11 |

For example,
```
Expand All @@ -141,9 +140,6 @@ mvn clean package -DskipTests -Dflink1.15
# Build against Flink 1.14.x and Scala 2.11
mvn clean package -DskipTests -Dflink1.14 -Dscala-2.11
# Build against Flink 1.13.x and Scala 2.12
mvn clean package -DskipTests -Dflink1.13
```

## Running Tests
Expand Down
9 changes: 4 additions & 5 deletions azure-pipelines-20230430.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ parameters:
- 'hudi-common'
- 'hudi-flink-datasource'
- 'hudi-flink-datasource/hudi-flink'
- 'hudi-flink-datasource/hudi-flink1.13.x'
- 'hudi-flink-datasource/hudi-flink1.14.x'
- 'hudi-flink-datasource/hudi-flink1.15.x'
- 'hudi-flink-datasource/hudi-flink1.16.x'
Expand All @@ -42,6 +41,7 @@ parameters:
type: object
default:
- 'hudi-client/hudi-spark-client'
- 'hudi-spark-datasource/hudi-spark'
- name: job3UTModules
type: object
default:
Expand All @@ -65,7 +65,6 @@ parameters:
- '!hudi-examples/hudi-examples-spark'
- '!hudi-flink-datasource'
- '!hudi-flink-datasource/hudi-flink'
- '!hudi-flink-datasource/hudi-flink1.13.x'
- '!hudi-flink-datasource/hudi-flink1.14.x'
- '!hudi-flink-datasource/hudi-flink1.15.x'
- '!hudi-flink-datasource/hudi-flink1.16.x'
Expand All @@ -89,12 +88,12 @@ parameters:
- '!hudi-examples/hudi-examples-spark'
- '!hudi-flink-datasource'
- '!hudi-flink-datasource/hudi-flink'
- '!hudi-flink-datasource/hudi-flink1.13.x'
- '!hudi-flink-datasource/hudi-flink1.14.x'
- '!hudi-flink-datasource/hudi-flink1.15.x'
- '!hudi-flink-datasource/hudi-flink1.16.x'
- '!hudi-flink-datasource/hudi-flink1.17.x'
- '!hudi-flink-datasource/hudi-flink1.18.x'
- '!hudi-spark-datasource/hudi-spark'

variables:
BUILD_PROFILES: '-Dscala-2.12 -Dspark3.2 -Dflink1.18'
Expand Down Expand Up @@ -144,7 +143,7 @@ stages:
grep "testcase" */target/surefire-reports/*.xml */*/target/surefire-reports/*.xml | awk -F'"' ' { print $6,$4,$2 } ' | sort -nr | head -n 100
displayName: Top 100 long-running testcases
- job: UT_FT_2
displayName: FT client/spark-client
displayName: FT client/spark-client & hudi-spark-datasource/hudi-spark
timeoutInMinutes: '150'
steps:
- task: Maven@4
Expand All @@ -156,7 +155,7 @@ stages:
publishJUnitResults: false
jdkVersionOption: '1.8'
- task: Maven@4
displayName: FT client/spark-client
displayName: FT client/spark-client & hudi-spark-datasource/hudi-spark
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

package org.apache.hudi.aws;

import org.apache.hudi.config.HoodieAWSConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.config.HoodieAWSConfig;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;

Expand All @@ -40,6 +41,7 @@ public void testGetAWSCredentials() {
assertEquals("random-session-token", credentials.sessionToken());
}

@Disabled("HUDI-7114")
@Test
public void testGetAWSCredentialsWithInvalidAssumeRole() {
// This test is to ensure that the AWS credentials provider factory fallbacks to default credentials
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig)
if (timelineServer.isPresent() && shouldStopTimelineServer) {
// Stop only if owner
LOG.info("Stopping Timeline service !!");
timelineServer.get().stop();
timelineServer.get().stopForBasePath(basePath);
}

timelineServer = Option.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieLogCompactException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
Expand Down Expand Up @@ -783,9 +784,14 @@ protected void archive(HoodieTable table) {
return;
}
try {
final Timer.Context timerContext = metrics.getArchiveCtx();
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table);
archiver.archiveIfRequired(context, true);
int instantsToArchive = archiver.archiveIfRequired(context, true);
if (timerContext != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
this.metrics.updateArchiveMetrics(durationMs, instantsToArchive);
}
} catch (IOException ioe) {
throw new HoodieIOException("Failed to archive", ioe);
}
Expand Down Expand Up @@ -1083,6 +1089,9 @@ public void rollbackFailedBootstrap() {
table.rollbackBootstrap(context, createNewInstantTime());
LOG.info("Finished rolling back pending bootstrap");
}

// if bootstrap failed, lets delete metadata and restart from scratch
HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,66 +23,42 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* Helper class to instantiate embedded timeline service.
*/
public class EmbeddedTimelineServerHelper {

private static final Logger LOG = LoggerFactory.getLogger(EmbeddedTimelineService.class);

private static Option<EmbeddedTimelineService> TIMELINE_SERVER = Option.empty();

/**
* Instantiate Embedded Timeline Server.
* @param context Hoodie Engine Context
* @param config Hoodie Write Config
* @return TimelineServer if configured to run
* @throws IOException
*/
public static synchronized Option<EmbeddedTimelineService> createEmbeddedTimelineService(
public static Option<EmbeddedTimelineService> createEmbeddedTimelineService(
HoodieEngineContext context, HoodieWriteConfig config) throws IOException {
if (config.isEmbeddedTimelineServerReuseEnabled()) {
if (!TIMELINE_SERVER.isPresent() || !TIMELINE_SERVER.get().canReuseFor(config.getBasePath())) {
TIMELINE_SERVER = Option.of(startTimelineService(context, config));
} else {
updateWriteConfigWithTimelineServer(TIMELINE_SERVER.get(), config);
}
return TIMELINE_SERVER;
}
if (config.isEmbeddedTimelineServerEnabled()) {
return Option.of(startTimelineService(context, config));
Option<String> hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
EmbeddedTimelineService timelineService = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(context, hostAddr.orElse(null), config);
updateWriteConfigWithTimelineServer(timelineService, config);
return Option.of(timelineService);
} else {
return Option.empty();
}
}

private static EmbeddedTimelineService startTimelineService(
HoodieEngineContext context, HoodieWriteConfig config) throws IOException {
// Run Embedded Timeline Server
LOG.info("Starting Timeline service !!");
Option<String> hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
EmbeddedTimelineService timelineService = new EmbeddedTimelineService(
context, hostAddr.orElse(null), config);
timelineService.startServer();
updateWriteConfigWithTimelineServer(timelineService, config);
return timelineService;
}

/**
* Adjusts hoodie write config with timeline server settings.
* @param timelineServer Embedded Timeline Server
* @param config Hoodie Write Config
*/
public static void updateWriteConfigWithTimelineServer(EmbeddedTimelineService timelineServer,
HoodieWriteConfig config) {
HoodieWriteConfig config) {
// Allow executor to find this newly instantiated timeline service
if (config.isEmbeddedTimelineServerEnabled()) {
config.setViewStorageConfig(timelineServer.getRemoteFileSystemViewConfig());
}
}
}
}
Loading

0 comments on commit c22d1db

Please sign in to comment.