Skip to content

Commit

Permalink
Fix incorrect BigQuery schema creation from FeatureSetSpec (#340)
Browse files Browse the repository at this point in the history
* Add test and fix for BigQuery schema creation from FeatureSetSpec

* Add entity fields in the createBigQueryTableDefinition test

* Apply spotless plugin for consistent formatting
  • Loading branch information
davidheryanto authored and feast-ci-bot committed Nov 29, 2019
1 parent 1ef1b71 commit 3b4dcaa
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 8 deletions.
22 changes: 15 additions & 7 deletions ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,19 @@
* need to manage any schemas. This class will not be used in that case.
*/
public class StoreUtil {

private static final Map<ValueType.Enum, StandardSQLTypeName> VALUE_TYPE_TO_STANDARD_SQL_TYPE =
new HashMap<>();
private static final Logger log = org.slf4j.LoggerFactory.getLogger(StoreUtil.class);

// Column description for reserved fields
public static final String BIGQUERY_EVENT_TIMESTAMP_FIELD_DESCRIPTION =
"Event time for the FeatureRow";
public static final String BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION =
"Processing time of the FeatureRow ingestion in Feast\"";
public static final String BIGQUERY_JOB_ID_FIELD_DESCRIPTION =
"Feast import job ID for the FeatureRow";

// Refer to protos/feast/core/Store.proto for the mapping definition.
static {
VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(Enum.BYTES, StandardSQLTypeName.BYTES);
Expand Down Expand Up @@ -110,15 +119,15 @@ public static void setupStore(Store store, FeatureSetSpec featureSetSpec) {
}

@SuppressWarnings("DuplicatedCode")
private static TableDefinition createBigQueryTableDefinition(FeatureSetSpec featureSetSpec) {
public static TableDefinition createBigQueryTableDefinition(FeatureSetSpec featureSetSpec) {
List<Field> fields = new ArrayList<>();
log.info("Table will have the following fields:");

for (EntitySpec entitySpec : featureSetSpec.getEntitiesList()) {
Builder builder =
Field.newBuilder(
entitySpec.getName(), VALUE_TYPE_TO_STANDARD_SQL_TYPE.get(entitySpec.getValueType()));
if (entitySpec.getValueTypeValue() >= 7 && entitySpec.getValueTypeValue() <= 17) {
if (entitySpec.getValueType().name().toLowerCase().endsWith("_list")) {
builder.setMode(Mode.REPEATED);
}
Field field = builder.build();
Expand All @@ -130,7 +139,7 @@ private static TableDefinition createBigQueryTableDefinition(FeatureSetSpec feat
Field.newBuilder(
featureSpec.getName(),
VALUE_TYPE_TO_STANDARD_SQL_TYPE.get(featureSpec.getValueType()));
if (featureSpec.getValueTypeValue() >= 7 && featureSpec.getValueTypeValue() <= 17) {
if (featureSpec.getValueType().name().toLowerCase().endsWith("_list")) {
builder.setMode(Mode.REPEATED);
}
Field field = builder.build();
Expand All @@ -143,13 +152,12 @@ private static TableDefinition createBigQueryTableDefinition(FeatureSetSpec feat
reservedFieldNameToPairOfStandardSQLTypeAndDescription =
ImmutableMap.of(
"event_timestamp",
Pair.of(StandardSQLTypeName.TIMESTAMP, "Event time for the FeatureRow"),
Pair.of(StandardSQLTypeName.TIMESTAMP, BIGQUERY_EVENT_TIMESTAMP_FIELD_DESCRIPTION),
"created_timestamp",
Pair.of(
StandardSQLTypeName.TIMESTAMP,
"Processing time of the FeatureRow ingestion in Feast"),
StandardSQLTypeName.TIMESTAMP, BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION),
"job_id",
Pair.of(StandardSQLTypeName.STRING, "Feast import job ID for the FeatureRow"));
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_JOB_ID_FIELD_DESCRIPTION));
for (Map.Entry<String, Pair<StandardSQLTypeName, String>> entry :
reservedFieldNameToPairOfStandardSQLTypeAndDescription.entrySet()) {
Field field =
Expand Down
178 changes: 177 additions & 1 deletion ingestion/src/test/java/feast/ingestion/util/StoreUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,39 @@
*/
package feast.ingestion.util;

import static feast.types.ValueProto.ValueType.Enum.BOOL;
import static feast.types.ValueProto.ValueType.Enum.BOOL_LIST;
import static feast.types.ValueProto.ValueType.Enum.BYTES;
import static feast.types.ValueProto.ValueType.Enum.BYTES_LIST;
import static feast.types.ValueProto.ValueType.Enum.DOUBLE;
import static feast.types.ValueProto.ValueType.Enum.DOUBLE_LIST;
import static feast.types.ValueProto.ValueType.Enum.FLOAT;
import static feast.types.ValueProto.ValueType.Enum.FLOAT_LIST;
import static feast.types.ValueProto.ValueType.Enum.INT32;
import static feast.types.ValueProto.ValueType.Enum.INT32_LIST;
import static feast.types.ValueProto.ValueType.Enum.INT64;
import static feast.types.ValueProto.ValueType.Enum.INT64_LIST;
import static feast.types.ValueProto.ValueType.Enum.STRING;
import static feast.types.ValueProto.ValueType.Enum.STRING_LIST;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Field.Mode;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import feast.core.FeatureSetProto.EntitySpec;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.FeatureSetProto.FeatureSpec;
import feast.ingestion.utils.StoreUtil;
import java.util.Arrays;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class StoreUtilTest {

@Test
public void setupBigQuery_shouldCreateTable_givenFeatureSetSpec() {
public void setupBigQuery_shouldCreateTable_givenValidFeatureSetSpec() {
FeatureSetSpec featureSetSpec =
FeatureSetSpec.newBuilder()
.setName("feature_set_1")
Expand All @@ -41,4 +60,161 @@ public void setupBigQuery_shouldCreateTable_givenFeatureSetSpec() {
BigQuery mockedBigquery = Mockito.mock(BigQuery.class);
StoreUtil.setupBigQuery(featureSetSpec, "project-1", "dataset_1", mockedBigquery);
}

@Test
public void createBigQueryTableDefinition_shouldCreateCorrectSchema_givenValidFeatureSetSpec() {
FeatureSetSpec input =
FeatureSetSpec.newBuilder()
.addAllEntities(
Arrays.asList(
EntitySpec.newBuilder().setName("bytes_entity").setValueType(BYTES).build(),
EntitySpec.newBuilder().setName("string_entity").setValueType(STRING).build(),
EntitySpec.newBuilder().setName("int32_entity").setValueType(INT32).build(),
EntitySpec.newBuilder().setName("int64_entity").setValueType(INT64).build(),
EntitySpec.newBuilder().setName("double_entity").setValueType(DOUBLE).build(),
EntitySpec.newBuilder().setName("float_entity").setValueType(FLOAT).build(),
EntitySpec.newBuilder().setName("bool_entity").setValueType(BOOL).build(),
EntitySpec.newBuilder()
.setName("bytes_list_entity")
.setValueType(BYTES_LIST)
.build(),
EntitySpec.newBuilder()
.setName("string_list_entity")
.setValueType(STRING_LIST)
.build(),
EntitySpec.newBuilder()
.setName("int32_list_entity")
.setValueType(INT32_LIST)
.build(),
EntitySpec.newBuilder()
.setName("int64_list_entity")
.setValueType(INT64_LIST)
.build(),
EntitySpec.newBuilder()
.setName("double_list_entity")
.setValueType(DOUBLE_LIST)
.build(),
EntitySpec.newBuilder()
.setName("float_list_entity")
.setValueType(FLOAT_LIST)
.build(),
EntitySpec.newBuilder()
.setName("bool_list_entity")
.setValueType(BOOL_LIST)
.build()))
.addAllFeatures(
Arrays.asList(
FeatureSpec.newBuilder().setName("bytes_feature").setValueType(BYTES).build(),
FeatureSpec.newBuilder().setName("string_feature").setValueType(STRING).build(),
FeatureSpec.newBuilder().setName("int32_feature").setValueType(INT32).build(),
FeatureSpec.newBuilder().setName("int64_feature").setValueType(INT64).build(),
FeatureSpec.newBuilder().setName("double_feature").setValueType(DOUBLE).build(),
FeatureSpec.newBuilder().setName("float_feature").setValueType(FLOAT).build(),
FeatureSpec.newBuilder().setName("bool_feature").setValueType(BOOL).build(),
FeatureSpec.newBuilder()
.setName("bytes_list_feature")
.setValueType(BYTES_LIST)
.build(),
FeatureSpec.newBuilder()
.setName("string_list_feature")
.setValueType(STRING_LIST)
.build(),
FeatureSpec.newBuilder()
.setName("int32_list_feature")
.setValueType(INT32_LIST)
.build(),
FeatureSpec.newBuilder()
.setName("int64_list_feature")
.setValueType(INT64_LIST)
.build(),
FeatureSpec.newBuilder()
.setName("double_list_feature")
.setValueType(DOUBLE_LIST)
.build(),
FeatureSpec.newBuilder()
.setName("float_list_feature")
.setValueType(FLOAT_LIST)
.build(),
FeatureSpec.newBuilder()
.setName("bool_list_feature")
.setValueType(BOOL_LIST)
.build()))
.build();

Schema actual = StoreUtil.createBigQueryTableDefinition(input).getSchema();

Schema expected =
Schema.of(
Arrays.asList(
// Fields from entity
Field.newBuilder("bytes_entity", StandardSQLTypeName.BYTES).build(),
Field.newBuilder("string_entity", StandardSQLTypeName.STRING).build(),
Field.newBuilder("int32_entity", StandardSQLTypeName.INT64).build(),
Field.newBuilder("int64_entity", StandardSQLTypeName.INT64).build(),
Field.newBuilder("double_entity", StandardSQLTypeName.FLOAT64).build(),
Field.newBuilder("float_entity", StandardSQLTypeName.FLOAT64).build(),
Field.newBuilder("bool_entity", StandardSQLTypeName.BOOL).build(),
Field.newBuilder("bytes_list_entity", StandardSQLTypeName.BYTES)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("string_list_entity", StandardSQLTypeName.STRING)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("int32_list_entity", StandardSQLTypeName.INT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("int64_list_entity", StandardSQLTypeName.INT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("double_list_entity", StandardSQLTypeName.FLOAT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("float_list_entity", StandardSQLTypeName.FLOAT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("bool_list_entity", StandardSQLTypeName.BOOL)
.setMode(Mode.REPEATED)
.build(),
// Fields from feature
Field.newBuilder("bytes_feature", StandardSQLTypeName.BYTES).build(),
Field.newBuilder("string_feature", StandardSQLTypeName.STRING).build(),
Field.newBuilder("int32_feature", StandardSQLTypeName.INT64).build(),
Field.newBuilder("int64_feature", StandardSQLTypeName.INT64).build(),
Field.newBuilder("double_feature", StandardSQLTypeName.FLOAT64).build(),
Field.newBuilder("float_feature", StandardSQLTypeName.FLOAT64).build(),
Field.newBuilder("bool_feature", StandardSQLTypeName.BOOL).build(),
Field.newBuilder("bytes_list_feature", StandardSQLTypeName.BYTES)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("string_list_feature", StandardSQLTypeName.STRING)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("int32_list_feature", StandardSQLTypeName.INT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("int64_list_feature", StandardSQLTypeName.INT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("double_list_feature", StandardSQLTypeName.FLOAT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("float_list_feature", StandardSQLTypeName.FLOAT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("bool_list_feature", StandardSQLTypeName.BOOL)
.setMode(Mode.REPEATED)
.build(),
// Reserved fields
Field.newBuilder("event_timestamp", StandardSQLTypeName.TIMESTAMP)
.setDescription(StoreUtil.BIGQUERY_EVENT_TIMESTAMP_FIELD_DESCRIPTION)
.build(),
Field.newBuilder("created_timestamp", StandardSQLTypeName.TIMESTAMP)
.setDescription(StoreUtil.BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION)
.build(),
Field.newBuilder("job_id", StandardSQLTypeName.STRING)
.setDescription(StoreUtil.BIGQUERY_JOB_ID_FIELD_DESCRIPTION)
.build()));

Assert.assertEquals(expected, actual);
}
}

0 comments on commit 3b4dcaa

Please sign in to comment.