Skip to content

Commit

Permalink
Add general storage API and refactor existing store implementations (#…
Browse files Browse the repository at this point in the history
…567)

* Add storage interfaces, basic file structure (#529)

* Add storage interfaces, basic file structure

* Apply spotless, add comments

* Move parseResponse and isEmpty to response object

* Make changes to write interface to be more beam-like

* Pass feature specs to the retriever

* Pass feature specs to online retriever

* Add FeatureSetRequest

* Add mistakenly removed TestUtil

* Add mistakenly removed TestUtil

* Add BigQuery storage (#546)

* Add Redis storage implementation (#547)

* Add Redis storage

* Remove staleness check; can be checked at the service level

* Remove staleness related tests

* Add dependencies to top level pom

* Clean up code

* Change serving and ingestion to use storage API (#553)

* Change serving and ingestion to use storage API

* Remove extra exclusion clause

* Storage refactor API and docstring tweaks (#569)

* API and docstring tweaks

* Fix javadoc linting errors

* Apply spotless

* Fix javadoc formatting

* Drop result from HistoricalRetrievalResult constructors

* Change pipeline to use DeadletterSink API (#586)

* Add better code docs to storage refactor (#601)

* Add better code documentation, make GetFeastServingInfo independent of retriever

* Make getStagingLocation method of historical retriever

* Apply spotless

* Clean up dependencies, remove exclusions at serving (#607)

* Clean up OnlineServingService code (#605)

* Clean up OnlineServingService code to be more readable

* Revert Metrics

* Rename storage API packages to nouns
  • Loading branch information
Chen Zhiling authored Apr 7, 2020
1 parent 9139fe3 commit e4f8fe9
Show file tree
Hide file tree
Showing 71 changed files with 3,711 additions and 2,711 deletions.
18 changes: 18 additions & 0 deletions ingestion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,24 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
<artifactId>feast-storage-api</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
<artifactId>feast-storage-connector-redis</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
<artifactId>feast-storage-connector-bigquery</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
Expand Down
78 changes: 39 additions & 39 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,26 @@
package feast.ingestion;

import static feast.ingestion.utils.SpecUtil.getFeatureSetReference;
import static feast.ingestion.utils.StoreUtil.getFeatureSink;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.FeatureSetProto.FeatureSet;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.SourceProto.Source;
import feast.core.StoreProto.Store;
import feast.ingestion.options.BZip2Decompressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.options.StringListStreamConverter;
import feast.ingestion.transform.ReadFromSource;
import feast.ingestion.transform.ValidateFeatureRows;
import feast.ingestion.transform.WriteFailedElementToBigQuery;
import feast.ingestion.transform.WriteToStore;
import feast.ingestion.transform.metrics.WriteMetricsTransform;
import feast.ingestion.utils.ResourceUtil;
import feast.ingestion.transform.metrics.WriteFailureMetricsTransform;
import feast.ingestion.transform.metrics.WriteSuccessMetricsTransform;
import feast.ingestion.utils.SpecUtil;
import feast.ingestion.utils.StoreUtil;
import feast.ingestion.values.FailedElement;
import feast.storage.api.writer.DeadletterSink;
import feast.storage.api.writer.FailedElement;
import feast.storage.api.writer.FeatureSink;
import feast.storage.api.writer.WriteResult;
import feast.storage.connectors.bigquery.writer.BigQueryDeadletterSink;
import feast.types.FeatureRowProto.FeatureRow;
import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -93,17 +96,24 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
SpecUtil.getSubscribedFeatureSets(store.getSubscriptionsList(), featureSets);

// Generate tags by key
Map<String, FeatureSet> featureSetsByKey = new HashMap<>();
Map<String, FeatureSetSpec> featureSetSpecsByKey = new HashMap<>();
subscribedFeatureSets.stream()
.forEach(
fs -> {
String ref = getFeatureSetReference(fs);
featureSetsByKey.put(ref, fs);
String ref = getFeatureSetReference(fs.getSpec());
featureSetSpecsByKey.put(ref, fs.getSpec());
});

FeatureSink featureSink = getFeatureSink(store, featureSetSpecsByKey);

// TODO: make the source part of the job initialisation options
Source source = subscribedFeatureSets.get(0).getSpec().getSource();

for (FeatureSet featureSet : subscribedFeatureSets) {
// Ensure Store has valid configuration and Feast can access it.
featureSink.prepareWrite(featureSet);
}

// Step 1. Read messages from Feast Source as FeatureRow.
PCollectionTuple convertedFeatureRows =
pipeline.apply(
Expand All @@ -114,58 +124,48 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
.setFailureTag(DEADLETTER_OUT)
.build());

for (FeatureSet featureSet : subscribedFeatureSets) {
// Ensure Store has valid configuration and Feast can access it.
StoreUtil.setupStore(store, featureSet);
}

// Step 2. Validate incoming FeatureRows
PCollectionTuple validatedRows =
convertedFeatureRows
.get(FEATURE_ROW_OUT)
.apply(
ValidateFeatureRows.newBuilder()
.setFeatureSets(featureSetsByKey)
.setFeatureSetSpecs(featureSetSpecsByKey)
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.build());

// Step 3. Write FeatureRow to the corresponding Store.
validatedRows
.get(FEATURE_ROW_OUT)
.apply(
"WriteFeatureRowToStore",
WriteToStore.newBuilder().setFeatureSets(featureSetsByKey).setStore(store).build());
WriteResult writeFeatureRows =
validatedRows.get(FEATURE_ROW_OUT).apply("WriteFeatureRowToStore", featureSink.writer());

// Step 4. Write FailedElements to a dead letter table in BigQuery.
if (options.getDeadLetterTableSpec() != null) {
// TODO: make deadletter destination type configurable
DeadletterSink deadletterSink =
new BigQueryDeadletterSink(options.getDeadLetterTableSpec());

convertedFeatureRows
.get(DEADLETTER_OUT)
.apply(
"WriteFailedElements_ReadFromSource",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());
.apply("WriteFailedElements_ReadFromSource", deadletterSink.write());

validatedRows
.get(DEADLETTER_OUT)
.apply(
"WriteFailedElements_ValidateRows",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());
.apply("WriteFailedElements_ValidateRows", deadletterSink.write());

writeFeatureRows
.getFailedInserts()
.apply("WriteFailedElements_WriteFeatureRowToStore", deadletterSink.write());
}

// Step 5. Write metrics to a metrics sink.
validatedRows.apply(
"WriteMetrics",
WriteMetricsTransform.newBuilder()
.setStoreName(store.getName())
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.build());
writeFeatureRows
.getSuccessfulInserts()
.apply("WriteSuccessMetrics", WriteSuccessMetricsTransform.create(store.getName()));

writeFeatureRows
.getFailedInserts()
.apply("WriteFailureMetrics", WriteFailureMetricsTransform.create(store.getName()));
}

return pipeline.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import feast.core.SourceProto.Source;
import feast.core.SourceProto.SourceType;
import feast.ingestion.transform.fn.KafkaRecordToFeatureRowDoFn;
import feast.ingestion.values.FailedElement;
import feast.storage.api.writer.FailedElement;
import feast.types.FeatureRowProto.FeatureRow;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import com.google.auto.value.AutoValue;
import feast.core.FeatureSetProto;
import feast.ingestion.transform.fn.ValidateFeatureRowDoFn;
import feast.ingestion.values.FailedElement;
import feast.ingestion.values.FeatureSet;
import feast.storage.api.writer.FailedElement;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -36,7 +36,7 @@
public abstract class ValidateFeatureRows
extends PTransform<PCollection<FeatureRow>, PCollectionTuple> {

public abstract Map<String, FeatureSetProto.FeatureSet> getFeatureSets();
public abstract Map<String, FeatureSetProto.FeatureSetSpec> getFeatureSetSpecs();

public abstract TupleTag<FeatureRow> getSuccessTag();

Expand All @@ -49,7 +49,8 @@ public static Builder newBuilder() {
@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setFeatureSets(Map<String, FeatureSetProto.FeatureSet> featureSets);
public abstract Builder setFeatureSetSpecs(
Map<String, FeatureSetProto.FeatureSetSpec> featureSets);

public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);

Expand All @@ -62,7 +63,7 @@ public abstract static class Builder {
public PCollectionTuple expand(PCollection<FeatureRow> input) {

Map<String, FeatureSet> featureSets =
getFeatureSets().entrySet().stream()
getFeatureSetSpecs().entrySet().stream()
.map(e -> Pair.of(e.getKey(), new FeatureSet(e.getValue())))
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.google.api.services.bigquery.model.TableRow;
import com.google.auto.value.AutoValue;
import feast.ingestion.values.FailedElement;
import feast.storage.api.writer.FailedElement;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
Expand Down
168 changes: 0 additions & 168 deletions ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

import com.google.auto.value.AutoValue;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.ingestion.transform.ReadFromSource.Builder;
import feast.ingestion.values.FailedElement;
import feast.storage.api.writer.FailedElement;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Base64;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
Expand Down
Loading

0 comments on commit e4f8fe9

Please sign in to comment.