Skip to content

Commit

Permalink
Changes after review
Browse files Browse the repository at this point in the history
  • Loading branch information
pkukielka committed May 21, 2018
1 parent d467b59 commit c4a8403
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 166 deletions.
138 changes: 63 additions & 75 deletions src/compiler/scala/tools/nsc/Global.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ package nsc
import java.io.{File, FileNotFoundException, IOException}
import java.net.URL
import java.nio.charset.{Charset, CharsetDecoder, IllegalCharsetNameException, UnsupportedCharsetException}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.{immutable, mutable}
import io.{AbstractFile, Path, SourceReader}
import reporters.{Reporter, StoreReporter}
import util.{ClassPath, ThreadIdentityAwareThreadLocal, returning}
import reporters.{BufferedReporter, Reporter}
import util.{ClassPath, returning}
import scala.reflect.ClassTag
import scala.reflect.internal.util.{BatchSourceFile, NoSourceFile, ScalaClassLoader, ScriptSourceFile, SourceFile, StatisticsStatics}
import scala.reflect.internal.util.Parallel._
import scala.reflect.internal.pickling.PickleBuffer
import symtab.{Flags, SymbolTable, SymbolTrackers}
import symtab.classfile.Pickler
Expand Down Expand Up @@ -78,10 +78,10 @@ class Global(var currentSettings: Settings, reporter0: Reporter)

override def settings = currentSettings

private[this] val currentReporter: ThreadIdentityAwareThreadLocal[Reporter] =
ThreadIdentityAwareThreadLocal(new StoreReporter, reporter0)
private[this] val currentReporter: WorkerOrMainThreadLocal[Reporter] =
WorkerThreadLocal(new BufferedReporter, reporter0)

def reporter: Reporter = { reporter = reporter0 ; currentReporter.get }
def reporter: Reporter = currentReporter.get

def reporter_=(newReporter: Reporter): Unit =
currentReporter.set(newReporter match {
Expand Down Expand Up @@ -392,81 +392,75 @@ class Global(var currentSettings: Settings, reporter0: Reporter)

def apply(unit: CompilationUnit): Unit

def run() {
assertOnMainThread()
echoPhaseSummary(this)
Await.result(Future.sequence(currentRun.units map processUnit), Duration.Inf)
}
// Method added to allow stacking functionality on top of the`run` (e..g measuring run time).
// Overriding `run` is now not allowed since we want to be in charge of how units are processed.
def wrapRun(code: => Unit): Unit = code

final def applyPhase(unit: CompilationUnit): Unit = Await.result(processUnit(unit), Duration.Inf)
def afterUnit(unit: CompilationUnit): Unit = {}

implicit val ec: ExecutionContext = {
val threadPoolFactory = ThreadPoolFactory(Global.this, this)
val javaExecutor = threadPoolFactory.newUnboundedQueueFixedThreadPool(parallelThreads, "worker")
scala.concurrent.ExecutionContext.fromExecutorService(javaExecutor, (_) => ())
}
final def run(): Unit = wrapRun {
assertOnMain()

if (isDebugPrintEnabled) inform("[running phase " + name + " on " + currentRun.size + " compilation units]")

private def processUnit(unit: CompilationUnit)(implicit ec: ExecutionContext): Future[Unit] = {
if (settings.debug && (settings.verbose || currentRun.size < 5))
inform("[running phase " + name + " on " + unit + "]")

def runWithCurrentUnit(): Unit = {
val threadName = Thread.currentThread().getName
if (!threadName.contains("worker")) Thread.currentThread().setName(s"$threadName-worker")
val unit0 = currentUnit

try {
if ((unit ne null) && unit.exists) lastSeenSourceFile = unit.source
currentRun.currentUnit = unit
apply(unit)
} finally {
currentRun.currentUnit = unit0
currentRun.advanceUnit()
Thread.currentThread().setName(threadName)

// If we are on main thread it means there are no worker threads at all.
// That in turn means we were already using main reporter all the time, so there is nothing more to do.
// Otherwise we have to forward messages from worker thread reporter to main one.
reporter match {
case rep: StoreReporter =>
val mainReporter = currentReporter.main
mainReporter.synchronized {
rep.infos.foreach { info =>
info.severity.toString match {
case "INFO" => mainReporter.info(info.pos, info.msg, force = false)
case "WARNING" => mainReporter.warning(info.pos, info.msg)
case "ERROR" => mainReporter.error(info.pos, info.msg)
}
}
}
case _ =>
implicit val ec: ExecutionContextExecutorService = createExecutionContext()
val futures = currentRun.units.collect {
case unit if !cancelled(unit) =>
Future {
processUnit(unit)
afterUnit(unit)
reporter
}
}
}

if (cancelled(unit)) Future.successful(())
else if (isParallel) Future(runWithCurrentUnit())
else Future.fromTry(scala.util.Try(runWithCurrentUnit()))
futures.foreach { future =>
val workerReporter = Await.result(future, Duration.Inf)
workerReporter.asInstanceOf[BufferedReporter].flushTo(reporter)
}
}

final def applyPhase(unit: CompilationUnit): Unit = {
assertOnWorker()
if (!cancelled(unit)) processUnit(unit)
}

private def adjustWorkerThreadName(): Unit = {
val currentThreadName = Thread.currentThread().getName
private def processUnit(unit: CompilationUnit): Unit = {
assertOnWorker()

reporter = new BufferedReporter

if (isDebugPrintEnabled) inform("[running phase " + name + " on " + unit + "]")

val unit0 = currentUnit

try {
if ((unit ne null) && unit.exists) lastSeenSourceFile = unit.source
currentRun.currentUnit = unit
apply(unit)
} finally {
currentRun.currentUnit = unit0
currentRun.advanceUnit()
}
}

private def parallelThreads = settings.YparallelThreads.value
/* Only output a summary message under debug if we aren't echoing each file. */
private def isDebugPrintEnabled: Boolean = settings.debug && !(settings.verbose || currentRun.size < 5)

private def isParallel = settings.YparallelPhases.containsPhase(this)
private def createExecutionContext(): ExecutionContextExecutorService = {
val isParallel = settings.YparallelPhases.containsPhase(this)
val parallelThreads = if (isParallel) settings.YparallelThreads.value else 1
val threadPoolFactory = ThreadPoolFactory(Global.this, this)
val javaExecutor = threadPoolFactory.newUnboundedQueueFixedThreadPool(parallelThreads, "worker")
scala.concurrent.ExecutionContext.fromExecutorService(javaExecutor, _ => ())
}

/** Is current phase cancelled on this unit? */
private def cancelled(unit: CompilationUnit) = {
assertOnMain()
// run the typer only if in `createJavadoc` mode
val maxJavaPhase = if (createJavadoc) currentRun.typerPhase.id else currentRun.namerPhase.id
reporter.cancelled || unit.isJava && this.id > maxJavaPhase
}

private def assertOnMainThread(): Unit = {
assert("main".equals(Thread.currentThread().getName), "")
}
}

// phaseName = "parser"
Expand Down Expand Up @@ -996,7 +990,7 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
* of what file was being compiled when it broke. Since I really
* really want to know, this hack.
*/
protected var _lastSeenSourceFile: ThreadIdentityAwareThreadLocal[SourceFile] = ThreadIdentityAwareThreadLocal(NoSourceFile)
private[this] final val _lastSeenSourceFile: WorkerThreadLocal[SourceFile] = WorkerThreadLocal(NoSourceFile)
@inline protected def lastSeenSourceFile: SourceFile = _lastSeenSourceFile.get
@inline protected def lastSeenSourceFile_=(source: SourceFile): Unit = _lastSeenSourceFile.set(source)

Expand Down Expand Up @@ -1103,12 +1097,6 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
*/
override def currentRunId = curRunId

def echoPhaseSummary(ph: Phase) = {
/* Only output a summary message under debug if we aren't echoing each file. */
if (settings.debug && !(settings.verbose || currentRun.size < 5))
inform("[running phase " + ph.name + " on " + currentRun.size + " compilation units]")
}

def newSourceFile(code: String, filename: String = "<console>") =
new BatchSourceFile(filename, code)

Expand All @@ -1135,7 +1123,7 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
*/
var isDefined = false
/** The currently compiled unit; set from GlobalPhase */
private final val _currentUnit: ThreadIdentityAwareThreadLocal[CompilationUnit] = ThreadIdentityAwareThreadLocal(NoCompilationUnit, NoCompilationUnit)
private[this] final val _currentUnit: WorkerOrMainThreadLocal[CompilationUnit] = WorkerThreadLocal(NoCompilationUnit, NoCompilationUnit)
def currentUnit: CompilationUnit = _currentUnit.get
def currentUnit_=(unit: CompilationUnit): Unit = _currentUnit.set(unit)

Expand Down Expand Up @@ -1175,8 +1163,8 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
/** A map from compiled top-level symbols to their picklers */
val symData = new mutable.AnyRefMap[Symbol, PickleBuffer]

private var phasec: Int = 0 // phases completed
private final val unitc: AtomicInteger = new AtomicInteger(0) // units completed this phase
private var phasec: Int = 0 // phases completed
private final val unitc: Counter = new Counter // units completed this phase

def size = unitbuf.size
override def toString = "scalac Run for:\n " + compiledFiles.toList.sorted.mkString("\n ")
Expand Down Expand Up @@ -1297,7 +1285,7 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
* (for progress reporting)
*/
def advancePhase(): Unit = {
unitc.set(0)
unitc.reset()
phasec += 1
refreshProgress()
}
Expand All @@ -1312,7 +1300,7 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
// for sbt
def cancel(): Unit = { reporter.cancelled = true }

private def currentProgress = (phasec * size) + unitc.get()
private def currentProgress = (phasec * size) + unitc.get
private def totalProgress = (phaseDescriptors.size - 1) * size // -1: drops terminal phase
private def refreshProgress() = if (size > 0) progress(currentProgress, totalProgress)

Expand Down
4 changes: 2 additions & 2 deletions src/compiler/scala/tools/nsc/backend/jvm/GenBCode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ abstract class GenBCode extends SubComponent {

def apply(unit: CompilationUnit): Unit = codeGen.genUnit(unit)

override def run(): Unit = {
override def wrapRun(code: => Unit): Unit = {
statistics.timed(bcodeTimer) {
try {
initialize()
super.run() // invokes `apply` for each compilation unit
code // invokes `apply` for each compilation unit
generatedClassHandler.complete()
} finally {
this.close()
Expand Down
3 changes: 2 additions & 1 deletion src/compiler/scala/tools/nsc/profile/ThreadPoolFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.concurrent.ThreadPoolExecutor.AbortPolicy
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger

import scala.reflect.internal.util.Parallel.WorkerThread
import scala.tools.nsc.{Global, Phase}

sealed trait ThreadPoolFactory {
Expand Down Expand Up @@ -47,7 +48,7 @@ object ThreadPoolFactory {
// the thread pool and executes them (on the thread created here).
override def newThread(worker: Runnable): Thread = {
val wrapped = wrapWorker(worker, shortId)
val t: Thread = new Thread(group, wrapped, namePrefix + threadNumber.getAndIncrement, 0)
val t: Thread = new WorkerThread(group, wrapped, namePrefix + threadNumber.getAndIncrement, 0)
if (t.isDaemon != daemon) t.setDaemon(daemon)
if (t.getPriority != priority) t.setPriority(priority)
t
Expand Down
26 changes: 26 additions & 0 deletions src/compiler/scala/tools/nsc/reporters/BufferedReporter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package scala.tools.nsc.reporters

import scala.reflect.internal.util.Parallel.{assertOnMain, assertOnWorker}
import scala.reflect.internal.util.Position

final class BufferedReporter extends Reporter {
private[this] var buffered = List.empty[BufferedMessage]

protected def info0(pos: Position, msg: String, severity: Severity, force: Boolean): Unit = {
assertOnWorker()
buffered = BufferedMessage(pos, msg, severity, force) :: buffered
severity.count += 1
}

def flushTo(reporter: Reporter): Unit = {
assertOnMain()
val sev = Array(reporter.INFO, reporter.WARNING, reporter.ERROR)
buffered.reverse.foreach {
msg =>
reporter.info1(msg.pos, msg.msg, sev(msg.severity.id), msg.force)
}
buffered = Nil
}

private case class BufferedMessage(pos: Position, msg: String, severity: Severity, force: Boolean)
}
5 changes: 3 additions & 2 deletions src/compiler/scala/tools/nsc/transform/SpecializeTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,9 @@ abstract class SpecializeTypes extends InfoTransform with TypingTransformers {
override def newPhase(prev: scala.tools.nsc.Phase): StdPhase = new SpecializationPhase(prev)
class SpecializationPhase(prev: scala.tools.nsc.Phase) extends super.Phase(prev) {
override def checkable = false
override def run(): Unit = {
super.run()
override def wrapRun(code: => Unit): Unit = {
code

exitingSpecialize {
FunctionClass.seq.map(_.info)
TupleClass.seq.map(_.info)
Expand Down
12 changes: 6 additions & 6 deletions src/compiler/scala/tools/nsc/typechecker/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,17 @@ trait Analyzer extends AnyRef
// Lacking a better fix, we clear it here (before the phase is created, meaning for each
// compiler run). This is good enough for the resident compiler, which was the most affected.
undoLog.clear()
override def run(): Unit = {

override def afterUnit(unit: CompilationUnit): Unit = undoLog.clear()

override def wrapRun(code: => Unit): Unit = {
val start = if (StatisticsStatics.areSomeColdStatsEnabled) statistics.startTimer(statistics.typerNanos) else null
global.echoPhaseSummary(this)
for (unit <- currentRun.units) {
applyPhase(unit)
undoLog.clear()
}
code
// defensive measure in case the bookkeeping in deferred macro expansion is buggy
clearDelayed()
if (StatisticsStatics.areSomeColdStatsEnabled) statistics.stopTimer(statistics.typerNanos, start)
}

def apply(unit: CompilationUnit): Unit = {
try {
val typer = newTyper(rootContext(unit))
Expand Down

This file was deleted.

10 changes: 5 additions & 5 deletions src/reflect/scala/reflect/api/Trees.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package scala
package reflect
package api

import scala.reflect.runtime.ThreadIdentityAwareThreadLocal
import scala.reflect.internal.util.Parallel.WorkerThreadLocal

/**
* <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>
Expand Down Expand Up @@ -2464,9 +2464,9 @@ trait Trees { self: Universe =>
* @group Traversal
*/
class Traverser {
protected[scala] def currentOwner: Symbol = _currentOwner.get
protected[scala] def currentOwner_=(sym: Symbol): Unit = _currentOwner.set(sym)
private val _currentOwner: ThreadIdentityAwareThreadLocal[Symbol] = ThreadIdentityAwareThreadLocal[Symbol](rootMirror.RootClass)
@inline final protected[scala] def currentOwner: Symbol = _currentOwner.get
@inline final protected[scala] def currentOwner_=(sym: Symbol): Unit = _currentOwner.set(sym)
private final val _currentOwner: WorkerThreadLocal[Symbol] = WorkerThreadLocal(rootMirror.RootClass)

/** Traverse something which Trees contain, but which isn't a Tree itself. */
def traverseName(name: Name): Unit = ()
Expand Down Expand Up @@ -2540,7 +2540,7 @@ trait Trees { self: Universe =>
/** The current owner symbol. */
protected[scala] def currentOwner: Symbol = _currentOwner.get
protected[scala] def currentOwner_=(sym: Symbol): Unit = _currentOwner.set(sym)
private val _currentOwner: ThreadIdentityAwareThreadLocal[Symbol] = ThreadIdentityAwareThreadLocal[Symbol](rootMirror.RootClass)
private final val _currentOwner: WorkerThreadLocal[Symbol] = WorkerThreadLocal(rootMirror.RootClass)

/** The enclosing method of the currently transformed tree. */
protected def currentMethod = {
Expand Down
Loading

0 comments on commit c4a8403

Please sign in to comment.