-
Notifications
You must be signed in to change notification settings - Fork 14
/
UnpackedPhantomMetaDagWalker.scala
163 lines (146 loc) · 7.3 KB
/
UnpackedPhantomMetaDagWalker.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
package ducttape.hyperdag.walker
import collection._
import ducttape.hyperdag.PhantomHyperDag
import ducttape.hyperdag.UnpackedVertex
import ducttape.util.MultiSet
import ducttape.hyperdag.PackedVertex
import grizzled.slf4j.Logging
import ducttape.hyperdag.meta.PhantomMetaHyperDag
import ducttape.hyperdag.meta.UnpackedChainedMetaVertex
import ducttape.hyperdag.meta.UnpackedMetaVertex
import ducttape.hyperdag.HyperEdge
import scala.annotation.tailrec
/**
* The main job of this walker is to skip phantom vertices while remembering which phantom
* vertices were skipped. The skipped phantom vertices are then flattened into a
* [[ducttape.hyperdag.meta.UnpackedChainedMetaVertex]].
*
* See [[ducttape.hyperdag.meta.PhantomMetaHyperDag]] for definitions of terms and generic types.
* See [[ducttape.hyperdag.walker.UnpackedDagWalker]] for definitions of generic types D,F
*/
class UnpackedPhantomMetaDagWalker[V,M,H,E,D,F](
val dag: PhantomMetaHyperDag[V,M,H,E],
munger: RealizationMunger[Option[V],H,E,D,F],
vertexFilter: MetaVertexFilter[Option[V],H,E,D] = new DefaultMetaVertexFilter[Option[V],H,E,D],
toD: H => D = new DefaultToD[H],
traversal: Traversal = Arbitrary,
observer: UnpackedVertex[Option[V],H,E,D] => Unit = (v: UnpackedVertex[Option[V],H,E,D]) => { ; } )
(implicit ordering: Ordering[D])
extends Walker[UnpackedChainedMetaVertex[V,H,E,D]] with Logging {
/** get an exact replica of this walker, but starting the traversal over again */
def duplicate(): UnpackedPhantomMetaDagWalker[V,M,H,E,D,F] = {
new UnpackedPhantomMetaDagWalker(dag, munger, vertexFilter, toD, traversal, observer)
}
object MetaVertexFilterAdapter extends MetaVertexFilter[Option[V],H,E,D] {
override def apply(v: UnpackedMetaVertex[Option[V],H,E,D]) = v.packed.value match {
case Some(_) => vertexFilter(v)
case None => true
}
}
val delegate = new UnpackedMetaDagWalker[Option[V],M,H,E,D,F](
dag.delegate, munger, MetaVertexFilterAdapter, toD, traversal, observer)
// we must be able to recover the phantom-antecedents of non-phantom vertices
// so that we can properly populate their state maps
// unfortunately, this kills space complexity, since we must hold on to these until
// all members of a packed vertex have been unpacked
// TODO: Right now, we don't dectect when all members of a packed vertex have
// been unpacked so that we can reclaim space. Could we refcount them?
private val unpackedMap = new mutable.HashMap[(PackedVertex[Option[V]],Seq[D]), UnpackedMetaVertex[Option[V],H,E,D]]
private def getOnly[A](seq: Seq[A]): A = seq match {
case Seq(only) => only
case _ => throw new RuntimeException("phantom chains can only have single parents; " +
"expected exactly one element in: " + seq)
}
// this is only ever called from takeSkippingPhantoms, where it is guaranteed to have a lock on unpackedMap
// note: cannot be @tailrec
private def followPhantomChain(v: UnpackedMetaVertex[Option[V],H,E,D], edge: E, parentReal: Seq[D])
: Seq[(E, Seq[D])] = {
trace("Follow phantom chain at " + v)
v.packed.value match {
case Some(_) => Seq( (edge, parentReal) )
case None => {
trace("Zip: " + v.edges.zip(v.parentRealizations))
// use active hyperedges to find parents
val parents: Seq[(E,UnpackedMetaVertex[Option[V],H,E,D])]
= v.edges.zip(v.parentRealizations).
flatMap { case (hyperedge, parentReals) => {
import ducttape.util.Collections._
zip3(dag.delegate.sources(hyperedge), hyperedge.e, parentReals).map { case (parent, e, parentReal) =>
// parent: PackedVertex[Option[V]]
trace("Resolving hyperedge %s: parent is %s".format(hyperedge, parent))
val uv: UnpackedMetaVertex[Option[V],H,E,D] = unpackedMap( (parent, parentReal.sorted(ordering)) )
(e, uv)
}
}
}
trace("Parents of %s are: %s".format(v, parents))
parents match {
case Seq() => Seq( (edge, parentReal) ) // this is a leaf/terminal phantom vertex
case _ => {
parents.flatMap { case (e, unpackedV) =>
followPhantomChain(unpackedV, e, unpackedV.realization)
}
}
}
}
}
}
@tailrec
private def takeSkippingPhantoms(): Option[UnpackedChainedMetaVertex[V,H,E,D]] = delegate.take() match {
case None => None
case Some(umv) => umv.packed.value match {
case Some(packed) => {
trace("Begin unpacking chained meta vertex: " + umv)
// we can have phantom vertex chains of arbitrary length
// epsilons are already removed by our delegate
import ducttape.util.Collections._
val parentInfo: Seq[(Seq[E], Seq[Seq[D]])] = unpackedMap.synchronized {
zip3(umv.edges, umv.parentRealizations, umv.edges).
map { case (hyperedge, parentReals, hyperEdge) =>
val munged: Seq[(E, Seq[D])] = zip3(dag.delegate.sources(hyperedge), parentReals, hyperedge.e) flatMap {
case (parent, parentReal, edge) => {
trace("Begin backtracing phantom chain for " + parent)
// TODO: DON'T RE-SORT HERE
assert(parentReal != null, "parentReal should not be null")
val unpackedV = unpackedMap( (parent, parentReal.sorted(ordering)) )
val leafParents: Seq[(E, Seq[D])] = followPhantomChain(unpackedV, edge, parentReal)
leafParents
}
}
val finalEdges = munged.map(_._1)
val finalParentReals = munged.map(_._2)
(finalEdges, finalParentReals)
}
}
// TODO: This is a different meaning of "munge" versus the RealizationMunger...
val mungedEdges: Seq[Seq[E]] = parentInfo.map(_._1)
val mungedParentReals: Seq[Seq[Seq[D]]] = parentInfo.map(_._2)
val parentsSize = parentInfo.size
assert(parentsSize == umv.edges.size)
assert(parentsSize == umv.parentRealizations.size,
"Parent size %d != parentReal.size %d".format(parentsSize, umv.parentRealizations.size))
assert(parentsSize == mungedParentReals.size)
debug("Yielding: " + umv)
Some(new UnpackedChainedMetaVertex[V,H,E,D](umv.packed, mungedEdges, umv.realization, mungedParentReals, umv))
}
case None => {
// phantom: save for later
debug("Phantom skipping: " + umv)
unpackedMap.synchronized {
unpackedMap += (umv.packed, umv.realization.sorted(ordering)) -> umv
}
delegate.complete(umv)
takeSkippingPhantoms()
}
}
}
override def take(): Option[UnpackedChainedMetaVertex[V,H,E,D]] = takeSkippingPhantoms()
override def complete(item: UnpackedChainedMetaVertex[V,H,E,D], continue: Boolean = true) = {
debug("Completing " + item)
unpackedMap.synchronized {
unpackedMap += (item.packed, item.realization.seq.sorted(ordering)) -> item.dual
}
delegate.complete(item.dual, continue)
}
}