Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial structure and parallelization of parser #66

Open
wants to merge 16 commits into
base: 2.13.x
Choose a base branch
from
117 changes: 77 additions & 40 deletions src/compiler/scala/tools/nsc/Global.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ package nsc
import java.io.{File, FileNotFoundException, IOException}
import java.net.URL
import java.nio.charset.{Charset, CharsetDecoder, IllegalCharsetNameException, UnsupportedCharsetException}

import scala.collection.{immutable, mutable}
import io.{AbstractFile, Path, SourceReader}
import reporters.Reporter
import reporters.{BufferedReporter, Reporter}
import util.{ClassPath, returning}
import scala.reflect.ClassTag
import scala.reflect.internal.util.{BatchSourceFile, NoSourceFile, ScalaClassLoader, ScriptSourceFile, SourceFile, StatisticsStatics}
import scala.reflect.internal.util.Parallel._
import scala.reflect.internal.pickling.PickleBuffer
import symtab.{Flags, SymbolTable, SymbolTrackers}
import symtab.classfile.Pickler
Expand All @@ -26,12 +28,13 @@ import typechecker._
import transform.patmat.PatternMatching
import transform._
import backend.{JavaPlatform, ScalaPrimitives}
import backend.jvm.{GenBCode, BackendStats}
import scala.concurrent.Future
import backend.jvm.{BackendStats, GenBCode}
import scala.concurrent.duration.Duration
import scala.concurrent._
import scala.language.postfixOps
import scala.tools.nsc.ast.{TreeGen => AstTreeGen}
import scala.tools.nsc.classpath._
import scala.tools.nsc.profile.Profiler
import scala.tools.nsc.profile.{Profiler, ThreadPoolFactory}

class Global(var currentSettings: Settings, reporter0: Reporter)
extends SymbolTable
Expand Down Expand Up @@ -75,16 +78,16 @@ class Global(var currentSettings: Settings, reporter0: Reporter)

override def settings = currentSettings

private[this] var currentReporter: Reporter = { reporter = reporter0 ; currentReporter }

def reporter: Reporter = currentReporter
private[this] val currentReporter: WorkerOrMainThreadLocal[Reporter] =
WorkerThreadLocal(new BufferedReporter, reporter0)
def reporter: Reporter = currentReporter.get
def reporter_=(newReporter: Reporter): Unit =
currentReporter = newReporter match {
currentReporter.set(newReporter match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert Parallel.onMainThread?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't be true.
We are setting new one for every unit now.
It's required to ensure consistent ordering of the logs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should assert that we are on the main thread I think

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not the case here, please look into processUnit method.

case _: reporters.ConsoleReporter | _: reporters.LimitingReporter => newReporter
case _ if settings.maxerrs.isSetByUser && settings.maxerrs.value < settings.maxerrs.default =>
new reporters.LimitingReporter(settings, newReporter)
case _ => newReporter
}
})

/** Switch to turn on detailed type logs */
var printTypings = settings.Ytyperdebug.value
Expand Down Expand Up @@ -385,45 +388,81 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
abstract class GlobalPhase(prev: Phase) extends Phase(prev) {
phaseWithId(id) = this

def run(): Unit = {
echoPhaseSummary(this)
currentRun.units foreach applyPhase
}

def apply(unit: CompilationUnit): Unit

/** Is current phase cancelled on this unit? */
def cancelled(unit: CompilationUnit) = {
assertOnMain()
// run the typer only if in `createJavadoc` mode
val maxJavaPhase = if (createJavadoc) currentRun.typerPhase.id else currentRun.namerPhase.id
reporter.cancelled || unit.isJava && this.id > maxJavaPhase
}

final def withCurrentUnit(unit: CompilationUnit)(task: => Unit): Unit = {
if ((unit ne null) && unit.exists)
lastSeenSourceFile = unit.source
// Method added to allow stacking functionality on top of the`run` (e..g measuring run time).
// Overriding `run` is now not allowed since we want to be in charge of how units are processed.
def wrapRun(code: => Unit): Unit = code

def afterUnit(unit: CompilationUnit): Unit = {}

final def run(): Unit = wrapRun {
assertOnMain()

if (isDebugPrintEnabled) inform("[running phase " + name + " on " + currentRun.size + " compilation units]")

implicit val ec: ExecutionContextExecutorService = createExecutionContext()

def task(unit: CompilationUnit): Reporter = {
processUnit(unit)
afterUnit(unit)
reporter
}

val futures = currentRun.units.collect {
case unit if !cancelled(unit) =>
if(isParallel) Future(task(unit)) else Future.fromTry(scala.util.Try(asWorkerThread(task(unit))))
}

if (settings.debug && (settings.verbose || currentRun.size < 5))
inform("[running phase " + name + " on " + unit + "]")
if (!cancelled(unit)) {
currentRun.informUnitStarting(this, unit)
try withCurrentUnitNoLog(unit)(task)
finally currentRun.advanceUnit()
futures.foreach { future =>
val workerReporter = Await.result(future, Duration.Inf)
workerReporter.asInstanceOf[BufferedReporter].flushTo(reporter)
}
}

final def withCurrentUnitNoLog(unit: CompilationUnit)(task: => Unit): Unit = {
final def applyPhase(unit: CompilationUnit): Unit = {
assertOnWorker()
if (!cancelled(unit)) processUnit(unit)
}

private def processUnit(unit: CompilationUnit): Unit = {
assertOnWorker()

reporter = new BufferedReporter

if (isDebugPrintEnabled) inform("[running phase " + name + " on " + unit + "]")

val unit0 = currentUnit

try {
if ((unit ne null) && unit.exists) lastSeenSourceFile = unit.source
currentRun.currentUnit = unit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be parallel?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unluckily yes, at least as long as we have currentUnit.
lastSeenSourceFile is used inside error reporting which happens inside worker thread.
If we would make it global it could report errors on incorrect files.

That said I do not understand why we need it in first place.
Comment in Global.scala says:

  /** There are common error conditions where when the exception hits
   *  here, currentRun.currentUnit is null.  This robs us of the knowledge
   *  of what file was being compiled when it broke.  Since I really
   *  really want to know, this hack.
   */
  protected var lastSeenSourceFile: SourceFile = NoSourceFile

But i cannot see how lastSeenSourceFile can be set when currentRun.currentUnit is null.

task
apply(unit)
} finally {
//assert(currentUnit == unit)
currentRun.currentUnit = unit0
currentRun.advanceUnit()
}
}

final def applyPhase(unit: CompilationUnit) = withCurrentUnit(unit)(apply(unit))
/* Only output a summary message under debug if we aren't echoing each file. */
private def isDebugPrintEnabled: Boolean = settings.debug && !(settings.verbose || currentRun.size < 5)

private def isParallel = settings.YparallelPhases.containsPhase(this)

private def createExecutionContext(): ExecutionContextExecutorService = {
val parallelThreads = if (isParallel) settings.YparallelThreads.value else 1
val threadPoolFactory = ThreadPoolFactory(Global.this, this)
val javaExecutor = threadPoolFactory.newUnboundedQueueFixedThreadPool(parallelThreads, "worker")
scala.concurrent.ExecutionContext.fromExecutorService(javaExecutor, _ => ())
}
}

// phaseName = "parser"
Expand Down Expand Up @@ -953,7 +992,9 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
* of what file was being compiled when it broke. Since I really
* really want to know, this hack.
*/
protected var lastSeenSourceFile: SourceFile = NoSourceFile
private[this] final val _lastSeenSourceFile: WorkerThreadLocal[SourceFile] = WorkerThreadLocal(NoSourceFile)
@inline protected def lastSeenSourceFile: SourceFile = _lastSeenSourceFile.get
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will need to be final to @inline

Should be final anyway I think

The pattern is that we replace
<prot> var <name>: <type>
with

private[this] final _<name> = new ThreadIdentityAwareThreadLocal[<type>](...)
<prot> final def <name> : <type> = _<name>.get
<prot> final def <name>_=(newValue: <type>) = _<name>.set(newValue)

@inline protected def lastSeenSourceFile_=(source: SourceFile): Unit = _lastSeenSourceFile.set(source)

/** Let's share a lot more about why we crash all over the place.
* People will be very grateful.
Expand Down Expand Up @@ -1058,12 +1099,6 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
*/
override def currentRunId = curRunId

def echoPhaseSummary(ph: Phase) = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do we gain by changing this method. We need to keep the focus on the chnage, not optimisations and refactors that are orthoganal IMO

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's related to some extend.
I changed typer's run method to call super, previously it was re-implementing it.
echoPhaseSummary was one of the methods which are no longer needed when we are not re-implementing code from the run.

/* Only output a summary message under debug if we aren't echoing each file. */
if (settings.debug && !(settings.verbose || currentRun.size < 5))
inform("[running phase " + ph.name + " on " + currentRun.size + " compilation units]")
}

def newSourceFile(code: String, filename: String = "<console>") =
new BatchSourceFile(filename, code)

Expand All @@ -1090,7 +1125,9 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
*/
var isDefined = false
/** The currently compiled unit; set from GlobalPhase */
var currentUnit: CompilationUnit = NoCompilationUnit
private[this] final val _currentUnit: WorkerOrMainThreadLocal[CompilationUnit] = WorkerThreadLocal(NoCompilationUnit, NoCompilationUnit)
def currentUnit: CompilationUnit = _currentUnit.get
def currentUnit_=(unit: CompilationUnit): Unit = _currentUnit.set(unit)

val profiler: Profiler = Profiler(settings)
keepPhaseStack = settings.log.isSetByUser
Expand Down Expand Up @@ -1128,8 +1165,8 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
/** A map from compiled top-level symbols to their picklers */
val symData = new mutable.AnyRefMap[Symbol, PickleBuffer]

private var phasec: Int = 0 // phases completed
private var unitc: Int = 0 // units completed this phase
private var phasec: Int = 0 // phases completed
private final val unitc: Counter = new Counter // units completed this phase

def size = unitbuf.size
override def toString = "scalac Run for:\n " + compiledFiles.toList.sorted.mkString("\n ")
Expand Down Expand Up @@ -1250,22 +1287,22 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
* (for progress reporting)
*/
def advancePhase(): Unit = {
unitc = 0
unitc.reset()
phasec += 1
refreshProgress()
}
/** take note that a phase on a unit is completed
* (for progress reporting)
*/
def advanceUnit(): Unit = {
unitc += 1
unitc.incrementAndGet()
refreshProgress()
}

// for sbt
def cancel(): Unit = { reporter.cancelled = true }

private def currentProgress = (phasec * size) + unitc
private def currentProgress = (phasec * size) + unitc.get
private def totalProgress = (phaseDescriptors.size - 1) * size // -1: drops terminal phase
private def refreshProgress() = if (size > 0) progress(currentProgress, totalProgress)

Expand Down
1 change: 0 additions & 1 deletion src/compiler/scala/tools/nsc/ast/Positions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ trait Positions extends scala.reflect.internal.Positions {
self: Global =>

class ValidatingPosAssigner extends PosAssigner {
var pos: Position = _
override def traverse(t: Tree): Unit = {
if (t eq EmptyTree) ()
else if (t.pos == NoPosition) super.traverse(t setPos pos)
Expand Down
4 changes: 2 additions & 2 deletions src/compiler/scala/tools/nsc/backend/jvm/GenBCode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ abstract class GenBCode extends SubComponent {

def apply(unit: CompilationUnit): Unit = codeGen.genUnit(unit)

override def run(): Unit = {
override def wrapRun(code: => Unit): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to change genBcode?

statistics.timed(bcodeTimer) {
try {
initialize()
super.run() // invokes `apply` for each compilation unit
code // invokes `apply` for each compilation unit
generatedClassHandler.complete()
} finally {
this.close()
Expand Down
3 changes: 2 additions & 1 deletion src/compiler/scala/tools/nsc/profile/ThreadPoolFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.concurrent.ThreadPoolExecutor.AbortPolicy
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger

import scala.reflect.internal.util.Parallel.WorkerThread
import scala.tools.nsc.{Global, Phase}

sealed trait ThreadPoolFactory {
Expand Down Expand Up @@ -47,7 +48,7 @@ object ThreadPoolFactory {
// the thread pool and executes them (on the thread created here).
override def newThread(worker: Runnable): Thread = {
val wrapped = wrapWorker(worker, shortId)
val t: Thread = new Thread(group, wrapped, namePrefix + threadNumber.getAndIncrement, 0)
val t: Thread = new WorkerThread(group, wrapped, namePrefix + threadNumber.getAndIncrement, 0)
if (t.isDaemon != daemon) t.setDaemon(daemon)
if (t.getPriority != priority) t.setPriority(priority)
t
Expand Down
26 changes: 26 additions & 0 deletions src/compiler/scala/tools/nsc/reporters/BufferedReporter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package scala.tools.nsc.reporters

import scala.reflect.internal.util.Parallel.{assertOnMain, assertOnWorker}
import scala.reflect.internal.util.Position

final class BufferedReporter extends Reporter {
private[this] var buffered = List.empty[BufferedMessage]

protected def info0(pos: Position, msg: String, severity: Severity, force: Boolean): Unit = {
assertOnWorker()
buffered = BufferedMessage(pos, msg, severity, force) :: buffered
severity.count += 1
}

def flushTo(reporter: Reporter): Unit = {
assertOnMain()
val sev = Array(reporter.INFO, reporter.WARNING, reporter.ERROR)
buffered.reverse.foreach {
msg =>
reporter.info1(msg.pos, msg.msg, sev(msg.severity.id), msg.force)
}
buffered = Nil
}

private case class BufferedMessage(pos: Position, msg: String, severity: Severity, force: Boolean)
}
5 changes: 4 additions & 1 deletion src/compiler/scala/tools/nsc/settings/ScalaSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ trait ScalaSettings extends AbsScalaSettings
val reporter = StringSetting ("-Xreporter", "classname", "Specify a custom reporter for compiler messages.", "scala.tools.nsc.reporters.ConsoleReporter")
val source = ScalaVersionSetting ("-Xsource", "version", "Treat compiler input as Scala source for the specified version, see scala/bug#8126.", initial = ScalaVersion("2.13"))

val XnoPatmatAnalysis = BooleanSetting ("-Xno-patmat-analysis", "Don't perform exhaustivity/unreachability analysis. Also, ignore @switch annotation.")
val XnoPatmatAnalysis = BooleanSetting ("-Xno-patmat-analysis", "Don't perform exhaustivity/unreachability analysis. Also, ignore @switch annotation.")

val YparallelPhases = PhasesSetting ("-Yparallel-phases", "Which phases to run in parallel")
val YparallelThreads = IntSetting ("-Yparallel-threads", "worker threads for parallel compilation", 4, Some((0,64)), _ => None )

val XmixinForceForwarders = ChoiceSetting(
name = "-Xmixin-force-forwarders",
Expand Down
5 changes: 3 additions & 2 deletions src/compiler/scala/tools/nsc/transform/SpecializeTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,9 @@ abstract class SpecializeTypes extends InfoTransform with TypingTransformers {
override def newPhase(prev: scala.tools.nsc.Phase): StdPhase = new SpecializationPhase(prev)
class SpecializationPhase(prev: scala.tools.nsc.Phase) extends super.Phase(prev) {
override def checkable = false
override def run(): Unit = {
super.run()
override def wrapRun(code: => Unit): Unit = {
code

exitingSpecialize {
FunctionClass.seq.map(_.info)
TupleClass.seq.map(_.info)
Expand Down
12 changes: 6 additions & 6 deletions src/compiler/scala/tools/nsc/typechecker/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,17 @@ trait Analyzer extends AnyRef
// Lacking a better fix, we clear it here (before the phase is created, meaning for each
// compiler run). This is good enough for the resident compiler, which was the most affected.
undoLog.clear()
override def run(): Unit = {

override def afterUnit(unit: CompilationUnit): Unit = undoLog.clear()

override def wrapRun(code: => Unit): Unit = {
val start = if (StatisticsStatics.areSomeColdStatsEnabled) statistics.startTimer(statistics.typerNanos) else null
global.echoPhaseSummary(this)
for (unit <- currentRun.units) {
applyPhase(unit)
undoLog.clear()
}
code
// defensive measure in case the bookkeeping in deferred macro expansion is buggy
clearDelayed()
if (StatisticsStatics.areSomeColdStatsEnabled) statistics.stopTimer(statistics.typerNanos, start)
}

def apply(unit: CompilationUnit): Unit = {
try {
val typer = newTyper(rootContext(unit))
Expand Down
13 changes: 9 additions & 4 deletions src/reflect/scala/reflect/api/Trees.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package scala
package reflect
package api

import scala.reflect.internal.util.Parallel.WorkerThreadLocal

/**
* <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>
*
Expand Down Expand Up @@ -49,8 +51,7 @@ package api
* @groupprio Factories 1
* @groupname Copying Tree Copying
* @groupprio Copying 1
*
* @contentDiagram hideNodes "*Api"
* @contentDiagram hideNodes "*Api"
* @group ReflectionAPI
*/
trait Trees { self: Universe =>
Expand Down Expand Up @@ -2463,7 +2464,9 @@ trait Trees { self: Universe =>
* @group Traversal
*/
class Traverser {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if all of these/some of these Traversers are thread local themselves.
Probably a bigger change thuogh and higher risk

protected[scala] var currentOwner: Symbol = rootMirror.RootClass
@inline final protected[scala] def currentOwner: Symbol = _currentOwner.get
@inline final protected[scala] def currentOwner_=(sym: Symbol): Unit = _currentOwner.set(sym)
private final val _currentOwner: WorkerThreadLocal[Symbol] = WorkerThreadLocal(rootMirror.RootClass)

/** Traverse something which Trees contain, but which isn't a Tree itself. */
def traverseName(name: Name): Unit = ()
Expand Down Expand Up @@ -2535,7 +2538,9 @@ trait Trees { self: Universe =>
val treeCopy: TreeCopier = newLazyTreeCopier

/** The current owner symbol. */
protected[scala] var currentOwner: Symbol = rootMirror.RootClass
@inline protected[scala] final def currentOwner: Symbol = _currentOwner.get
@inline protected[scala] final def currentOwner_=(sym: Symbol): Unit = _currentOwner.set(sym)
private final val _currentOwner: WorkerThreadLocal[Symbol] = WorkerThreadLocal(rootMirror.RootClass)

/** The enclosing method of the currently transformed tree. */
protected def currentMethod = {
Expand Down
2 changes: 1 addition & 1 deletion src/reflect/scala/reflect/internal/Names.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ trait Names extends api.Names {
// detect performance regressions.
//
// Discussion: https://groups.google.com/forum/#!search/biased$20scala-internals/scala-internals/0cYB7SkJ-nM/47MLhsgw8jwJ
protected def synchronizeNames: Boolean = false
protected def synchronizeNames: Boolean = true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we do.
Umad reports violation in this class:

[WARN] Method accessed from multiple threads (main, scalac-parser-worker-1): scala.reflect.internal.SymbolTable.scala$reflect$internal$Names$$nc_$eq(int)
    scala.reflect.internal.SymbolTable.scala$reflect$internal$Names$$nc_$eq(SymbolTable.scala)
    scala.reflect.internal.Names.enterChars(Names.scala:78)
    scala.reflect.internal.Names.body$1(Names.scala:116)
    scala.reflect.internal.Names.newTermName(Names.scala:127)
    scala.reflect.internal.Names.newTermName$(Names.scala:96)
    scala.reflect.internal.SymbolTable.newTermName(SymbolTable.scala:18)
    scala.reflect.internal.Names.newTermName(Names.scala:83)
    scala.reflect.internal.Names.newTermName$(Names.scala:82)
    scala.reflect.internal.SymbolTable.newTermName(SymbolTable.scala:18)
    scala.reflect.internal.Names.newTermName(Names.scala:85)

What synchronizeNames do is enabling synchronization on the newTermName.
It looks like something designed exactly for our use case so I happily used it.
There were some concerns regarding enabling it globally though (mostly performance ones): https://groups.google.com/forum/#!search/biased$20scala-internals/scala-internals/0cYB7SkJ-nM/47MLhsgw8jwJ

private val nameLock: Object = new Object

/** Memory to store all names sequentially. */
Expand Down
Loading