From e75050e6eefa486146e71dc283e5e8fec8b876f2 Mon Sep 17 00:00:00 2001 From: Chen Zhiling Date: Sun, 29 Mar 2020 12:19:43 +0800 Subject: [PATCH] Change pipeline to use DeadletterSink API (#586) --- .../main/java/feast/ingestion/ImportJob.java | 29 ++++++------------- .../write/BigQueryDeadletterSink.java | 7 ++++- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 5b0bf88924..c51ea1f944 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -29,14 +29,14 @@ import feast.ingestion.options.StringListStreamConverter; import feast.ingestion.transform.ReadFromSource; import feast.ingestion.transform.ValidateFeatureRows; -import feast.ingestion.transform.WriteFailedElementToBigQuery; import feast.ingestion.transform.metrics.WriteFailureMetricsTransform; import feast.ingestion.transform.metrics.WriteSuccessMetricsTransform; -import feast.ingestion.utils.ResourceUtil; import feast.ingestion.utils.SpecUtil; +import feast.storage.api.write.DeadletterSink; import feast.storage.api.write.FailedElement; import feast.storage.api.write.FeatureSink; import feast.storage.api.write.WriteResult; +import feast.storage.connectors.bigquery.write.BigQueryDeadletterSink; import feast.types.FeatureRowProto.FeatureRow; import java.io.IOException; import java.util.HashMap; @@ -141,32 +141,21 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti // Step 4. Write FailedElements to a dead letter table in BigQuery. if (options.getDeadLetterTableSpec() != null) { + // TODO: make deadletter destination type configurable + DeadletterSink deadletterSink = + new BigQueryDeadletterSink(options.getDeadLetterTableSpec()); + convertedFeatureRows .get(DEADLETTER_OUT) - .apply( - "WriteFailedElements_ReadFromSource", - WriteFailedElementToBigQuery.newBuilder() - .setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson()) - .setTableSpec(options.getDeadLetterTableSpec()) - .build()); + .apply("WriteFailedElements_ReadFromSource", deadletterSink.write()); validatedRows .get(DEADLETTER_OUT) - .apply( - "WriteFailedElements_ValidateRows", - WriteFailedElementToBigQuery.newBuilder() - .setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson()) - .setTableSpec(options.getDeadLetterTableSpec()) - .build()); + .apply("WriteFailedElements_ValidateRows", deadletterSink.write()); writeFeatureRows .getFailedInserts() - .apply( - "WriteFailedElements_WriteFeatureRowToStore", - WriteFailedElementToBigQuery.newBuilder() - .setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson()) - .setTableSpec(options.getDeadLetterTableSpec()) - .build()); + .apply("WriteFailedElements_WriteFeatureRowToStore", deadletterSink.write()); } // Step 5. Write metrics to a metrics sink. diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/BigQueryDeadletterSink.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/BigQueryDeadletterSink.java index 91e3c2fd31..0b88c93dc2 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/BigQueryDeadletterSink.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/BigQueryDeadletterSink.java @@ -17,6 +17,7 @@ package feast.storage.connectors.bigquery.write; import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TimePartitioning; import com.google.auto.value.AutoValue; import com.google.common.io.Resources; import feast.storage.api.write.DeadletterSink; @@ -36,6 +37,7 @@ public class BigQueryDeadletterSink implements DeadletterSink { private static final String DEADLETTER_SCHEMA_FILE_PATH = "schemas/deadletter_table_schema.json"; private static final Logger log = org.slf4j.LoggerFactory.getLogger(BigQueryDeadletterSink.class); + private static final String TIMESTAMP_COLUMN = "timestamp"; private final String tableSpec; private String jsonSchema; @@ -97,6 +99,8 @@ public abstract static class Builder { @Override public PDone expand(PCollection input) { + TimePartitioning partition = new TimePartitioning().setType("DAY"); + partition.setField(TIMESTAMP_COLUMN); input .apply("FailedElementToTableRow", ParDo.of(new FailedElementToTableRowFn())) .apply( @@ -104,6 +108,7 @@ public PDone expand(PCollection input) { BigQueryIO.writeTableRows() .to(getTableSpec()) .withJsonSchema(getJsonSchema()) + .withTimePartitioning(partition) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(WriteDisposition.WRITE_APPEND)); return PDone.in(input.getPipeline()); @@ -116,7 +121,7 @@ public void processElement(ProcessContext context) { final FailedElement element = context.element(); final TableRow tableRow = new TableRow() - .set("timestamp", element.getTimestamp().toString()) + .set(TIMESTAMP_COLUMN, element.getTimestamp().toString()) .set("job_name", element.getJobName()) .set("transform_name", element.getTransformName()) .set("payload", element.getPayload())