Skip to content

Commit

Permalink
Change pipeline to use DeadletterSink API (#586)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chen Zhiling committed Apr 3, 2020
1 parent 5644bfe commit e75050e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 21 deletions.
29 changes: 9 additions & 20 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
import feast.ingestion.options.StringListStreamConverter;
import feast.ingestion.transform.ReadFromSource;
import feast.ingestion.transform.ValidateFeatureRows;
import feast.ingestion.transform.WriteFailedElementToBigQuery;
import feast.ingestion.transform.metrics.WriteFailureMetricsTransform;
import feast.ingestion.transform.metrics.WriteSuccessMetricsTransform;
import feast.ingestion.utils.ResourceUtil;
import feast.ingestion.utils.SpecUtil;
import feast.storage.api.write.DeadletterSink;
import feast.storage.api.write.FailedElement;
import feast.storage.api.write.FeatureSink;
import feast.storage.api.write.WriteResult;
import feast.storage.connectors.bigquery.write.BigQueryDeadletterSink;
import feast.types.FeatureRowProto.FeatureRow;
import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -141,32 +141,21 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti

// Step 4. Write FailedElements to a dead letter table in BigQuery.
if (options.getDeadLetterTableSpec() != null) {
// TODO: make deadletter destination type configurable
DeadletterSink deadletterSink =
new BigQueryDeadletterSink(options.getDeadLetterTableSpec());

convertedFeatureRows
.get(DEADLETTER_OUT)
.apply(
"WriteFailedElements_ReadFromSource",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());
.apply("WriteFailedElements_ReadFromSource", deadletterSink.write());

validatedRows
.get(DEADLETTER_OUT)
.apply(
"WriteFailedElements_ValidateRows",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());
.apply("WriteFailedElements_ValidateRows", deadletterSink.write());

writeFeatureRows
.getFailedInserts()
.apply(
"WriteFailedElements_WriteFeatureRowToStore",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());
.apply("WriteFailedElements_WriteFeatureRowToStore", deadletterSink.write());
}

// Step 5. Write metrics to a metrics sink.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package feast.storage.connectors.bigquery.write;

import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.auto.value.AutoValue;
import com.google.common.io.Resources;
import feast.storage.api.write.DeadletterSink;
Expand All @@ -36,6 +37,7 @@ public class BigQueryDeadletterSink implements DeadletterSink {

private static final String DEADLETTER_SCHEMA_FILE_PATH = "schemas/deadletter_table_schema.json";
private static final Logger log = org.slf4j.LoggerFactory.getLogger(BigQueryDeadletterSink.class);
private static final String TIMESTAMP_COLUMN = "timestamp";

private final String tableSpec;
private String jsonSchema;
Expand Down Expand Up @@ -97,13 +99,16 @@ public abstract static class Builder {

@Override
public PDone expand(PCollection<FailedElement> input) {
TimePartitioning partition = new TimePartitioning().setType("DAY");
partition.setField(TIMESTAMP_COLUMN);
input
.apply("FailedElementToTableRow", ParDo.of(new FailedElementToTableRowFn()))
.apply(
"WriteFailedElementsToBigQuery",
BigQueryIO.writeTableRows()
.to(getTableSpec())
.withJsonSchema(getJsonSchema())
.withTimePartitioning(partition)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
return PDone.in(input.getPipeline());
Expand All @@ -116,7 +121,7 @@ public void processElement(ProcessContext context) {
final FailedElement element = context.element();
final TableRow tableRow =
new TableRow()
.set("timestamp", element.getTimestamp().toString())
.set(TIMESTAMP_COLUMN, element.getTimestamp().toString())
.set("job_name", element.getJobName())
.set("transform_name", element.getTransformName())
.set("payload", element.getPayload())
Expand Down

0 comments on commit e75050e

Please sign in to comment.