From ba3e4d348c36f40d2bbf1ccff25b62ce1c972160 Mon Sep 17 00:00:00 2001 From: Tim Sell Date: Thu, 3 Jan 2019 15:03:40 +1100 Subject: [PATCH 1/2] allow empty string to select a NoOp write transform --- .../transform/SplitOutputByStore.java | 29 +- .../feast/ingestion/ImportJobCSVTest.java | 3 +- .../transform/SplitOutputByStoreTest.java | 281 ++++++++++++++++++ .../java/feast/storage/MockErrorsStore.java | 18 +- .../java/feast/storage/MockFeatureStore.java | 43 +++ .../java/feast/storage/MockServingStore.java | 18 +- .../feast/storage/MockWarehouseStore.java | 18 +- 7 files changed, 348 insertions(+), 62 deletions(-) create mode 100644 ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java create mode 100644 ingestion/src/test/java/feast/storage/MockFeatureStore.java diff --git a/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java b/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java index af09d1e7c6..6851705e58 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java +++ b/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java @@ -19,6 +19,18 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import feast.ingestion.exceptions.ErrorsHandler; +import feast.ingestion.model.Specs; +import feast.ingestion.transform.FeatureIO.Write; +import feast.ingestion.transform.SplitFeatures.MultiOutputSplit; +import feast.ingestion.values.PFeatureRows; +import feast.specs.FeatureSpecProto.FeatureSpec; +import feast.specs.StorageSpecProto.StorageSpec; +import feast.storage.FeatureStore; +import feast.storage.noop.NoOpIO; +import feast.types.FeatureRowExtendedProto.Attempt; +import feast.types.FeatureRowExtendedProto.Error; +import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -35,18 +47,6 @@ import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; -import feast.ingestion.exceptions.ErrorsHandler; -import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO.Write; -import feast.ingestion.transform.SplitFeatures.MultiOutputSplit; -import feast.ingestion.values.PFeatureRows; -import feast.specs.FeatureSpecProto.FeatureSpec; -import feast.specs.StorageSpecProto.StorageSpec; -import feast.storage.FeatureStore; -import feast.storage.noop.NoOpIO; -import feast.types.FeatureRowExtendedProto.Attempt; -import feast.types.FeatureRowExtendedProto.Error; -import feast.types.FeatureRowExtendedProto.FeatureRowExtended; @AllArgsConstructor @Slf4j @@ -58,6 +58,7 @@ public class SplitOutputByStore extends PTransform { @Override public PFeatureRows expand(PFeatureRows input) { Map transforms = getFeatureStoreTransforms(); + transforms.put("", new NoOpIO.Write()); Set keys = transforms.keySet(); Preconditions.checkArgument(transforms.size() > 0, "no write transforms found"); @@ -120,9 +121,7 @@ public PFeatureRows expand(PCollectionTuple input) { } String message = - "FeatureRow with output tag.no matching storage, these feature's " - + "specs may be specifying a store which was unknown when " - + "ingestion started as they somehow passed validation. "; + "FeatureRows have no matching write transform, these rows should not have passed validation."; PCollection errors = input.get(mainTag).apply(ParDo.of(new WithErrors(getName(), message))); diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java index 2ddc57059e..761a4b232a 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java @@ -58,7 +58,6 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; -import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -188,7 +187,7 @@ public void testImportCSVUnknownServingStoreError() throws IOException { + " fields:\n" + " - name: id\n" + " - featureId: testEntity.none.redisInt32\n" // Redis is not available by - // default from the json specs + // default from the json specs + " - featureId: testEntity.none.testString\n" + "\n", ImportSpec.getDefaultInstance()); diff --git a/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java b/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java new file mode 100644 index 0000000000..d5a185d3d8 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java @@ -0,0 +1,281 @@ +package feast.ingestion.transform; + +import static junit.framework.TestCase.assertNull; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.protobuf.Timestamp; +import feast.ingestion.model.Features; +import feast.ingestion.model.Specs; +import feast.ingestion.model.Values; +import feast.ingestion.service.SpecService; +import feast.ingestion.values.PFeatureRows; +import feast.specs.EntitySpecProto.EntitySpec; +import feast.specs.FeatureSpecProto.FeatureSpec; +import feast.specs.ImportSpecProto.Field; +import feast.specs.ImportSpecProto.ImportSpec; +import feast.specs.ImportSpecProto.Schema; +import feast.specs.StorageSpecProto.StorageSpec; +import feast.storage.FeatureStore; +import feast.storage.MockFeatureStore; +import feast.storage.MockTransforms; +import feast.types.FeatureRowExtendedProto.FeatureRowExtended; +import feast.types.FeatureRowProto.FeatureRow; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.Rule; +import org.junit.Test; + +public class SplitOutputByStoreTest { + @Rule public TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testSplit() { + // Note we are stores on the group, instead of warehouse or serving store id. + SerializableFunction selector = FeatureSpec::getGroup; + MockSpecService specService = new MockSpecService(); + specService.entitySpecs.put("e1", EntitySpec.getDefaultInstance()); + specService.featureSpecs.put( + "f1", FeatureSpec.newBuilder().setEntity("e1").setGroup("store1").build()); + specService.featureSpecs.put( + "f2", FeatureSpec.newBuilder().setEntity("e1").setGroup("store2").build()); + specService.storageSpecs.put( + "store1", StorageSpec.newBuilder().setId("store1").setType("type1").build()); + specService.storageSpecs.put( + "store2", StorageSpec.newBuilder().setId("store2").setType("type2").build()); + List stores = + Lists.newArrayList(new MockFeatureStore("type1"), new MockFeatureStore("type2")); + Specs specs = + Specs.of( + "jobname", + ImportSpec.newBuilder() + .addEntities("e1") + .setSchema( + Schema.newBuilder() + .addAllFields( + Lists.newArrayList( + Field.newBuilder().setFeatureId("f1").build(), + Field.newBuilder().setFeatureId("f2").build()))) + .build(), + specService); + assertNull(specs.getError()); + + SplitOutputByStore split = new SplitOutputByStore(stores, selector, specs); + + PCollection input = + pipeline + .apply( + Create.of( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .addFeatures(Features.of("f2", Values.ofInt32(2))) + .build())) + .apply(new ToFeatureRowExtended()); + PFeatureRows pfrows = PFeatureRows.of(input); + pfrows = pfrows.apply("do split", split); + + PAssert.that( + pfrows + .getErrors()).empty(); + PAssert.that( + pfrows + .getMain() + .apply( + MapElements.into(TypeDescriptor.of(FeatureRow.class)) + .via(FeatureRowExtended::getRow))) + .containsInAnyOrder( + Lists.newArrayList( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build(), + FeatureRow.newBuilder() + .addFeatures(Features.of("f2", Values.ofInt32(2))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build())); + + MockTransforms.Write mockSpecService1 = ((MockFeatureStore) stores.get(0)).getWrite(); + MockTransforms.Write mockSpecService2 = ((MockFeatureStore) stores.get(1)).getWrite(); + + PCollection store1Output = + mockSpecService1 + .getInputs() + .get(0) + .apply("map store1 outputs", + MapElements.into(TypeDescriptor.of(FeatureRow.class)) + .via(FeatureRowExtended::getRow)); + + PCollection store2Output = + mockSpecService2 + .getInputs() + .get(0) + .apply("map store2 outputs", + MapElements.into(TypeDescriptor.of(FeatureRow.class)) + .via(FeatureRowExtended::getRow)); + + PAssert.that(store1Output) + .containsInAnyOrder( + Lists.newArrayList( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build())); + + PAssert.that(store2Output) + .containsInAnyOrder( + Lists.newArrayList( + FeatureRow.newBuilder() + .addFeatures(Features.of("f2", Values.ofInt32(2))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build())); + pipeline.run(); + } + + @Test + public void testSplitWhereFeature2HasNoStoreId() { + // Note we are stores on the group, instead of warehouse or serving store id. + SerializableFunction selector = FeatureSpec::getGroup; + MockSpecService specService = new MockSpecService(); + specService.entitySpecs.put("e1", EntitySpec.getDefaultInstance()); + specService.featureSpecs.put( + "f1", FeatureSpec.newBuilder().setEntity("e1").setGroup("store1").build()); + specService.featureSpecs.put( + "f2", FeatureSpec.newBuilder().setEntity("e1").build()); + specService.storageSpecs.put( + "store1", StorageSpec.newBuilder().setId("store1").setType("type1").build()); + specService.storageSpecs.put( + "store2", StorageSpec.newBuilder().setId("store2").setType("type2").build()); + List stores = + Lists.newArrayList(new MockFeatureStore("type1"), new MockFeatureStore("type2")); + Specs specs = + Specs.of( + "jobname", + ImportSpec.newBuilder() + .addEntities("e1") + .setSchema( + Schema.newBuilder() + .addAllFields( + Lists.newArrayList( + Field.newBuilder().setFeatureId("f1").build(), + Field.newBuilder().setFeatureId("f2").build()))) + .build(), + specService); + assertNull(specs.getError()); + + SplitOutputByStore split = new SplitOutputByStore(stores, selector, specs); + + PCollection input = + pipeline + .apply( + Create.of( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .addFeatures(Features.of("f2", Values.ofInt32(2))) + .build())) + .apply(new ToFeatureRowExtended()); + PFeatureRows pfrows = PFeatureRows.of(input); + pfrows = pfrows.apply("do split", split); + + + PAssert.that( + pfrows + .getErrors()).empty(); + PAssert.that( + pfrows + .getMain() + .apply( + MapElements.into(TypeDescriptor.of(FeatureRow.class)) + .via(FeatureRowExtended::getRow))) + .containsInAnyOrder( + Lists.newArrayList( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build(), + FeatureRow.newBuilder() + .addFeatures(Features.of("f2", Values.ofInt32(2))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build())); + + MockTransforms.Write mockSpecService1 = ((MockFeatureStore) stores.get(0)).getWrite(); + MockTransforms.Write mockSpecService2 = ((MockFeatureStore) stores.get(1)).getWrite(); + + PCollection store1Output = + mockSpecService1 + .getInputs() + .get(0) + .apply("map store1 outputs", + MapElements.into(TypeDescriptor.of(FeatureRow.class)) + .via(FeatureRowExtended::getRow)); + + PCollection store2Output = + mockSpecService2 + .getInputs() + .get(0) + .apply("map store2 outputs", + MapElements.into(TypeDescriptor.of(FeatureRow.class)) + .via(FeatureRowExtended::getRow)); + + PAssert.that(store1Output) + .containsInAnyOrder( + Lists.newArrayList( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build())); + + pipeline.run(); + } + + public static class MockSpecService implements SpecService { + public Map entitySpecs = new HashMap<>(); + public Map featureSpecs = new HashMap<>(); + public Map storageSpecs = new HashMap<>(); + + @Override + public Map getEntitySpecs(Iterable entityIds) { + Set entityIdsSet = Sets.newHashSet(entityIds); + return Maps.newHashMap( + Maps.filterEntries(entitySpecs, (entry) -> entityIdsSet.contains(entry.getKey()))); + } + + @Override + public Map getAllEntitySpecs() { + return entitySpecs; + } + + @Override + public Map getFeatureSpecs(Iterable featureIds) { + Set featureIdsSet = Sets.newHashSet(featureIds); + return Maps.newHashMap( + Maps.filterEntries(featureSpecs, (entry) -> featureIdsSet.contains(entry.getKey()))); + } + + @Override + public Map getAllFeatureSpecs() { + return featureSpecs; + } + + @Override + public Map getStorageSpecs(Iterable storageIds) { + Set storageIdsSet = Sets.newHashSet(storageIds); + return Maps.newHashMap( + Maps.filterEntries(storageSpecs, (entry) -> storageIdsSet.contains(entry.getKey()))); + } + + @Override + public Map getAllStorageSpecs() { + return storageSpecs; + } + } +} diff --git a/ingestion/src/test/java/feast/storage/MockErrorsStore.java b/ingestion/src/test/java/feast/storage/MockErrorsStore.java index 1523940f55..9abaf33208 100644 --- a/ingestion/src/test/java/feast/storage/MockErrorsStore.java +++ b/ingestion/src/test/java/feast/storage/MockErrorsStore.java @@ -18,24 +18,12 @@ package feast.storage; import com.google.auto.service.AutoService; -import lombok.Getter; -import feast.ingestion.model.Specs; -import feast.specs.StorageSpecProto.StorageSpec; @AutoService(ErrorsStore.class) -public class MockErrorsStore implements ErrorsStore { +public class MockErrorsStore extends MockFeatureStore implements ErrorsStore { public static final String MOCK_ERRORS_STORE_TYPE = "MOCK_ERRORS_STORE"; - @Getter private MockTransforms.Write write; - - @Override - public MockTransforms.Write create(StorageSpec storageSpec, Specs specs) { - write = new MockTransforms.Write(storageSpec); - return write; - } - - @Override - public String getType() { - return MOCK_ERRORS_STORE_TYPE; + public MockErrorsStore() { + super(MOCK_ERRORS_STORE_TYPE); } } diff --git a/ingestion/src/test/java/feast/storage/MockFeatureStore.java b/ingestion/src/test/java/feast/storage/MockFeatureStore.java new file mode 100644 index 0000000000..2e957d4de2 --- /dev/null +++ b/ingestion/src/test/java/feast/storage/MockFeatureStore.java @@ -0,0 +1,43 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.storage; + +import feast.ingestion.model.Specs; +import feast.specs.StorageSpecProto.StorageSpec; +import lombok.Getter; + +public class MockFeatureStore implements FeatureStore { + + private final String type; + @Getter private MockTransforms.Write write; + + public MockFeatureStore(String type) { + this.type = type; + } + + @Override + public MockTransforms.Write create(StorageSpec storageSpec, Specs specs) { + write = new MockTransforms.Write(storageSpec); + return write; + } + + @Override + public String getType() { + return type; + } +} diff --git a/ingestion/src/test/java/feast/storage/MockServingStore.java b/ingestion/src/test/java/feast/storage/MockServingStore.java index c73494c17b..e52ca046e7 100644 --- a/ingestion/src/test/java/feast/storage/MockServingStore.java +++ b/ingestion/src/test/java/feast/storage/MockServingStore.java @@ -18,24 +18,12 @@ package feast.storage; import com.google.auto.service.AutoService; -import lombok.Getter; -import feast.ingestion.model.Specs; -import feast.specs.StorageSpecProto.StorageSpec; @AutoService(ServingStore.class) -public class MockServingStore implements ServingStore { +public class MockServingStore extends MockFeatureStore implements ServingStore { public static final String MOCK_SERVING_STORE_TYPE = "MOCK_SERVING_STORE"; - @Getter private MockTransforms.Write write; - - @Override - public MockTransforms.Write create(StorageSpec storageSpec, Specs specs) { - write = new MockTransforms.Write(storageSpec); - return write; - } - - @Override - public String getType() { - return MOCK_SERVING_STORE_TYPE; + public MockServingStore() { + super(MOCK_SERVING_STORE_TYPE); } } diff --git a/ingestion/src/test/java/feast/storage/MockWarehouseStore.java b/ingestion/src/test/java/feast/storage/MockWarehouseStore.java index 126e6dfe1f..452a2729df 100644 --- a/ingestion/src/test/java/feast/storage/MockWarehouseStore.java +++ b/ingestion/src/test/java/feast/storage/MockWarehouseStore.java @@ -18,24 +18,12 @@ package feast.storage; import com.google.auto.service.AutoService; -import lombok.Getter; -import feast.ingestion.model.Specs; -import feast.specs.StorageSpecProto.StorageSpec; @AutoService(WarehouseStore.class) -public class MockWarehouseStore implements WarehouseStore { +public class MockWarehouseStore extends MockFeatureStore implements WarehouseStore { public static final String MOCK_WAREHOUSE_STORE_TYPE = "MOCK_WAREHOUSE_STORE"; - @Getter private MockTransforms.Write write; - - @Override - public MockTransforms.Write create(StorageSpec storageSpec, Specs specs) { - write = new MockTransforms.Write(storageSpec); - return write; - } - - @Override - public String getType() { - return MOCK_WAREHOUSE_STORE_TYPE; + public MockWarehouseStore() { + super(MOCK_WAREHOUSE_STORE_TYPE); } } From 21e59fa624d38c957f1cd1662b4c267eb9dad142 Mon Sep 17 00:00:00 2001 From: Tim Sell Date: Mon, 7 Jan 2019 09:36:11 +1100 Subject: [PATCH 2/2] reformat to google styles --- .../ingestion/transform/SplitOutputByStore.java | 7 ++++++- .../java/feast/ingestion/ImportJobCSVTest.java | 8 +++++--- .../transform/SplitOutputByStoreTest.java | 16 +++++++++------- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java b/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java index 6851705e58..ecf587c4fd 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java +++ b/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java @@ -51,6 +51,7 @@ @AllArgsConstructor @Slf4j public class SplitOutputByStore extends PTransform { + private Collection stores; private SerializableFunction selector; private Specs specs; @@ -103,6 +104,7 @@ private Map getFeatureStoreTransforms() { @AllArgsConstructor public static class WriteTags extends PTransform { + private Map, Write> transforms; private TupleTag mainTag; @@ -130,8 +132,11 @@ public PFeatureRows expand(PCollectionTuple input) { } } - /** Sets the last attempt error for all rows with a given exception */ + /** + * Sets the last attempt error for all rows with a given exception + */ public static class WithErrors extends DoFn { + private Error error; public WithErrors(Error error) { diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java index 761a4b232a..47011f5d9a 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java @@ -65,9 +65,11 @@ @Slf4j public class ImportJobCSVTest { - @Rule public TemporaryFolder folder = new TemporaryFolder(); + @Rule + public TemporaryFolder folder = new TemporaryFolder(); - @Rule public TestPipeline testPipeline = TestPipeline.create(); + @Rule + public TestPipeline testPipeline = TestPipeline.create(); public ImportSpec initImportSpec(ImportSpec importSpec, String dataFile) throws IOException { return importSpec.toBuilder().putOptions("path", dataFile).build(); @@ -122,7 +124,7 @@ public void testImportCSV() throws IOException { PCollection writtenToWarehouse = PCollectionList.of( - WarehouseStoreService.get(MockWarehouseStore.class).getWrite().getInputs()) + WarehouseStoreService.get(MockWarehouseStore.class).getWrite().getInputs()) .apply("flatten warehouse input", Flatten.pCollections()); PCollection writtenToErrors = diff --git a/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java b/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java index d5a185d3d8..80ed711533 100644 --- a/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java +++ b/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java @@ -37,7 +37,9 @@ import org.junit.Test; public class SplitOutputByStoreTest { - @Rule public TestPipeline pipeline = TestPipeline.create(); + + @Rule + public TestPipeline pipeline = TestPipeline.create(); @Test public void testSplit() { @@ -88,11 +90,11 @@ public void testSplit() { pfrows .getErrors()).empty(); PAssert.that( - pfrows - .getMain() - .apply( - MapElements.into(TypeDescriptor.of(FeatureRow.class)) - .via(FeatureRowExtended::getRow))) + pfrows + .getMain() + .apply( + MapElements.into(TypeDescriptor.of(FeatureRow.class)) + .via(FeatureRowExtended::getRow))) .containsInAnyOrder( Lists.newArrayList( FeatureRow.newBuilder() @@ -186,7 +188,6 @@ public void testSplitWhereFeature2HasNoStoreId() { PFeatureRows pfrows = PFeatureRows.of(input); pfrows = pfrows.apply("do split", split); - PAssert.that( pfrows .getErrors()).empty(); @@ -238,6 +239,7 @@ public void testSplitWhereFeature2HasNoStoreId() { } public static class MockSpecService implements SpecService { + public Map entitySpecs = new HashMap<>(); public Map featureSpecs = new HashMap<>(); public Map storageSpecs = new HashMap<>();