Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create table in BigQuery if doesn't exists when new FeatureSetSpec arrived to IngestionJob #815

Merged
merged 3 commits into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ String createJobId(Source source, String storeName) {
String.format(
"%s-%d-to-%s-%s",
source.getTypeString(), Objects.hashCode(source.getConfig()), storeName, dateSuffix);
return jobId.replaceAll("_store", "-");
return jobId.replaceAll("_store", "-").toLowerCase();
}

private void logAudit(Action action, Job job, String detail, Object... args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;

/**
* Converts {@link feast.proto.core.FeatureSetProto.FeatureSetSpec} into BigQuery schema. Serializes
* it into json-like format {@link TableSchema}. Fetches existing schema to merge existing fields
* with new ones.
*
* <p>As a side effect this Operation may create bq table (if it doesn't exist) to make
* bootstrapping faster
*/
public class FeatureSetSpecToTableSchema
extends DoFn<KV<String, FeatureSetProto.FeatureSetSpec>, KV<String, TableSchema>> {
private BigQuery bqService;
Expand All @@ -51,6 +59,12 @@ public class FeatureSetSpecToTableSchema
private static final Logger log =
org.slf4j.LoggerFactory.getLogger(FeatureSetSpecToTableSchema.class);

// Reserved columns
public static final String EVENT_TIMESTAMP_COLUMN = "event_timestamp";
public static final String CREATED_TIMESTAMP_COLUMN = "created_timestamp";
public static final String INGESTION_ID_COLUMN = "ingestion_id";
public static final String JOB_ID_COLUMN = "job_id";

// Column description for reserved fields
public static final String BIGQUERY_EVENT_TIMESTAMP_FIELD_DESCRIPTION =
"Event time for the FeatureRow";
Expand All @@ -76,19 +90,44 @@ public void processElement(
@Element KV<String, FeatureSetProto.FeatureSetSpec> element,
OutputReceiver<KV<String, TableSchema>> output,
ProcessContext context) {
Schema schema = createSchemaFromSpec(element.getValue(), element.getKey());
output.output(KV.of(element.getKey(), serializeSchema(schema)));
String specKey = element.getKey();

Table existingTable = getExistingTable(specKey);
Schema schema = createSchemaFromSpec(element.getValue(), specKey, existingTable);
zhilingc marked this conversation as resolved.
Show resolved Hide resolved

if (existingTable == null) {
createTable(specKey, schema);
}

output.output(KV.of(specKey, serializeSchema(schema)));
}

private Table getExistingTable(String specKey) {
private TableId generateTableId(String specKey) {
TableDestination tableDestination = BigQuerySinkHelpers.getTableDestination(dataset, specKey);
TableReference tableReference = BigQueryHelpers.parseTableSpec(tableDestination.getTableSpec());
TableId tableId =
TableId.of(
tableReference.getProjectId(),
tableReference.getDatasetId(),
tableReference.getTableId());
return bqService.getTable(tableId);
return TableId.of(
tableReference.getProjectId(), tableReference.getDatasetId(), tableReference.getTableId());
}

private Table getExistingTable(String specKey) {
return bqService.getTable(generateTableId(specKey));
}

private void createTable(String specKey, Schema schema) {
TimePartitioning timePartitioning =
TimePartitioning.newBuilder(TimePartitioning.Type.DAY)
.setField(EVENT_TIMESTAMP_COLUMN)
.build();

StandardTableDefinition tableDefinition =
StandardTableDefinition.newBuilder()
.setTimePartitioning(timePartitioning)
.setSchema(schema)
.build();

TableInfo tableInfo = TableInfo.of(generateTableId(specKey), tableDefinition);

bqService.create(tableInfo);
}

/**
Expand All @@ -98,12 +137,14 @@ private Table getExistingTable(String specKey) {
*
* @param spec FeatureSet spec that this table is for
* @param specKey String for retrieving existing table
* @param existingTable Table fetched from BQ. Fields from existing table used to merge with new
* schema
* @return {@link Schema} containing all tombstoned and active fields.
*/
private Schema createSchemaFromSpec(FeatureSetProto.FeatureSetSpec spec, String specKey) {
private Schema createSchemaFromSpec(
FeatureSetProto.FeatureSetSpec spec, String specKey, Table existingTable) {
List<Field> fields = new ArrayList<>();
log.info("Table {} will have the following fields:", specKey);
Table existingTable = getExistingTable(specKey);

for (FeatureSetProto.EntitySpec entitySpec : spec.getEntitiesList()) {
Field.Builder builder =
Expand Down Expand Up @@ -133,14 +174,14 @@ private Schema createSchemaFromSpec(FeatureSetProto.FeatureSetSpec spec, String
Map<String, Pair<StandardSQLTypeName, String>>
reservedFieldNameToPairOfStandardSQLTypeAndDescription =
ImmutableMap.of(
"event_timestamp",
EVENT_TIMESTAMP_COLUMN,
Pair.of(StandardSQLTypeName.TIMESTAMP, BIGQUERY_EVENT_TIMESTAMP_FIELD_DESCRIPTION),
"created_timestamp",
CREATED_TIMESTAMP_COLUMN,
Pair.of(
StandardSQLTypeName.TIMESTAMP, BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION),
"ingestion_id",
INGESTION_ID_COLUMN,
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION),
"job_id",
JOB_ID_COLUMN,
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_JOB_ID_FIELD_DESCRIPTION));
for (Map.Entry<String, Pair<StandardSQLTypeName, String>> entry :
reservedFieldNameToPairOfStandardSQLTypeAndDescription.entrySet()) {
Expand Down