Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only lookup storage specs that we actually need #52

Merged
merged 4 commits into from
Jan 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package feast.ingestion;

import com.google.api.services.bigquery.model.TableRow;
import com.google.common.collect.Lists;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
Expand All @@ -42,6 +43,7 @@
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
Expand All @@ -53,13 +55,15 @@
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sample;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.commons.codec.digest.DigestUtils;
import org.joda.time.DateTime;
Expand All @@ -68,6 +72,7 @@

@Slf4j
public class ImportJob {

private static Random random = new Random(System.currentTimeMillis());

private final Pipeline pipeline;
Expand Down Expand Up @@ -164,17 +169,23 @@ public void expand() {
"Round event timestamps to granularity",
ParDo.of(new RoundEventTimestampsDoFn())),
pFeatureRows.getErrors());

if (!dryRun) {
List<PCollection<FeatureRowExtended>> errors = Lists.newArrayList();
pFeatureRows = pFeatureRows.apply("Write to Serving Stores", servingStoreTransform);
pFeatureRows.getErrors().apply("Write serving errors", errorsStoreTransform);
errors.add(pFeatureRows.getErrors());
pFeatureRows = PFeatureRows.of(pFeatureRows.getMain());

log.info(
"A sample of any 2 rows from each of MAIN, RETRIES and ERRORS will logged for convenience");
logNRows(pFeatureRows, "Output sample", 2);

PFeatureRows.of(pFeatureRows.getMain())
.apply("Write to Warehouse Stores", warehouseStoreTransform);
pFeatureRows.getErrors().apply("Write warehouse errors", errorsStoreTransform);
errors.add(pFeatureRows.getErrors());

PCollectionList.of(errors).apply("flatten errors", Flatten.pCollections())
.apply("Write serving errors", errorsStoreTransform);
}
}

Expand Down
53 changes: 39 additions & 14 deletions ingestion/src/main/java/feast/ingestion/model/Specs.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package feast.ingestion.model;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import feast.ingestion.service.SpecService;
import feast.specs.EntitySpecProto.EntitySpec;
import feast.specs.FeatureSpecProto.FeatureSpec;
Expand All @@ -31,10 +32,15 @@
import java.util.Map.Entry;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

@Builder
@Getter
@Slf4j
@ToString
public class Specs implements Serializable {

private String jobName;
private ImportSpec importSpec;
private Map<String, EntitySpec> entitySpecs;
Expand All @@ -57,14 +63,23 @@ public static Specs of(String jobName, ImportSpec importSpec, SpecService specSe
specsBuilder.featureSpecs(specService.getFeatureSpecs(featureIds));

List<String> entityNames = importSpec.getEntitiesList();
List<String> storageIds = Lists.newArrayList();
for (FeatureSpec featureSpec : specsBuilder.featureSpecs.values()) {
Preconditions.checkArgument(
entityNames.contains(featureSpec.getEntity()),
"Feature has entity not listed in import spec featureSpec=" + featureSpec.toString());
String servingId = featureSpec.getDataStores().getServing().getId();
if (!servingId.isEmpty()) {
storageIds.add(servingId);
}
String warehouseId = featureSpec.getDataStores().getWarehouse().getId();
if (!warehouseId.isEmpty()) {
storageIds.add(warehouseId);
}
}
specsBuilder.entitySpecs(specService.getEntitySpecs(entityNames));

specsBuilder.storageSpecs(specService.getAllStorageSpecs());
specsBuilder.storageSpecs(specService.getStorageSpecs(storageIds));

return specsBuilder.build();
} catch (RuntimeException e) {
Expand All @@ -79,13 +94,19 @@ public void validate() {

// Sanity checks that our maps are built correctly
for (Entry<String, FeatureSpec> entry : featureSpecs.entrySet()) {
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getId()));
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getId()),
String.format("Feature id does not match spec %s!=%s", entry.getKey(),
entry.getValue().getId()));
}
for (Entry<String, EntitySpec> entry : entitySpecs.entrySet()) {
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getName()));
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getName()),
String.format("Entity name does not match spec %s!=%s", entry.getKey(),
entry.getValue().getName()));
}
for (Entry<String, StorageSpec> entry : storageSpecs.entrySet()) {
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getId()));
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getId()),
String.format("Storage id does not match spec %s!=%s", entry.getKey(),
entry.getValue().getId()));
}

for (FeatureSpec featureSpec : featureSpecs.values()) {
Expand All @@ -96,17 +117,21 @@ public void validate() {
"Feature %s references unknown entity %s",
featureSpec.getId(), featureSpec.getEntity()));
// Check that feature has a matching serving store
Preconditions.checkArgument(
storageSpecs.containsKey(featureSpec.getDataStores().getServing().getId()),
String.format(
"Feature %s references unknown serving store %s",
featureSpec.getId(), featureSpec.getDataStores().getServing().getId()));
if (!featureSpec.getDataStores().getServing().getId().isEmpty()) {
Preconditions.checkArgument(
storageSpecs.containsKey(featureSpec.getDataStores().getServing().getId()),
String.format(
"Feature %s references unknown serving store %s",
featureSpec.getId(), featureSpec.getDataStores().getServing().getId()));
}
// Check that feature has a matching warehouse store
Preconditions.checkArgument(
storageSpecs.containsKey(featureSpec.getDataStores().getWarehouse().getId()),
String.format(
"Feature %s references unknown warehouse store %s",
featureSpec.getId(), featureSpec.getDataStores().getWarehouse().getId()));
if (!featureSpec.getDataStores().getWarehouse().getId().isEmpty()) {
Preconditions.checkArgument(
storageSpecs.containsKey(featureSpec.getDataStores().getWarehouse().getId()),
String.format(
"Feature %s references unknown warehouse store %s",
featureSpec.getId(), featureSpec.getDataStores().getWarehouse().getId()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import feast.ingestion.exceptions.SpecNotFound;
import feast.ingestion.util.PathUtil;
import feast.specs.EntitySpecProto.EntitySpec;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.specs.StorageSpecProto.StorageSpec;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -35,11 +40,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import feast.ingestion.exceptions.SpecNotFound;
import feast.ingestion.util.PathUtil;
import feast.specs.EntitySpecProto.EntitySpec;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.specs.StorageSpecProto.StorageSpec;

@AllArgsConstructor
public class FileSpecService implements SpecService {
Expand Down Expand Up @@ -112,31 +112,16 @@ public Map<String, EntitySpec> getEntitySpecs(Iterable<String> entityIds) {
return getSpecs(EntitySpec.getDefaultInstance(), ENTITY_SPEC, entityIds);
}

@Override
public Map<String, EntitySpec> getAllEntitySpecs() {
return getAllSpecs(EntitySpec.getDefaultInstance(), ENTITY_SPEC, EntitySpec::getName);
}

@Override
public Map<String, FeatureSpec> getFeatureSpecs(Iterable<String> featureIds) {
return getSpecs(FeatureSpec.getDefaultInstance(), FEATURE_SPEC, featureIds);
}

@Override
public Map<String, FeatureSpec> getAllFeatureSpecs() {
return getAllSpecs(FeatureSpec.getDefaultInstance(), FEATURE_SPEC, FeatureSpec::getId);
}

@Override
public Map<String, StorageSpec> getStorageSpecs(Iterable<String> storageIds) {
return getSpecs(StorageSpec.getDefaultInstance(), STORAGE_SPEC, storageIds);
}

@Override
public Map<String, StorageSpec> getAllStorageSpecs() {
return getAllSpecs(StorageSpec.getDefaultInstance(), STORAGE_SPEC, StorageSpec::getId);
}

private <T extends Message> void putSpecs(
String type, Function<T, String> keyFunc, Iterable<T> specs) {
for (T spec : specs) {
Expand Down Expand Up @@ -169,6 +154,7 @@ public void putStorageSpecs(Iterable<StorageSpec> storageSpecs) {

@AllArgsConstructor
public static class Builder implements SpecService.Builder {

private String basePath;

@Override
Expand Down
21 changes: 0 additions & 21 deletions ingestion/src/main/java/feast/ingestion/service/SpecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ public interface SpecService {
*/
Map<String, EntitySpec> getEntitySpecs(Iterable<String> entityIds);

/**
* Get all {@link EntitySpec} from Core API.
*
* @return map of {@link EntitySpec}, where the key is the entity name.
*/
Map<String, EntitySpec> getAllEntitySpecs();

/**
* Get a map of {@link FeatureSpec} from Core API, given a collection of featureId.
*
Expand All @@ -54,13 +47,6 @@ public interface SpecService {
*/
Map<String, FeatureSpec> getFeatureSpecs(Iterable<String> featureIds);

/**
* Get all {@link FeatureSpec} available in Core API.
*
* @return map of {@link FeatureSpec}, where the key is feature id.
*/
Map<String, FeatureSpec> getAllFeatureSpecs();

/**
* Get map of {@link StorageSpec} from Core API, given a collection of storageId.
*
Expand All @@ -70,13 +56,6 @@ public interface SpecService {
*/
Map<String, StorageSpec> getStorageSpecs(Iterable<String> storageIds);

/**
* Get all {@link StorageSpec} from Core API.
*
* @return map of {@link StorageSpec}, where the key is storage id.
*/
Map<String, StorageSpec> getAllStorageSpecs();

interface Builder extends Serializable {
SpecService build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,72 +17,66 @@

package feast.ingestion.transform;

import static com.google.common.base.Preconditions.checkArgument;
import static feast.ingestion.util.JsonUtil.convertJsonStringToMap;

import com.google.inject.Inject;
import feast.ingestion.model.Specs;
import feast.ingestion.options.ImportJobOptions;
import feast.ingestion.transform.FeatureIO.Write;
import feast.ingestion.transform.fn.LoggerDoFn;
import feast.specs.StorageSpecProto.StorageSpec;
import feast.storage.ErrorsStore;
import feast.storage.noop.NoOpIO;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.slf4j.event.Level;
import org.apache.hadoop.hbase.util.Strings;

@Slf4j
public class ErrorsStoreTransform extends FeatureIO.Write {

public static final String ERRORS_STORE_STDERR = "stderr";
public static final String ERRORS_STORE_STDOUT = "stdout";
public static final String ERRORS_STORE_JSON = "file.json";

private String errorsStoreType;
private StorageSpec errorsStoreSpec;
private ErrorsStore errorsStore;
private Specs specs;
private List<ErrorsStore> errorsStores;

@Inject
public ErrorsStoreTransform(
ImportJobOptions options, Specs specs, List<ErrorsStore> errorsStores) {
this.specs = specs;
this.errorsStores = errorsStores;
this.errorsStoreType = options.getErrorsStoreType();

for (ErrorsStore errorsStore : errorsStores) {
if (errorsStore.getType().equals(errorsStoreType)) {
this.errorsStore = errorsStore;
}
if (!Strings.isEmpty(errorsStoreType)) {
this.errorsStoreSpec =
StorageSpec.newBuilder()
.setType(errorsStoreType)
.putAllOptions(convertJsonStringToMap(options.getErrorsStoreOptions()))
.build();
}

this.errorsStoreSpec =
StorageSpec.newBuilder()
.setType(errorsStoreType)
.putAllOptions(convertJsonStringToMap(options.getErrorsStoreOptions()))
.build();
}

@Override
public PDone expand(PCollection<FeatureRowExtended> input) {
switch (errorsStoreType) {
case ERRORS_STORE_STDOUT:
input.apply("Log errors to STDOUT", ParDo.of(new LoggerDoFn(Level.INFO)));
break;
case ERRORS_STORE_STDERR:
input.apply("Log errors to STDERR", ParDo.of(new LoggerDoFn(Level.ERROR)));
break;
default:
if (errorsStore == null) {
log.warn("No valid errors store specified, errors will be discarded");
return input.apply(new NoOpIO.Write());
}
Write write = errorsStore.create(this.errorsStoreSpec, specs);
return input.apply(write);
Write write;
if (Strings.isEmpty(errorsStoreType)) {
write = new NoOpIO.Write();
} else {
write = getErrorStore().create(this.errorsStoreSpec, specs);
}
input.apply("errors to " + String.valueOf(errorsStoreType), write);
return PDone.in(input.getPipeline());
}

ErrorsStore getErrorStore() {
checkArgument(!errorsStoreType.isEmpty(), "Errors store type not provided");
for (ErrorsStore errorsStore : errorsStores) {
if (errorsStore.getType().equals(errorsStoreType)) {
return errorsStore;
}
}
throw new IllegalArgumentException("Errors store type not found");
}
}
Loading