Skip to content

Commit

Permalink
bigquery table - add json and parquet options (GoogleCloudPlatform#8296)
Browse files Browse the repository at this point in the history
* add json and parquet option to bq table

* fmt

* fmt
  • Loading branch information
DrFaust92 authored and DanielRieske committed Aug 2, 2023
1 parent 30e7b92 commit 9cf0d79
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,45 @@ func ResourceBigQueryTable() *schema.Resource {
},
},
},
// jsonOptions: [Optional] Additional properties to set if sourceFormat is set to JSON.
"json_options": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Description: `Additional properties to set if sourceFormat is set to JSON."`,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"encoding": {
Type: schema.TypeString,
Optional: true,
Default: "UTF-8",
ValidateFunc: validation.StringInSlice([]string{"UTF-8", "UTF-16BE", "UTF-16LE", "UTF-32BE", "UTF-32LE"}, false),
Description: `The character encoding of the data. The supported values are UTF-8, UTF-16BE, UTF-16LE, UTF-32BE, and UTF-32LE. The default value is UTF-8.`,
},
},
},
},

"parquet_options": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Description: `Additional properties to set if sourceFormat is set to PARQUET."`,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"enum_as_string": {
Type: schema.TypeBool,
Optional: true,
Description: `Indicates whether to infer Parquet ENUM logical type as STRING instead of BYTES by default.`,
},
"enable_list_inference": {
Type: schema.TypeBool,
Optional: true,
Description: `Indicates whether to use schema inference specifically for Parquet LIST logical type.`,
},
},
},
},
// GoogleSheetsOptions: [Optional] Additional options if sourceFormat is set to GOOGLE_SHEETS.
"google_sheets_options": {
Type: schema.TypeList,
Expand Down Expand Up @@ -1370,9 +1409,13 @@ func expandExternalDataConfiguration(cfg interface{}) (*bigquery.ExternalDataCon
if v, ok := raw["compression"]; ok {
edc.Compression = v.(string)
}

if v, ok := raw["csv_options"]; ok {
edc.CsvOptions = expandCsvOptions(v)
}
if v, ok := raw["json_options"]; ok {
edc.JsonOptions = expandJsonOptions(v)
}
if v, ok := raw["google_sheets_options"]; ok {
edc.GoogleSheetsOptions = expandGoogleSheetsOptions(v)
}
Expand All @@ -1382,6 +1425,10 @@ func expandExternalDataConfiguration(cfg interface{}) (*bigquery.ExternalDataCon
if v, ok := raw["avro_options"]; ok {
edc.AvroOptions = expandAvroOptions(v)
}
if v, ok := raw["parquet_options"]; ok {
edc.ParquetOptions = expandParquetOptions(v)
}

if v, ok := raw["ignore_unknown_values"]; ok {
edc.IgnoreUnknownValues = v.(bool)
}
Expand Down Expand Up @@ -1441,6 +1488,14 @@ func flattenExternalDataConfiguration(edc *bigquery.ExternalDataConfiguration) (
result["avro_options"] = flattenAvroOptions(edc.AvroOptions)
}

if edc.ParquetOptions != nil {
result["parquet_options"] = flattenParquetOptions(edc.ParquetOptions)
}

if edc.JsonOptions != nil {
result["json_options"] = flattenJsonOptions(edc.JsonOptions)
}

if edc.IgnoreUnknownValues {
result["ignore_unknown_values"] = edc.IgnoreUnknownValues
}
Expand Down Expand Up @@ -1638,6 +1693,64 @@ func flattenAvroOptions(opts *bigquery.AvroOptions) []map[string]interface{} {
return []map[string]interface{}{result}
}

func expandParquetOptions(configured interface{}) *bigquery.ParquetOptions {
if len(configured.([]interface{})) == 0 {
return nil
}

raw := configured.([]interface{})[0].(map[string]interface{})
opts := &bigquery.ParquetOptions{}

if v, ok := raw["enum_as_string"]; ok {
opts.EnumAsString = v.(bool)
}

if v, ok := raw["enable_list_inference"]; ok {
opts.EnableListInference = v.(bool)
}

return opts
}

func flattenParquetOptions(opts *bigquery.ParquetOptions) []map[string]interface{} {
result := map[string]interface{}{}

if opts.EnumAsString {
result["enum_as_string"] = opts.EnumAsString
}

if opts.EnableListInference {
result["enable_list_inference"] = opts.EnableListInference
}

return []map[string]interface{}{result}
}

func expandJsonOptions(configured interface{}) *bigquery.JsonOptions {
if len(configured.([]interface{})) == 0 {
return nil
}

raw := configured.([]interface{})[0].(map[string]interface{})
opts := &bigquery.JsonOptions{}

if v, ok := raw["encoding"]; ok {
opts.Encoding = v.(string)
}

return opts
}

func flattenJsonOptions(opts *bigquery.JsonOptions) []map[string]interface{} {
result := map[string]interface{}{}

if opts.Encoding != "" {
result["encoding"] = opts.Encoding
}

return []map[string]interface{}{result}
}

func expandSchema(raw interface{}) (*bigquery.TableSchema, error) {
var fields []*bigquery.TableFieldSchema

Expand Down
148 changes: 148 additions & 0 deletions mmv1/third_party/terraform/tests/resource_bigquery_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,34 @@ func TestAccBigQueryTable_AvroPartitioning(t *testing.T) {
})
}

func TestAccBigQueryExternalDataTable_json(t *testing.T) {
t.Parallel()
bucketName := testBucketName(t)
resourceName := "google_bigquery_table.test"
datasetID := fmt.Sprintf("tf_test_%s", acctest.RandString(t, 10))
tableID := fmt.Sprintf("tf_test_%s", acctest.RandString(t, 10))

acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckBigQueryTableDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccBigQueryTableJson(datasetID, tableID, bucketName, "UTF-8"),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"external_data_configuration.0.schema", "deletion_protection"},
},
{
Config: testAccBigQueryTableJson(datasetID, tableID, bucketName, "UTF-16BE"),
},
},
})
}

func TestAccBigQueryTable_RangePartitioning(t *testing.T) {
t.Parallel()
resourceName := "google_bigquery_table.test"
Expand Down Expand Up @@ -480,6 +508,30 @@ func TestAccBigQueryExternalDataTable_parquet(t *testing.T) {
})
}

func TestAccBigQueryExternalDataTable_parquetOptions(t *testing.T) {
t.Parallel()

bucketName := testBucketName(t)
objectName := fmt.Sprintf("tf_test_%s.gz.parquet", acctest.RandString(t, 10))

datasetID := fmt.Sprintf("tf_test_%s", acctest.RandString(t, 10))
tableID := fmt.Sprintf("tf_test_%s", acctest.RandString(t, 10))

acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckBigQueryTableDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccBigQueryTableFromGCSParquetOptions(datasetID, tableID, bucketName, objectName, true, true),
},
{
Config: testAccBigQueryTableFromGCSParquetOptions(datasetID, tableID, bucketName, objectName, false, false),
},
},
})
}

func TestAccBigQueryExternalDataTable_objectTable(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1581,6 +1633,46 @@ resource "google_bigquery_table" "test" {
`, datasetID, bucketName, objectName, tableID)
}

func testAccBigQueryTableFromGCSParquetOptions(datasetID, tableID, bucketName, objectName string, enum, list bool) string {
return fmt.Sprintf(`
resource "google_bigquery_dataset" "test" {
dataset_id = "%s"
}
resource "google_storage_bucket" "test" {
name = "%s"
location = "US"
force_destroy = true
}
resource "google_storage_bucket_object" "test" {
name = "%s"
source = "./test-fixtures/bigquerytable/test.parquet.gzip"
bucket = google_storage_bucket.test.name
}
resource "google_bigquery_table" "test" {
deletion_protection = false
table_id = "%s"
dataset_id = google_bigquery_dataset.test.dataset_id
external_data_configuration {
autodetect = false
source_format = "PARQUET"
reference_file_schema_uri = "gs://${google_storage_bucket.test.name}/${google_storage_bucket_object.test.name}"
parquet_options {
enum_as_string = "%t"
enable_list_inference = "%t"
}
source_uris = [
"gs://${google_storage_bucket.test.name}/*",
]
}
}
`, datasetID, bucketName, objectName, tableID, enum, list)
}

func testAccBigQueryTableFromGCSObjectTable(connectionID, datasetID, tableID, bucketName, objectName string) string {
return fmt.Sprintf(`
resource "google_bigquery_connection" "test" {
Expand Down Expand Up @@ -1797,6 +1889,62 @@ resource "google_bigquery_table" "test" {
`, datasetID, bucketName, objectName, content, connectionID, projectID, tableID, schema)
}

func testAccBigQueryTableJson(bucketName, datasetID, tableID, encoding string) string {
return fmt.Sprintf(`
resource "google_storage_bucket" "test" {
name = "%s"
location = "US"
force_destroy = true
}
resource "google_storage_bucket_object" "test" {
name = "key1=20200330/data.json"
content = "{\"name\":\"test\", \"last_modification\":\"2020-04-01\"}"
bucket = google_storage_bucket.test.name
}
resource "google_bigquery_dataset" "test" {
dataset_id = "%s"
}
resource "google_bigquery_table" "test" {
deletion_protection = false
table_id = "%s"
dataset_id = google_bigquery_dataset.test.dataset_id
external_data_configuration {
source_format = "NEWLINE_DELIMITED_JSON"
autodetect = false
source_uris= ["gs://${google_storage_bucket.test.name}/*"]
json_options {
encoding = "%s"
}
hive_partitioning_options {
mode = "CUSTOM"
source_uri_prefix = "gs://${google_storage_bucket.test.name}/{key1:STRING}"
require_partition_filter = true
}
schema = <<EOH
[
{
"name": "name",
"type": "STRING"
},
{
"name": "last_modification",
"type": "DATE"
}
]
EOH
}
depends_on = ["google_storage_bucket_object.test"]
}
`, datasetID, bucketName, tableID, encoding)
}

func testAccBigQueryTableFromGCSWithSchema(datasetID, tableID, bucketName, objectName, content, schema string) string {
return fmt.Sprintf(`
resource "google_bigquery_dataset" "test" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ in Terraform state, a `terraform destroy` or `terraform apply` that would delete
* `csv_options` (Optional) - Additional properties to set if
`source_format` is set to "CSV". Structure is [documented below](#nested_csv_options).

* `json_options` (Optional) - Additional properties to set if
`source_format` is set to "JSON". Structure is [documented below](#nested_json_options).

* `parquet_options` (Optional) - Additional properties to set if
`source_format` is set to "PARQUET". Structure is [documented below](#nested_parquet_options).

* `google_sheets_options` (Optional) - Additional options if
`source_format` is set to "GOOGLE_SHEETS". Structure is
[documented below](#nested_google_sheets_options).
Expand All @@ -172,7 +178,6 @@ in Terraform state, a `terraform destroy` or `terraform apply` that would delete
* `avro_options` (Optional) - Additional options if `source_format` is set to
"AVRO". Structure is [documented below](#nested_avro_options).


* `ignore_unknown_values` (Optional) - Indicates if BigQuery should
allow extra values that are not represented in the table schema.
If true, the extra values are ignored. If false, records with
Expand Down Expand Up @@ -234,6 +239,10 @@ in Terraform state, a `terraform destroy` or `terraform apply` that would delete
* `skip_leading_rows` (Optional) - The number of rows at the top of a CSV
file that BigQuery will skip when reading the data.

<a name="nested_json_options"></a>The `json_options` block supports:

* `encoding` (Optional) - The character encoding of the data. The supported values are UTF-8, UTF-16BE, UTF-16LE, UTF-32BE, and UTF-32LE. The default value is UTF-8.

<a name="nested_google_sheets_options"></a>The `google_sheets_options` block supports:

* `range` (Optional) - Range of a sheet to query from. Only used when
Expand All @@ -255,7 +264,7 @@ in Terraform state, a `terraform destroy` or `terraform apply` that would delete
partitioning on an unsupported format will lead to an error.
Currently supported formats are: JSON, CSV, ORC, Avro and Parquet.
* CUSTOM: when set to `CUSTOM`, you must encode the partition key schema within the `source_uri_prefix` by setting `source_uri_prefix` to `gs://bucket/path_to_table/{key1:TYPE1}/{key2:TYPE2}/{key3:TYPE3}`.

* `require_partition_filter` - (Optional) If set to true, queries over this table
require a partition filter that can be used for partition elimination to be
specified.
Expand All @@ -274,7 +283,12 @@ in Terraform state, a `terraform destroy` or `terraform apply` that would delete
* `use_avro_logical_types` (Optional) - If is set to true, indicates whether
to interpret logical types as the corresponding BigQuery data type
(for example, TIMESTAMP), instead of using the raw type (for example, INTEGER).


<a name="nested_parquet_options"></a>The `parquet_options` block supports:

* `enum_as_string` (Optional) - Indicates whether to infer Parquet ENUM logical type as STRING instead of BYTES by default.

* `enable_list_inference` (Optional) - Indicates whether to use schema inference specifically for Parquet LIST logical type.

<a name="nested_time_partitioning"></a>The `time_partitioning` block supports:

Expand Down

0 comments on commit 9cf0d79

Please sign in to comment.