diff --git a/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java b/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java index af09d1e7c6..ecf587c4fd 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java +++ b/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java @@ -19,6 +19,18 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import feast.ingestion.exceptions.ErrorsHandler; +import feast.ingestion.model.Specs; +import feast.ingestion.transform.FeatureIO.Write; +import feast.ingestion.transform.SplitFeatures.MultiOutputSplit; +import feast.ingestion.values.PFeatureRows; +import feast.specs.FeatureSpecProto.FeatureSpec; +import feast.specs.StorageSpecProto.StorageSpec; +import feast.storage.FeatureStore; +import feast.storage.noop.NoOpIO; +import feast.types.FeatureRowExtendedProto.Attempt; +import feast.types.FeatureRowExtendedProto.Error; +import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -35,22 +47,11 @@ import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; -import feast.ingestion.exceptions.ErrorsHandler; -import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO.Write; -import feast.ingestion.transform.SplitFeatures.MultiOutputSplit; -import feast.ingestion.values.PFeatureRows; -import feast.specs.FeatureSpecProto.FeatureSpec; -import feast.specs.StorageSpecProto.StorageSpec; -import feast.storage.FeatureStore; -import feast.storage.noop.NoOpIO; -import feast.types.FeatureRowExtendedProto.Attempt; -import feast.types.FeatureRowExtendedProto.Error; -import feast.types.FeatureRowExtendedProto.FeatureRowExtended; @AllArgsConstructor @Slf4j public class SplitOutputByStore extends PTransform { + private Collection stores; private SerializableFunction selector; private Specs specs; @@ -58,6 +59,7 @@ public class SplitOutputByStore extends PTransform { @Override public PFeatureRows expand(PFeatureRows input) { Map transforms = getFeatureStoreTransforms(); + transforms.put("", new NoOpIO.Write()); Set keys = transforms.keySet(); Preconditions.checkArgument(transforms.size() > 0, "no write transforms found"); @@ -102,6 +104,7 @@ private Map getFeatureStoreTransforms() { @AllArgsConstructor public static class WriteTags extends PTransform { + private Map, Write> transforms; private TupleTag mainTag; @@ -120,9 +123,7 @@ public PFeatureRows expand(PCollectionTuple input) { } String message = - "FeatureRow with output tag.no matching storage, these feature's " - + "specs may be specifying a store which was unknown when " - + "ingestion started as they somehow passed validation. "; + "FeatureRows have no matching write transform, these rows should not have passed validation."; PCollection errors = input.get(mainTag).apply(ParDo.of(new WithErrors(getName(), message))); @@ -131,8 +132,11 @@ public PFeatureRows expand(PCollectionTuple input) { } } - /** Sets the last attempt error for all rows with a given exception */ + /** + * Sets the last attempt error for all rows with a given exception + */ public static class WithErrors extends DoFn { + private Error error; public WithErrors(Error error) { diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java index 2ddc57059e..47011f5d9a 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java @@ -58,7 +58,6 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; -import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -66,9 +65,11 @@ @Slf4j public class ImportJobCSVTest { - @Rule public TemporaryFolder folder = new TemporaryFolder(); + @Rule + public TemporaryFolder folder = new TemporaryFolder(); - @Rule public TestPipeline testPipeline = TestPipeline.create(); + @Rule + public TestPipeline testPipeline = TestPipeline.create(); public ImportSpec initImportSpec(ImportSpec importSpec, String dataFile) throws IOException { return importSpec.toBuilder().putOptions("path", dataFile).build(); @@ -123,7 +124,7 @@ public void testImportCSV() throws IOException { PCollection writtenToWarehouse = PCollectionList.of( - WarehouseStoreService.get(MockWarehouseStore.class).getWrite().getInputs()) + WarehouseStoreService.get(MockWarehouseStore.class).getWrite().getInputs()) .apply("flatten warehouse input", Flatten.pCollections()); PCollection writtenToErrors = @@ -188,7 +189,7 @@ public void testImportCSVUnknownServingStoreError() throws IOException { + " fields:\n" + " - name: id\n" + " - featureId: testEntity.none.redisInt32\n" // Redis is not available by - // default from the json specs + // default from the json specs + " - featureId: testEntity.none.testString\n" + "\n", ImportSpec.getDefaultInstance()); diff --git a/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java b/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java new file mode 100644 index 0000000000..80ed711533 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java @@ -0,0 +1,283 @@ +package feast.ingestion.transform; + +import static junit.framework.TestCase.assertNull; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.protobuf.Timestamp; +import feast.ingestion.model.Features; +import feast.ingestion.model.Specs; +import feast.ingestion.model.Values; +import feast.ingestion.service.SpecService; +import feast.ingestion.values.PFeatureRows; +import feast.specs.EntitySpecProto.EntitySpec; +import feast.specs.FeatureSpecProto.FeatureSpec; +import feast.specs.ImportSpecProto.Field; +import feast.specs.ImportSpecProto.ImportSpec; +import feast.specs.ImportSpecProto.Schema; +import feast.specs.StorageSpecProto.StorageSpec; +import feast.storage.FeatureStore; +import feast.storage.MockFeatureStore; +import feast.storage.MockTransforms; +import feast.types.FeatureRowExtendedProto.FeatureRowExtended; +import feast.types.FeatureRowProto.FeatureRow; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.Rule; +import org.junit.Test; + +public class SplitOutputByStoreTest { + + @Rule + public TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testSplit() { + // Note we are stores on the group, instead of warehouse or serving store id. + SerializableFunction selector = FeatureSpec::getGroup; + MockSpecService specService = new MockSpecService(); + specService.entitySpecs.put("e1", EntitySpec.getDefaultInstance()); + specService.featureSpecs.put( + "f1", FeatureSpec.newBuilder().setEntity("e1").setGroup("store1").build()); + specService.featureSpecs.put( + "f2", FeatureSpec.newBuilder().setEntity("e1").setGroup("store2").build()); + specService.storageSpecs.put( + "store1", StorageSpec.newBuilder().setId("store1").setType("type1").build()); + specService.storageSpecs.put( + "store2", StorageSpec.newBuilder().setId("store2").setType("type2").build()); + List stores = + Lists.newArrayList(new MockFeatureStore("type1"), new MockFeatureStore("type2")); + Specs specs = + Specs.of( + "jobname", + ImportSpec.newBuilder() + .addEntities("e1") + .setSchema( + Schema.newBuilder() + .addAllFields( + Lists.newArrayList( + Field.newBuilder().setFeatureId("f1").build(), + Field.newBuilder().setFeatureId("f2").build()))) + .build(), + specService); + assertNull(specs.getError()); + + SplitOutputByStore split = new SplitOutputByStore(stores, selector, specs); + + PCollection input = + pipeline + .apply( + Create.of( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .addFeatures(Features.of("f2", Values.ofInt32(2))) + .build())) + .apply(new ToFeatureRowExtended()); + PFeatureRows pfrows = PFeatureRows.of(input); + pfrows = pfrows.apply("do split", split); + + PAssert.that( + pfrows + .getErrors()).empty(); + PAssert.that( + pfrows + .getMain() + .apply( + MapElements.into(TypeDescriptor.of(FeatureRow.class)) + .via(FeatureRowExtended::getRow))) + .containsInAnyOrder( + Lists.newArrayList( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build(), + FeatureRow.newBuilder() + .addFeatures(Features.of("f2", Values.ofInt32(2))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build())); + + MockTransforms.Write mockSpecService1 = ((MockFeatureStore) stores.get(0)).getWrite(); + MockTransforms.Write mockSpecService2 = ((MockFeatureStore) stores.get(1)).getWrite(); + + PCollection store1Output = + mockSpecService1 + .getInputs() + .get(0) + .apply("map store1 outputs", + MapElements.into(TypeDescriptor.of(FeatureRow.class)) + .via(FeatureRowExtended::getRow)); + + PCollection store2Output = + mockSpecService2 + .getInputs() + .get(0) + .apply("map store2 outputs", + MapElements.into(TypeDescriptor.of(FeatureRow.class)) + .via(FeatureRowExtended::getRow)); + + PAssert.that(store1Output) + .containsInAnyOrder( + Lists.newArrayList( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build())); + + PAssert.that(store2Output) + .containsInAnyOrder( + Lists.newArrayList( + FeatureRow.newBuilder() + .addFeatures(Features.of("f2", Values.ofInt32(2))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build())); + pipeline.run(); + } + + @Test + public void testSplitWhereFeature2HasNoStoreId() { + // Note we are stores on the group, instead of warehouse or serving store id. + SerializableFunction selector = FeatureSpec::getGroup; + MockSpecService specService = new MockSpecService(); + specService.entitySpecs.put("e1", EntitySpec.getDefaultInstance()); + specService.featureSpecs.put( + "f1", FeatureSpec.newBuilder().setEntity("e1").setGroup("store1").build()); + specService.featureSpecs.put( + "f2", FeatureSpec.newBuilder().setEntity("e1").build()); + specService.storageSpecs.put( + "store1", StorageSpec.newBuilder().setId("store1").setType("type1").build()); + specService.storageSpecs.put( + "store2", StorageSpec.newBuilder().setId("store2").setType("type2").build()); + List stores = + Lists.newArrayList(new MockFeatureStore("type1"), new MockFeatureStore("type2")); + Specs specs = + Specs.of( + "jobname", + ImportSpec.newBuilder() + .addEntities("e1") + .setSchema( + Schema.newBuilder() + .addAllFields( + Lists.newArrayList( + Field.newBuilder().setFeatureId("f1").build(), + Field.newBuilder().setFeatureId("f2").build()))) + .build(), + specService); + assertNull(specs.getError()); + + SplitOutputByStore split = new SplitOutputByStore(stores, selector, specs); + + PCollection input = + pipeline + .apply( + Create.of( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .addFeatures(Features.of("f2", Values.ofInt32(2))) + .build())) + .apply(new ToFeatureRowExtended()); + PFeatureRows pfrows = PFeatureRows.of(input); + pfrows = pfrows.apply("do split", split); + + PAssert.that( + pfrows + .getErrors()).empty(); + PAssert.that( + pfrows + .getMain() + .apply( + MapElements.into(TypeDescriptor.of(FeatureRow.class)) + .via(FeatureRowExtended::getRow))) + .containsInAnyOrder( + Lists.newArrayList( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build(), + FeatureRow.newBuilder() + .addFeatures(Features.of("f2", Values.ofInt32(2))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build())); + + MockTransforms.Write mockSpecService1 = ((MockFeatureStore) stores.get(0)).getWrite(); + MockTransforms.Write mockSpecService2 = ((MockFeatureStore) stores.get(1)).getWrite(); + + PCollection store1Output = + mockSpecService1 + .getInputs() + .get(0) + .apply("map store1 outputs", + MapElements.into(TypeDescriptor.of(FeatureRow.class)) + .via(FeatureRowExtended::getRow)); + + PCollection store2Output = + mockSpecService2 + .getInputs() + .get(0) + .apply("map store2 outputs", + MapElements.into(TypeDescriptor.of(FeatureRow.class)) + .via(FeatureRowExtended::getRow)); + + PAssert.that(store1Output) + .containsInAnyOrder( + Lists.newArrayList( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build())); + + pipeline.run(); + } + + public static class MockSpecService implements SpecService { + + public Map entitySpecs = new HashMap<>(); + public Map featureSpecs = new HashMap<>(); + public Map storageSpecs = new HashMap<>(); + + @Override + public Map getEntitySpecs(Iterable entityIds) { + Set entityIdsSet = Sets.newHashSet(entityIds); + return Maps.newHashMap( + Maps.filterEntries(entitySpecs, (entry) -> entityIdsSet.contains(entry.getKey()))); + } + + @Override + public Map getAllEntitySpecs() { + return entitySpecs; + } + + @Override + public Map getFeatureSpecs(Iterable featureIds) { + Set featureIdsSet = Sets.newHashSet(featureIds); + return Maps.newHashMap( + Maps.filterEntries(featureSpecs, (entry) -> featureIdsSet.contains(entry.getKey()))); + } + + @Override + public Map getAllFeatureSpecs() { + return featureSpecs; + } + + @Override + public Map getStorageSpecs(Iterable storageIds) { + Set storageIdsSet = Sets.newHashSet(storageIds); + return Maps.newHashMap( + Maps.filterEntries(storageSpecs, (entry) -> storageIdsSet.contains(entry.getKey()))); + } + + @Override + public Map getAllStorageSpecs() { + return storageSpecs; + } + } +} diff --git a/ingestion/src/test/java/feast/storage/MockErrorsStore.java b/ingestion/src/test/java/feast/storage/MockErrorsStore.java index 1523940f55..9abaf33208 100644 --- a/ingestion/src/test/java/feast/storage/MockErrorsStore.java +++ b/ingestion/src/test/java/feast/storage/MockErrorsStore.java @@ -18,24 +18,12 @@ package feast.storage; import com.google.auto.service.AutoService; -import lombok.Getter; -import feast.ingestion.model.Specs; -import feast.specs.StorageSpecProto.StorageSpec; @AutoService(ErrorsStore.class) -public class MockErrorsStore implements ErrorsStore { +public class MockErrorsStore extends MockFeatureStore implements ErrorsStore { public static final String MOCK_ERRORS_STORE_TYPE = "MOCK_ERRORS_STORE"; - @Getter private MockTransforms.Write write; - - @Override - public MockTransforms.Write create(StorageSpec storageSpec, Specs specs) { - write = new MockTransforms.Write(storageSpec); - return write; - } - - @Override - public String getType() { - return MOCK_ERRORS_STORE_TYPE; + public MockErrorsStore() { + super(MOCK_ERRORS_STORE_TYPE); } } diff --git a/ingestion/src/test/java/feast/storage/MockFeatureStore.java b/ingestion/src/test/java/feast/storage/MockFeatureStore.java new file mode 100644 index 0000000000..2e957d4de2 --- /dev/null +++ b/ingestion/src/test/java/feast/storage/MockFeatureStore.java @@ -0,0 +1,43 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.storage; + +import feast.ingestion.model.Specs; +import feast.specs.StorageSpecProto.StorageSpec; +import lombok.Getter; + +public class MockFeatureStore implements FeatureStore { + + private final String type; + @Getter private MockTransforms.Write write; + + public MockFeatureStore(String type) { + this.type = type; + } + + @Override + public MockTransforms.Write create(StorageSpec storageSpec, Specs specs) { + write = new MockTransforms.Write(storageSpec); + return write; + } + + @Override + public String getType() { + return type; + } +} diff --git a/ingestion/src/test/java/feast/storage/MockServingStore.java b/ingestion/src/test/java/feast/storage/MockServingStore.java index c73494c17b..e52ca046e7 100644 --- a/ingestion/src/test/java/feast/storage/MockServingStore.java +++ b/ingestion/src/test/java/feast/storage/MockServingStore.java @@ -18,24 +18,12 @@ package feast.storage; import com.google.auto.service.AutoService; -import lombok.Getter; -import feast.ingestion.model.Specs; -import feast.specs.StorageSpecProto.StorageSpec; @AutoService(ServingStore.class) -public class MockServingStore implements ServingStore { +public class MockServingStore extends MockFeatureStore implements ServingStore { public static final String MOCK_SERVING_STORE_TYPE = "MOCK_SERVING_STORE"; - @Getter private MockTransforms.Write write; - - @Override - public MockTransforms.Write create(StorageSpec storageSpec, Specs specs) { - write = new MockTransforms.Write(storageSpec); - return write; - } - - @Override - public String getType() { - return MOCK_SERVING_STORE_TYPE; + public MockServingStore() { + super(MOCK_SERVING_STORE_TYPE); } } diff --git a/ingestion/src/test/java/feast/storage/MockWarehouseStore.java b/ingestion/src/test/java/feast/storage/MockWarehouseStore.java index 126e6dfe1f..452a2729df 100644 --- a/ingestion/src/test/java/feast/storage/MockWarehouseStore.java +++ b/ingestion/src/test/java/feast/storage/MockWarehouseStore.java @@ -18,24 +18,12 @@ package feast.storage; import com.google.auto.service.AutoService; -import lombok.Getter; -import feast.ingestion.model.Specs; -import feast.specs.StorageSpecProto.StorageSpec; @AutoService(WarehouseStore.class) -public class MockWarehouseStore implements WarehouseStore { +public class MockWarehouseStore extends MockFeatureStore implements WarehouseStore { public static final String MOCK_WAREHOUSE_STORE_TYPE = "MOCK_WAREHOUSE_STORE"; - @Getter private MockTransforms.Write write; - - @Override - public MockTransforms.Write create(StorageSpec storageSpec, Specs specs) { - write = new MockTransforms.Write(storageSpec); - return write; - } - - @Override - public String getType() { - return MOCK_WAREHOUSE_STORE_TYPE; + public MockWarehouseStore() { + super(MOCK_WAREHOUSE_STORE_TYPE); } }