From d19ef558433b7b062b40332ea38ac869fb7eb0d5 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Thu, 2 Apr 2015 19:28:15 +0800 Subject: [PATCH] change OnlineLDA to class --- .../spark/mllib/clustering/OnlineLDA.scala | 103 ++++++++++++------ 1 file changed, 72 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala index 13891bba6573e..72c550144db0a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala @@ -24,7 +24,6 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg._ import org.apache.spark.rdd.RDD - /** * :: Experimental :: * Latent Dirichlet Allocation (LDA), a topic model designed for text documents. @@ -37,7 +36,58 @@ import org.apache.spark.rdd.RDD * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. */ @Experimental -object OnlineLDA{ +class OnlineLDA( + private var k: Int, + private var numIterations: Int, + private var miniBatchFraction: Double, + private var tau_0: Double, + private var kappa: Double) { + + def this() = this(k = 10, numIterations = 100, miniBatchFraction = 0.01, + tau_0 = 1024, kappa = 0.5) + + /** + * Number of topics to infer. I.e., the number of soft cluster centers. + * (default = 10) + */ + def setK(k: Int): this.type = { + require(k > 0, s"OnlineLDA k (number of clusters) must be > 0, but was set to $k") + this.k = k + this + } + + /** + * Set the number of iterations for OnlineLDA. Default 100. + */ + def setNumIterations(iters: Int): this.type = { + this.numIterations = iters + this + } + + /** + * Set fraction of data to be used for each iteration. Default 0.01. + */ + def setMiniBatchFraction(fraction: Double): this.type = { + this.miniBatchFraction = fraction + this + } + + /** + * A (positive) learning parameter that downweights early iterations. Default 1024. + */ + def setTau_0(t: Double): this.type = { + this.tau_0 = t + this + } + + /** + * Learning rate: exponential decay rate. Default 0.5. + */ + def setKappa(kappa: Double): this.type = { + this.kappa = kappa + this + } + /** * Learns an LDA model from the given data set, using online variational Bayes (VB) algorithm. @@ -49,33 +99,18 @@ object OnlineLDA{ * The term count vectors are "bags of words" with a fixed-size vocabulary * (where the vocabulary size is the length of the vector). * Document IDs must be unique and >= 0. - * @param k Number of topics to infer. - * @param batchNumber Number of batches to split input corpus. For each batch, recommendation - * size is [4, 16384]. -1 for automatic batchNumber. * @return Inferred LDA model */ - def run(documents: RDD[(Long, Vector)], k: Int, batchNumber: Int = -1): LDAModel = { - require(batchNumber > 0 || batchNumber == -1, - s"batchNumber must be greater or -1, but was set to $batchNumber") - require(k > 0, s"LDA k (number of clusters) must be > 0, but was set to $k") - + def run(documents: RDD[(Long, Vector)]): LDAModel = { val vocabSize = documents.first._2.size val D = documents.count().toInt // total documents count - val batchSize = - if (batchNumber == -1) { // auto mode - if (D / 100 > 16384) 16384 - else if (D / 100 < 4) 4 - else D / 100 - } - else { - D / batchNumber - } - - val onlineLDA = new OnlineLDAOptimizer(k, D, vocabSize) - val actualBatchNumber = Math.ceil(D.toDouble / batchSize).toInt - for(i <- 1 to actualBatchNumber){ - val batch = documents.sample(true, batchSize.toDouble / D) - onlineLDA.submitMiniBatch(batch) + val onlineLDA = new OnlineLDAOptimizer(k, D, vocabSize, tau_0, kappa) + + val arr = Array.fill(math.ceil(1.0 / miniBatchFraction).toInt)(miniBatchFraction) + for(i <- 0 until numIterations){ + val splits = documents.randomSplit(arr) + val index = i % splits.size + onlineLDA.submitMiniBatch(splits(index)) } onlineLDA.getTopicDistribution() } @@ -93,10 +128,12 @@ object OnlineLDA{ * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. */ @Experimental -class OnlineLDAOptimizer ( +private[clustering] class OnlineLDAOptimizer ( private var k: Int, private var D: Int, - private val vocabSize:Int) extends Serializable { + private val vocabSize: Int, + private val tau_0: Double, + private val kappa: Double) extends Serializable { // Initialize the variational distribution q(beta|lambda) private var lambda = getGammaMatrix(k, vocabSize) // K * V @@ -115,7 +152,11 @@ class OnlineLDAOptimizer ( * Document IDs must be unique and >= 0. * @return Inferred LDA model */ - def submitMiniBatch(documents: RDD[(Long, Vector)]): Unit = { + private[clustering] def submitMiniBatch(documents: RDD[(Long, Vector)]): Unit = { + if(documents.isEmpty()){ + return + } + var stat = BDM.zeros[Double](k, vocabSize) stat = documents.treeAggregate(stat)(gradient, _ += _) update(stat, i, documents.count().toInt) @@ -125,13 +166,13 @@ class OnlineLDAOptimizer ( /** * get the topic-term distribution */ - def getTopicDistribution(): LDAModel ={ + private[clustering] def getTopicDistribution(): LDAModel ={ new LocalLDAModel(Matrices.fromBreeze(lambda).transpose) } private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={ - // weight of the mini-batch. 1024 helps down weights early iterations - val weight = math.pow(1024 + iter, -0.5) + // weight of the mini-batch. + val weight = math.pow(tau_0 + iter, -kappa) // This step finishes computing the sufficient statistics for the M step val stat = raw :* expElogbeta