From f2cd00be350fdba3acfbfdf155701182d1c404fd Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 14 May 2015 10:25:18 -0700 Subject: [PATCH 1/9] [SQL][minor] rename apply for QueryPlanner A follow-up of https://github.com/apache/spark/pull/5624 Author: Wenchen Fan Closes #6142 from cloud-fan/tmp and squashes the following commits: 971a92b [Wenchen Fan] use plan instead of execute 24c5ffe [Wenchen Fan] rename apply --- .../org/apache/spark/sql/catalyst/planning/QueryPlanner.scala | 4 ++-- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 51b5699affed5..73a21884a4710 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -51,9 +51,9 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { * filled in automatically by the QueryPlanner using the other execution strategies that are * available. */ - protected def planLater(plan: LogicalPlan) = apply(plan).next() + protected def planLater(plan: LogicalPlan) = this.plan(plan).next() - def apply(plan: LogicalPlan): Iterator[PhysicalPlan] = { + def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... val iter = strategies.view.flatMap(_(plan)).toIterator assert(iter.hasNext, s"No plan for $plan") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 521f3dc821795..b33a700208014 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -1321,7 +1321,7 @@ class SQLContext(@transient val sparkContext: SparkContext) // TODO: Don't just pick the first one... lazy val sparkPlan: SparkPlan = { SparkPlan.currentContext.set(self) - planner(optimizedPlan).next() + planner.plan(optimizedPlan).next() } // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. From 5d7d4f887d509e6d037d8fc5247d2e5f8a4563c9 Mon Sep 17 00:00:00 2001 From: ksonj Date: Thu, 14 May 2015 15:10:58 -0700 Subject: [PATCH 2/9] [SPARK-7278] [PySpark] DateType should find datetime.datetime acceptable DateType should not be restricted to `datetime.date` but accept `datetime.datetime` objects as well. Could someone with a little more insight verify this? Author: ksonj Closes #6057 from ksonj/dates and squashes the following commits: 68a158e [ksonj] DateType should find datetime.datetime acceptable too --- python/pyspark/sql/_types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/_types.py b/python/pyspark/sql/_types.py index b96851a174d49..629c3a94513b8 100644 --- a/python/pyspark/sql/_types.py +++ b/python/pyspark/sql/_types.py @@ -930,7 +930,7 @@ def _infer_schema_type(obj, dataType): DecimalType: (decimal.Decimal,), StringType: (str, unicode), BinaryType: (bytearray,), - DateType: (datetime.date,), + DateType: (datetime.date, datetime.datetime), TimestampType: (datetime.datetime,), ArrayType: (list, tuple, array), MapType: (dict,), From 11a1a135d1fe892cd48a9116acc7554846aed84c Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 14 May 2015 15:26:35 -0700 Subject: [PATCH 3/9] Make SPARK prefix a variable Author: tedyu Closes #6153 from ted-yu/master and squashes the following commits: 4e0bac5 [tedyu] Use JIRA_PROJECT_NAME as variable name ab982aa [tedyu] Make SPARK prefix a variable --- dev/github_jira_sync.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dev/github_jira_sync.py b/dev/github_jira_sync.py index 8051080117062..ff1e39664ee04 100755 --- a/dev/github_jira_sync.py +++ b/dev/github_jira_sync.py @@ -33,6 +33,7 @@ # User facing configs GITHUB_API_BASE = os.environ.get("GITHUB_API_BASE", "https://api.github.com/repos/apache/spark") +JIRA_PROJECT_NAME = os.environ.get("JIRA_PROJECT_NAME", "SPARK") JIRA_API_BASE = os.environ.get("JIRA_API_BASE", "https://issues.apache.org/jira") JIRA_USERNAME = os.environ.get("JIRA_USERNAME", "apachespark") JIRA_PASSWORD = os.environ.get("JIRA_PASSWORD", "XXX") @@ -68,7 +69,7 @@ def get_jira_prs(): page_json = get_json(page) for pull in page_json: - jiras = re.findall("SPARK-[0-9]{4,5}", pull['title']) + jiras = re.findall(JIRA_PROJECT_NAME + "-[0-9]{4,5}", pull['title']) for jira in jiras: result = result + [(jira, pull)] From 93dbb3ad83fd60444a38c3dc87a2053c667123af Mon Sep 17 00:00:00 2001 From: Rex Xiong Date: Thu, 14 May 2015 16:55:31 -0700 Subject: [PATCH 4/9] [SPARK-7598] [DEPLOY] Add aliveWorkers metrics in Master In Spark Standalone setup, when some workers are DEAD, they will stay in master worker list for a while. master.workers metrics for master is only showing the total number of workers, we need to monitor how many real ALIVE workers are there to ensure the cluster is healthy. Author: Rex Xiong Closes #6117 from twilightgod/add-aliveWorker-metrics and squashes the following commits: 6be69a5 [Rex Xiong] Fix comment for aliveWorkers metrics a882f39 [Rex Xiong] Fix style for aliveWorkers metrics 38ce955 [Rex Xiong] Add aliveWorkers metrics in Master --- .../scala/org/apache/spark/deploy/master/MasterSource.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala index 9c3f79f1244b7..66a9ff38678c6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala @@ -30,6 +30,11 @@ private[spark] class MasterSource(val master: Master) extends Source { override def getValue: Int = master.workers.size }) + // Gauge for alive worker numbers in cluster + metricRegistry.register(MetricRegistry.name("aliveWorkers"), new Gauge[Int]{ + override def getValue: Int = master.workers.filter(_.state == WorkerState.ALIVE).size + }) + // Gauge for application numbers in cluster metricRegistry.register(MetricRegistry.name("apps"), new Gauge[Int] { override def getValue: Int = master.apps.size From 57ed16cf9372c109e84bd51b728f2c82940949a7 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 14 May 2015 16:56:32 -0700 Subject: [PATCH 5/9] [SPARK-7643] [UI] use the correct size in RDDPage for storage info and partitions `dataDistribution` and `partitions` are `Option[Seq[_]]`. andrewor14 squito Author: Xiangrui Meng Closes #6157 from mengxr/SPARK-7643 and squashes the following commits: 99fe8a4 [Xiangrui Meng] use the correct size in RDDPage for storage info and partitions --- .../main/scala/org/apache/spark/ui/storage/RDDPage.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 05f94a7507f4f..fbce917a0824d 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -77,14 +77,17 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
-

Data Distribution on {rddStorageInfo.dataDistribution.size} Executors

+

+ Data Distribution on {rddStorageInfo.dataDistribution.map(_.size).getOrElse(0)} + Executors +

{workerTable}
-

{rddStorageInfo.partitions.size} Partitions

+

{rddStorageInfo.partitions.map(_.size).getOrElse(0)} Partitions

{blockTable}
; From 0a317c124c3a43089cdb8f079345c8f2842238cd Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 14 May 2015 16:57:33 -0700 Subject: [PATCH 6/9] [SPARK-7649] [STREAMING] [WEBUI] Use window.localStorage to store the status rather than the url Use window.localStorage to store the status rather than the url so that the url won't be changed. cc tdas Author: zsxwing Closes #6158 from zsxwing/SPARK-7649 and squashes the following commits: 3c56fef [zsxwing] Use window.localStorage to store the status rather than the url --- .../apache/spark/ui/static/streaming-page.js | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js index 22b186873e990..0fac658d57842 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js +++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js @@ -252,28 +252,16 @@ function drawHistogram(id, values, minY, maxY, unitY, batchInterval) { } $(function() { - function getParameterFromURL(param) - { - var parameters = window.location.search.substring(1); // Remove "?" - var keyValues = parameters.split('&'); - for (var i = 0; i < keyValues.length; i++) - { - var paramKeyValue = keyValues[i].split('='); - if (paramKeyValue[0] == param) - { - return paramKeyValue[1]; - } - } - } - - var status = getParameterFromURL("show-streams-detail") == "true"; + var status = window.localStorage && window.localStorage.getItem("show-streams-detail") == "true"; $("span.expand-input-rate").click(function() { status = !status; $("#inputs-table").toggle('collapsed'); // Toggle the class of the arrow between open and closed $(this).find('.expand-input-rate-arrow').toggleClass('arrow-open').toggleClass('arrow-closed'); - window.history.pushState('', document.title, window.location.pathname + '?show-streams-detail=' + status); + if (window.localStorage) { + window.localStorage.setItem("show-streams-detail", "" + status); + } }); if (status) { From b208f998b5800bdba4ce6651f172c26a8d7d351b Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 14 May 2015 16:58:36 -0700 Subject: [PATCH 7/9] [SPARK-7645] [STREAMING] [WEBUI] Show milliseconds in the UI if the batch interval < 1 second I also updated the summary of the Streaming page. ![screen shot 2015-05-14 at 11 52 59 am](https://cloud.githubusercontent.com/assets/1000778/7640103/13cdf68e-fa36-11e4-84ec-e2a3954f4319.png) ![screen shot 2015-05-14 at 12 39 33 pm](https://cloud.githubusercontent.com/assets/1000778/7640151/4cc066ac-fa36-11e4-8494-2821d6a6f17c.png) Author: zsxwing Closes #6154 from zsxwing/SPARK-7645 and squashes the following commits: 5db6ca1 [zsxwing] Add UIUtils.formatBatchTime e4802df [zsxwing] Show milliseconds in the UI if the batch interval < 1 second --- .../apache/spark/ui/static/streaming-page.js | 11 +++- .../spark/streaming/ui/AllBatchesTable.scala | 14 +++-- .../apache/spark/streaming/ui/BatchPage.scala | 5 +- .../spark/streaming/ui/StreamingPage.scala | 10 ++-- .../apache/spark/streaming/ui/UIUtils.scala | 55 ++++++++++++++++++- .../spark/streaming/ui/UIUtilsSuite.scala | 11 ++++ 6 files changed, 94 insertions(+), 12 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js index 0fac658d57842..0ee6752b29e9a 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js +++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js @@ -98,7 +98,16 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) { var x = d3.scale.linear().domain([minX, maxX]).range([0, width]); var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]); - var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(function(d) { return timeFormat[d]; }); + var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(function(d) { + var formattedDate = timeFormat[d]; + var dotIndex = formattedDate.indexOf('.'); + if (dotIndex >= 0) { + // Remove milliseconds + return formattedDate.substring(0, dotIndex); + } else { + return formattedDate; + } + }); var formatYValue = d3.format(",.2f"); var yAxis = d3.svg.axis().scale(y).orient("left").ticks(5).tickFormat(formatYValue); diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index 3619e129ad9cf..00cc47d6a3ca5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -17,11 +17,14 @@ package org.apache.spark.streaming.ui +import java.text.SimpleDateFormat +import java.util.Date + import scala.xml.Node import org.apache.spark.ui.{UIUtils => SparkUIUtils} -private[ui] abstract class BatchTableBase(tableId: String) { +private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) { protected def columns: Seq[Node] = { Batch Time @@ -35,7 +38,7 @@ private[ui] abstract class BatchTableBase(tableId: String) { protected def baseRow(batch: BatchUIData): Seq[Node] = { val batchTime = batch.batchTime.milliseconds - val formattedBatchTime = SparkUIUtils.formatDate(batch.batchTime.milliseconds) + val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval) val eventCount = batch.numRecords val schedulingDelay = batch.schedulingDelay val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-") @@ -79,7 +82,8 @@ private[ui] abstract class BatchTableBase(tableId: String) { private[ui] class ActiveBatchTable( runningBatches: Seq[BatchUIData], - waitingBatches: Seq[BatchUIData]) extends BatchTableBase("active-batches-table") { + waitingBatches: Seq[BatchUIData], + batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) { override protected def columns: Seq[Node] = super.columns ++ Status @@ -99,8 +103,8 @@ private[ui] class ActiveBatchTable( } } -private[ui] class CompletedBatchTable(batches: Seq[BatchUIData]) - extends BatchTableBase("completed-batches-table") { +private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long) + extends BatchTableBase("completed-batches-table", batchInterval) { override protected def columns: Seq[Node] = super.columns ++ Total Delay diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 831f60e870f74..f75067669abe5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.ui +import java.text.SimpleDateFormat +import java.util.Date import javax.servlet.http.HttpServletRequest import scala.xml.{NodeSeq, Node, Text} @@ -288,7 +290,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse { throw new IllegalArgumentException(s"Missing id parameter") } - val formattedBatchTime = SparkUIUtils.formatDate(batchTime.milliseconds) + val formattedBatchTime = + UIUtils.formatBatchTime(batchTime.milliseconds, streamingListener.batchDuration) val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse { throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index efce8c58fb962..070564aa10633 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -186,6 +186,8 @@ private[ui] class StreamingPage(parent: StreamingTab) {SparkUIUtils.formatDate(startTime)} + ({listener.numTotalCompletedBatches} + completed batches, {listener.numTotalReceivedRecords} records)
} @@ -199,9 +201,9 @@ private[ui] class StreamingPage(parent: StreamingTab) * @param times all time values that will be used in the graphs. */ private def generateTimeMap(times: Seq[Long]): Seq[Node] = { - val dateFormat = new SimpleDateFormat("HH:mm:ss") val js = "var timeFormat = {};\n" + times.map { time => - val formattedTime = dateFormat.format(new Date(time)) + val formattedTime = + UIUtils.formatBatchTime(time, listener.batchDuration, showYYYYMMSS = false) s"timeFormat[$time] = '$formattedTime';" }.mkString("\n") @@ -472,14 +474,14 @@ private[ui] class StreamingPage(parent: StreamingTab) val activeBatchesContent = {

Active Batches ({runningBatches.size + waitingBatches.size})

++ - new ActiveBatchTable(runningBatches, waitingBatches).toNodeSeq + new ActiveBatchTable(runningBatches, waitingBatches, listener.batchDuration).toNodeSeq } val completedBatchesContent = {

Completed Batches (last {completedBatches.size} out of {listener.numTotalCompletedBatches})

++ - new CompletedBatchTable(completedBatches).toNodeSeq + new CompletedBatchTable(completedBatches, listener.batchDuration).toNodeSeq } activeBatchesContent ++ completedBatchesContent diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala index f153ee105a18e..86cfb1fa47370 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.ui +import java.text.SimpleDateFormat +import java.util.TimeZone import java.util.concurrent.TimeUnit private[streaming] object UIUtils { @@ -62,7 +64,7 @@ private[streaming] object UIUtils { * Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it * will discard the fractional part. */ - def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match { + def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match { case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000 case TimeUnit.MICROSECONDS => milliseconds * 1000 case TimeUnit.MILLISECONDS => milliseconds @@ -71,4 +73,55 @@ private[streaming] object UIUtils { case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0 case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0 } + + // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. + private val batchTimeFormat = new ThreadLocal[SimpleDateFormat]() { + override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + } + + private val batchTimeFormatWithMilliseconds = new ThreadLocal[SimpleDateFormat]() { + override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS") + } + + /** + * If `batchInterval` is less than 1 second, format `batchTime` with milliseconds. Otherwise, + * format `batchTime` without milliseconds. + * + * @param batchTime the batch time to be formatted + * @param batchInterval the batch interval + * @param showYYYYMMSS if showing the `yyyy/MM/dd` part. If it's false, the return value wll be + * only `HH:mm:ss` or `HH:mm:ss.SSS` depending on `batchInterval` + * @param timezone only for test + */ + def formatBatchTime( + batchTime: Long, + batchInterval: Long, + showYYYYMMSS: Boolean = true, + timezone: TimeZone = null): String = { + val oldTimezones = + (batchTimeFormat.get.getTimeZone, batchTimeFormatWithMilliseconds.get.getTimeZone) + if (timezone != null) { + batchTimeFormat.get.setTimeZone(timezone) + batchTimeFormatWithMilliseconds.get.setTimeZone(timezone) + } + try { + val formattedBatchTime = + if (batchInterval < 1000) { + batchTimeFormatWithMilliseconds.get.format(batchTime) + } else { + // If batchInterval >= 1 second, don't show milliseconds + batchTimeFormat.get.format(batchTime) + } + if (showYYYYMMSS) { + formattedBatchTime + } else { + formattedBatchTime.substring(formattedBatchTime.indexOf(' ') + 1) + } + } finally { + if (timezone != null) { + batchTimeFormat.get.setTimeZone(oldTimezones._1) + batchTimeFormatWithMilliseconds.get.setTimeZone(oldTimezones._2) + } + } + } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala index 6df1a63ab2e37..e9ab917ab845c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.ui +import java.util.TimeZone import java.util.concurrent.TimeUnit import org.scalatest.FunSuite @@ -64,4 +65,14 @@ class UIUtilsSuite extends FunSuite with Matchers{ val convertedTime = UIUtils.convertToTimeUnit(milliseconds, unit) convertedTime should be (expectedTime +- 1E-6) } + + test("formatBatchTime") { + val tzForTest = TimeZone.getTimeZone("America/Los_Angeles") + val batchTime = 1431637480452L // Thu May 14 14:04:40 PDT 2015 + assert("2015/05/14 14:04:40" === UIUtils.formatBatchTime(batchTime, 1000, timezone = tzForTest)) + assert("2015/05/14 14:04:40.452" === + UIUtils.formatBatchTime(batchTime, 999, timezone = tzForTest)) + assert("14:04:40" === UIUtils.formatBatchTime(batchTime, 1000, false, timezone = tzForTest)) + assert("14:04:40.452" === UIUtils.formatBatchTime(batchTime, 999, false, timezone = tzForTest)) + } } From 723853edab18d28515af22097b76e4e6574b228e Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 14 May 2015 18:13:58 -0700 Subject: [PATCH 8/9] [SPARK-7648] [MLLIB] Add weights and intercept to GLM wrappers in spark.ml Otherwise, users can only use `transform` on the models. brkyvz Author: Xiangrui Meng Closes #6156 from mengxr/SPARK-7647 and squashes the following commits: 1ae3d2d [Xiangrui Meng] add weights and intercept to LogisticRegression in Python f49eb46 [Xiangrui Meng] add weights and intercept to LinearRegressionModel --- python/pyspark/ml/classification.py | 18 ++++++++++++++++++ python/pyspark/ml/regression.py | 18 ++++++++++++++++++ python/pyspark/ml/wrapper.py | 8 +++++++- 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 96d29058a3781..8c9a55e79abad 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -43,6 +43,10 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF() >>> model.transform(test0).head().prediction 0.0 + >>> model.weights + DenseVector([5.5...]) + >>> model.intercept + -2.68... >>> test1 = sc.parallelize([Row(features=Vectors.sparse(1, [0], [1.0]))]).toDF() >>> model.transform(test1).head().prediction 1.0 @@ -148,6 +152,20 @@ class LogisticRegressionModel(JavaModel): Model fitted by LogisticRegression. """ + @property + def weights(self): + """ + Model weights. + """ + return self._call_java("weights") + + @property + def intercept(self): + """ + Model intercept. + """ + return self._call_java("intercept") + class TreeClassifierParams(object): """ diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py index 0ab5c6c3d20c3..2803864ff4a17 100644 --- a/python/pyspark/ml/regression.py +++ b/python/pyspark/ml/regression.py @@ -51,6 +51,10 @@ class LinearRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPrediction >>> test0 = sqlContext.createDataFrame([(Vectors.dense(-1.0),)], ["features"]) >>> model.transform(test0).head().prediction -1.0 + >>> model.weights + DenseVector([1.0]) + >>> model.intercept + 0.0 >>> test1 = sqlContext.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"]) >>> model.transform(test1).head().prediction 1.0 @@ -117,6 +121,20 @@ class LinearRegressionModel(JavaModel): Model fitted by LinearRegression. """ + @property + def weights(self): + """ + Model weights. + """ + return self._call_java("weights") + + @property + def intercept(self): + """ + Model intercept. + """ + return self._call_java("intercept") + class TreeRegressorParams(object): """ diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index f5ac2a398642a..dda6c6aba3049 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -21,7 +21,7 @@ from pyspark.sql import DataFrame from pyspark.ml.param import Params from pyspark.ml.pipeline import Estimator, Transformer, Evaluator, Model -from pyspark.mllib.common import inherit_doc +from pyspark.mllib.common import inherit_doc, _java2py, _py2java def _jvm(): @@ -149,6 +149,12 @@ def __init__(self, java_model): def _java_obj(self): return self._java_model + def _call_java(self, name, *args): + m = getattr(self._java_model, name) + sc = SparkContext._active_spark_context + java_args = [_py2java(sc, arg) for arg in args] + return _java2py(sc, m(*java_args)) + @inherit_doc class JavaEvaluator(Evaluator, JavaWrapper): From 48fc38f5844f6c12bf440f2990b6d7f1630fafac Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 14 May 2015 18:16:22 -0700 Subject: [PATCH 9/9] [SPARK-7619] [PYTHON] fix docstring signature Just realized that we need `\` at the end of the docstring. brkyvz Author: Xiangrui Meng Closes #6161 from mengxr/SPARK-7619 and squashes the following commits: e44495f [Xiangrui Meng] fix docstring signature --- python/docs/pyspark.ml.rst | 14 +++++------ python/pyspark/ml/classification.py | 39 ++++++++++++++--------------- python/pyspark/ml/feature.py | 8 +++--- python/pyspark/ml/recommendation.py | 8 +++--- python/pyspark/ml/regression.py | 38 +++++++++++++--------------- 5 files changed, 52 insertions(+), 55 deletions(-) diff --git a/python/docs/pyspark.ml.rst b/python/docs/pyspark.ml.rst index 8379b8fc8a1e1..518b8e774dd5f 100644 --- a/python/docs/pyspark.ml.rst +++ b/python/docs/pyspark.ml.rst @@ -1,8 +1,8 @@ pyspark.ml package -===================== +================== ML Pipeline APIs --------------- +---------------- .. automodule:: pyspark.ml :members: @@ -10,7 +10,7 @@ ML Pipeline APIs :inherited-members: pyspark.ml.param module -------------------------- +----------------------- .. automodule:: pyspark.ml.param :members: @@ -34,7 +34,7 @@ pyspark.ml.classification module :inherited-members: pyspark.ml.recommendation module -------------------------- +-------------------------------- .. automodule:: pyspark.ml.recommendation :members: @@ -42,7 +42,7 @@ pyspark.ml.recommendation module :inherited-members: pyspark.ml.regression module -------------------------- +---------------------------- .. automodule:: pyspark.ml.regression :members: @@ -50,7 +50,7 @@ pyspark.ml.regression module :inherited-members: pyspark.ml.tuning module --------------------------------- +------------------------ .. automodule:: pyspark.ml.tuning :members: @@ -58,7 +58,7 @@ pyspark.ml.tuning module :inherited-members: pyspark.ml.evaluation module --------------------------------- +---------------------------- .. automodule:: pyspark.ml.evaluation :members: diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 8c9a55e79abad..1411d3fd9c56e 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -71,7 +71,7 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred threshold=0.5, probabilityCol="probability"): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ - maxIter=100, regParam=0.1, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, + maxIter=100, regParam=0.1, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \ threshold=0.5, probabilityCol="probability") """ super(LogisticRegression, self).__init__() @@ -96,8 +96,8 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre maxIter=100, regParam=0.1, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, threshold=0.5, probabilityCol="probability"): """ - setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", - maxIter=100, regParam=0.1, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, + setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ + maxIter=100, regParam=0.1, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \ threshold=0.5, probabilityCol="probability") Sets params for logistic regression. """ @@ -220,7 +220,7 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini"): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ - maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, + maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini") """ super(DecisionTreeClassifier, self).__init__() @@ -242,9 +242,8 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre impurity="gini"): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ - maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, - maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, - impurity="gini") + maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ + maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini") Sets params for the DecisionTreeClassifier. """ kwargs = self.setParams._input_kwargs @@ -320,9 +319,9 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", numTrees=20, featureSubsetStrategy="auto", seed=42): """ - __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", - maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, - maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", + __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ + maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ + maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", \ numTrees=20, featureSubsetStrategy="auto", seed=42) """ super(RandomForestClassifier, self).__init__() @@ -355,9 +354,9 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, seed=42, impurity="gini", numTrees=20, featureSubsetStrategy="auto"): """ - setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", - maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, - maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, seed=42, + setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ + maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ + maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, seed=42, \ impurity="gini", numTrees=20, featureSubsetStrategy="auto") Sets params for linear classification. """ @@ -471,10 +470,10 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType="logistic", maxIter=20, stepSize=0.1): """ - __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", - maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, - maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType="logistic", - maxIter=20, stepSize=0.1) + __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ + maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ + maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, \ + lossType="logistic", maxIter=20, stepSize=0.1) """ super(GBTClassifier, self).__init__() #: param for Loss function which GBT tries to minimize (case-insensitive). @@ -502,9 +501,9 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType="logistic", maxIter=20, stepSize=0.1): """ - setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", - maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, - maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, + setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ + maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ + maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, \ lossType="logistic", maxIter=20, stepSize=0.1) Sets params for Gradient Boosted Tree Classification. """ diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 30e1fd4922d0a..58e22190c7c3c 100644 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -481,7 +481,7 @@ class RegexTokenizer(JavaTransformer, HasInputCol, HasOutputCol): def __init__(self, minTokenLength=1, gaps=False, pattern="\\p{L}+|[^\\p{L}\\s]+", inputCol=None, outputCol=None): """ - __init__(self, minTokenLength=1, gaps=False, pattern="\\p{L}+|[^\\p{L}\\s]+", + __init__(self, minTokenLength=1, gaps=False, pattern="\\p{L}+|[^\\p{L}\\s]+", \ inputCol=None, outputCol=None) """ super(RegexTokenizer, self).__init__() @@ -496,7 +496,7 @@ def __init__(self, minTokenLength=1, gaps=False, pattern="\\p{L}+|[^\\p{L}\\s]+" def setParams(self, minTokenLength=1, gaps=False, pattern="\\p{L}+|[^\\p{L}\\s]+", inputCol=None, outputCol=None): """ - setParams(self, minTokenLength=1, gaps=False, pattern="\\p{L}+|[^\\p{L}\\s]+", + setParams(self, minTokenLength=1, gaps=False, pattern="\\p{L}+|[^\\p{L}\\s]+", \ inputCol="input", outputCol="output") Sets params for this RegexTokenizer. """ @@ -869,7 +869,7 @@ class Word2Vec(JavaEstimator, HasStepSize, HasMaxIter, HasSeed, HasInputCol, Has def __init__(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, seed=42, inputCol=None, outputCol=None): """ - __init__(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, + __init__(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, \ seed=42, inputCol=None, outputCol=None) """ super(Word2Vec, self).__init__() @@ -889,7 +889,7 @@ def __init__(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, def setParams(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, seed=42, inputCol=None, outputCol=None): """ - setParams(self, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, seed=42, + setParams(self, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, seed=42, \ inputCol=None, outputCol=None) Sets params for this Word2Vec. """ diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py index 4846b907e85ec..b2439cbd96522 100644 --- a/python/pyspark/ml/recommendation.py +++ b/python/pyspark/ml/recommendation.py @@ -92,8 +92,8 @@ def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemB implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0, ratingCol="rating", nonnegative=False, checkpointInterval=10): """ - __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, - implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=0, + __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \ + implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=0, \ ratingCol="rating", nonnegative=false, checkpointInterval=10) """ super(ALS, self).__init__() @@ -118,8 +118,8 @@ def setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItem implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0, ratingCol="rating", nonnegative=False, checkpointInterval=10): """ - setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, - implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0, + setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \ + implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0, \ ratingCol="rating", nonnegative=False, checkpointInterval=10) Sets params for ALS. """ diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py index 2803864ff4a17..ef77e19327188 100644 --- a/python/pyspark/ml/regression.py +++ b/python/pyspark/ml/regression.py @@ -33,8 +33,7 @@ class LinearRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPrediction Linear regression. The learning objective is to minimize the squared error, with regularization. - The specific squared error loss function used is: - L = 1/2n ||A weights - y||^2^ + The specific squared error loss function used is: L = 1/2n ||A weights - y||^2^ This support multiple types of regularization: - none (a.k.a. ordinary least squares) @@ -191,7 +190,7 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance"): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ - maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, + maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance") """ super(DecisionTreeRegressor, self).__init__() @@ -213,9 +212,8 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre impurity="variance"): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ - maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, - maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, - impurity="variance") + maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ + maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance") Sets params for the DecisionTreeRegressor. """ kwargs = self.setParams._input_kwargs @@ -286,10 +284,10 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance", numTrees=20, featureSubsetStrategy="auto", seed=42): """ - __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", - maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, - maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance", - numTrees=20, featureSubsetStrategy="auto", seed=42) + __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ + maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ + maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, \ + impurity="variance", numTrees=20, featureSubsetStrategy="auto", seed=42) """ super(RandomForestRegressor, self).__init__() #: param for Criterion used for information gain calculation (case-insensitive). @@ -321,9 +319,9 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, seed=42, impurity="variance", numTrees=20, featureSubsetStrategy="auto"): """ - setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", - maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, - maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, seed=42, + setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ + maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ + maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, seed=42, \ impurity="variance", numTrees=20, featureSubsetStrategy="auto") Sets params for linear regression. """ @@ -432,10 +430,10 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType="squared", maxIter=20, stepSize=0.1): """ - __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", - maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, - maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType="squared", - maxIter=20, stepSize=0.1) + __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ + maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ + maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, \ + lossType="squared", maxIter=20, stepSize=0.1) """ super(GBTRegressor, self).__init__() #: param for Loss function which GBT tries to minimize (case-insensitive). @@ -463,9 +461,9 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType="squared", maxIter=20, stepSize=0.1): """ - setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", - maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, - maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, + setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ + maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ + maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, \ lossType="squared", maxIter=20, stepSize=0.1) Sets params for Gradient Boosted Tree Regression. """