diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 47c7261df7150..5aeea5a3f765e 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -493,19 +493,6 @@ jobs: steps: - name: Checkout Spark repository uses: actions/checkout@v2 - - name: Cache TPC-DS generated data - id: cache-tpcds-sf-1 - uses: actions/cache@v2 - with: - path: ./tpcds-sf-1 - key: tpcds-556111e35d400f56cb0625dc16e9063d54628320 - - name: Checkout TPC-DS (SF=1) generated data repository - if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' - uses: actions/checkout@v2 - with: - repository: maropu/spark-tpcds-sf-1 - ref: 556111e35d400f56cb0625dc16e9063d54628320 - path: ./tpcds-sf-1 - name: Cache Scala, SBT and Maven uses: actions/cache@v2 with: @@ -528,6 +515,24 @@ jobs: uses: actions/setup-java@v1 with: java-version: 8 + - name: Cache TPC-DS generated data + id: cache-tpcds-sf-1 + uses: actions/cache@v2 + with: + path: ./tpcds-sf-1 + key: tpcds-${{ hashFiles('sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }} + - name: Checkout tpcds-kit repository + if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' + uses: actions/checkout@v2 + with: + repository: maropu/spark-tpcds-datagen + path: ./tpcds-kit + - name: Build tpcds-kit + if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' + run: cd tpcds-kit/thirdparty/tpcds-kit/tools && make OS=LINUX + - name: Generate TPC-DS (SF=1) table data + if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' + run: build/sbt "sql/test:runMain org.apache.spark.sql.GenTPCDSData --dsdgenDir `pwd`/tpcds-kit/thirdparty/tpcds-kit/tools --location `pwd`/tpcds-sf-1 --scaleFactor 1 --numPartitions 1 --overwrite" - name: Run TPC-DS queries run: | SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 build/sbt "sql/testOnly org.apache.spark.sql.TPCDSQueryTestSuite" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala b/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala new file mode 100644 index 0000000000000..aff67d7f3202c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.immutable.Stream +import scala.sys.process._ +import scala.util.Try + +import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.functions.{col, rpad} +import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType} + +// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf + +/** + * Using ProcessBuilder.lineStream produces a stream, that uses + * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE. + * + * This causes OOM if the consumer cannot keep up with the producer. + * + * See scala.sys.process.ProcessBuilderImpl.lineStream + */ +object BlockingLineStream { + + // See scala.sys.process.Streamed + private final class BlockingStreamed[T]( + val process: T => Unit, + val done: Int => Unit, + val stream: () => Stream[T]) + + // See scala.sys.process.Streamed + private object BlockingStreamed { + // scala.process.sys.Streamed uses default of Integer.MAX_VALUE, + // which causes OOMs if the consumer cannot keep up with producer. + val maxQueueSize = 65536 + + def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = { + val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize) + + def next(): Stream[T] = q.take match { + case Left(0) => Stream.empty + case Left(code) => + if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty + case Right(s) => Stream.cons(s, next()) + } + + new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next()) + } + } + + // See scala.sys.process.ProcessImpl.Spawn + private object Spawn { + def apply(f: => Unit): Thread = apply(f, daemon = false) + def apply(f: => Unit, daemon: Boolean): Thread = { + val thread = new Thread() { override def run() = { f } } + thread.setDaemon(daemon) + thread.start() + thread + } + } + + def apply(command: Seq[String]): Stream[String] = { + val streamed = BlockingStreamed[String](true) + val process = command.run(BasicIO(false, streamed.process, None)) + Spawn(streamed.done(process.exitValue())) + streamed.stream() + } +} + +class Dsdgen(dsdgenDir: String) extends Serializable { + private val dsdgen = s"$dsdgenDir/dsdgen" + + def generate( + sparkContext: SparkContext, + tableName: String, + partitions: Int, + scaleFactor: Int): RDD[String] = { + val generatedData = { + sparkContext.parallelize(1 to partitions, partitions).flatMap { i => + val localToolsDir = if (new java.io.File(dsdgen).exists) { + dsdgenDir + } else if (new java.io.File(s"/$dsdgen").exists) { + s"/$dsdgenDir" + } else { + throw new IllegalStateException( + s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install") + } + + // NOTE: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to + // 19620718 that is used to generate `https://github.com/maropu/spark-tpcds-sf-1`. + val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else "" + val commands = Seq( + "bash", "-c", + s"cd $localToolsDir && ./dsdgen -table $tableName -filter Y -scale $scaleFactor " + + s"-RNGSEED 19620718 $parallel") + BlockingLineStream(commands) + } + } + + generatedData.setName(s"$tableName, sf=$scaleFactor, strings") + generatedData + } +} + +class TPCDSTables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) + extends TPCDSSchema with Logging with Serializable { + + private val dataGenerator = new Dsdgen(dsdgenDir) + + private def tables: Seq[Table] = tableColumns.map { case (tableName, schemaString) => + val partitionColumns = tablePartitionColumns.getOrElse(tableName, Nil) + .map(_.stripPrefix("`").stripSuffix("`")) + Table(tableName, partitionColumns, StructType.fromDDL(schemaString)) + }.toSeq + + private case class Table(name: String, partitionColumns: Seq[String], schema: StructType) { + def nonPartitioned: Table = { + Table(name, Nil, schema) + } + + private def df(numPartition: Int) = { + val generatedData = dataGenerator.generate( + sqlContext.sparkContext, name, numPartition, scaleFactor) + val rows = generatedData.mapPartitions { iter => + iter.map { l => + val values = l.split("\\|", -1).dropRight(1).map { v => + if (v.equals("")) { + // If the string value is an empty string, we turn it to a null + null + } else { + v + } + } + Row.fromSeq(values) + } + } + + val stringData = + sqlContext.createDataFrame( + rows, + StructType(schema.fields.map(f => StructField(f.name, StringType)))) + + val convertedData = { + val columns = schema.fields.map { f => + val c = f.dataType match { + // Needs right-padding for char types + case CharType(n) => rpad(Column(f.name), n, " ") + // Don't need a cast for varchar types + case _: VarcharType => col(f.name) + case _ => col(f.name).cast(f.dataType) + } + c.as(f.name) + } + stringData.select(columns: _*) + } + + convertedData + } + + def genData( + location: String, + format: String, + overwrite: Boolean, + clusterByPartitionColumns: Boolean, + filterOutNullPartitionValues: Boolean, + numPartitions: Int): Unit = { + val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore + + val data = df(numPartitions) + val tempTableName = s"${name}_text" + data.createOrReplaceTempView(tempTableName) + + val writer = if (partitionColumns.nonEmpty) { + if (clusterByPartitionColumns) { + val columnString = data.schema.fields.map { field => + field.name + }.mkString(",") + val partitionColumnString = partitionColumns.mkString(",") + val predicates = if (filterOutNullPartitionValues) { + partitionColumns.map(col => s"$col IS NOT NULL").mkString("WHERE ", " AND ", "") + } else { + "" + } + + val query = + s""" + |SELECT + | $columnString + |FROM + | $tempTableName + |$predicates + |DISTRIBUTE BY + | $partitionColumnString + """.stripMargin + val grouped = sqlContext.sql(query) + logInfo(s"Pre-clustering with partitioning columns with query $query.") + grouped.write + } else { + data.write + } + } else { + // treat non-partitioned tables as "one partition" that we want to coalesce + if (clusterByPartitionColumns) { + // in case data has more than maxRecordsPerFile, split into multiple writers to improve + // datagen speed files will be truncated to maxRecordsPerFile value, so the final + // result will be the same. + val numRows = data.count + val maxRecordPerFile = Try { + sqlContext.getConf("spark.sql.files.maxRecordsPerFile").toInt + }.getOrElse(0) + + if (maxRecordPerFile > 0 && numRows > maxRecordPerFile) { + val numFiles = (numRows.toDouble/maxRecordPerFile).ceil.toInt + logInfo(s"Coalescing into $numFiles files") + data.coalesce(numFiles).write + } else { + data.coalesce(1).write + } + } else { + data.write + } + } + writer.format(format).mode(mode) + if (partitionColumns.nonEmpty) { + writer.partitionBy(partitionColumns: _*) + } + logInfo(s"Generating table $name in database to $location with save mode $mode.") + writer.save(location) + sqlContext.dropTempTable(tempTableName) + } + } + + def genData( + location: String, + format: String, + overwrite: Boolean, + partitionTables: Boolean, + clusterByPartitionColumns: Boolean, + filterOutNullPartitionValues: Boolean, + tableFilter: String = "", + numPartitions: Int = 100): Unit = { + var tablesToBeGenerated = if (partitionTables) { + tables + } else { + tables.map(_.nonPartitioned) + } + + if (!tableFilter.isEmpty) { + tablesToBeGenerated = tablesToBeGenerated.filter(_.name == tableFilter) + if (tablesToBeGenerated.isEmpty) { + throw new RuntimeException("Bad table name filter: " + tableFilter) + } + } + + tablesToBeGenerated.foreach { table => + val tableLocation = s"$location/${table.name}" + table.genData(tableLocation, format, overwrite, clusterByPartitionColumns, + filterOutNullPartitionValues, numPartitions) + } + } +} + +class GenTPCDSDataConfig(args: Array[String]) { + var master: String = "local[*]" + var dsdgenDir: String = null + var location: String = null + var scaleFactor: Int = 1 + var format: String = "parquet" + var overwrite: Boolean = false + var partitionTables: Boolean = false + var clusterByPartitionColumns: Boolean = false + var filterOutNullPartitionValues: Boolean = false + var tableFilter: String = "" + var numPartitions: Int = 100 + + parseArgs(args.toList) + + private def parseArgs(inputArgs: List[String]): Unit = { + var args = inputArgs + + while (args.nonEmpty) { + args match { + case "--master" :: value :: tail => + master = value + args = tail + + case "--dsdgenDir" :: value :: tail => + dsdgenDir = value + args = tail + + case "--location" :: value :: tail => + location = value + args = tail + + case "--scaleFactor" :: value :: tail => + scaleFactor = toPositiveIntValue("Scale factor", value) + args = tail + + case "--format" :: value :: tail => + format = value + args = tail + + case "--overwrite" :: tail => + overwrite = true + args = tail + + case "--partitionTables" :: tail => + partitionTables = true + args = tail + + case "--clusterByPartitionColumns" :: tail => + clusterByPartitionColumns = true + args = tail + + case "--filterOutNullPartitionValues" :: tail => + filterOutNullPartitionValues = true + args = tail + + case "--tableFilter" :: value :: tail => + tableFilter = value + args = tail + + case "--numPartitions" :: value :: tail => + numPartitions = toPositiveIntValue("Number of partitions", value) + args = tail + + case "--help" :: tail => + printUsageAndExit(0) + + case _ => + // scalastyle:off println + System.err.println("Unknown/unsupported param " + args) + // scalastyle:on println + printUsageAndExit(1) + } + } + + checkRequiredArguments() + } + + private def printUsageAndExit(exitCode: Int): Unit = { + // scalastyle:off + System.err.println(""" + |build/sbt "test:runMain [Options]" + |Options: + | --master the Spark master to use, default to local[*] + | --dsdgenDir location of dsdgen + | --location root directory of location to generate data in + | --scaleFactor size of the dataset to generate (in GB) + | --format generated data format, Parquet, ORC ... + | --overwrite whether to overwrite the data that is already there + | --partitionTables whether to create the partitioned fact tables + | --clusterByPartitionColumns whether to shuffle to get partitions coalesced into single files + | --filterOutNullPartitionValues whether to filter out the partition with NULL key value + | --tableFilter comma-separated list of table names to generate (e.g., store_sales,store_returns), + | all the tables are generated by default + | --numPartitions how many dsdgen partitions to run - number of input tasks + """.stripMargin) + // scalastyle:on + System.exit(exitCode) + } + + private def toPositiveIntValue(name: String, v: String): Int = { + if (Try(v.toInt).getOrElse(-1) <= 0) { + // scalastyle:off println + System.err.println(s"$name must be a positive number") + // scalastyle:on println + printUsageAndExit(-1) + } + v.toInt + } + + private def checkRequiredArguments(): Unit = { + if (dsdgenDir == null) { + // scalastyle:off println + System.err.println("Must specify a dsdgen path") + // scalastyle:on println + printUsageAndExit(-1) + } + if (location == null) { + // scalastyle:off println + System.err.println("Must specify an output location") + // scalastyle:on println + printUsageAndExit(-1) + } + } +} + +/** + * This class generates TPCDS table data by using tpcds-kit: + * - https://github.com/databricks/tpcds-kit + * + * To run this: + * {{{ + * build/sbt "sql/test:runMain --dsdgenDir --location --scaleFactor 1" + * }}} + */ +object GenTPCDSData { + + def main(args: Array[String]): Unit = { + val config = new GenTPCDSDataConfig(args) + + val spark = SparkSession + .builder() + .appName(getClass.getName) + .master(config.master) + .getOrCreate() + + val tables = new TPCDSTables( + spark.sqlContext, + dsdgenDir = config.dsdgenDir, + scaleFactor = config.scaleFactor) + + tables.genData( + location = config.location, + format = config.format, + overwrite = config.overwrite, + partitionTables = config.partitionTables, + clusterByPartitionColumns = config.clusterByPartitionColumns, + filterOutNullPartitionValues = config.filterOutNullPartitionValues, + tableFilter = config.tableFilter, + numPartitions = config.numPartitions) + + spark.stop() + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSBase.scala index e0ed9c8ff5412..3eef3a2279a13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSBase.scala @@ -21,32 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession - -/** - * Base trait for TPC-DS related tests. - * - * Datatype mapping for TPC-DS and Spark SQL, fully matching schemas defined in `tpcds.sql` of the - * official tpcds toolkit - * see more at: - * http://www.tpc.org/tpc_documents_current_versions/pdf/tpc-ds_v2.9.0.pdf - * - * |---------------|---------------| - * | TPC-DS | Spark SQL | - * |---------------|---------------| - * | Identifier | INT | - * |---------------|---------------| - * | Integer | INT | - * |---------------|---------------| - * | Decimal(d, f) | Decimal(d, f) | - * |---------------|---------------| - * | Char(N) | Char(N) | - * |---------------|---------------| - * | Varchar(N) | Varchar(N) | - * |---------------|---------------| - * | Date | Date | - * |---------------|---------------| - */ -trait TPCDSBase extends SharedSparkSession { +trait TPCDSBase extends SharedSparkSession with TPCDSSchema { // The TPCDS queries below are based on v1.4 val tpcdsQueries = Seq( @@ -73,516 +48,6 @@ trait TPCDSBase extends SharedSparkSession { "q3", "q7", "q10", "q19", "q27", "q34", "q42", "q43", "q46", "q52", "q53", "q55", "q59", "q63", "q65", "q68", "q73", "q79", "q89", "q98", "ss_max") - protected val tableColumns: Map[String, String] = Map( - "store_sales" -> - """ - |`ss_sold_date_sk` INT, - |`ss_sold_time_sk` INT, - |`ss_item_sk` INT, - |`ss_customer_sk` INT, - |`ss_cdemo_sk` INT, - |`ss_hdemo_sk` INT, - |`ss_addr_sk` INT, - |`ss_store_sk` INT, - |`ss_promo_sk` INT, - |`ss_ticket_number` INT, - |`ss_quantity` INT, - |`ss_wholesale_cost` DECIMAL(7,2), - |`ss_list_price` DECIMAL(7,2), - |`ss_sales_price` DECIMAL(7,2), - |`ss_ext_discount_amt` DECIMAL(7,2), - |`ss_ext_sales_price` DECIMAL(7,2), - |`ss_ext_wholesale_cost` DECIMAL(7,2), - |`ss_ext_list_price` DECIMAL(7,2), - |`ss_ext_tax` DECIMAL(7,2), - |`ss_coupon_amt` DECIMAL(7,2), - |`ss_net_paid` DECIMAL(7,2), - |`ss_net_paid_inc_tax` DECIMAL(7,2), - |`ss_net_profit` DECIMAL(7,2) - """.stripMargin, - "store_returns" -> - """ - |`sr_returned_date_sk` INT, - |`sr_return_time_sk` INT, - |`sr_item_sk` INT, - |`sr_customer_sk` INT, - |`sr_cdemo_sk` INT, - |`sr_hdemo_sk` INT, - |`sr_addr_sk` INT, - |`sr_store_sk` INT, - |`sr_reason_sk` INT, - |`sr_ticket_number` INT, - |`sr_return_quantity` INT, - |`sr_return_amt` DECIMAL(7,2), - |`sr_return_tax` DECIMAL(7,2), - |`sr_return_amt_inc_tax` DECIMAL(7,2), - |`sr_fee` DECIMAL(7,2), - |`sr_return_ship_cost` DECIMAL(7,2), - |`sr_refunded_cash` DECIMAL(7,2), - |`sr_reversed_charge` DECIMAL(7,2), - |`sr_store_credit` DECIMAL(7,2), - |`sr_net_loss` DECIMAL(7,2) - """.stripMargin, - "catalog_sales" -> - """ - |`cs_sold_date_sk` INT, - |`cs_sold_time_sk` INT, - |`cs_ship_date_sk` INT, - |`cs_bill_customer_sk` INT, - |`cs_bill_cdemo_sk` INT, - |`cs_bill_hdemo_sk` INT, - |`cs_bill_addr_sk` INT, - |`cs_ship_customer_sk` INT, - |`cs_ship_cdemo_sk` INT, - |`cs_ship_hdemo_sk` INT, - |`cs_ship_addr_sk` INT, - |`cs_call_center_sk` INT, - |`cs_catalog_page_sk` INT, - |`cs_ship_mode_sk` INT, - |`cs_warehouse_sk` INT, - |`cs_item_sk` INT, - |`cs_promo_sk` INT, - |`cs_order_number` INT, - |`cs_quantity` INT, - |`cs_wholesale_cost` DECIMAL(7,2), - |`cs_list_price` DECIMAL(7,2), - |`cs_sales_price` DECIMAL(7,2), - |`cs_ext_discount_amt` DECIMAL(7,2), - |`cs_ext_sales_price` DECIMAL(7,2), - |`cs_ext_wholesale_cost` DECIMAL(7,2), - |`cs_ext_list_price` DECIMAL(7,2), - |`cs_ext_tax` DECIMAL(7,2), - |`cs_coupon_amt` DECIMAL(7,2), - |`cs_ext_ship_cost` DECIMAL(7,2), - |`cs_net_paid` DECIMAL(7,2), - |`cs_net_paid_inc_tax` DECIMAL(7,2), - |`cs_net_paid_inc_ship` DECIMAL(7,2), - |`cs_net_paid_inc_ship_tax` DECIMAL(7,2), - |`cs_net_profit` DECIMAL(7,2) - """.stripMargin, - "catalog_returns" -> - """ - |`cr_returned_date_sk` INT, - |`cr_returned_time_sk` INT, - |`cr_item_sk` INT, - |`cr_refunded_customer_sk` INT, - |`cr_refunded_cdemo_sk` INT, - |`cr_refunded_hdemo_sk` INT, - |`cr_refunded_addr_sk` INT, - |`cr_returning_customer_sk` INT, - |`cr_returning_cdemo_sk` INT, - |`cr_returning_hdemo_sk` INT, - |`cr_returning_addr_sk` INT, - |`cr_call_center_sk` INT, - |`cr_catalog_page_sk` INT, - |`cr_ship_mode_sk` INT, - |`cr_warehouse_sk` INT, - |`cr_reason_sk` INT,`cr_order_number` INT, - |`cr_return_quantity` INT, - |`cr_return_amount` DECIMAL(7,2), - |`cr_return_tax` DECIMAL(7,2), - |`cr_return_amt_inc_tax` DECIMAL(7,2), - |`cr_fee` DECIMAL(7,2), - |`cr_return_ship_cost` DECIMAL(7,2), - |`cr_refunded_cash` DECIMAL(7,2), - |`cr_reversed_charge` DECIMAL(7,2), - |`cr_store_credit` DECIMAL(7,2), - |`cr_net_loss` DECIMAL(7,2) - """.stripMargin, - "web_sales" -> - """ - |`ws_sold_date_sk` INT, - |`ws_sold_time_sk` INT, - |`ws_ship_date_sk` INT, - |`ws_item_sk` INT, - |`ws_bill_customer_sk` INT, - |`ws_bill_cdemo_sk` INT, - |`ws_bill_hdemo_sk` INT, - |`ws_bill_addr_sk` INT, - |`ws_ship_customer_sk` INT, - |`ws_ship_cdemo_sk` INT, - |`ws_ship_hdemo_sk` INT, - |`ws_ship_addr_sk` INT, - |`ws_web_page_sk` INT, - |`ws_web_site_sk` INT, - |`ws_ship_mode_sk` INT, - |`ws_warehouse_sk` INT, - |`ws_promo_sk` INT, - |`ws_order_number` INT, - |`ws_quantity` INT, - |`ws_wholesale_cost` DECIMAL(7,2), - |`ws_list_price` DECIMAL(7,2), - |`ws_sales_price` DECIMAL(7,2), - |`ws_ext_discount_amt` DECIMAL(7,2), - |`ws_ext_sales_price` DECIMAL(7,2), - |`ws_ext_wholesale_cost` DECIMAL(7,2), - |`ws_ext_list_price` DECIMAL(7,2), - |`ws_ext_tax` DECIMAL(7,2), - |`ws_coupon_amt` DECIMAL(7,2), - |`ws_ext_ship_cost` DECIMAL(7,2), - |`ws_net_paid` DECIMAL(7,2), - |`ws_net_paid_inc_tax` DECIMAL(7,2), - |`ws_net_paid_inc_ship` DECIMAL(7,2), - |`ws_net_paid_inc_ship_tax` DECIMAL(7,2), - |`ws_net_profit` DECIMAL(7,2) - """.stripMargin, - "web_returns" -> - """ - |`wr_returned_date_sk` INT, - |`wr_returned_time_sk` INT, - |`wr_item_sk` INT, - |`wr_refunded_customer_sk` INT, - |`wr_refunded_cdemo_sk` INT, - |`wr_refunded_hdemo_sk` INT, - |`wr_refunded_addr_sk` INT, - |`wr_returning_customer_sk` INT, - |`wr_returning_cdemo_sk` INT, - |`wr_returning_hdemo_sk` INT, - |`wr_returning_addr_sk` INT, - |`wr_web_page_sk` INT, - |`wr_reason_sk` INT, - |`wr_order_number` INT, - |`wr_return_quantity` INT, - |`wr_return_amt` DECIMAL(7,2), - |`wr_return_tax` DECIMAL(7,2), - |`wr_return_amt_inc_tax` DECIMAL(7,2), - |`wr_fee` DECIMAL(7,2), - |`wr_return_ship_cost` DECIMAL(7,2), - |`wr_refunded_cash` DECIMAL(7,2), - |`wr_reversed_charge` DECIMAL(7,2), - |`wr_account_credit` DECIMAL(7,2), - |`wr_net_loss` DECIMAL(7,2) - """.stripMargin, - "inventory" -> - """ - |`inv_date_sk` INT, - |`inv_item_sk` INT, - |`inv_warehouse_sk` INT, - |`inv_quantity_on_hand` INT - """.stripMargin, - "store" -> - """ - |`s_store_sk` INT, - |`s_store_id` CHAR(16), - |`s_rec_start_date` DATE, - |`s_rec_end_date` DATE, - |`s_closed_date_sk` INT, - |`s_store_name` VARCHAR(50), - |`s_number_employees` INT, - |`s_floor_space` INT, - |`s_hours` CHAR(20), - |`s_manager` VARCHAR(40), - |`s_market_id` INT, - |`s_geography_class` VARCHAR(100), - |`s_market_desc` VARCHAR(100), - |`s_market_manager` VARCHAR(40), - |`s_division_id` INT, - |`s_division_name` VARCHAR(50), - |`s_company_id` INT, - |`s_company_name` VARCHAR(50), - |`s_street_number` VARCHAR(10), - |`s_street_name` VARCHAR(60), - |`s_street_type` CHAR(15), - |`s_suite_number` CHAR(10), - |`s_city` VARCHAR(60), - |`s_county` VARCHAR(30), - |`s_state` CHAR(2), - |`s_zip` CHAR(10), - |`s_country` VARCHAR(20), - |`s_gmt_offset` DECIMAL(5,2), - |`s_tax_percentage` DECIMAL(5,2) - """.stripMargin, - "call_center" -> - """ - |`cc_call_center_sk` INT, - |`cc_call_center_id` CHAR(16), - |`cc_rec_start_date` DATE, - |`cc_rec_end_date` DATE, - |`cc_closed_date_sk` INT, - |`cc_open_date_sk` INT, - |`cc_name` VARCHAR(50), - |`cc_class` VARCHAR(50), - |`cc_employees` INT, - |`cc_sq_ft` INT, - |`cc_hours` CHAR(20), - |`cc_manager` VARCHAR(40), - |`cc_mkt_id` INT, - |`cc_mkt_class` CHAR(50), - |`cc_mkt_desc` VARCHAR(100), - |`cc_market_manager` VARCHAR(40), - |`cc_division` INT, - |`cc_division_name` VARCHAR(50), - |`cc_company` INT, - |`cc_company_name` CHAR(50), - |`cc_street_number` CHAR(10), - |`cc_street_name` VARCHAR(60), - |`cc_street_type` CHAR(15), - |`cc_suite_number` CHAR(10), - |`cc_city` VARCHAR(60), - |`cc_county` VARCHAR(30), - |`cc_state` CHAR(2), - |`cc_zip` CHAR(10), - |`cc_country` VARCHAR(20), - |`cc_gmt_offset` DECIMAL(5,2), - |`cc_tax_percentage` DECIMAL(5,2) - """.stripMargin, - "catalog_page" -> - """ - |`cp_catalog_page_sk` INT, - |`cp_catalog_page_id` CHAR(16), - |`cp_start_date_sk` INT, - |`cp_end_date_sk` INT, - |`cp_department` VARCHAR(50), - |`cp_catalog_number` INT, - |`cp_catalog_page_number` INT, - |`cp_description` VARCHAR(100), - |`cp_type` VARCHAR(100) - """.stripMargin, - "web_site" -> - """ - |`web_site_sk` INT, - |`web_site_id` CHAR(16), - |`web_rec_start_date` DATE, - |`web_rec_end_date` DATE, - |`web_name` VARCHAR(50), - |`web_open_date_sk` INT, - |`web_close_date_sk` INT, - |`web_class` VARCHAR(50), - |`web_manager` VARCHAR(40), - |`web_mkt_id` INT, - |`web_mkt_class` VARCHAR(50), - |`web_mkt_desc` VARCHAR(100), - |`web_market_manager` VARCHAR(40), - |`web_company_id` INT, - |`web_company_name` CHAR(50), - |`web_street_number` CHAR(10), - |`web_street_name` VARCHAR(60), - |`web_street_type` CHAR(15), - |`web_suite_number` CHAR(10), - |`web_city` VARCHAR(60), - |`web_county` VARCHAR(30), - |`web_state` CHAR(2), - |`web_zip` CHAR(10), - |`web_country` VARCHAR(20), - |`web_gmt_offset` DECIMAL(5,2), - |`web_tax_percentage` DECIMAL(5,2) - """.stripMargin, - "web_page" -> - """ - |`wp_web_page_sk` INT, - |`wp_web_page_id` CHAR(16), - |`wp_rec_start_date` DATE, - |`wp_rec_end_date` DATE, - |`wp_creation_date_sk` INT, - |`wp_access_date_sk` INT, - |`wp_autogen_flag` CHAR(1), - |`wp_customer_sk` INT, - |`wp_url` VARCHAR(100), - |`wp_type` CHAR(50), - |`wp_char_count` INT, - |`wp_link_count` INT, - |`wp_image_count` INT, - |`wp_max_ad_count` INT - """.stripMargin, - "warehouse" -> - """ - |`w_warehouse_sk` INT, - |`w_warehouse_id` CHAR(16), - |`w_warehouse_name` VARCHAR(20), - |`w_warehouse_sq_ft` INT, - |`w_street_number` CHAR(10), - |`w_street_name` VARCHAR(20), - |`w_street_type` CHAR(15), - |`w_suite_number` CHAR(10), - |`w_city` VARCHAR(60), - |`w_county` VARCHAR(30), - |`w_state` CHAR(2), - |`w_zip` CHAR(10), - |`w_country` VARCHAR(20), - |`w_gmt_offset` DECIMAL(5,2) - """.stripMargin, - "customer" -> - """ - |`c_customer_sk` INT, - |`c_customer_id` CHAR(16), - |`c_current_cdemo_sk` INT, - |`c_current_hdemo_sk` INT, - |`c_current_addr_sk` INT, - |`c_first_shipto_date_sk` INT, - |`c_first_sales_date_sk` INT, - |`c_salutation` CHAR(10), - |`c_first_name` CHAR(20), - |`c_last_name` CHAR(30), - |`c_preferred_cust_flag` CHAR(1), - |`c_birth_day` INT, - |`c_birth_month` INT, - |`c_birth_year` INT, - |`c_birth_country` VARCHAR(20), - |`c_login` CHAR(13), - |`c_email_address` CHAR(50), - |`c_last_review_date` INT - """.stripMargin, - "customer_address" -> - """ - |`ca_address_sk` INT, - |`ca_address_id` CHAR(16), - |`ca_street_number` CHAR(10), - |`ca_street_name` VARCHAR(60), - |`ca_street_type` CHAR(15), - |`ca_suite_number` CHAR(10), - |`ca_city` VARCHAR(60), - |`ca_county` VARCHAR(30), - |`ca_state` CHAR(2), - |`ca_zip` CHAR(10), - |`ca_country` VARCHAR(20), - |`ca_gmt_offset` DECIMAL(5,2), - |`ca_location_type` CHAR(20) - """.stripMargin, - "customer_demographics" -> - """ - |`cd_demo_sk` INT, - |`cd_gender` CHAR(1), - |`cd_marital_status` CHAR(1), - |`cd_education_status` CHAR(20), - |`cd_purchase_estimate` INT, - |`cd_credit_rating` CHAR(10), - |`cd_dep_count` INT, - |`cd_dep_employed_count` INT, - |`cd_dep_college_count` INT - """.stripMargin, - "date_dim" -> - """ - |`d_date_sk` INT, - |`d_date_id` CHAR(16), - |`d_date` DATE, - |`d_month_seq` INT, - |`d_week_seq` INT, - |`d_quarter_seq` INT, - |`d_year` INT, - |`d_dow` INT, - |`d_moy` INT, - |`d_dom` INT, - |`d_qoy` INT, - |`d_fy_year` INT, - |`d_fy_quarter_seq` INT, - |`d_fy_week_seq` INT, - |`d_day_name` CHAR(9), - |`d_quarter_name` CHAR(6), - |`d_holiday` CHAR(1), - |`d_weekend` CHAR(1), - |`d_following_holiday` CHAR(1), - |`d_first_dom` INT, - |`d_last_dom` INT, - |`d_same_day_ly` INT, - |`d_same_day_lq` INT, - |`d_current_day` CHAR(1), - |`d_current_week` CHAR(1), - |`d_current_month` CHAR(1), - |`d_current_quarter` CHAR(1), - |`d_current_year` CHAR(1) - """.stripMargin, - "household_demographics" -> - """ - |`hd_demo_sk` INT, - |`hd_income_band_sk` INT, - |`hd_buy_potential` CHAR(15), - |`hd_dep_count` INT, - |`hd_vehicle_count` INT - """.stripMargin, - "item" -> - """ - |`i_item_sk` INT, - |`i_item_id` CHAR(16), - |`i_rec_start_date` DATE, - |`i_rec_end_date` DATE, - |`i_item_desc` VARCHAR(200), - |`i_current_price` DECIMAL(7,2), - |`i_wholesale_cost` DECIMAL(7,2), - |`i_brand_id` INT, - |`i_brand` CHAR(50), - |`i_class_id` INT, - |`i_class` CHAR(50), - |`i_category_id` INT, - |`i_category` CHAR(50), - |`i_manufact_id` INT, - |`i_manufact` CHAR(50), - |`i_size` CHAR(20), - |`i_formulation` CHAR(20), - |`i_color` CHAR(20), - |`i_units` CHAR(10), - |`i_container` CHAR(10), - |`i_manager_id` INT, - |`i_product_name` CHAR(50) - """.stripMargin, - "income_band" -> - """ - |`ib_income_band_sk` INT, - |`ib_lower_bound` INT, - |`ib_upper_bound` INT - """.stripMargin, - "promotion" -> - """ - |`p_promo_sk` INT, - |`p_promo_id` CHAR(16), - |`p_start_date_sk` INT, - |`p_end_date_sk` INT, - |`p_item_sk` INT, - |`p_cost` DECIMAL(15,2), - |`p_response_target` INT, - |`p_promo_name` CHAR(50), - |`p_channel_dmail` CHAR(1), - |`p_channel_email` CHAR(1), - |`p_channel_catalog` CHAR(1), - |`p_channel_tv` CHAR(1), - |`p_channel_radio` CHAR(1), - |`p_channel_press` CHAR(1), - |`p_channel_event` CHAR(1), - |`p_channel_demo` CHAR(1), - |`p_channel_details` VARCHAR(100), - |`p_purpose` CHAR(15), - |`p_discount_active` CHAR(1) - """.stripMargin, - "reason" -> - """ - |`r_reason_sk` INT, - |`r_reason_id` CHAR(16), - |`r_reason_desc` CHAR(100) - """.stripMargin, - "ship_mode" -> - """ - |`sm_ship_mode_sk` INT, - |`sm_ship_mode_id` CHAR(16), - |`sm_type` CHAR(30), - |`sm_code` CHAR(10), - |`sm_carrier` CHAR(20), - |`sm_contract` CHAR(20) - """.stripMargin, - "time_dim" -> - """ - |`t_time_sk` INT, - |`t_time_id` CHAR(16), - |`t_time` INT, - |`t_hour` INT, - |`t_minute` INT, - |`t_second` INT, - |`t_am_pm` CHAR(2), - |`t_shift` CHAR(20), - |`t_sub_shift` CHAR(20), - |`t_meal_time` CHAR(20) - """.stripMargin - ) - - // The partition column is consistent with the databricks/spark-sql-perf project. - private val tablePartitionColumns = Map( - "catalog_sales" -> Seq("`cs_sold_date_sk`"), - "catalog_returns" -> Seq("`cr_returned_date_sk`"), - "inventory" -> Seq("`inv_date_sk`"), - "store_sales" -> Seq("`ss_sold_date_sk`"), - "store_returns" -> Seq("`sr_returned_date_sk`"), - "web_sales" -> Seq("`ws_sold_date_sk`"), - "web_returns" -> Seq("`wr_returned_date_sk`") - ) - protected def partitionedByClause(tableName: String): String = { tablePartitionColumns.get(tableName) match { case Some(cols) if cols.nonEmpty => s"PARTITIONED BY (${cols.mkString(", ")})" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala new file mode 100644 index 0000000000000..7b2ed8d28274d --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +/** + * Base trait for TPC-DS related tests. + * + * Datatype mapping for TPC-DS and Spark SQL, fully matching schemas defined in `tpcds.sql` of the + * official tpcds toolkit + * see more at: + * http://www.tpc.org/tpc_documents_current_versions/pdf/tpc-ds_v2.9.0.pdf + * + * |---------------|---------------| + * | TPC-DS | Spark SQL | + * |---------------|---------------| + * | Identifier | INT | + * |---------------|---------------| + * | Integer | INT | + * |---------------|---------------| + * | Decimal(d, f) | Decimal(d, f) | + * |---------------|---------------| + * | Char(N) | Char(N) | + * |---------------|---------------| + * | Varchar(N) | Varchar(N) | + * |---------------|---------------| + * | Date | Date | + * |---------------|---------------| + */ +trait TPCDSSchema { + + protected val tableColumns: Map[String, String] = Map( + "store_sales" -> + """ + |`ss_sold_date_sk` INT, + |`ss_sold_time_sk` INT, + |`ss_item_sk` INT, + |`ss_customer_sk` INT, + |`ss_cdemo_sk` INT, + |`ss_hdemo_sk` INT, + |`ss_addr_sk` INT, + |`ss_store_sk` INT, + |`ss_promo_sk` INT, + |`ss_ticket_number` INT, + |`ss_quantity` INT, + |`ss_wholesale_cost` DECIMAL(7,2), + |`ss_list_price` DECIMAL(7,2), + |`ss_sales_price` DECIMAL(7,2), + |`ss_ext_discount_amt` DECIMAL(7,2), + |`ss_ext_sales_price` DECIMAL(7,2), + |`ss_ext_wholesale_cost` DECIMAL(7,2), + |`ss_ext_list_price` DECIMAL(7,2), + |`ss_ext_tax` DECIMAL(7,2), + |`ss_coupon_amt` DECIMAL(7,2), + |`ss_net_paid` DECIMAL(7,2), + |`ss_net_paid_inc_tax` DECIMAL(7,2), + |`ss_net_profit` DECIMAL(7,2) + """.stripMargin, + "store_returns" -> + """ + |`sr_returned_date_sk` INT, + |`sr_return_time_sk` INT, + |`sr_item_sk` INT, + |`sr_customer_sk` INT, + |`sr_cdemo_sk` INT, + |`sr_hdemo_sk` INT, + |`sr_addr_sk` INT, + |`sr_store_sk` INT, + |`sr_reason_sk` INT, + |`sr_ticket_number` INT, + |`sr_return_quantity` INT, + |`sr_return_amt` DECIMAL(7,2), + |`sr_return_tax` DECIMAL(7,2), + |`sr_return_amt_inc_tax` DECIMAL(7,2), + |`sr_fee` DECIMAL(7,2), + |`sr_return_ship_cost` DECIMAL(7,2), + |`sr_refunded_cash` DECIMAL(7,2), + |`sr_reversed_charge` DECIMAL(7,2), + |`sr_store_credit` DECIMAL(7,2), + |`sr_net_loss` DECIMAL(7,2) + """.stripMargin, + "catalog_sales" -> + """ + |`cs_sold_date_sk` INT, + |`cs_sold_time_sk` INT, + |`cs_ship_date_sk` INT, + |`cs_bill_customer_sk` INT, + |`cs_bill_cdemo_sk` INT, + |`cs_bill_hdemo_sk` INT, + |`cs_bill_addr_sk` INT, + |`cs_ship_customer_sk` INT, + |`cs_ship_cdemo_sk` INT, + |`cs_ship_hdemo_sk` INT, + |`cs_ship_addr_sk` INT, + |`cs_call_center_sk` INT, + |`cs_catalog_page_sk` INT, + |`cs_ship_mode_sk` INT, + |`cs_warehouse_sk` INT, + |`cs_item_sk` INT, + |`cs_promo_sk` INT, + |`cs_order_number` INT, + |`cs_quantity` INT, + |`cs_wholesale_cost` DECIMAL(7,2), + |`cs_list_price` DECIMAL(7,2), + |`cs_sales_price` DECIMAL(7,2), + |`cs_ext_discount_amt` DECIMAL(7,2), + |`cs_ext_sales_price` DECIMAL(7,2), + |`cs_ext_wholesale_cost` DECIMAL(7,2), + |`cs_ext_list_price` DECIMAL(7,2), + |`cs_ext_tax` DECIMAL(7,2), + |`cs_coupon_amt` DECIMAL(7,2), + |`cs_ext_ship_cost` DECIMAL(7,2), + |`cs_net_paid` DECIMAL(7,2), + |`cs_net_paid_inc_tax` DECIMAL(7,2), + |`cs_net_paid_inc_ship` DECIMAL(7,2), + |`cs_net_paid_inc_ship_tax` DECIMAL(7,2), + |`cs_net_profit` DECIMAL(7,2) + """.stripMargin, + "catalog_returns" -> + """ + |`cr_returned_date_sk` INT, + |`cr_returned_time_sk` INT, + |`cr_item_sk` INT, + |`cr_refunded_customer_sk` INT, + |`cr_refunded_cdemo_sk` INT, + |`cr_refunded_hdemo_sk` INT, + |`cr_refunded_addr_sk` INT, + |`cr_returning_customer_sk` INT, + |`cr_returning_cdemo_sk` INT, + |`cr_returning_hdemo_sk` INT, + |`cr_returning_addr_sk` INT, + |`cr_call_center_sk` INT, + |`cr_catalog_page_sk` INT, + |`cr_ship_mode_sk` INT, + |`cr_warehouse_sk` INT, + |`cr_reason_sk` INT,`cr_order_number` INT, + |`cr_return_quantity` INT, + |`cr_return_amount` DECIMAL(7,2), + |`cr_return_tax` DECIMAL(7,2), + |`cr_return_amt_inc_tax` DECIMAL(7,2), + |`cr_fee` DECIMAL(7,2), + |`cr_return_ship_cost` DECIMAL(7,2), + |`cr_refunded_cash` DECIMAL(7,2), + |`cr_reversed_charge` DECIMAL(7,2), + |`cr_store_credit` DECIMAL(7,2), + |`cr_net_loss` DECIMAL(7,2) + """.stripMargin, + "web_sales" -> + """ + |`ws_sold_date_sk` INT, + |`ws_sold_time_sk` INT, + |`ws_ship_date_sk` INT, + |`ws_item_sk` INT, + |`ws_bill_customer_sk` INT, + |`ws_bill_cdemo_sk` INT, + |`ws_bill_hdemo_sk` INT, + |`ws_bill_addr_sk` INT, + |`ws_ship_customer_sk` INT, + |`ws_ship_cdemo_sk` INT, + |`ws_ship_hdemo_sk` INT, + |`ws_ship_addr_sk` INT, + |`ws_web_page_sk` INT, + |`ws_web_site_sk` INT, + |`ws_ship_mode_sk` INT, + |`ws_warehouse_sk` INT, + |`ws_promo_sk` INT, + |`ws_order_number` INT, + |`ws_quantity` INT, + |`ws_wholesale_cost` DECIMAL(7,2), + |`ws_list_price` DECIMAL(7,2), + |`ws_sales_price` DECIMAL(7,2), + |`ws_ext_discount_amt` DECIMAL(7,2), + |`ws_ext_sales_price` DECIMAL(7,2), + |`ws_ext_wholesale_cost` DECIMAL(7,2), + |`ws_ext_list_price` DECIMAL(7,2), + |`ws_ext_tax` DECIMAL(7,2), + |`ws_coupon_amt` DECIMAL(7,2), + |`ws_ext_ship_cost` DECIMAL(7,2), + |`ws_net_paid` DECIMAL(7,2), + |`ws_net_paid_inc_tax` DECIMAL(7,2), + |`ws_net_paid_inc_ship` DECIMAL(7,2), + |`ws_net_paid_inc_ship_tax` DECIMAL(7,2), + |`ws_net_profit` DECIMAL(7,2) + """.stripMargin, + "web_returns" -> + """ + |`wr_returned_date_sk` INT, + |`wr_returned_time_sk` INT, + |`wr_item_sk` INT, + |`wr_refunded_customer_sk` INT, + |`wr_refunded_cdemo_sk` INT, + |`wr_refunded_hdemo_sk` INT, + |`wr_refunded_addr_sk` INT, + |`wr_returning_customer_sk` INT, + |`wr_returning_cdemo_sk` INT, + |`wr_returning_hdemo_sk` INT, + |`wr_returning_addr_sk` INT, + |`wr_web_page_sk` INT, + |`wr_reason_sk` INT, + |`wr_order_number` INT, + |`wr_return_quantity` INT, + |`wr_return_amt` DECIMAL(7,2), + |`wr_return_tax` DECIMAL(7,2), + |`wr_return_amt_inc_tax` DECIMAL(7,2), + |`wr_fee` DECIMAL(7,2), + |`wr_return_ship_cost` DECIMAL(7,2), + |`wr_refunded_cash` DECIMAL(7,2), + |`wr_reversed_charge` DECIMAL(7,2), + |`wr_account_credit` DECIMAL(7,2), + |`wr_net_loss` DECIMAL(7,2) + """.stripMargin, + "inventory" -> + """ + |`inv_date_sk` INT, + |`inv_item_sk` INT, + |`inv_warehouse_sk` INT, + |`inv_quantity_on_hand` INT + """.stripMargin, + "store" -> + """ + |`s_store_sk` INT, + |`s_store_id` CHAR(16), + |`s_rec_start_date` DATE, + |`s_rec_end_date` DATE, + |`s_closed_date_sk` INT, + |`s_store_name` VARCHAR(50), + |`s_number_employees` INT, + |`s_floor_space` INT, + |`s_hours` CHAR(20), + |`s_manager` VARCHAR(40), + |`s_market_id` INT, + |`s_geography_class` VARCHAR(100), + |`s_market_desc` VARCHAR(100), + |`s_market_manager` VARCHAR(40), + |`s_division_id` INT, + |`s_division_name` VARCHAR(50), + |`s_company_id` INT, + |`s_company_name` VARCHAR(50), + |`s_street_number` VARCHAR(10), + |`s_street_name` VARCHAR(60), + |`s_street_type` CHAR(15), + |`s_suite_number` CHAR(10), + |`s_city` VARCHAR(60), + |`s_county` VARCHAR(30), + |`s_state` CHAR(2), + |`s_zip` CHAR(10), + |`s_country` VARCHAR(20), + |`s_gmt_offset` DECIMAL(5,2), + |`s_tax_percentage` DECIMAL(5,2) + """.stripMargin, + "call_center" -> + """ + |`cc_call_center_sk` INT, + |`cc_call_center_id` CHAR(16), + |`cc_rec_start_date` DATE, + |`cc_rec_end_date` DATE, + |`cc_closed_date_sk` INT, + |`cc_open_date_sk` INT, + |`cc_name` VARCHAR(50), + |`cc_class` VARCHAR(50), + |`cc_employees` INT, + |`cc_sq_ft` INT, + |`cc_hours` CHAR(20), + |`cc_manager` VARCHAR(40), + |`cc_mkt_id` INT, + |`cc_mkt_class` CHAR(50), + |`cc_mkt_desc` VARCHAR(100), + |`cc_market_manager` VARCHAR(40), + |`cc_division` INT, + |`cc_division_name` VARCHAR(50), + |`cc_company` INT, + |`cc_company_name` CHAR(50), + |`cc_street_number` CHAR(10), + |`cc_street_name` VARCHAR(60), + |`cc_street_type` CHAR(15), + |`cc_suite_number` CHAR(10), + |`cc_city` VARCHAR(60), + |`cc_county` VARCHAR(30), + |`cc_state` CHAR(2), + |`cc_zip` CHAR(10), + |`cc_country` VARCHAR(20), + |`cc_gmt_offset` DECIMAL(5,2), + |`cc_tax_percentage` DECIMAL(5,2) + """.stripMargin, + "catalog_page" -> + """ + |`cp_catalog_page_sk` INT, + |`cp_catalog_page_id` CHAR(16), + |`cp_start_date_sk` INT, + |`cp_end_date_sk` INT, + |`cp_department` VARCHAR(50), + |`cp_catalog_number` INT, + |`cp_catalog_page_number` INT, + |`cp_description` VARCHAR(100), + |`cp_type` VARCHAR(100) + """.stripMargin, + "web_site" -> + """ + |`web_site_sk` INT, + |`web_site_id` CHAR(16), + |`web_rec_start_date` DATE, + |`web_rec_end_date` DATE, + |`web_name` VARCHAR(50), + |`web_open_date_sk` INT, + |`web_close_date_sk` INT, + |`web_class` VARCHAR(50), + |`web_manager` VARCHAR(40), + |`web_mkt_id` INT, + |`web_mkt_class` VARCHAR(50), + |`web_mkt_desc` VARCHAR(100), + |`web_market_manager` VARCHAR(40), + |`web_company_id` INT, + |`web_company_name` CHAR(50), + |`web_street_number` CHAR(10), + |`web_street_name` VARCHAR(60), + |`web_street_type` CHAR(15), + |`web_suite_number` CHAR(10), + |`web_city` VARCHAR(60), + |`web_county` VARCHAR(30), + |`web_state` CHAR(2), + |`web_zip` CHAR(10), + |`web_country` VARCHAR(20), + |`web_gmt_offset` DECIMAL(5,2), + |`web_tax_percentage` DECIMAL(5,2) + """.stripMargin, + "web_page" -> + """ + |`wp_web_page_sk` INT, + |`wp_web_page_id` CHAR(16), + |`wp_rec_start_date` DATE, + |`wp_rec_end_date` DATE, + |`wp_creation_date_sk` INT, + |`wp_access_date_sk` INT, + |`wp_autogen_flag` CHAR(1), + |`wp_customer_sk` INT, + |`wp_url` VARCHAR(100), + |`wp_type` CHAR(50), + |`wp_char_count` INT, + |`wp_link_count` INT, + |`wp_image_count` INT, + |`wp_max_ad_count` INT + """.stripMargin, + "warehouse" -> + """ + |`w_warehouse_sk` INT, + |`w_warehouse_id` CHAR(16), + |`w_warehouse_name` VARCHAR(20), + |`w_warehouse_sq_ft` INT, + |`w_street_number` CHAR(10), + |`w_street_name` VARCHAR(20), + |`w_street_type` CHAR(15), + |`w_suite_number` CHAR(10), + |`w_city` VARCHAR(60), + |`w_county` VARCHAR(30), + |`w_state` CHAR(2), + |`w_zip` CHAR(10), + |`w_country` VARCHAR(20), + |`w_gmt_offset` DECIMAL(5,2) + """.stripMargin, + "customer" -> + """ + |`c_customer_sk` INT, + |`c_customer_id` CHAR(16), + |`c_current_cdemo_sk` INT, + |`c_current_hdemo_sk` INT, + |`c_current_addr_sk` INT, + |`c_first_shipto_date_sk` INT, + |`c_first_sales_date_sk` INT, + |`c_salutation` CHAR(10), + |`c_first_name` CHAR(20), + |`c_last_name` CHAR(30), + |`c_preferred_cust_flag` CHAR(1), + |`c_birth_day` INT, + |`c_birth_month` INT, + |`c_birth_year` INT, + |`c_birth_country` VARCHAR(20), + |`c_login` CHAR(13), + |`c_email_address` CHAR(50), + |`c_last_review_date` INT + """.stripMargin, + "customer_address" -> + """ + |`ca_address_sk` INT, + |`ca_address_id` CHAR(16), + |`ca_street_number` CHAR(10), + |`ca_street_name` VARCHAR(60), + |`ca_street_type` CHAR(15), + |`ca_suite_number` CHAR(10), + |`ca_city` VARCHAR(60), + |`ca_county` VARCHAR(30), + |`ca_state` CHAR(2), + |`ca_zip` CHAR(10), + |`ca_country` VARCHAR(20), + |`ca_gmt_offset` DECIMAL(5,2), + |`ca_location_type` CHAR(20) + """.stripMargin, + "customer_demographics" -> + """ + |`cd_demo_sk` INT, + |`cd_gender` CHAR(1), + |`cd_marital_status` CHAR(1), + |`cd_education_status` CHAR(20), + |`cd_purchase_estimate` INT, + |`cd_credit_rating` CHAR(10), + |`cd_dep_count` INT, + |`cd_dep_employed_count` INT, + |`cd_dep_college_count` INT + """.stripMargin, + "date_dim" -> + """ + |`d_date_sk` INT, + |`d_date_id` CHAR(16), + |`d_date` DATE, + |`d_month_seq` INT, + |`d_week_seq` INT, + |`d_quarter_seq` INT, + |`d_year` INT, + |`d_dow` INT, + |`d_moy` INT, + |`d_dom` INT, + |`d_qoy` INT, + |`d_fy_year` INT, + |`d_fy_quarter_seq` INT, + |`d_fy_week_seq` INT, + |`d_day_name` CHAR(9), + |`d_quarter_name` CHAR(6), + |`d_holiday` CHAR(1), + |`d_weekend` CHAR(1), + |`d_following_holiday` CHAR(1), + |`d_first_dom` INT, + |`d_last_dom` INT, + |`d_same_day_ly` INT, + |`d_same_day_lq` INT, + |`d_current_day` CHAR(1), + |`d_current_week` CHAR(1), + |`d_current_month` CHAR(1), + |`d_current_quarter` CHAR(1), + |`d_current_year` CHAR(1) + """.stripMargin, + "household_demographics" -> + """ + |`hd_demo_sk` INT, + |`hd_income_band_sk` INT, + |`hd_buy_potential` CHAR(15), + |`hd_dep_count` INT, + |`hd_vehicle_count` INT + """.stripMargin, + "item" -> + """ + |`i_item_sk` INT, + |`i_item_id` CHAR(16), + |`i_rec_start_date` DATE, + |`i_rec_end_date` DATE, + |`i_item_desc` VARCHAR(200), + |`i_current_price` DECIMAL(7,2), + |`i_wholesale_cost` DECIMAL(7,2), + |`i_brand_id` INT, + |`i_brand` CHAR(50), + |`i_class_id` INT, + |`i_class` CHAR(50), + |`i_category_id` INT, + |`i_category` CHAR(50), + |`i_manufact_id` INT, + |`i_manufact` CHAR(50), + |`i_size` CHAR(20), + |`i_formulation` CHAR(20), + |`i_color` CHAR(20), + |`i_units` CHAR(10), + |`i_container` CHAR(10), + |`i_manager_id` INT, + |`i_product_name` CHAR(50) + """.stripMargin, + "income_band" -> + """ + |`ib_income_band_sk` INT, + |`ib_lower_bound` INT, + |`ib_upper_bound` INT + """.stripMargin, + "promotion" -> + """ + |`p_promo_sk` INT, + |`p_promo_id` CHAR(16), + |`p_start_date_sk` INT, + |`p_end_date_sk` INT, + |`p_item_sk` INT, + |`p_cost` DECIMAL(15,2), + |`p_response_target` INT, + |`p_promo_name` CHAR(50), + |`p_channel_dmail` CHAR(1), + |`p_channel_email` CHAR(1), + |`p_channel_catalog` CHAR(1), + |`p_channel_tv` CHAR(1), + |`p_channel_radio` CHAR(1), + |`p_channel_press` CHAR(1), + |`p_channel_event` CHAR(1), + |`p_channel_demo` CHAR(1), + |`p_channel_details` VARCHAR(100), + |`p_purpose` CHAR(15), + |`p_discount_active` CHAR(1) + """.stripMargin, + "reason" -> + """ + |`r_reason_sk` INT, + |`r_reason_id` CHAR(16), + |`r_reason_desc` CHAR(100) + """.stripMargin, + "ship_mode" -> + """ + |`sm_ship_mode_sk` INT, + |`sm_ship_mode_id` CHAR(16), + |`sm_type` CHAR(30), + |`sm_code` CHAR(10), + |`sm_carrier` CHAR(20), + |`sm_contract` CHAR(20) + """.stripMargin, + "time_dim" -> + """ + |`t_time_sk` INT, + |`t_time_id` CHAR(16), + |`t_time` INT, + |`t_hour` INT, + |`t_minute` INT, + |`t_second` INT, + |`t_am_pm` CHAR(2), + |`t_shift` CHAR(20), + |`t_sub_shift` CHAR(20), + |`t_meal_time` CHAR(20) + """.stripMargin + ) + + // The partition column is consistent with the databricks/spark-sql-perf project. + protected val tablePartitionColumns = Map( + "catalog_sales" -> Seq("`cs_sold_date_sk`"), + "catalog_returns" -> Seq("`cr_returned_date_sk`"), + "inventory" -> Seq("`inv_date_sk`"), + "store_sales" -> Seq("`ss_sold_date_sk`"), + "store_returns" -> Seq("`sr_returned_date_sk`"), + "web_sales" -> Seq("`ws_sold_date_sk`"), + "web_returns" -> Seq("`wr_returned_date_sk`") + ) +}