Skip to content

Commit

Permalink
Set BigQuery table timepartition inside get table function (feast-dev…
Browse files Browse the repository at this point in the history
…#333)

* Fix BigQuery write setting timepartition outside of table reference

* Give core a bit more time to start up

* Rename GetTableReference to correctly reflect return type

* Increase kafka sleep time
  • Loading branch information
Chen Zhiling committed Nov 28, 2019
1 parent d635315 commit ac6c18e
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 4 deletions.
4 changes: 2 additions & 2 deletions .prow/scripts/test-end-to-end.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ nohup /tmp/kafka/bin/zookeeper-server-start.sh /tmp/kafka/config/zookeeper.prope
sleep 5
tail -n10 /var/log/zookeeper.log
nohup /tmp/kafka/bin/kafka-server-start.sh /tmp/kafka/config/server.properties &> /var/log/kafka.log 2>&1 &
sleep 5
sleep 10
tail -n10 /var/log/kafka.log

echo "
Expand Down Expand Up @@ -124,7 +124,7 @@ EOF
nohup java -jar core/target/feast-core-0.3.2-SNAPSHOT.jar \
--spring.config.location=file:///tmp/core.application.yml \
&> /var/log/feast-core.log &
sleep 20
sleep 30
tail -n10 /var/log/feast-core.log

echo "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import feast.ingestion.utils.ResourceUtil;
import feast.ingestion.values.FailedElement;
import feast.store.serving.bigquery.FeatureRowToTableRow;
import feast.store.serving.bigquery.GetTableDestination;
import feast.store.serving.redis.FeatureRowToRedisMutationDoFn;
import feast.store.serving.redis.RedisCustomIO;
import feast.types.FeatureRowProto.FeatureRow;
Expand All @@ -40,12 +41,10 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.ValueInSingleWindow;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package feast.store.serving.bigquery;

import com.google.api.services.bigquery.model.TimePartitioning;
import feast.types.FeatureRowProto.FeatureRow;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.ValueInSingleWindow;

public class GetTableDestination implements
SerializableFunction<ValueInSingleWindow<FeatureRow>, TableDestination> {

private String projectId;
private String datasetId;

public GetTableDestination(String projectId, String datasetId) {
this.projectId = projectId;
this.datasetId = datasetId;
}

@Override
public TableDestination apply(ValueInSingleWindow<FeatureRow> input) {
String[] split = input.getValue().getFeatureSet().split(":");

TimePartitioning timePartitioning =
new TimePartitioning()
.setType("DAY")
.setField(FeatureRowToTableRow.getEventTimestampColumn());

return new TableDestination(
String.format("%s:%s.%s_v%s", projectId, datasetId, split[0], split[1]),
String
.format("Feast table for %s", input.getValue().getFeatureSet()),
timePartitioning);
}
}

0 comments on commit ac6c18e

Please sign in to comment.