Skip to content

Commit

Permalink
Depth first projections
Browse files Browse the repository at this point in the history
  • Loading branch information
Feynman Liang committed Jul 13, 2015
1 parent 70b93e3 commit 2e00cba
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.mllib.fpm
import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental

import scala.collection.mutable.ArrayBuffer

/**
*
* :: Experimental ::
Expand All @@ -36,80 +34,71 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable {
* @param minCount minimum count
* @param maxPatternLength maximum pattern length
* @param prefix prefix
* @param projectedDatabase the projected dabase
* @param database the projected dabase
* @return a set of sequential pattern pairs,
* the key of pair is sequential pattern (a list of items),
* the value of pair is the pattern's count.
*/
def run(
minCount: Long,
maxPatternLength: Int,
prefix: ArrayBuffer[Int],
projectedDatabase: Array[Array[Int]]): Iterator[(Array[Int], Long)] = {
val frequentPrefixAndCounts = getFreqItemAndCounts(minCount, projectedDatabase)
val frequentPatternAndCounts = frequentPrefixAndCounts
.map(x => ((prefix :+ x._1).toArray, x._2))
val prefixProjectedDatabases = getPatternAndProjectedDatabase(
prefix, frequentPrefixAndCounts.map(_._1), projectedDatabase)
prefix: List[Int],
database: Iterable[Array[Int]]): Iterator[(Array[Int], Long)] = {

if (database.isEmpty) return Iterator.empty

val frequentItemAndCounts = getFreqItemAndCounts(minCount, database)
val frequentItems = frequentItemAndCounts.map(_._1)
val frequentPatternAndCounts = frequentItemAndCounts
.map { case (item, count) => ((item :: prefix).reverse.toArray, count) }

if (prefixProjectedDatabases.nonEmpty && prefix.length + 1 < maxPatternLength) {
frequentPatternAndCounts.iterator ++ prefixProjectedDatabases.flatMap {
case (nextPrefix, projDB) => run(minCount, maxPatternLength, nextPrefix, projDB)
val filteredProjectedDatabase = database.map(x => x.filter(frequentItems.contains(_)))

if (prefix.length + 1 < maxPatternLength) {
frequentPatternAndCounts ++ frequentItems.flatMap { item =>
val nextProjected = project(filteredProjectedDatabase, item)
run(minCount, maxPatternLength, item :: prefix, nextProjected)
}
} else {
frequentPatternAndCounts.iterator
frequentPatternAndCounts
}
}

/**
* calculate suffix sequence following a prefix in a sequence
* @param prefix prefix
* @param sequence sequence
* Calculate suffix sequence immediately after the first occurrence of an item.
* @param item item to get suffix after
* @param sequence sequence to extract suffix from
* @return suffix sequence
*/
def getSuffix(prefix: Int, sequence: Array[Int]): Array[Int] = {
val index = sequence.indexOf(prefix)
def getSuffix(item: Int, sequence: Array[Int]): Array[Int] = {
val index = sequence.indexOf(item)
if (index == -1) {
Array()
} else {
sequence.drop(index + 1)
}
}

def project(database: Iterable[Array[Int]], prefix: Int): Iterable[Array[Int]] = {
database
.map(candidateSeq => getSuffix(prefix, candidateSeq))
.filter(_.nonEmpty)
}

/**
* Generates frequent items by filtering the input data using minimal count level.
* @param minCount the absolute minimum count
* @param sequences sequences data
* @return array of item and count pair
* @param minCount the minimum count for an item to be frequent
* @param database database of sequences
* @return item and count pairs
*/
private def getFreqItemAndCounts(
minCount: Long,
sequences: Array[Array[Int]]): Array[(Int, Long)] = {
sequences.flatMap(_.distinct)
database: Iterable[Array[Int]]): Iterator[(Int, Long)] = {
database.flatMap(_.distinct)
.foldRight(Map[Int, Long]().withDefaultValue(0L)) { case (item, ctr) =>
ctr + (item -> (ctr(item) + 1))
}
.filter(_._2 >= minCount)
.toArray
}

/**
* Get the frequent prefixes' projected database.
* @param prefix the frequent prefixes' prefix
* @param frequentPrefixes frequent next prefixes
* @param projDB projected database for given prefix
* @return extensions of prefix by one item and corresponding projected databases
*/
private def getPatternAndProjectedDatabase(
prefix: ArrayBuffer[Int],
frequentPrefixes: Array[Int],
projDB: Array[Array[Int]]): Array[(ArrayBuffer[Int], Array[Array[Int]])] = {
val filteredProjectedDatabase = projDB.map(x => x.filter(frequentPrefixes.contains(_)))
frequentPrefixes.map { nextItem =>
val nextProjDB = filteredProjectedDatabase
.map(candidateSeq => getSuffix(nextItem, candidateSeq))
.filter(_.nonEmpty)
(prefix :+ nextItem, nextProjDB)
}.filter(x => x._2.nonEmpty)
.iterator
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class PrefixSpan private (
minCount: Long,
data: RDD[(Array[Int], Array[Array[Int]])]): RDD[(Array[Int], Long)] = {
data.flatMap { case (prefix, projDB) =>
LocalPrefixSpan.run(minCount, maxPatternLength, prefix.to[ArrayBuffer], projDB)
LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList, projDB)
}
}
}

0 comments on commit 2e00cba

Please sign in to comment.