Skip to content

Commit

Permalink
[SPARK-17719][SPARK-17776][SQL] Unify and tie up options in a single …
Browse files Browse the repository at this point in the history
…place in JDBC datasource package

## What changes were proposed in this pull request?

This PR proposes to fix arbitrary usages among `Map[String, String]`, `Properties` and `JDBCOptions` instances for options in `execution/jdbc` package and make the connection properties exclude Spark-only options.

This PR includes some changes as below:

  - Unify `Map[String, String]`, `Properties` and `JDBCOptions` in `execution/jdbc` package to `JDBCOptions`.

- Move `batchsize`, `fetchszie`, `driver` and `isolationlevel` options into `JDBCOptions` instance.

- Document `batchSize` and `isolationlevel` with marking both read-only options and write-only options. Also, this includes minor types and detailed explanation for some statements such as url.

- Throw exceptions fast by checking arguments first rather than in execution time (e.g. for `fetchsize`).

- Exclude Spark-only options in connection properties.

## How was this patch tested?

Existing tests should cover this.

Author: hyukjinkwon <[email protected]>

Closes #15292 from HyukjinKwon/SPARK-17719.
  • Loading branch information
HyukjinKwon authored and gatorsmile committed Oct 11, 2016
1 parent 19a5bae commit 0c0ad43
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 137 deletions.
36 changes: 27 additions & 9 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1049,16 +1049,20 @@ bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.
{% endhighlight %}

Tables from the remote database can be loaded as a DataFrame or Spark SQL Temporary table using
the Data Sources API. The following options are supported:
the Data Sources API. Users can specify the JDBC connection properties in the data source options.
<code>user</code> and <code>password</code> are normally provided as connection properties for
logging into the data sources. In addition to the connection properties, Spark also supports
the following case-sensitive options:

<table class="table">
<tr><th>Property Name</th><th>Meaning</th></tr>
<tr>
<td><code>url</code></td>
<td>
The JDBC URL to connect to.
The JDBC URL to connect to. The source-specific connection properties may be specified in the URL. e.g., <code>jdbc:postgresql://localhost/test?user=fred&password=secret</code>
</td>
</tr>

<tr>
<td><code>dbtable</code></td>
<td>
Expand All @@ -1083,28 +1087,42 @@ the Data Sources API. The following options are supported:
<code>partitionColumn</code> must be a numeric column from the table in question. Notice
that <code>lowerBound</code> and <code>upperBound</code> are just used to decide the
partition stride, not for filtering the rows in table. So all rows in the table will be
partitioned and returned.
partitioned and returned. This option applies only to reading.
</td>
</tr>

<tr>
<td><code>fetchsize</code></td>
<td>
The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows).
The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). This option applies only to reading.
</td>
</tr>

<tr>
<td><code>batchsize</code></td>
<td>
The JDBC batch size, which determines how many rows to insert per round trip. This can help performance on JDBC drivers. This option applies only to writing. It defaults to <code>1000</code>.
</td>
</tr>

<tr>
<td><code>isolationLevel</code></td>
<td>
The transaction isolation level, which applies to current connection. It can be one of <code>NONE<code>, <code>READ_COMMITTED<code>, <code>READ_UNCOMMITTED<code>, <code>REPEATABLE_READ<code>, or <code>SERIALIZABLE<code>, corresponding to standard transaction isolation levels defined by JDBC's Connection object, with default of <code>READ_UNCOMMITTED<code>. This option applies only to writing. Please refer the documentation in <code>java.sql.Connection</code>.
</td>
</tr>

<tr>
<td><code>truncate</code></td>
<td>
This is a JDBC writer related option. When <code>SaveMode.Overwrite</code> is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g. indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to <code>false</code>.
This is a JDBC writer related option. When <code>SaveMode.Overwrite</code> is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to <code>false</code>. This option applies only to writing.
</td>
</tr>

<tr>
<td><code>createTableOptions</code></td>
<td>
This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table. For example: <code>CREATE TABLE t (name string) ENGINE=InnoDB.</code>
This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., <code>CREATE TABLE t (name string) ENGINE=InnoDB.</code>). This option applies only to writing.
</td>
</tr>
</table>
Expand Down Expand Up @@ -1328,7 +1346,7 @@ options.

- Dataset API and DataFrame API are unified. In Scala, `DataFrame` becomes a type alias for
`Dataset[Row]`, while Java API users must replace `DataFrame` with `Dataset<Row>`. Both the typed
transformations (e.g. `map`, `filter`, and `groupByKey`) and untyped transformations (e.g.
transformations (e.g., `map`, `filter`, and `groupByKey`) and untyped transformations (e.g.,
`select` and `groupBy`) are available on the Dataset class. Since compile-time type-safety in
Python and R is not a language feature, the concept of Dataset does not apply to these languages’
APIs. Instead, `DataFrame` remains the primary programing abstraction, which is analogous to the
Expand Down Expand Up @@ -1377,7 +1395,7 @@ options.
- Timestamps are now stored at a precision of 1us, rather than 1ns
- In the `sql` dialect, floating point numbers are now parsed as decimal. HiveQL parsing remains
unchanged.
- The canonical name of SQL/DataFrame functions are now lower case (e.g. sum vs SUM).
- The canonical name of SQL/DataFrame functions are now lower case (e.g., sum vs SUM).
- JSON data source will not automatically load new files that are created by other applications
(i.e. files that are not inserted to the dataset through Spark SQL).
For a JSON persistent table (i.e. the metadata of the table is stored in Hive Metastore),
Expand All @@ -1392,7 +1410,7 @@ options.

Based on user feedback, we created a new, more fluid API for reading data in (`SQLContext.read`)
and writing data out (`DataFrame.write`),
and deprecated the old APIs (e.g. `SQLContext.parquetFile`, `SQLContext.jsonFile`).
and deprecated the old APIs (e.g., `SQLContext.parquetFile`, `SQLContext.jsonFile`).

See the API docs for `SQLContext.read` (
<a href="api/scala/index.html#org.apache.spark.sql.SQLContext@read:DataFrameReader">Scala</a>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.InferSchema
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -231,13 +231,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
table: String,
parts: Array[Partition],
connectionProperties: Properties): DataFrame = {
val props = new Properties()
extraOptions.foreach { case (key, value) =>
props.put(key, value)
}
// connectionProperties should override settings in extraOptions
props.putAll(connectionProperties)
val relation = JDBCRelation(url, table, parts, props)(sparkSession)
// connectionProperties should override settings in extraOptions.
val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
val options = new JDBCOptions(url, table, params)
val relation = JDBCRelation(parts, options)(sparkSession)
sparkSession.baseRelationToDataFrame(relation)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,127 @@

package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Connection, DriverManager}
import java.util.Properties

import scala.collection.mutable.ArrayBuffer

/**
* Options for the JDBC data source.
*/
class JDBCOptions(
@transient private val parameters: Map[String, String])
extends Serializable {

import JDBCOptions._

def this(url: String, table: String, parameters: Map[String, String]) = {
this(parameters ++ Map(
JDBCOptions.JDBC_URL -> url,
JDBCOptions.JDBC_TABLE_NAME -> table))
}

val asConnectionProperties: Properties = {
val properties = new Properties()
// We should avoid to pass the options into properties. See SPARK-17776.
parameters.filterKeys(!jdbcOptionNames.contains(_))
.foreach { case (k, v) => properties.setProperty(k, v) }
properties
}

// ------------------------------------------------------------
// Required parameters
// ------------------------------------------------------------
require(parameters.isDefinedAt("url"), "Option 'url' is required.")
require(parameters.isDefinedAt("dbtable"), "Option 'dbtable' is required.")
require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.")
require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.")
// a JDBC URL
val url = parameters("url")
val url = parameters(JDBC_URL)
// name of table
val table = parameters("dbtable")
val table = parameters(JDBC_TABLE_NAME)

// ------------------------------------------------------------
// Optional parameters
// ------------------------------------------------------------
val driverClass = {
val userSpecifiedDriverClass = parameters.get(JDBC_DRIVER_CLASS)
userSpecifiedDriverClass.foreach(DriverRegistry.register)

// Performing this part of the logic on the driver guards against the corner-case where the
// driver returned for a URL is different on the driver and executors due to classpath
// differences.
userSpecifiedDriverClass.getOrElse {
DriverManager.getDriver(url).getClass.getCanonicalName
}
}

// ------------------------------------------------------------
// Optional parameter list
// Optional parameters only for reading
// ------------------------------------------------------------
// the column used to partition
val partitionColumn = parameters.getOrElse("partitionColumn", null)
val partitionColumn = parameters.getOrElse(JDBC_PARTITION_COLUMN, null)
// the lower bound of partition column
val lowerBound = parameters.getOrElse("lowerBound", null)
val lowerBound = parameters.getOrElse(JDBC_LOWER_BOUND, null)
// the upper bound of the partition column
val upperBound = parameters.getOrElse("upperBound", null)
val upperBound = parameters.getOrElse(JDBC_UPPER_BOUND, null)
// the number of partitions
val numPartitions = parameters.getOrElse("numPartitions", null)

val numPartitions = parameters.getOrElse(JDBC_NUM_PARTITIONS, null)
require(partitionColumn == null ||
(lowerBound != null && upperBound != null && numPartitions != null),
"If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," +
" and 'numPartitions' are required.")
s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND'," +
s" and '$JDBC_NUM_PARTITIONS' are required.")
val fetchSize = {
val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt
require(size >= 0,
s"Invalid value `${size.toString}` for parameter " +
s"`$JDBC_BATCH_FETCH_SIZE`. The minimum value is 0. When the value is 0, " +
"the JDBC driver ignores the value and does the estimates.")
size
}

// ------------------------------------------------------------
// The options for DataFrameWriter
// Optional parameters only for writing
// ------------------------------------------------------------
// if to truncate the table from the JDBC database
val isTruncate = parameters.getOrElse("truncate", "false").toBoolean
val isTruncate = parameters.getOrElse(JDBC_TRUNCATE, "false").toBoolean
// the create table option , which can be table_options or partition_options.
// E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
// TODO: to reuse the existing partition parameters for those partition specific options
val createTableOptions = parameters.getOrElse("createTableOptions", "")
val createTableOptions = parameters.getOrElse(JDBC_CREATE_TABLE_OPTIONS, "")
val batchSize = {
val size = parameters.getOrElse(JDBC_BATCH_INSERT_SIZE, "1000").toInt
require(size >= 1,
s"Invalid value `${size.toString}` for parameter " +
s"`$JDBC_BATCH_INSERT_SIZE`. The minimum value is 1.")
size
}
val isolationLevel =
parameters.getOrElse(JDBC_TXN_ISOLATION_LEVEL, "READ_UNCOMMITTED") match {
case "NONE" => Connection.TRANSACTION_NONE
case "READ_UNCOMMITTED" => Connection.TRANSACTION_READ_UNCOMMITTED
case "READ_COMMITTED" => Connection.TRANSACTION_READ_COMMITTED
case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
}
}

object JDBCOptions {
private val jdbcOptionNames = ArrayBuffer.empty[String]

private def newOption(name: String): String = {
jdbcOptionNames += name
name
}

val JDBC_URL = newOption("url")
val JDBC_TABLE_NAME = newOption("dbtable")
val JDBC_DRIVER_CLASS = newOption("driver")
val JDBC_PARTITION_COLUMN = newOption("partitionColumn")
val JDBC_LOWER_BOUND = newOption("lowerBound")
val JDBC_UPPER_BOUND = newOption("upperBound")
val JDBC_NUM_PARTITIONS = newOption("numPartitions")
val JDBC_BATCH_FETCH_SIZE = newOption("fetchsize")
val JDBC_TRUNCATE = newOption("truncate")
val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Connection, Date, PreparedStatement, ResultSet, SQLException, Timestamp}
import java.util.Properties

import scala.util.control.NonFatal

Expand Down Expand Up @@ -46,17 +45,18 @@ object JDBCRDD extends Logging {
* Takes a (schema, table) specification and returns the table's Catalyst
* schema.
*
* @param url - The JDBC url to fetch information from.
* @param table - The table name of the desired table. This may also be a
* SQL query wrapped in parentheses.
* @param options - JDBC options that contains url, table and other information.
*
* @return A StructType giving the table's Catalyst schema.
* @throws SQLException if the table specification is garbage.
* @throws SQLException if the table contains an unsupported type.
*/
def resolveTable(url: String, table: String, properties: Properties): StructType = {
def resolveTable(options: JDBCOptions): StructType = {
val url = options.url
val table = options.table
val properties = options.asConnectionProperties
val dialect = JdbcDialects.get(url)
val conn: Connection = JdbcUtils.createConnectionFactory(url, properties)()
val conn: Connection = JdbcUtils.createConnectionFactory(options)()
try {
val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
try {
Expand Down Expand Up @@ -143,43 +143,38 @@ object JDBCRDD extends Logging {
})
}



/**
* Build and return JDBCRDD from the given information.
*
* @param sc - Your SparkContext.
* @param schema - The Catalyst schema of the underlying database table.
* @param url - The JDBC url to connect to.
* @param fqTable - The fully-qualified table name (or paren'd SQL query) to use.
* @param requiredColumns - The names of the columns to SELECT.
* @param filters - The filters to include in all WHERE clauses.
* @param parts - An array of JDBCPartitions specifying partition ids and
* per-partition WHERE clauses.
* @param options - JDBC options that contains url, table and other information.
*
* @return An RDD representing "SELECT requiredColumns FROM fqTable".
*/
def scanTable(
sc: SparkContext,
schema: StructType,
url: String,
properties: Properties,
fqTable: String,
requiredColumns: Array[String],
filters: Array[Filter],
parts: Array[Partition]): RDD[InternalRow] = {
parts: Array[Partition],
options: JDBCOptions): RDD[InternalRow] = {
val url = options.url
val dialect = JdbcDialects.get(url)
val quotedColumns = requiredColumns.map(colName => dialect.quoteIdentifier(colName))
new JDBCRDD(
sc,
JdbcUtils.createConnectionFactory(url, properties),
JdbcUtils.createConnectionFactory(options),
pruneSchema(schema, requiredColumns),
fqTable,
quotedColumns,
filters,
parts,
url,
properties)
options)
}
}

Expand All @@ -192,12 +187,11 @@ private[jdbc] class JDBCRDD(
sc: SparkContext,
getConnection: () => Connection,
schema: StructType,
fqTable: String,
columns: Array[String],
filters: Array[Filter],
partitions: Array[Partition],
url: String,
properties: Properties)
options: JDBCOptions)
extends RDD[InternalRow](sc, Nil) {

/**
Expand All @@ -211,7 +205,7 @@ private[jdbc] class JDBCRDD(
private val columnList: String = {
val sb = new StringBuilder()
columns.foreach(x => sb.append(",").append(x))
if (sb.length == 0) "1" else sb.substring(1)
if (sb.isEmpty) "1" else sb.substring(1)
}

/**
Expand Down Expand Up @@ -286,23 +280,18 @@ private[jdbc] class JDBCRDD(
conn = getConnection()
val dialect = JdbcDialects.get(url)
import scala.collection.JavaConverters._
dialect.beforeFetch(conn, properties.asScala.toMap)
dialect.beforeFetch(conn, options.asConnectionProperties.asScala.toMap)

// H2's JDBC driver does not support the setSchema() method. We pass a
// fully-qualified table name in the SELECT statement. I don't know how to
// talk about a table in a completely portable way.

val myWhereClause = getWhereClause(part)

val sqlText = s"SELECT $columnList FROM $fqTable $myWhereClause"
val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
stmt = conn.prepareStatement(sqlText,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
val fetchSize = properties.getProperty(JdbcUtils.JDBC_BATCH_FETCH_SIZE, "0").toInt
require(fetchSize >= 0,
s"Invalid value `${fetchSize.toString}` for parameter " +
s"`${JdbcUtils.JDBC_BATCH_FETCH_SIZE}`. The minimum value is 0. When the value is 0, " +
"the JDBC driver ignores the value and does the estimates.")
stmt.setFetchSize(fetchSize)
stmt.setFetchSize(options.fetchSize)
rs = stmt.executeQuery()
val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)

Expand Down
Loading

0 comments on commit 0c0ad43

Please sign in to comment.