Skip to content

Commit

Permalink
Support for spark datasource options in S3EventsHoodieIncrSource
Browse files Browse the repository at this point in the history
  • Loading branch information
harsh1231 committed Mar 25, 2022
1 parent e271eef commit f653eff
Showing 1 changed file with 15 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;

import com.esotericsoftware.minlog.Log;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.pulsar.shade.com.google.gson.Gson;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
Expand All @@ -55,14 +57,13 @@
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SPARK_DATASOURCE_OPTIONS;

/**
* This source will use the S3 events meta information from hoodie table generate by {@link S3EventsSource}.
*/
public class S3EventsHoodieIncrSource extends HoodieIncrSource {

private static final Logger LOG = LogManager.getLogger(S3EventsHoodieIncrSource.class);
private static final Gson gson = new Gson();

static class Config {
// control whether we do existence check for files before consuming them
static final String ENABLE_EXISTS_CHECK = "hoodie.deltastreamer.source.s3incr.check.file.exists";
Expand Down Expand Up @@ -130,7 +131,7 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
queryTypeAndInstantEndpts.getRight().getLeft()));
}

if (source.isEmpty()) {
return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight());
}
Expand All @@ -146,7 +147,7 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
filter = filter + " and s3.object.key not like '%" + props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'";
}
// add file format filtering by default
filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
filter = filter + " and s3.object.key like '%" + fileFormat + "%'";

String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
String s3Prefix = s3FS + "://";
Expand Down Expand Up @@ -182,8 +183,15 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
DataFrameReader dataFrameReader = sparkSession.read().format(fileFormat);
Map<String, String> arguments = new HashMap<>();
if (!StringUtils.isNullOrEmpty(props.getString(HoodieIncrSource.Config.SPARK_DATASOURCE_OPTIONS, null))) {
Map<String,String> sparkOptionsMaps = gson.fromJson(props.getString(SPARK_DATASOURCE_OPTIONS), arguments.getClass());
dataFrameReader = dataFrameReader.options(sparkOptionsMaps);
final ObjectMapper mapper = new ObjectMapper();
Map<String, String> sparkOptionsMap = null;
try {
sparkOptionsMap = (Map<String, String>) mapper.readValue(props.getString(SPARK_DATASOURCE_OPTIONS), Map.class);
} catch (IOException e) {
throw new HoodieException(String.format("Failed to parse sparkOptions: %s ",props.getString(SPARK_DATASOURCE_OPTIONS)),e);
}
Log.info(String.format("sparkOptions loaded: %s", sparkOptionsMap));
dataFrameReader = dataFrameReader.options(sparkOptionsMap);
}
dataset = Option.of(dataFrameReader.load(cloudFiles.toArray(new String[0])));
}
Expand Down

0 comments on commit f653eff

Please sign in to comment.