diff --git a/python/docs/conf.py b/python/docs/conf.py index e58d97ae6a746..b00dce95d65b4 100644 --- a/python/docs/conf.py +++ b/python/docs/conf.py @@ -55,9 +55,9 @@ # built documents. # # The short X.Y version. -version = '1.2-SNAPSHOT' +version = '1.3-SNAPSHOT' # The full version, including alpha/beta/rc tags. -release = '1.2-SNAPSHOT' +release = '1.3-SNAPSHOT' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/python/docs/index.rst b/python/docs/index.rst index 703bef644de28..d150de9d5c502 100644 --- a/python/docs/index.rst +++ b/python/docs/index.rst @@ -14,6 +14,7 @@ Contents: pyspark pyspark.sql pyspark.streaming + pyspark.ml pyspark.mllib diff --git a/python/docs/pyspark.ml.rst b/python/docs/pyspark.ml.rst index 064fcd9c03204..f10d1339a9a8f 100644 --- a/python/docs/pyspark.ml.rst +++ b/python/docs/pyspark.ml.rst @@ -10,16 +10,6 @@ pyspark.ml module .. automodule:: pyspark.ml :members: :undoc-members: - :show-inheritance: - :inherited-members: - -pyspark.ml.param module ------------------------ - -.. automodule:: pyspark.ml.param - :members: - :undoc-members: - :show-inheritance: :inherited-members: pyspark.ml.feature module @@ -28,7 +18,6 @@ pyspark.ml.feature module .. automodule:: pyspark.ml.feature :members: :undoc-members: - :show-inheritance: :inherited-members: pyspark.ml.classification module @@ -37,5 +26,4 @@ pyspark.ml.classification module .. automodule:: pyspark.ml.classification :members: :undoc-members: - :show-inheritance: :inherited-members: diff --git a/python/docs/pyspark.rst b/python/docs/pyspark.rst index e81be3b6cb796..0df12c49ad033 100644 --- a/python/docs/pyspark.rst +++ b/python/docs/pyspark.rst @@ -9,6 +9,7 @@ Subpackages pyspark.sql pyspark.streaming + pyspark.ml pyspark.mllib Contents diff --git a/python/pyspark/ml/__init__.py b/python/pyspark/ml/__init__.py index 5b484f7392521..47fed80f42e13 100644 --- a/python/pyspark/ml/__init__.py +++ b/python/pyspark/ml/__init__.py @@ -15,310 +15,7 @@ # limitations under the License. # -from abc import ABCMeta, abstractmethod, abstractproperty +from pyspark.ml.param import * +from pyspark.ml.pipeline import * -from pyspark import SparkContext -from pyspark.sql import SchemaRDD, inherit_doc # TODO: move inherit_doc to Spark Core -from pyspark.ml.param import Param, Params -from pyspark.ml.util import Identifiable - -__all__ = ["Pipeline", "Transformer", "Estimator", "param", "feature", "classification"] - - -def _jvm(): - """ - Returns the JVM view associated with SparkContext. Must be called - after SparkContext is initialized. - """ - jvm = SparkContext._jvm - if jvm: - return jvm - else: - raise AttributeError("Cannot load _jvm from SparkContext. Is SparkContext initialized?") - - -@inherit_doc -class PipelineStage(Params): - """ - A stage in a pipeline, either an :py:class:`Estimator` or a - :py:class:`Transformer`. - """ - - __metaclass__ = ABCMeta - - def __init__(self): - super(PipelineStage, self).__init__() - - -@inherit_doc -class Estimator(PipelineStage): - """ - Abstract class for estimators that fit models to data. - """ - - __metaclass__ = ABCMeta - - def __init__(self): - super(Estimator, self).__init__() - - @abstractmethod - def fit(self, dataset, params={}): - """ - Fits a model to the input dataset with optional parameters. - - :param dataset: input dataset, which is an instance of - :py:class:`pyspark.sql.SchemaRDD` - :param params: an optional param map that overwrites embedded - params - :returns: fitted model - """ - raise NotImplementedError() - - -@inherit_doc -class Transformer(PipelineStage): - """ - Abstract class for transformers that transform one dataset into - another. - """ - - __metaclass__ = ABCMeta - - def __init__(self): - super(Transformer, self).__init__() - - @abstractmethod - def transform(self, dataset, params={}): - """ - Transforms the input dataset with optional parameters. - - :param dataset: input dataset, which is an instance of - :py:class:`pyspark.sql.SchemaRDD` - :param params: an optional param map that overwrites embedded - params - :returns: transformed dataset - """ - raise NotImplementedError() - - -@inherit_doc -class Model(Transformer): - """ - Abstract class for models fitted by :py:class:`Estimator`s. - """ - - ___metaclass__ = ABCMeta - - def __init__(self): - super(Model, self).__init__() - - -@inherit_doc -class Pipeline(Estimator): - """ - A simple pipeline, which acts as an estimator. A Pipeline consists - of a sequence of stages, each of which is either an - :py:class:`Estimator` or a :py:class:`Transformer`. When - :py:meth:`Pipeline.fit` is called, the stages are executed in - order. If a stage is an :py:class:`Estimator`, its - :py:meth:`Estimator.fit` method will be called on the input - dataset to fit a model. Then the model, which is a transformer, - will be used to transform the dataset as the input to the next - stage. If a stage is a :py:class:`Transformer`, its - :py:meth:`Transformer.transform` method will be called to produce - the dataset for the next stage. The fitted model from a - :py:class:`Pipeline` is an :py:class:`PipelineModel`, which - consists of fitted models and transformers, corresponding to the - pipeline stages. If there are no stages, the pipeline acts as an - identity transformer. - """ - - def __init__(self): - super(Pipeline, self).__init__() - #: Param for pipeline stages. - self.stages = Param(self, "stages", "pipeline stages") - - def setStages(self, value): - """ - Set pipeline stages. - :param value: a list of transformers or estimators - :return: the pipeline instance - """ - self.paramMap[self.stages] = value - return self - - def getStages(self): - """ - Get pipeline stages. - """ - if self.stages in self.paramMap: - return self.paramMap[self.stages] - - def fit(self, dataset, params={}): - paramMap = self._merge_params(params) - stages = paramMap[self.stages] - for stage in stages: - if not (isinstance(stage, Estimator) or isinstance(stage, Transformer)): - raise ValueError( - "Cannot recognize a pipeline stage of type %s." % type(stage).__name__) - indexOfLastEstimator = -1 - for i, stage in enumerate(stages): - if isinstance(stage, Estimator): - indexOfLastEstimator = i - transformers = [] - for i, stage in enumerate(stages): - if i <= indexOfLastEstimator: - if isinstance(stage, Transformer): - transformers.append(stage) - dataset = stage.transform(dataset, paramMap) - else: # must be an Estimator - model = stage.fit(dataset, paramMap) - transformers.append(model) - if i < indexOfLastEstimator: - dataset = model.transform(dataset, paramMap) - else: - transformers.append(stage) - return PipelineModel(transformers) - - -@inherit_doc -class PipelineModel(Model): - """ - Represents a compiled pipeline with transformers and fitted models. - """ - - def __init__(self, transformers): - super(PipelineModel, self).__init__() - self.transformers = transformers - - def transform(self, dataset, params={}): - paramMap = self._merge_params(params) - for t in self.transformers: - dataset = t.transform(dataset, paramMap) - return dataset - - -@inherit_doc -class JavaWrapper(Params): - """ - Utility class to help create wrapper classes from Java/Scala - implementations of pipeline components. - """ - - __metaclass__ = ABCMeta - - def __init__(self): - super(JavaWrapper, self).__init__() - - @abstractproperty - def _java_class(self): - """ - Fully-qualified class name of the wrapped Java component. - """ - raise NotImplementedError - - def _java_obj(self): - """ - Returns or creates a Java object. - """ - java_obj = _jvm() - for name in self._java_class.split("."): - java_obj = getattr(java_obj, name) - return java_obj() - - def _transfer_params_to_java(self, params, java_obj): - """ - Transforms the embedded params and additional params to the - input Java object. - :param params: additional params (overwriting embedded values) - :param java_obj: Java object to receive the params - """ - paramMap = self._merge_params(params) - for param in self.params: - if param in paramMap: - java_obj.set(param.name, paramMap[param]) - - def _empty_java_param_map(self): - """ - Returns an empty Java ParamMap reference. - """ - return _jvm().org.apache.spark.ml.param.ParamMap() - - def _create_java_param_map(self, params, java_obj): - paramMap = self._empty_java_param_map() - for param, value in params.items(): - if param.parent is self: - paramMap.put(java_obj.getParam(param.name), value) - return paramMap - - -@inherit_doc -class JavaEstimator(Estimator, JavaWrapper): - """ - Base class for :py:class:`Estimator`s that wrap Java/Scala - implementations. - """ - - __metaclass__ = ABCMeta - - def __init__(self): - super(JavaEstimator, self).__init__() - - @abstractmethod - def _create_model(self, java_model): - """ - Creates a model from the input Java model reference. - """ - raise NotImplementedError - - def _fit_java(self, dataset, params={}): - """ - Fits a Java model to the input dataset. - :param dataset: input dataset, which is an instance of - :py:class:`pyspark.sql.SchemaRDD` - :param params: additional params (overwriting embedded values) - :return: fitted Java model - """ - java_obj = self._java_obj() - self._transfer_params_to_java(params, java_obj) - return java_obj.fit(dataset._jschema_rdd, self._empty_java_param_map()) - - def fit(self, dataset, params={}): - java_model = self._fit_java(dataset, params) - return self._create_model(java_model) - - -@inherit_doc -class JavaTransformer(Transformer, JavaWrapper): - """ - Base class for :py:class:`Transformer`s that wrap Java/Scala - implementations. - """ - - __metaclass__ = ABCMeta - - def __init__(self): - super(JavaTransformer, self).__init__() - - def transform(self, dataset, params={}): - java_obj = self._java_obj() - self._transfer_params_to_java({}, java_obj) - java_param_map = self._create_java_param_map(params, java_obj) - return SchemaRDD(java_obj.transform(dataset._jschema_rdd, java_param_map), - dataset.sql_ctx) - - -@inherit_doc -class JavaModel(JavaTransformer): - """ - Base class for :py:class:`Model`s that wrap Java/Scala - implementations. - """ - - __metaclass__ = ABCMeta - - def __init__(self): - super(JavaTransformer, self).__init__() - - def _java_obj(self): - return self._java_model +__all__ = ["Param", "Params", "Transformer", "Estimator", "Pipeline"] diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 0384c39b5d5ab..6bd2aa8e47837 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -15,12 +15,15 @@ # limitations under the License. # -from pyspark.sql import inherit_doc -from pyspark.ml import JavaEstimator, JavaModel +from pyspark.ml.util import inherit_doc +from pyspark.ml.wrapper import JavaEstimator, JavaModel from pyspark.ml.param.shared import HasFeaturesCol, HasLabelCol, HasPredictionCol, HasMaxIter,\ HasRegParam +__all__ = ['LogisticRegression', 'LogisticRegressionModel'] + + @inherit_doc class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, HasMaxIter, HasRegParam): @@ -37,38 +40,23 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti .setRegParam(0.01) >>> model = lr.fit(dataset) >>> test0 = sqlCtx.inferSchema(sc.parallelize([Row(features=Vectors.dense(-1.0))])) - >>> print model.transform(test0).first().prediction + >>> print model.transform(test0).head().prediction 0.0 >>> test1 = sqlCtx.inferSchema(sc.parallelize([Row(features=Vectors.sparse(1, [0], [1.0]))])) - >>> print model.transform(test1).first().prediction + >>> print model.transform(test1).head().prediction 1.0 """ - - def __init__(self): - super(LogisticRegression, self).__init__() - - @property - def _java_class(self): - return "org.apache.spark.ml.classification.LogisticRegression" + _java_class = "org.apache.spark.ml.classification.LogisticRegression" def _create_model(self, java_model): return LogisticRegressionModel(java_model) -@inherit_doc class LogisticRegressionModel(JavaModel): """ Model fitted by LogisticRegression. """ - def __init__(self, java_model): - super(LogisticRegressionModel, self).__init__() - self._java_model = java_model - - @property - def _java_class(self): - return "org.apache.spark.ml.classification.LogisticRegressionModel" - if __name__ == "__main__": import doctest diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 7e9746cdccac2..e088acd0ca82d 100644 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -15,9 +15,11 @@ # limitations under the License. # -from pyspark.sql import inherit_doc -from pyspark.ml import JavaTransformer from pyspark.ml.param.shared import HasInputCol, HasOutputCol, HasNumFeatures +from pyspark.ml.util import inherit_doc +from pyspark.ml.wrapper import JavaTransformer + +__all__ = ['Tokenizer', 'HashingTF'] @inherit_doc @@ -31,18 +33,13 @@ class Tokenizer(JavaTransformer, HasInputCol, HasOutputCol): >>> tokenizer = Tokenizer() \ .setInputCol("text") \ .setOutputCol("words") - >>> print tokenizer.transform(dataset).first() + >>> print tokenizer.transform(dataset).head() Row(text=u'a b c', words=[u'a', u'b', u'c']) - >>> print tokenizer.transform(dataset, {tokenizer.outputCol: "tokens"}).first() + >>> print tokenizer.transform(dataset, {tokenizer.outputCol: "tokens"}).head() Row(text=u'a b c', tokens=[u'a', u'b', u'c']) """ - def __init__(self): - super(Tokenizer, self).__init__() - - @property - def _java_class(self): - return "org.apache.spark.ml.feature.Tokenizer" + _java_class = "org.apache.spark.ml.feature.Tokenizer" @inherit_doc @@ -57,19 +54,14 @@ class HashingTF(JavaTransformer, HasInputCol, HasOutputCol, HasNumFeatures): .setNumFeatures(10) \ .setInputCol("words") \ .setOutputCol("features") - >>> print hashingTF.transform(dataset).first().features + >>> print hashingTF.transform(dataset).head().features (10,[7,8,9],[1.0,1.0,1.0]) >>> params = {hashingTF.numFeatures: 5, hashingTF.outputCol: "vector"} - >>> print hashingTF.transform(dataset, params).first().vector + >>> print hashingTF.transform(dataset, params).head().vector (5,[2,3,4],[1.0,1.0,1.0]) """ - def __init__(self): - super(HashingTF, self).__init__() - - @property - def _java_class(self): - return "org.apache.spark.ml.feature.HashingTF" + _java_class = "org.apache.spark.ml.feature.HashingTF" if __name__ == "__main__": diff --git a/python/pyspark/ml/param/__init__.py b/python/pyspark/ml/param/__init__.py index 80c62c8cdb5d0..9d657acdd94f4 100644 --- a/python/pyspark/ml/param/__init__.py +++ b/python/pyspark/ml/param/__init__.py @@ -15,11 +15,24 @@ # limitations under the License. # +import uuid from abc import ABCMeta -from pyspark.ml.util import Identifiable +__all__ = ['Param', 'Params'] -__all__ = ["Param"] + +class Identifiable(object): + """ + Object with a unique ID. + """ + + def __init__(self): + #: A unique id for the object. The default implementation + #: concatenates the class name, "-", and 8 random hex chars. + self.uid = type(self).__name__ + "-" + uuid.uuid4().hex[:8] + + def __repr__(self): + return self.uid class Param(object): diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py new file mode 100644 index 0000000000000..0c5ec86620a97 --- /dev/null +++ b/python/pyspark/ml/pipeline.py @@ -0,0 +1,166 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from abc import ABCMeta, abstractmethod + +from pyspark.ml.param import Param, Params + +__all__ = ['Estimator', 'Transformer', 'Pipeline', 'PipelineModel'] + + +def inherit_doc(cls): + for name, func in vars(cls).items(): + # only inherit docstring for public functions + if name.startswith("_"): + continue + if not func.__doc__: + for parent in cls.__bases__: + parent_func = getattr(parent, name, None) + if parent_func and getattr(parent_func, "__doc__", None): + func.__doc__ = parent_func.__doc__ + break + return cls + + +@inherit_doc +class Estimator(Params): + """ + Abstract class for estimators that fit models to data. + """ + + __metaclass__ = ABCMeta + + @abstractmethod + def fit(self, dataset, params={}): + """ + Fits a model to the input dataset with optional parameters. + + :param dataset: input dataset, which is an instance of + :py:class:`pyspark.sql.SchemaRDD` + :param params: an optional param map that overwrites embedded + params + :returns: fitted model + """ + raise NotImplementedError() + + +@inherit_doc +class Transformer(Params): + """ + Abstract class for transformers that transform one dataset into + another. + """ + + __metaclass__ = ABCMeta + + @abstractmethod + def transform(self, dataset, params={}): + """ + Transforms the input dataset with optional parameters. + + :param dataset: input dataset, which is an instance of + :py:class:`pyspark.sql.SchemaRDD` + :param params: an optional param map that overwrites embedded + params + :returns: transformed dataset + """ + raise NotImplementedError() + + +@inherit_doc +class Pipeline(Estimator): + """ + A simple pipeline, which acts as an estimator. A Pipeline consists + of a sequence of stages, each of which is either an + :py:class:`Estimator` or a :py:class:`Transformer`. When + :py:meth:`Pipeline.fit` is called, the stages are executed in + order. If a stage is an :py:class:`Estimator`, its + :py:meth:`Estimator.fit` method will be called on the input + dataset to fit a model. Then the model, which is a transformer, + will be used to transform the dataset as the input to the next + stage. If a stage is a :py:class:`Transformer`, its + :py:meth:`Transformer.transform` method will be called to produce + the dataset for the next stage. The fitted model from a + :py:class:`Pipeline` is an :py:class:`PipelineModel`, which + consists of fitted models and transformers, corresponding to the + pipeline stages. If there are no stages, the pipeline acts as an + identity transformer. + """ + + def __init__(self): + super(Pipeline, self).__init__() + #: Param for pipeline stages. + self.stages = Param(self, "stages", "pipeline stages") + + def setStages(self, value): + """ + Set pipeline stages. + :param value: a list of transformers or estimators + :return: the pipeline instance + """ + self.paramMap[self.stages] = value + return self + + def getStages(self): + """ + Get pipeline stages. + """ + if self.stages in self.paramMap: + return self.paramMap[self.stages] + + def fit(self, dataset, params={}): + paramMap = self._merge_params(params) + stages = paramMap[self.stages] + for stage in stages: + if not (isinstance(stage, Estimator) or isinstance(stage, Transformer)): + raise ValueError( + "Cannot recognize a pipeline stage of type %s." % type(stage).__name__) + indexOfLastEstimator = -1 + for i, stage in enumerate(stages): + if isinstance(stage, Estimator): + indexOfLastEstimator = i + transformers = [] + for i, stage in enumerate(stages): + if i <= indexOfLastEstimator: + if isinstance(stage, Transformer): + transformers.append(stage) + dataset = stage.transform(dataset, paramMap) + else: # must be an Estimator + model = stage.fit(dataset, paramMap) + transformers.append(model) + if i < indexOfLastEstimator: + dataset = model.transform(dataset, paramMap) + else: + transformers.append(stage) + return PipelineModel(transformers) + + +@inherit_doc +class PipelineModel(Transformer): + """ + Represents a compiled pipeline with transformers and fitted models. + """ + + def __init__(self, transformers): + super(PipelineModel, self).__init__() + self.transformers = transformers + + def transform(self, dataset, params={}): + paramMap = self._merge_params(params) + for t in self.transformers: + dataset = t.transform(dataset, paramMap) + return dataset diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 2f5f2d18d5d01..b627c2b4e930b 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -31,12 +31,12 @@ import unittest from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase -from pyspark.sql import SchemaRDD -from pyspark.ml import Transformer, Estimator, Model, Pipeline +from pyspark.sql import DataFrame from pyspark.ml.param import Param +from pyspark.ml.pipeline import Transformer, Estimator, Pipeline -class MockDataset(SchemaRDD): +class MockDataset(DataFrame): def __init__(self): self.index = 0 @@ -76,7 +76,7 @@ def fit(self, dataset, params={}): return model -class MockModel(MockTransformer, Model): +class MockModel(MockTransformer, Transformer): def __init__(self): super(MockModel, self).__init__() diff --git a/python/pyspark/ml/util.py b/python/pyspark/ml/util.py index 5d74088b0b13e..991330f78e983 100644 --- a/python/pyspark/ml/util.py +++ b/python/pyspark/ml/util.py @@ -15,21 +15,16 @@ # limitations under the License. # -import uuid - -class Identifiable(object): - """ - Object with a unique ID. - """ - - def __init__(self): - #: A unique id for the object. The default implementation - #: concatenates the class name, "-", and 8 random hex chars. - self.uid = type(self).__name__ + "-" + uuid.uuid4().hex[:8] - - def __str__(self): - return self.uid - - def __repr__(self): - return str(self) +def inherit_doc(cls): + for name, func in vars(cls).items(): + # only inherit docstring for public functions + if name.startswith("_"): + continue + if not func.__doc__: + for parent in cls.__bases__: + parent_func = getattr(parent, name, None) + if parent_func and getattr(parent_func, "__doc__", None): + func.__doc__ = parent_func.__doc__ + break + return cls diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py new file mode 100644 index 0000000000000..9e12ddc3d9b8f --- /dev/null +++ b/python/pyspark/ml/wrapper.py @@ -0,0 +1,149 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from abc import ABCMeta + +from pyspark import SparkContext +from pyspark.sql import DataFrame +from pyspark.ml.param import Params +from pyspark.ml.pipeline import Estimator, Transformer +from pyspark.ml.util import inherit_doc + + +def _jvm(): + """ + Returns the JVM view associated with SparkContext. Must be called + after SparkContext is initialized. + """ + jvm = SparkContext._jvm + if jvm: + return jvm + else: + raise AttributeError("Cannot load _jvm from SparkContext. Is SparkContext initialized?") + + +@inherit_doc +class JavaWrapper(Params): + """ + Utility class to help create wrapper classes from Java/Scala + implementations of pipeline components. + """ + + __metaclass__ = ABCMeta + + #: Fully-qualified class name of the wrapped Java component. + _java_class = None + + def _java_obj(self): + """ + Returns or creates a Java object. + """ + java_obj = _jvm() + for name in self._java_class.split("."): + java_obj = getattr(java_obj, name) + return java_obj() + + def _transfer_params_to_java(self, params, java_obj): + """ + Transforms the embedded params and additional params to the + input Java object. + :param params: additional params (overwriting embedded values) + :param java_obj: Java object to receive the params + """ + paramMap = self._merge_params(params) + for param in self.params: + if param in paramMap: + java_obj.set(param.name, paramMap[param]) + + def _empty_java_param_map(self): + """ + Returns an empty Java ParamMap reference. + """ + return _jvm().org.apache.spark.ml.param.ParamMap() + + def _create_java_param_map(self, params, java_obj): + paramMap = self._empty_java_param_map() + for param, value in params.items(): + if param.parent is self: + paramMap.put(java_obj.getParam(param.name), value) + return paramMap + + +@inherit_doc +class JavaEstimator(Estimator, JavaWrapper): + """ + Base class for :py:class:`Estimator`s that wrap Java/Scala + implementations. + """ + + __metaclass__ = ABCMeta + + def _create_model(self, java_model): + """ + Creates a model from the input Java model reference. + """ + return JavaModel(java_model) + + def _fit_java(self, dataset, params={}): + """ + Fits a Java model to the input dataset. + :param dataset: input dataset, which is an instance of + :py:class:`pyspark.sql.SchemaRDD` + :param params: additional params (overwriting embedded values) + :return: fitted Java model + """ + java_obj = self._java_obj() + self._transfer_params_to_java(params, java_obj) + return java_obj.fit(dataset._jdf, self._empty_java_param_map()) + + def fit(self, dataset, params={}): + java_model = self._fit_java(dataset, params) + return self._create_model(java_model) + + +@inherit_doc +class JavaTransformer(Transformer, JavaWrapper): + """ + Base class for :py:class:`Transformer`s that wrap Java/Scala + implementations. + """ + + __metaclass__ = ABCMeta + + def transform(self, dataset, params={}): + java_obj = self._java_obj() + self._transfer_params_to_java({}, java_obj) + java_param_map = self._create_java_param_map(params, java_obj) + return DataFrame(java_obj.transform(dataset._jdf, java_param_map), + dataset.sql_ctx) + + +@inherit_doc +class JavaModel(JavaTransformer): + """ + Base class for :py:class:`Model`s that wrap Java/Scala + implementations. + """ + + __metaclass__ = ABCMeta + + def __init__(self, java_model): + super(JavaTransformer, self).__init__() + self._java_model = java_model + + def _java_obj(self): + return self._java_model diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 7d7550c854b2f..c3a6938f56864 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -1794,20 +1794,6 @@ def __repr__(self): return "" % ", ".join(self) -def inherit_doc(cls): - for name, func in vars(cls).items(): - # only inherit docstring for public functions - if name.startswith("_"): - continue - if not func.__doc__: - for parent in cls.__bases__: - parent_func = getattr(parent, name, None) - if parent_func and getattr(parent_func, "__doc__", None): - func.__doc__ = parent_func.__doc__ - break - return cls - - class DataFrame(object): """A collection of rows that have the same columns. diff --git a/python/run-tests b/python/run-tests index 9ee19ed6e6b26..57e58c1341c62 100755 --- a/python/run-tests +++ b/python/run-tests @@ -81,6 +81,13 @@ function run_mllib_tests() { run_test "pyspark/mllib/tests.py" } +function run_ml_tests() { + echo "Run ml tests ..." + run_test "pyspark/ml/feature.py" + run_test "pyspark/ml/classification.py" + run_test "pyspark/ml/tests.py" +} + function run_streaming_tests() { echo "Run streaming tests ..." run_test "pyspark/streaming/util.py" @@ -102,6 +109,7 @@ $PYSPARK_PYTHON --version run_core_tests run_sql_tests run_mllib_tests +run_ml_tests run_streaming_tests # Try to test with PyPy