From 9a17e2ab734385a0894d52c7c285e836b16def34 Mon Sep 17 00:00:00 2001 From: pyalex Date: Mon, 22 Jun 2020 11:35:58 +0300 Subject: [PATCH 1/3] create bq table if doesnt exists --- .../writer/FeatureSetSpecToTableSchema.java | 55 +++++++++++++++---- 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureSetSpecToTableSchema.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureSetSpecToTableSchema.java index f57a9a8902..fb4b2031f6 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureSetSpecToTableSchema.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureSetSpecToTableSchema.java @@ -42,6 +42,14 @@ import org.apache.beam.sdk.values.KV; import org.slf4j.Logger; +/** + * Converts {@link feast.proto.core.FeatureSetProto.FeatureSetSpec} into BigQuery schema Serializes + * it into json-like format {@link TableSchema} Fetches existing schema to merge existing fields + * with new ones. + * + *

As a side effect this Operation may create bq table (if it doesn't exist) to make + * bootstrapping faster + */ public class FeatureSetSpecToTableSchema extends DoFn, KV> { private BigQuery bqService; @@ -76,19 +84,42 @@ public void processElement( @Element KV element, OutputReceiver> output, ProcessContext context) { - Schema schema = createSchemaFromSpec(element.getValue(), element.getKey()); - output.output(KV.of(element.getKey(), serializeSchema(schema))); + String specKey = element.getKey(); + + Table existingTable = getExistingTable(specKey); + Schema schema = createSchemaFromSpec(element.getValue(), specKey, existingTable); + + if (existingTable == null) { + createTable(specKey, schema); + } + + output.output(KV.of(specKey, serializeSchema(schema))); } - private Table getExistingTable(String specKey) { + private TableId generateTableId(String specKey) { TableDestination tableDestination = BigQuerySinkHelpers.getTableDestination(dataset, specKey); TableReference tableReference = BigQueryHelpers.parseTableSpec(tableDestination.getTableSpec()); - TableId tableId = - TableId.of( - tableReference.getProjectId(), - tableReference.getDatasetId(), - tableReference.getTableId()); - return bqService.getTable(tableId); + return TableId.of( + tableReference.getProjectId(), tableReference.getDatasetId(), tableReference.getTableId()); + } + + private Table getExistingTable(String specKey) { + return bqService.getTable(generateTableId(specKey)); + } + + private void createTable(String specKey, Schema schema) { + TimePartitioning timePartitioning = + TimePartitioning.newBuilder(TimePartitioning.Type.DAY).setField("event_timestamp").build(); + + StandardTableDefinition tableDefinition = + StandardTableDefinition.newBuilder() + .setTimePartitioning(timePartitioning) + .setSchema(schema) + .build(); + + TableInfo tableInfo = TableInfo.of(generateTableId(specKey), tableDefinition); + + bqService.create(tableInfo); } /** @@ -98,12 +129,14 @@ private Table getExistingTable(String specKey) { * * @param spec FeatureSet spec that this table is for * @param specKey String for retrieving existing table + * @param existingTable Table fetched from BQ. Fields from existing table used to merge with new + * schema * @return {@link Schema} containing all tombstoned and active fields. */ - private Schema createSchemaFromSpec(FeatureSetProto.FeatureSetSpec spec, String specKey) { + private Schema createSchemaFromSpec( + FeatureSetProto.FeatureSetSpec spec, String specKey, Table existingTable) { List fields = new ArrayList<>(); log.info("Table {} will have the following fields:", specKey); - Table existingTable = getExistingTable(specKey); for (FeatureSetProto.EntitySpec entitySpec : spec.getEntitiesList()) { Field.Builder builder = From 2e04b58d0e568104a351f663e3e87722ce6c7ab1 Mon Sep 17 00:00:00 2001 From: pyalex Date: Mon, 22 Jun 2020 11:39:51 +0300 Subject: [PATCH 2/3] dots --- .../bigquery/writer/FeatureSetSpecToTableSchema.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureSetSpecToTableSchema.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureSetSpecToTableSchema.java index fb4b2031f6..ff44cbfe19 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureSetSpecToTableSchema.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureSetSpecToTableSchema.java @@ -43,8 +43,8 @@ import org.slf4j.Logger; /** - * Converts {@link feast.proto.core.FeatureSetProto.FeatureSetSpec} into BigQuery schema Serializes - * it into json-like format {@link TableSchema} Fetches existing schema to merge existing fields + * Converts {@link feast.proto.core.FeatureSetProto.FeatureSetSpec} into BigQuery schema. Serializes + * it into json-like format {@link TableSchema}. Fetches existing schema to merge existing fields * with new ones. * *

As a side effect this Operation may create bq table (if it doesn't exist) to make From a97593d01a259bc1979aae9fefaa64dfd95a15eb Mon Sep 17 00:00:00 2001 From: pyalex Date: Mon, 22 Jun 2020 12:22:14 +0300 Subject: [PATCH 3/3] constants & unified job name --- .../java/feast/core/job/JobUpdateTask.java | 2 +- .../writer/FeatureSetSpecToTableSchema.java | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/feast/core/job/JobUpdateTask.java b/core/src/main/java/feast/core/job/JobUpdateTask.java index b5ab782634..a6a29f7d63 100644 --- a/core/src/main/java/feast/core/job/JobUpdateTask.java +++ b/core/src/main/java/feast/core/job/JobUpdateTask.java @@ -238,7 +238,7 @@ String createJobId(Source source, String storeName) { String.format( "%s-%d-to-%s-%s", source.getTypeString(), Objects.hashCode(source.getConfig()), storeName, dateSuffix); - return jobId.replaceAll("_store", "-"); + return jobId.replaceAll("_store", "-").toLowerCase(); } private void logAudit(Action action, Job job, String detail, Object... args) { diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureSetSpecToTableSchema.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureSetSpecToTableSchema.java index ff44cbfe19..7c531a8121 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureSetSpecToTableSchema.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureSetSpecToTableSchema.java @@ -59,6 +59,12 @@ public class FeatureSetSpecToTableSchema private static final Logger log = org.slf4j.LoggerFactory.getLogger(FeatureSetSpecToTableSchema.class); + // Reserved columns + public static final String EVENT_TIMESTAMP_COLUMN = "event_timestamp"; + public static final String CREATED_TIMESTAMP_COLUMN = "created_timestamp"; + public static final String INGESTION_ID_COLUMN = "ingestion_id"; + public static final String JOB_ID_COLUMN = "job_id"; + // Column description for reserved fields public static final String BIGQUERY_EVENT_TIMESTAMP_FIELD_DESCRIPTION = "Event time for the FeatureRow"; @@ -109,7 +115,9 @@ private Table getExistingTable(String specKey) { private void createTable(String specKey, Schema schema) { TimePartitioning timePartitioning = - TimePartitioning.newBuilder(TimePartitioning.Type.DAY).setField("event_timestamp").build(); + TimePartitioning.newBuilder(TimePartitioning.Type.DAY) + .setField(EVENT_TIMESTAMP_COLUMN) + .build(); StandardTableDefinition tableDefinition = StandardTableDefinition.newBuilder() @@ -166,14 +174,14 @@ private Schema createSchemaFromSpec( Map> reservedFieldNameToPairOfStandardSQLTypeAndDescription = ImmutableMap.of( - "event_timestamp", + EVENT_TIMESTAMP_COLUMN, Pair.of(StandardSQLTypeName.TIMESTAMP, BIGQUERY_EVENT_TIMESTAMP_FIELD_DESCRIPTION), - "created_timestamp", + CREATED_TIMESTAMP_COLUMN, Pair.of( StandardSQLTypeName.TIMESTAMP, BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION), - "ingestion_id", + INGESTION_ID_COLUMN, Pair.of(StandardSQLTypeName.STRING, BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION), - "job_id", + JOB_ID_COLUMN, Pair.of(StandardSQLTypeName.STRING, BIGQUERY_JOB_ID_FIELD_DESCRIPTION)); for (Map.Entry> entry : reservedFieldNameToPairOfStandardSQLTypeAndDescription.entrySet()) {