From 46eea43d25f18c58f2008e8fb083c6417a243ad6 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 31 Dec 2014 11:33:59 -0800 Subject: [PATCH] a pipeline in python --- .../ml/simple_text_classification_pipeline.py | 33 +++++++ .../org/apache/spark/ml/param/params.scala | 7 ++ python/pyspark/ml/__init__.py | 40 +++++++++ python/pyspark/ml/classification.py | 49 +++++------ python/pyspark/ml/feature.py | 85 +++++++++++++++++++ python/pyspark/ml/param.py | 6 -- python/pyspark/ml/test.py | 15 ++++ 7 files changed, 200 insertions(+), 35 deletions(-) create mode 100644 examples/src/main/python/ml/simple_text_classification_pipeline.py create mode 100644 python/pyspark/ml/feature.py create mode 100644 python/pyspark/ml/test.py diff --git a/examples/src/main/python/ml/simple_text_classification_pipeline.py b/examples/src/main/python/ml/simple_text_classification_pipeline.py new file mode 100644 index 0000000000000..f5558eaebe005 --- /dev/null +++ b/examples/src/main/python/ml/simple_text_classification_pipeline.py @@ -0,0 +1,33 @@ +from pyspark import SparkContext +from pyspark.sql import SQLContext, Row +from pyspark.ml import Pipeline +from pyspark.ml.feature import HashingTF, Tokenizer +from pyspark.ml.classification import LogisticRegression + +if __name__ == "__main__": + sc = SparkContext(appName="SimpleTextClassificationPipeline") + sqlCtx = SQLContext(sc) + training = sqlCtx.inferSchema( + sc.parallelize([(0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0)]) \ + .map(lambda x: Row(id=x[0], text=x[1], label=x[2]))) + + tokenizer = Tokenizer() \ + .setInputCol("text") \ + .setOutputCol("words") + hashingTF = HashingTF() \ + .setInputCol(tokenizer.getOutputCol()) \ + .setOutputCol("features") + lr = LogisticRegression() \ + .setMaxIter(10) \ + .setRegParam(0.01) + pipeline = Pipeline() \ + .setStages([tokenizer, hashingTF, lr]) + + model = pipeline.fit(training) + + test = sqlCtx.inferSchema( + sc.parallelize([(4L, "spark i j k"), (5L, "l m n"), (6L, "mapreduce spark"), (7L, "apache hadoop")]) \ + .map(lambda x: Row(id=x[0], text=x[1]))) + + for row in model.transform(test).collect(): + print row diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index 04f9cfb1bfc2f..33f7a3900a98e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -164,6 +164,13 @@ trait Params extends Identifiable with Serializable { this } + /** + * Sets a parameter (by name) in the embedded param map. + */ + private[ml] def set(param: String, value: Any): this.type = { + set(getParam(param), value) + } + /** * Gets the value of a parameter in the embedded param map. */ diff --git a/python/pyspark/ml/__init__.py b/python/pyspark/ml/__init__.py index 622acb96782e8..f5d3557a354d4 100644 --- a/python/pyspark/ml/__init__.py +++ b/python/pyspark/ml/__init__.py @@ -1,6 +1,9 @@ import inspect from pyspark import SparkContext +from pyspark.ml.param import Param + +__all__ = ["Pipeline"] # An implementation of PEP3102 for Python 2. _keyword_only_secret = 70861589 @@ -20,3 +23,40 @@ def _assert_keyword_only_args(): def _jvm(): return SparkContext._jvm + +class Pipeline(object): + + def __init__(self): + self.stages = Param(self, "stages", "pipeline stages") + self.paramMap = {} + + def setStages(self, value): + self.paramMap[self.stages] = value + return self + + def getStages(self): + if self.stages in self.paramMap: + return self.paramMap[self.stages] + + def fit(self, dataset): + transformers = [] + for stage in self.getStages(): + if hasattr(stage, "transform"): + transformers.append(stage) + dataset = stage.transform(dataset) + elif hasattr(stage, "fit"): + model = stage.fit(dataset) + transformers.append(model) + dataset = model.transform(dataset) + return PipelineModel(transformers) + + +class PipelineModel(object): + + def __init__(self, transformers): + self.transformers = transformers + + def transform(self, dataset): + for t in self.transformers: + dataset = t.transform(dataset) + return dataset diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 9fe81e864855f..13bbe69628df1 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1,5 +1,5 @@ from pyspark.sql import SchemaRDD -from pyspark.ml import _keyword_only_secret, _assert_keyword_only_args, _jvm +from pyspark.ml import _jvm from pyspark.ml.param import Param @@ -8,45 +8,39 @@ class LogisticRegression(object): Logistic regression. """ - _java_class = "org.apache.spark.ml.classification.LogisticRegression" + # _java_class = "org.apache.spark.ml.classification.LogisticRegression" def __init__(self): self._java_obj = _jvm().org.apache.spark.ml.classification.LogisticRegression() - self.paramMap = {} self.maxIter = Param(self, "maxIter", "max number of iterations", 100) self.regParam = Param(self, "regParam", "regularization constant", 0.1) + self.featuresCol = Param(self, "featuresCol", "features column name", "features") - def set(self, _keyword_only=_keyword_only_secret, - maxIter=None, regParam=None): - _assert_keyword_only_args() - if maxIter is not None: - self.paramMap[self.maxIter] = maxIter - if regParam is not None: - self.paramMap[self.regParam] = regParam - return self - - # cannot chained def setMaxIter(self, value): - self.paramMap[self.maxIter] = value + self._java_obj.setMaxIter(value) return self + def getMaxIter(self): + return self._java_obj.getMaxIter() + def setRegParam(self, value): - self.paramMap[self.regParam] = value + self._java_obj.setRegParam(value) return self - def getMaxIter(self): - if self.maxIter in self.paramMap: - return self.paramMap[self.maxIter] - else: - return self.maxIter.defaultValue - def getRegParam(self): - if self.regParam in self.paramMap: - return self.paramMap[self.regParam] - else: - return self.regParam.defaultValue + return self._java_obj.getRegParam() + + def setFeaturesCol(self, value): + self._java_obj.setFeaturesCol(value) + return self - def fit(self, dataset): + def getFeaturesCol(self): + return self._java_obj.getFeaturesCol() + + def fit(self, dataset, params=None): + """ + Fits a dataset with optional parameters. + """ java_model = self._java_obj.fit(dataset._jschema_rdd, _jvm().org.apache.spark.ml.param.ParamMap()) return LogisticRegressionModel(java_model) @@ -62,6 +56,3 @@ def __init__(self, _java_model): def transform(self, dataset): return SchemaRDD(self._java_model.transform(dataset._jschema_rdd, _jvm().org.apache.spark.ml.param.ParamMap()), dataset.sql_ctx) -lr = LogisticRegression() - -lr.set(maxIter=10, regParam=0.1) diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py new file mode 100644 index 0000000000000..23923204b60a9 --- /dev/null +++ b/python/pyspark/ml/feature.py @@ -0,0 +1,85 @@ +from pyspark.sql import SchemaRDD, ArrayType, StringType +from pyspark.ml import _jvm +from pyspark.ml.param import Param + + +class Tokenizer(object): + + def __init__(self): + self.inputCol = Param(self, "inputCol", "input column name", None) + self.outputCol = Param(self, "outputCol", "output column name", None) + self.paramMap = {} + + def setInputCol(self, value): + self.paramMap[self.inputCol] = value + return self + + def getInputCol(self): + if self.inputCol in self.paramMap: + return self.paramMap[self.inputCol] + + def setOutputCol(self, value): + self.paramMap[self.outputCol] = value + return self + + def getOutputCol(self): + if self.outputCol in self.paramMap: + return self.paramMap[self.outputCol] + + def transform(self, dataset, params={}): + sqlCtx = dataset.sql_ctx + if isinstance(params, dict): + paramMap = self.paramMap.copy() + paramMap.update(params) + inputCol = paramMap[self.inputCol] + outputCol = paramMap[self.outputCol] + # TODO: make names unique + sqlCtx.registerFunction("tokenize", lambda text: text.split(), + ArrayType(StringType(), False)) + dataset.registerTempTable("dataset") + return sqlCtx.sql("SELECT *, tokenize(%s) AS %s FROM dataset" % (inputCol, outputCol)) + elif isinstance(params, list): + return [self.transform(dataset, paramMap) for paramMap in params] + else: + raise ValueError("The input params must be either a dict or a list.") + + +class HashingTF(object): + + def __init__(self): + self._java_obj = _jvm().org.apache.spark.ml.feature.HashingTF() + self.numFeatures = Param(self, "numFeatures", "number of features", 1 << 18) + self.inputCol = Param(self, "inputCol", "input column name") + self.outputCol = Param(self, "outputCol", "output column name") + + def setNumFeatures(self, value): + self._java_obj.setNumFeatures(value) + return self + + def getNumFeatures(self): + return self._java_obj.getNumFeatures() + + def setInputCol(self, value): + self._java_obj.setInputCol(value) + return self + + def getInputCol(self): + return self._java_obj.getInputCol() + + def setOutputCol(self, value): + self._java_obj.setOutputCol(value) + return self + + def getOutputCol(self): + return self._java_obj.getOutputCol() + + def transform(self, dataset, paramMap={}): + if isinstance(paramMap, dict): + javaParamMap = _jvm().org.apache.spark.ml.param.ParamMap() + for k, v in paramMap.items(): + param = self._java_obj.getParam(k.name) + javaParamMap.put(param, v) + return SchemaRDD(self._java_obj.transform(dataset._jschema_rdd, javaParamMap), + dataset.sql_ctx) + else: + raise ValueError("paramMap must be a dict.") diff --git a/python/pyspark/ml/param.py b/python/pyspark/ml/param.py index be85675233cde..181a158cb94c8 100644 --- a/python/pyspark/ml/param.py +++ b/python/pyspark/ml/param.py @@ -14,9 +14,3 @@ def __str__(self): def __repr_(self): return self.parent + "_" + self.name - - -class Params(object): - """ - Components that take parameters. - """ diff --git a/python/pyspark/ml/test.py b/python/pyspark/ml/test.py new file mode 100644 index 0000000000000..aad7483488ad7 --- /dev/null +++ b/python/pyspark/ml/test.py @@ -0,0 +1,15 @@ +import subprocess + +def funcA(dataset, **kwargs): + """ + funcA + :param dataset: + :param kwargs: + + :return: + """ + pass + + +dataset = [] +funcA(dataset, )