Skip to content

Commit

Permalink
Readability improvements and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Feynman Liang committed Jul 28, 2015
1 parent 1235cfc commit c2caa5c
Showing 1 changed file with 34 additions and 30 deletions.
64 changes: 34 additions & 30 deletions mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,45 +103,49 @@ class PrefixSpan private (
// Convert min support to a min number of transactions for this dataset
val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong

val itemCounts = sequences
// Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold
val freqItemCounts = sequences
.flatMap(seq => seq.distinct.map(item => (item, 1L)))
.reduceByKey(_ + _)
.filter(_._2 >= minCount)
var allPatternAndCounts = itemCounts.map(x => (List(x._1), x._2))

val prefixSuffixPairs = {
val frequentItems = itemCounts.map(_._1).collect()
val candidates = sequences.map { p =>
p.filter (frequentItems.contains(_) )
}
candidates.flatMap { x =>
frequentItems.map { y =>
val sub = LocalPrefixSpan.getSuffix(y, x)
(List(y), sub)
}.filter(_._2.nonEmpty)
// Pairs of (length 1 prefix, suffix consisting of frequent items)
val itemSuffixPairs = {
val freqItems = freqItemCounts.keys.collect().toSet
sequences.flatMap { seq =>
freqItems.flatMap { item =>
val candidateSuffix = LocalPrefixSpan.getSuffix(item, seq.filter(freqItems.contains(_)))
candidateSuffix match {
case suffix if !suffix.isEmpty => Some((List(item), suffix))
case _ => None
}
}
}
}
var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = partitionByProjDBSize(prefixSuffixPairs)

while (largePrefixSuffixPairs.count() != 0) {
// Accumulator for the computed results to be returned
var resultsAccumulator = freqItemCounts.map(x => (List(x._1), x._2))

// Remaining work to be locally and distributively processed respectfully
var (pairsForLocal, pairsForDistributed) = partitionByProjDBSize(itemSuffixPairs)

// Continue processing until no pairs for distributed processing remain (i.e. all prefixes have
// projected database sizes <= `maxLocalProjDBSize`)
while (pairsForDistributed.count() != 0) {
val (nextPatternAndCounts, nextPrefixSuffixPairs) =
getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs)
largePrefixSuffixPairs.unpersist()
getPatternCountsAndPrefixSuffixPairs(minCount, pairsForDistributed)
pairsForDistributed.unpersist()
val (smallerPairsPart, largerPairsPart) = partitionByProjDBSize(nextPrefixSuffixPairs)
largePrefixSuffixPairs = largerPairsPart
largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
smallPrefixSuffixPairs ++= smallerPairsPart
allPatternAndCounts ++= nextPatternAndCounts
pairsForDistributed = largerPairsPart
pairsForDistributed.persist(StorageLevel.MEMORY_AND_DISK)
pairsForLocal ++= smallerPairsPart
resultsAccumulator ++= nextPatternAndCounts
}

if (smallPrefixSuffixPairs.count() > 0) {
val projectedDatabase = smallPrefixSuffixPairs
// TODO aggregateByKey
.groupByKey()
val nextPatternAndCounts = getPatternsInLocal(minCount, projectedDatabase)
allPatternAndCounts ++= nextPatternAndCounts
}
allPatternAndCounts.map { case (pattern, count) => (pattern.toArray, count) }
// Process the small projected databases locally
resultsAccumulator ++= getPatternsInLocal(minCount, pairsForLocal.groupByKey())

resultsAccumulator.map { case (pattern, count) => (pattern.toArray, count) }
}


Expand Down Expand Up @@ -177,8 +181,8 @@ class PrefixSpan private (
*/
private def getPatternCountsAndPrefixSuffixPairs(
minCount: Long,
prefixSuffixPairs: RDD[(List[Int], Array[Int])]):
(RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = {
prefixSuffixPairs: RDD[(List[Int], Array[Int])])
: (RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = {
val prefixAndFrequentItemAndCounts = prefixSuffixPairs
.flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L)) }
.reduceByKey(_ + _)
Expand Down

0 comments on commit c2caa5c

Please sign in to comment.