Skip to content

Commit

Permalink
Dataflow runner options: disk type & streaming engine (#906)
Browse files Browse the repository at this point in the history
* diskType & streamingEnginer

* edit infra docs
  • Loading branch information
Oleksii Moskalenko authored Aug 1, 2020
1 parent f3ab22c commit 294faa5
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class DataflowRunnerConfig extends RunnerConfig {
public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
this.project = runnerConfigOptions.getProject();
this.region = runnerConfigOptions.getRegion();
this.zone = runnerConfigOptions.getZone();
this.workerZone = runnerConfigOptions.getWorkerZone();
this.serviceAccount = runnerConfigOptions.getServiceAccount();
this.network = runnerConfigOptions.getNetwork();
this.subnetwork = runnerConfigOptions.getSubnetwork();
Expand All @@ -44,6 +44,8 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
this.deadLetterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();
this.diskSizeGb = runnerConfigOptions.getDiskSizeGb();
this.labels = runnerConfigOptions.getLabelsMap();
this.enableStreamingEngine = runnerConfigOptions.getEnableStreamingEngine();
this.workerDiskType = runnerConfigOptions.getWorkerDiskType();
validate();
}

Expand All @@ -54,7 +56,7 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
@NotBlank public String region;

/* GCP availability zone for operations. */
@NotBlank public String zone;
@NotBlank public String workerZone;

/* Run the job as a specific service account, instead of the default GCE robot. */
public String serviceAccount;
Expand Down Expand Up @@ -91,6 +93,12 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {

public Map<String, String> labels;

/* If true job will be run on StreamingEngine instead of VMs */
public Boolean enableStreamingEngine;

/* Type of persistent disk to be used by workers */
public String workerDiskType;

/** Validates Dataflow runner configuration options */
public void validate() {
ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ feast:
options:
project: my_gcp_project
region: asia-east1
zone: asia-east1-a
workerZone: asia-east1-a
tempLocation: gs://bucket/tempLocation
network: default
subnetwork: regions/asia-east1/subnetworks/mysubnetwork
maxNumWorkers: 1
enableStreamingEngine: false
workerDiskType: compute.googleapis.com/projects/asia-east1-a/diskTypes/pd-ssd
autoscalingAlgorithm: THROUGHPUT_BASED
usePublicIps: false
workerMachineType: n1-standard-1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void setUp() {
Builder optionsBuilder = DataflowRunnerConfigOptions.newBuilder();
optionsBuilder.setProject("project");
optionsBuilder.setRegion("region");
optionsBuilder.setZone("zone");
optionsBuilder.setWorkerZone("zone");
optionsBuilder.setTempLocation("tempLocation");
optionsBuilder.setNetwork("network");
optionsBuilder.setSubnetwork("subnetwork");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
DataflowRunnerConfigOptions.newBuilder()
.setProject("my-project")
.setRegion("asia-east1")
.setZone("asia-east1-a")
.setWorkerZone("asia-east1-a")
.setEnableStreamingEngine(true)
.setWorkerDiskType("pd-ssd")
.setTempLocation("gs://bucket/tempLocation")
.setNetwork("default")
.setSubnetwork("regions/asia-east1/subnetworks/mysubnetwork")
Expand All @@ -52,7 +54,7 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
Arrays.asList(
"--project=my-project",
"--region=asia-east1",
"--zone=asia-east1-a",
"--workerZone=asia-east1-a",
"--tempLocation=gs://bucket/tempLocation",
"--network=default",
"--subnetwork=regions/asia-east1/subnetworks/mysubnetwork",
Expand All @@ -62,7 +64,9 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
"--workerMachineType=n1-standard-1",
"--deadLetterTableSpec=project_id:dataset_id.table_id",
"--diskSizeGb=100",
"--labels={\"key\":\"value\"}")
"--labels={\"key\":\"value\"}",
"--enableStreamingEngine=true",
"--workerDiskType=pd-ssd")
.toArray(String[]::new);
assertThat(args.size(), equalTo(expectedArgs.length));
assertThat(args, containsInAnyOrder(expectedArgs));
Expand All @@ -74,7 +78,7 @@ public void shouldIgnoreOptionalArguments() throws IllegalAccessException {
DataflowRunnerConfigOptions.newBuilder()
.setProject("my-project")
.setRegion("asia-east1")
.setZone("asia-east1-a")
.setWorkerZone("asia-east1-a")
.setTempLocation("gs://bucket/tempLocation")
.setNetwork("default")
.setSubnetwork("regions/asia-east1/subnetworks/mysubnetwork")
Expand All @@ -90,15 +94,16 @@ public void shouldIgnoreOptionalArguments() throws IllegalAccessException {
Arrays.asList(
"--project=my-project",
"--region=asia-east1",
"--zone=asia-east1-a",
"--workerZone=asia-east1-a",
"--tempLocation=gs://bucket/tempLocation",
"--network=default",
"--subnetwork=regions/asia-east1/subnetworks/mysubnetwork",
"--maxNumWorkers=1",
"--autoscalingAlgorithm=THROUGHPUT_BASED",
"--usePublicIps=false",
"--workerMachineType=n1-standard-1",
"--labels={}")
"--labels={}",
"--enableStreamingEngine=false")
.toArray(String[]::new);
assertThat(args.size(), equalTo(expectedArgs.length));
assertThat(args, containsInAnyOrder(expectedArgs));
Expand Down
2 changes: 1 addition & 1 deletion infra/charts/feast/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ feast-core:
options:
project: <google_project_id>
region: <dataflow_regional_endpoint e.g. asia-east1>
zone: <google_zone e.g. asia-east1-a>
workerZone: <google_zone e.g. asia-east1-a>
tempLocation: <gcs_path_for_temp_files e.g. gs://bucket/tempLocation>
network: <google_cloud_network_name>
subnetwork: <google_cloud_subnetwork_path e.g. regions/asia-east1/subnetworks/mysubnetwork>
Expand Down
2 changes: 1 addition & 1 deletion infra/charts/feast/README.md.gotmpl
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ feast-core:
options:
project: <google_project_id>
region: <dataflow_regional_endpoint e.g. asia-east1>
zone: <google_zone e.g. asia-east1-a>
workerZone: <google_zone e.g. asia-east1-a>
tempLocation: <gcs_path_for_temp_files e.g. gs://bucket/tempLocation>
network: <google_cloud_network_name>
subnetwork: <google_cloud_subnetwork_path e.g. regions/asia-east1/subnetworks/mysubnetwork>
Expand Down
4 changes: 3 additions & 1 deletion infra/charts/feast/values-dataflow-runner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ feast-core:
options:
project: <google_project_id>
region: <dataflow_regional_endpoint e.g. asia-east1>
zone: <google_zone e.g. asia-east1-a>
workerZone: <google_zone e.g. asia-east1-a>
tempLocation: <gcs_path_for_temp_files e.g. gs://bucket/tempLocation>
network: <google_cloud_network_name>
subnetwork: <google_cloud_subnetwork_path e.g. regions/asia-east1/subnetworks/mysubnetwork>
enableStreamingEngine: false
workerDiskType: <disk_type e.g. compute.googleapis.com/projects/asia-east1-a/diskTypes/pd-ssd>
maxNumWorkers: 1
autoscalingAlgorithm: THROUGHPUT_BASED
usePublicIps: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ feast-core:
options:
project: $GCLOUD_PROJECT
region: $GCLOUD_REGION
zone: $GCLOUD_REGION-a
workerZone: $GCLOUD_REGION-a
tempLocation: gs://$TEMP_BUCKET/tempLocation
network: $GCLOUD_NETWORK
subnetwork: regions/$GCLOUD_REGION/subnetworks/$GCLOUD_SUBNET
Expand Down
7 changes: 6 additions & 1 deletion protos/feast/core/Runner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ message DataflowRunnerConfigOptions {
string region = 2;

/* GCP availability zone for operations. */
string zone = 3;
string workerZone = 3;

/* Run the job as a specific service account, instead of the default GCE robot. */
string serviceAccount = 4;
Expand Down Expand Up @@ -81,4 +81,9 @@ message DataflowRunnerConfigOptions {
/* Disk size to use on each remote Compute Engine worker instance */
int32 diskSizeGb = 14;

/* Run job on Dataflow Streaming Engine instead of creating worker VMs */
bool enableStreamingEngine = 15;

/* Type of persistent disk to be used by workers */
string workerDiskType = 16;
}

0 comments on commit 294faa5

Please sign in to comment.