Skip to content

Commit

Permalink
remove minCount and window from constructor
Browse files Browse the repository at this point in the history
change model to use float instead of double
  • Loading branch information
mengxr committed Aug 4, 2014
1 parent e93e726 commit 384c771
Showing 1 changed file with 63 additions and 67 deletions.
130 changes: 63 additions & 67 deletions mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel
import org.apache.spark.mllib.rdd.RDDFunctions._

/**
* Entry in vocabulary
*/
Expand Down Expand Up @@ -61,18 +62,15 @@ private case class VocabWord(
* Distributed Representations of Words and Phrases and their Compositionality.
* @param size vector dimension
* @param startingAlpha initial learning rate
* @param window context words from [-window, window]
* @param minCount minimum frequncy to consider a vocabulary word
* @param parallelisum number of partitions to run Word2Vec
* @param parallelism number of partitions to run Word2Vec (using a small number for accuracy)
* @param numIterations number of iterations to run, should be smaller than or equal to parallelism
*/
@Experimental
class Word2Vec(
val size: Int,
val startingAlpha: Double,
val window: Int,
val minCount: Int,
val parallelism:Int = 1,
val numIterations:Int = 1)
val parallelism: Int = 1,
val numIterations: Int = 1)
extends Serializable with Logging {

private val EXP_TABLE_SIZE = 1000
Expand All @@ -81,7 +79,13 @@ class Word2Vec(
private val MAX_SENTENCE_LENGTH = 1000
private val layer1Size = size
private val modelPartitionNum = 100


/** context words from [-window, window] */
private val window = 5

/** minimum frequency to consider a vocabulary word */
private val minCount = 5

private var trainWordsCount = 0
private var vocabSize = 0
private var vocab: Array[VocabWord] = null
Expand All @@ -99,7 +103,7 @@ class Word2Vec(
0))
.filter(_.cn >= minCount)
.collect()
.sortWith((a, b)=> a.cn > b.cn)
.sortWith((a, b) => a.cn > b.cn)

vocabSize = vocab.length
var a = 0
Expand All @@ -111,16 +115,12 @@ class Word2Vec(
logInfo("trainWordsCount = " + trainWordsCount)
}

private def learnVocabPerPartition(words:RDD[String]) {

}

private def createExpTable(): Array[Double] = {
val expTable = new Array[Double](EXP_TABLE_SIZE)
private def createExpTable(): Array[Float] = {
val expTable = new Array[Float](EXP_TABLE_SIZE)
var i = 0
while (i < EXP_TABLE_SIZE) {
val tmp = math.exp((2.0 * i / EXP_TABLE_SIZE - 1.0) * MAX_EXP)
expTable(i) = tmp / (tmp + 1)
expTable(i) = (tmp / (tmp + 1.0)).toFloat
i += 1
}
expTable
Expand Down Expand Up @@ -209,7 +209,7 @@ class Word2Vec(
* @return a Word2VecModel
*/

def fit[S <: Iterable[String]](dataset:RDD[S]): Word2VecModel = {
def fit[S <: Iterable[String]](dataset: RDD[S]): Word2VecModel = {

val words = dataset.flatMap(x => x)

Expand All @@ -223,39 +223,37 @@ class Word2Vec(
val bcVocab = sc.broadcast(vocab)
val bcVocabHash = sc.broadcast(vocabHash)

val sentences: RDD[Array[Int]] = words.mapPartitions {
iter => { new Iterator[Array[Int]] {
def hasNext = iter.hasNext

def next = {
var sentence = new ArrayBuffer[Int]
var sentenceLength = 0
while (iter.hasNext && sentenceLength < MAX_SENTENCE_LENGTH) {
val word = bcVocabHash.value.get(iter.next)
word match {
case Some(w) => {
sentence += w
sentenceLength += 1
}
case None =>
}
val sentences: RDD[Array[Int]] = words.mapPartitions { iter =>
new Iterator[Array[Int]] {
def hasNext: Boolean = iter.hasNext

def next(): Array[Int] = {
var sentence = new ArrayBuffer[Int]
var sentenceLength = 0
while (iter.hasNext && sentenceLength < MAX_SENTENCE_LENGTH) {
val word = bcVocabHash.value.get(iter.next())
word match {
case Some(w) =>
sentence += w
sentenceLength += 1
case None =>
}
sentence.toArray
}
sentence.toArray
}
}
}

val newSentences = sentences.repartition(parallelism).cache()
var syn0Global
= Array.fill[Double](vocabSize * layer1Size)((Random.nextDouble - 0.5) / layer1Size)
var syn1Global = new Array[Double](vocabSize * layer1Size)
var syn0Global =
Array.fill[Float](vocabSize * layer1Size)((Random.nextFloat() - 0.5f) / layer1Size)
var syn1Global = new Array[Float](vocabSize * layer1Size)

for(iter <- 1 to numIterations) {
val (aggSyn0, aggSyn1, _, _) =
// TODO: broadcast temp instead of serializing it directly
// TODO: broadcast temp instead of serializing it directly
// or initialize the model in each executor
newSentences.treeAggregate((syn0Global.clone(), syn1Global.clone(), 0, 0))(
newSentences.treeAggregate((syn0Global, syn1Global, 0, 0))(
seqOp = (c, v) => (c, v) match {
case ((syn0, syn1, lastWordCount, wordCount), sentence) =>
var lwc = lastWordCount
Expand All @@ -280,23 +278,23 @@ class Word2Vec(
if (c >= 0 && c < sentence.size) {
val lastWord = sentence(c)
val l1 = lastWord * layer1Size
val neu1e = new Array[Double](layer1Size)
val neu1e = new Array[Float](layer1Size)
// Hierarchical softmax
var d = 0
while (d < bcVocab.value(word).codeLen) {
val l2 = bcVocab.value(word).point(d) * layer1Size
// Propagate hidden -> output
var f = blas.ddot(layer1Size, syn0, l1, 1, syn1, l2, 1)
var f = blas.sdot(layer1Size, syn0, l1, 1, syn1, l2, 1)
if (f > -MAX_EXP && f < MAX_EXP) {
val ind = ((f + MAX_EXP) * (EXP_TABLE_SIZE / MAX_EXP / 2.0)).toInt
f = expTable.value(ind)
val g = (1 - bcVocab.value(word).code(d) - f) * alpha
blas.daxpy(layer1Size, g, syn1, l2, 1, neu1e, 0, 1)
blas.daxpy(layer1Size, g, syn0, l1, 1, syn1, l2, 1)
val g = ((1 - bcVocab.value(word).code(d) - f) * alpha).toFloat
blas.saxpy(layer1Size, g, syn1, l2, 1, neu1e, 0, 1)
blas.saxpy(layer1Size, g, syn0, l1, 1, syn1, l2, 1)
}
d += 1
}
blas.daxpy(layer1Size, 1.0, neu1e, 0, 1, syn0, l1, 1)
blas.saxpy(layer1Size, 1.0f, neu1e, 0, 1, syn0, l1, 1)
}
}
a += 1
Expand All @@ -308,24 +306,24 @@ class Word2Vec(
combOp = (c1, c2) => (c1, c2) match {
case ((syn0_1, syn1_1, lwc_1, wc_1), (syn0_2, syn1_2, lwc_2, wc_2)) =>
val n = syn0_1.length
val weight1 = 1.0 * wc_1 / (wc_1 + wc_2)
val weight2 = 1.0 * wc_2 / (wc_1 + wc_2)
blas.dscal(n, weight1, syn0_1, 1)
blas.dscal(n, weight1, syn1_1, 1)
blas.daxpy(n, weight2, syn0_2, 1, syn0_1, 1)
blas.daxpy(n, weight2, syn1_2, 1, syn1_1, 1)
val weight1 = 1.0f * wc_1 / (wc_1 + wc_2)
val weight2 = 1.0f * wc_2 / (wc_1 + wc_2)
blas.sscal(n, weight1, syn0_1, 1)
blas.sscal(n, weight1, syn1_1, 1)
blas.saxpy(n, weight2, syn0_2, 1, syn0_1, 1)
blas.saxpy(n, weight2, syn1_2, 1, syn1_1, 1)
(syn0_1, syn1_1, lwc_1 + lwc_2, wc_1 + wc_2)
})
syn0Global = aggSyn0
syn1Global = aggSyn1
}
newSentences.unpersist()

val wordMap = new Array[(String, Array[Double])](vocabSize)
val wordMap = new Array[(String, Array[Float])](vocabSize)
var i = 0
while (i < vocabSize) {
val word = bcVocab.value(i).word
val vector = new Array[Double](layer1Size)
val vector = new Array[Float](layer1Size)
Array.copy(syn0Global, i * layer1Size, vector, 0, layer1Size)
wordMap(i) = (word, vector)
i += 1
Expand All @@ -341,15 +339,15 @@ class Word2Vec(
/**
* Word2Vec model
*/
class Word2VecModel (private val model:RDD[(String, Array[Double])]) extends Serializable {
class Word2VecModel (private val model:RDD[(String, Array[Float])]) extends Serializable {

private def cosineSimilarity(v1: Array[Double], v2: Array[Double]): Double = {
private def cosineSimilarity(v1: Array[Float], v2: Array[Float]): Double = {
require(v1.length == v2.length, "Vectors should have the same length")
val n = v1.length
val norm1 = blas.dnrm2(n, v1, 1)
val norm2 = blas.dnrm2(n, v2, 1)
val norm1 = blas.snrm2(n, v1, 1)
val norm2 = blas.snrm2(n, v2, 1)
if (norm1 == 0 || norm2 == 0) return 0.0
blas.ddot(n, v1, 1, v2,1) / norm1 / norm2
blas.sdot(n, v1, 1, v2,1) / norm1 / norm2
}

/**
Expand All @@ -362,7 +360,7 @@ class Word2VecModel (private val model:RDD[(String, Array[Double])]) extends Ser
if (result.isEmpty) {
throw new IllegalStateException(s"${word} not in vocabulary")
}
else Vectors.dense(result(0))
else Vectors.dense(result(0).map(_.toDouble))
}

/**
Expand Down Expand Up @@ -394,7 +392,7 @@ class Word2VecModel (private val model:RDD[(String, Array[Double])]) extends Ser
def findSynonyms(vector: Vector, num: Int): Array[(String, Double)] = {
require(num > 0, "Number of similar words should > 0")
val topK = model.map { case(w, vec) =>
(cosineSimilarity(vector.toArray, vec), w) }
(cosineSimilarity(vector.toArray.map(_.toFloat), vec), w) }
.sortByKey(ascending = false)
.take(num + 1)
.map(_.swap)
Expand All @@ -410,18 +408,16 @@ object Word2Vec{
* @param input RDD of words
* @param size vector dimension
* @param startingAlpha initial learning rate
* @param window context words from [-window, window]
* @param minCount minimum frequncy to consider a vocabulary word
* @return Word2Vec model
*/
* @param parallelism number of partitions to run Word2Vec (using a small number for accuracy)
* @param numIterations number of iterations, should be smaller than or equal to parallelism
* @return Word2Vec model
*/
def train[S <: Iterable[String]](
input: RDD[S],
size: Int,
startingAlpha: Double,
window: Int,
minCount: Int,
parallelism: Int = 1,
numIterations:Int = 1): Word2VecModel = {
new Word2Vec(size,startingAlpha, window, minCount, parallelism, numIterations).fit[S](input)
new Word2Vec(size,startingAlpha, parallelism, numIterations).fit[S](input)
}
}

0 comments on commit 384c771

Please sign in to comment.