Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow empty string to select a NoOp write transform #30

Merged
merged 2 commits into from
Jan 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import feast.ingestion.exceptions.ErrorsHandler;
import feast.ingestion.model.Specs;
import feast.ingestion.transform.FeatureIO.Write;
import feast.ingestion.transform.SplitFeatures.MultiOutputSplit;
import feast.ingestion.values.PFeatureRows;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.specs.StorageSpecProto.StorageSpec;
import feast.storage.FeatureStore;
import feast.storage.noop.NoOpIO;
import feast.types.FeatureRowExtendedProto.Attempt;
import feast.types.FeatureRowExtendedProto.Error;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand All @@ -35,29 +47,19 @@
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import feast.ingestion.exceptions.ErrorsHandler;
import feast.ingestion.model.Specs;
import feast.ingestion.transform.FeatureIO.Write;
import feast.ingestion.transform.SplitFeatures.MultiOutputSplit;
import feast.ingestion.values.PFeatureRows;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.specs.StorageSpecProto.StorageSpec;
import feast.storage.FeatureStore;
import feast.storage.noop.NoOpIO;
import feast.types.FeatureRowExtendedProto.Attempt;
import feast.types.FeatureRowExtendedProto.Error;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;

@AllArgsConstructor
@Slf4j
public class SplitOutputByStore extends PTransform<PFeatureRows, PFeatureRows> {

private Collection<? extends FeatureStore> stores;
private SerializableFunction<FeatureSpec, String> selector;
private Specs specs;

@Override
public PFeatureRows expand(PFeatureRows input) {
Map<String, Write> transforms = getFeatureStoreTransforms();
transforms.put("", new NoOpIO.Write());
Set<String> keys = transforms.keySet();
Preconditions.checkArgument(transforms.size() > 0, "no write transforms found");

Expand Down Expand Up @@ -102,6 +104,7 @@ private Map<String, Write> getFeatureStoreTransforms() {

@AllArgsConstructor
public static class WriteTags extends PTransform<PCollectionTuple, PFeatureRows> {

private Map<TupleTag<FeatureRowExtended>, Write> transforms;
private TupleTag<FeatureRowExtended> mainTag;

Expand All @@ -120,9 +123,7 @@ public PFeatureRows expand(PCollectionTuple input) {
}

String message =
"FeatureRow with output tag.no matching storage, these feature's "
+ "specs may be specifying a store which was unknown when "
+ "ingestion started as they somehow passed validation. ";
"FeatureRows have no matching write transform, these rows should not have passed validation.";
PCollection<FeatureRowExtended> errors =
input.get(mainTag).apply(ParDo.of(new WithErrors(getName(), message)));

Expand All @@ -131,8 +132,11 @@ public PFeatureRows expand(PCollectionTuple input) {
}
}

/** Sets the last attempt error for all rows with a given exception */
/**
* Sets the last attempt error for all rows with a given exception
*/
public static class WithErrors extends DoFn<FeatureRowExtended, FeatureRowExtended> {

private Error error;

public WithErrors(Error error) {
Expand Down
11 changes: 6 additions & 5 deletions ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,18 @@
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

@Slf4j
public class ImportJobCSVTest {

@Rule public TemporaryFolder folder = new TemporaryFolder();
@Rule
public TemporaryFolder folder = new TemporaryFolder();

@Rule public TestPipeline testPipeline = TestPipeline.create();
@Rule
public TestPipeline testPipeline = TestPipeline.create();

public ImportSpec initImportSpec(ImportSpec importSpec, String dataFile) throws IOException {
return importSpec.toBuilder().putOptions("path", dataFile).build();
Expand Down Expand Up @@ -123,7 +124,7 @@ public void testImportCSV() throws IOException {

PCollection<FeatureRowExtended> writtenToWarehouse =
PCollectionList.of(
WarehouseStoreService.get(MockWarehouseStore.class).getWrite().getInputs())
WarehouseStoreService.get(MockWarehouseStore.class).getWrite().getInputs())
.apply("flatten warehouse input", Flatten.pCollections());

PCollection<FeatureRowExtended> writtenToErrors =
Expand Down Expand Up @@ -188,7 +189,7 @@ public void testImportCSVUnknownServingStoreError() throws IOException {
+ " fields:\n"
+ " - name: id\n"
+ " - featureId: testEntity.none.redisInt32\n" // Redis is not available by
// default from the json specs
// default from the json specs
+ " - featureId: testEntity.none.testString\n"
+ "\n",
ImportSpec.getDefaultInstance());
Expand Down
Loading