Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SkimPlus & RepositioningManager #1585

Merged
merged 33 commits into from
May 24, 2019
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7ff573b
SkimPlus added to BeamSkimmer
haitamlaarabi Mar 28, 2019
b068aee
fmt
haitamlaarabi Mar 28, 2019
74a2365
timer set up for relocation
haitamlaarabi Apr 4, 2019
554abea
repositioning manager print every x seconds the available vehicles an…
haitamlaarabi Apr 5, 2019
539a759
BeamSkimmer merge-conflict
haitamlaarabi Apr 5, 2019
73b01e4
Merge remote-tracking branch 'origin/master' into HL/#1540-rebalancin…
haitamlaarabi Apr 5, 2019
90d6b7b
conflict-merge-fmt
haitamlaarabi Apr 5, 2019
8d5c057
x => logger.info(x) to logger.info
haitamlaarabi Apr 5, 2019
79a3fd1
fmt
haitamlaarabi Apr 6, 2019
3a1e58d
Merge branch 'master' into HL/#1540-rebalancing-bis
haitamlaarabi Apr 6, 2019
f963019
first run ok
haitamlaarabi Apr 15, 2019
0e7cbb6
Merge remote-tracking branch 'origin/master' into HL/#1540-rebalancin…
haitamlaarabi Apr 15, 2019
18536bb
Merge branch 'master' into HL/#1540-rebalancing-bis
JustinPihony Apr 16, 2019
a379a6d
Merge branch 'HL/#1540-rebalancing-bis' of github.com:LBNL-UCB-STI/be…
haitamlaarabi Apr 16, 2019
2b94eb0
towrads generic reposition manager
haitamlaarabi Apr 19, 2019
80418ad
Merge remote-tracking branch 'origin/master' into HL/#1540-rebalancin…
haitamlaarabi Apr 19, 2019
22d9868
WIP a much better design but vehiclesForReposition always empty, I ne…
haitamlaarabi Apr 20, 2019
b81fb81
adding reposition listener
haitamlaarabi Apr 23, 2019
474144a
Merge branch 'master' into HL/#1540-rebalancing-bis
JustinPihony Apr 24, 2019
1ec2c8a
fmt
JustinPihony Apr 24, 2019
44a7672
fixing skimplus bug
haitamlaarabi May 11, 2019
7a8aaf0
reposition operational
haitamlaarabi May 14, 2019
6c21fa1
skimming relocation events instead of listening
haitamlaarabi May 16, 2019
2e12151
warm start in sflight 5k was poiting to a local file
haitamlaarabi May 16, 2019
1dfaeb3
update list of oversupplied and undersupplied
haitamlaarabi May 17, 2019
fe03bcf
pseudo reservation mechanism
haitamlaarabi May 21, 2019
f29fa4c
reposition validated on beamville
haitamlaarabi May 22, 2019
db60bd3
merge with develop part 1
haitamlaarabi May 22, 2019
f6df49f
Merge remote-tracking branch 'origin/develop' into HL/#1540-rebalanci…
haitamlaarabi May 22, 2019
65b0928
merge with develop p3 OK + not parked runtime exception
haitamlaarabi May 22, 2019
2ceee65
random distribution within area
haitamlaarabi May 22, 2019
e3e9130
bad completion notice resolved
haitamlaarabi May 23, 2019
e2bb19c
rolling back from pseudo reservation mechanism
haitamlaarabi May 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 147 additions & 3 deletions src/main/scala/beam/router/BeamSkimmer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package beam.router
import java.util.concurrent.TimeUnit

import beam.agentsim.agents.choice.mode.DrivingCost
import beam.agentsim.agents.vehicles.BeamVehicleType
import beam.agentsim.agents.vehicles.{BeamVehicle, BeamVehicleType}
import beam.agentsim.events.SpaceTime
import beam.agentsim.infrastructure.TAZTreeMap.TAZ
import beam.router.BeamRouter.Location
import beam.router.BeamSkimmer.Label
import beam.router.Modes.BeamMode
import beam.router.Modes.BeamMode.{
BIKE,
Expand All @@ -22,17 +24,21 @@ import beam.router.Modes.BeamMode.{
import beam.router.model.{BeamLeg, BeamPath, EmbodiedBeamTrip}
import beam.sim.common.GeoUtils
import beam.sim.config.BeamConfig
import beam.sim.vehiclesharing.VehicleManager
import beam.sim.{BeamServices, BeamWarmStart}
import beam.utils.{FileUtils, ProfilingUtils}
import com.google.inject.Inject
import com.typesafe.scalalogging.LazyLogging
import com.vividsolutions.jts.index.quadtree.Quadtree
import org.matsim.api.core.v01.population.Person
import org.matsim.api.core.v01.{Coord, Id}
import org.matsim.core.config.groups.TravelTimeCalculatorConfigGroup
import org.matsim.core.controler.events.IterationEndsEvent
import org.matsim.core.utils.io.IOUtils
import org.supercsv.io.CsvMapReader
import org.supercsv.prefs.CsvPreference

import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
import scala.language.implicitConversions
import scala.util.control.NonFatal
Expand Down Expand Up @@ -190,6 +196,16 @@ class BeamSkimmer @Inject()(val beamConfig: BeamConfig, val beamServices: BeamSe
}
}

def getPreviousSkimValueOrDefault(time: Int, mode: BeamMode, orig: Id[TAZ], dest: Id[TAZ]): Skim = {
previousSkims.get((timeToBin(time), mode, orig, dest)) match {
case Some(someSkim) => someSkim.toSkimExternal
case None =>
val origTaz = beamServices.tazTreeMap.getTAZ(orig).get.coord
val destTaz = beamServices.tazTreeMap.getTAZ(dest).get.coord
getSkimDefaultValue(mode, origTaz, destTaz, time, Id.create("NA", classOf[BeamVehicleType]), beamServices)
}
}

def observeTrip(
trip: EmbodiedBeamTrip,
generalizedTimeInHours: Double,
Expand Down Expand Up @@ -245,8 +261,8 @@ class BeamSkimmer @Inject()(val beamConfig: BeamConfig, val beamServices: BeamSe
}
}

def timeToBin(departTime: Int): Int = {
Math.floorMod(Math.floor(departTime.toDouble / 3600.0).toInt, 24)
def timeToBin(departTime: Int, timeWindow: Double = 3600.0): Int = {
Math.floorMod(Math.floor(departTime.toDouble / timeWindow).toInt, (30 * 3600 / timeWindow).toInt)
}

def mergeAverage(existingAverage: Double, existingCount: Int, newValue: Double): Double = {
Expand All @@ -263,12 +279,17 @@ class BeamSkimmer @Inject()(val beamConfig: BeamConfig, val beamServices: BeamSe
) {
writeAllModeSkimsForPeakNonPeakPeriods(event)
}
ProfilingUtils.timed(s"writeObservedSkimsPlus on iteration ${event.getIteration}", x => logger.info(x)) {
haitamlaarabi marked this conversation as resolved.
Show resolved Hide resolved
writeObservedSkimsPlus(event)
}
// Writing full skims are very large, but code is preserved here in case we want to enable it.
// TODO make this a configurable output "writeFullSkimsInterval" with default of 0
// if(beamServicesOpt.isDefined) writeFullSkims(event)

previousSkims = skims
skims = new TrieMap()
previousSkimsPlus = skimsPlus
skimsPlus = new TrieMap()
}

def getExcerptData(
Expand Down Expand Up @@ -474,6 +495,96 @@ class BeamSkimmer @Inject()(val beamConfig: BeamConfig, val beamServices: BeamSe
}
writer.close()
}

// **********
// Skim Plus
private var previousSkimsPlus: TrieMap[(TimeBin, Id[TAZ], Id[VehicleManager], Label), Double] =
initialPreviousSkimsPlus()
private var skimsPlus: TrieMap[(TimeBin, Id[TAZ], Id[VehicleManager], Label), Double] = TrieMap()
private val timeWindow: Int = 5 * 60
private def skimsPlusFilePath: Option[String] = {
val maxHour = TimeUnit.SECONDS.toHours(new TravelTimeCalculatorConfigGroup().getMaxTime).toInt
BeamWarmStart(beamConfig, maxHour).getWarmStartFilePath(observedSkimsPlusFileBaseName + ".csv.gz")
}

def getPreviousSkimPlusValues(
startTime: Int,
endTime: Int,
taz: Id[TAZ],
vehicleManager: Id[VehicleManager],
label: Label
): Vector[Double] = {
(timeToBin(startTime, timeWindow) to timeToBin(endTime, timeWindow))
.map { i =>
previousSkimsPlus.get((i, taz, vehicleManager, label))
}
.toVector
.flatten
}

def countEventsByTAZ(
time: Int,
location: Coord,
vehicleManager: Id[VehicleManager],
label: Label
): Unit = {
val timeBin = timeToBin(time, timeWindow)
val taz = beamServices.tazTreeMap.getTAZ(location.getX, location.getY)
val key = (timeBin, taz.tazId, vehicleManager, label)
skimsPlus.put(key, skimsPlus.getOrElse(key, 0.0) + 1.0)
}

def observeVehicleAvailabilityByTAZ(
time: Int,
vehicleManager: Id[VehicleManager],
label: Label,
vehicles: List[Any]
): Unit = {
val timeBin = timeToBin(time, timeWindow)
beamServices.tazTreeMap.getTAZs.foreach { taz =>
val key = (timeBin, taz.tazId, vehicleManager, label)
val count = vehicles.count(
v =>
taz == beamServices.tazTreeMap
.getTAZ(v.asInstanceOf[BeamVehicle].spaceTime.loc.getX, v.asInstanceOf[BeamVehicle].spaceTime.loc.getY)
)
skimsPlus.put(key, skimsPlus.getOrElse(key, 0.0) + count.toDouble)
}
}

def writeObservedSkimsPlus(event: IterationEndsEvent): Unit = {
val filePath = event.getServices.getControlerIO.getIterationFilename(
event.getServices.getIterationNumber,
BeamSkimmer.observedSkimsPlusFileBaseName + ".csv.gz"
)
val writer = IOUtils.getBufferedWriter(filePath)
writer.write(observedSkimsPlusHeader.mkString(","))
writer.write("\n")

skimsPlus.foreach {
case (k, v) =>
val (bin, taz, vehicleManager, label) = k
writer.write(s"$bin,$taz,$vehicleManager,$label,$v\n")
}
writer.close()
}

private def initialPreviousSkimsPlus(): TrieMap[(TimeBin, Id[TAZ], Id[VehicleManager], Label), Double] = {
if (beamConfig.beam.warmStart.enabled) {
try {
skimsPlusFilePath
.map(BeamSkimmer.readSkimPlusFile)
.getOrElse(TrieMap.empty)
} catch {
case NonFatal(ex) =>
logger.error(s"Could not load previous skim from '${skimsFilePath}': ${ex.getMessage}", ex)
TrieMap.empty
}
} else {
TrieMap.empty
}
}
// *********
}

object BeamSkimmer extends LazyLogging {
Expand Down Expand Up @@ -593,4 +704,37 @@ object BeamSkimmer extends LazyLogging {
}
res
}

// *******
// Skim Plus
private val observedSkimsPlusFileBaseName = "skimsPlus"
private val observedSkimsPlusHeader = "time,taz,manager,label,value".split(",")
type TimeBin = Int
type Label = String

private def readSkimPlusFile(filePath: String): TrieMap[(TimeBin, Id[TAZ], Id[VehicleManager], Label), Double] = {
val mapReader = new CsvMapReader(FileUtils.readerFromFile(filePath), CsvPreference.STANDARD_PREFERENCE)
val res = TrieMap[(TimeBin, Id[TAZ], Id[VehicleManager], Label), Double]()
try {
val header = mapReader.getHeader(true)
var line: java.util.Map[String, String] = mapReader.read(header: _*)
while (null != line) {
val time = line.get("time")
val tazid = line.get("taz")
val manager = line.get("manager")
val label = line.get("label")
val value = line.get("value").toDouble
res.put(
(time.toInt, Id.create(tazid, classOf[TAZ]), Id.create(manager, classOf[VehicleManager]), label),
value
)
line = mapReader.read(header: _*)
}
} finally {
if (null != mapReader)
mapReader.close()
}
res
}
//
}
7 changes: 1 addition & 6 deletions src/main/scala/beam/sim/BeamHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,7 @@ trait BeamHelper extends LazyLogging {
if (isMetricsEnable) Kamon.start(clusterConfig.withFallback(ConfigFactory.defaultReference()))

import akka.actor.{ActorSystem, DeadLetter, PoisonPill, Props}
import akka.cluster.singleton.{
ClusterSingletonManager,
ClusterSingletonManagerSettings,
ClusterSingletonProxy,
ClusterSingletonProxySettings
}
import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings}
import beam.router.ClusterWorkerRouter
import beam.sim.monitoring.DeadLetterReplayer

Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/beam/sim/BeamMobsim.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ class BeamMobsim @Inject()(
}

private val sharedVehicleFleets = config.agents.vehicles.sharedFleets.map { fleetConfig =>
context.actorOf(Fleets.lookup(fleetConfig).props(beamServices, parkingManager), fleetConfig.name)
context.actorOf(
Fleets.lookup(fleetConfig).props(beamServices, beamSkimmer, scheduler, parkingManager),
fleetConfig.name
)
}
sharedVehicleFleets.foreach(context.watch)
sharedVehicleFleets.foreach(scheduler ! ScheduleTrigger(InitializeTrigger(0), _))
Expand Down
50 changes: 50 additions & 0 deletions src/main/scala/beam/sim/config/BeamConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,12 @@ object BeamConfig {

object Vehicles {
case class SharedFleets$Elm(
fixed_non_reserving_fleet_from_file: scala.Option[
BeamConfig.Beam.Agentsim.Agents.Vehicles.SharedFleets$Elm.FixedNonReservingFleetFromFile
],
fixed_non_reserving_random_dist: scala.Option[
BeamConfig.Beam.Agentsim.Agents.Vehicles.SharedFleets$Elm.FixedNonReservingRandomlyDistributed
],
fixed_non_reserving: scala.Option[
BeamConfig.Beam.Agentsim.Agents.Vehicles.SharedFleets$Elm.FixedNonReserving
],
Expand All @@ -975,6 +981,36 @@ object BeamConfig {
)

object SharedFleets$Elm {
case class FixedNonReservingFleetFromFile(vehicleTypeId: java.lang.String, filePathCSV: java.lang.String)

object FixedNonReservingFleetFromFile {
def apply(
c: com.typesafe.config.Config
): BeamConfig.Beam.Agentsim.Agents.Vehicles.SharedFleets$Elm.FixedNonReservingFleetFromFile = {
BeamConfig.Beam.Agentsim.Agents.Vehicles.SharedFleets$Elm.FixedNonReservingFleetFromFile(
vehicleTypeId = if (c.hasPathOrNull("vehicleTypeId")) c.getString("vehicleTypeId") else "sharedCar",
filePathCSV = if (c.hasPathOrNull("filename")) c.getString("filename") else ""
)
}
}

case class FixedNonReservingRandomlyDistributed(
vehicleTypeId: java.lang.String,
fleetSize: Int
)

object FixedNonReservingRandomlyDistributed {

def apply(
c: com.typesafe.config.Config
): BeamConfig.Beam.Agentsim.Agents.Vehicles.SharedFleets$Elm.FixedNonReservingRandomlyDistributed = {
BeamConfig.Beam.Agentsim.Agents.Vehicles.SharedFleets$Elm.FixedNonReservingRandomlyDistributed(
vehicleTypeId = if (c.hasPathOrNull("vehicleTypeId")) c.getString("vehicleTypeId") else "sharedCar",
fleetSize = if (c.hasPathOrNull("fleetSize")) c.getInt("fleetSize") else 0
)
}
}

case class FixedNonReserving(
vehicleTypeId: java.lang.String
)
Expand Down Expand Up @@ -1007,6 +1043,20 @@ object BeamConfig {

def apply(c: com.typesafe.config.Config): BeamConfig.Beam.Agentsim.Agents.Vehicles.SharedFleets$Elm = {
BeamConfig.Beam.Agentsim.Agents.Vehicles.SharedFleets$Elm(
fixed_non_reserving_fleet_from_file =
if (c.hasPathOrNull("fixed_non_reserving_fleet_from_file"))
scala.Some(
BeamConfig.Beam.Agentsim.Agents.Vehicles.SharedFleets$Elm
.FixedNonReservingFleetFromFile(c.getConfig("fixed_non_reserving_fleet_from_file"))
)
else None,
fixed_non_reserving_random_dist =
if (c.hasPathOrNull("fixed_non_reserving_random_dist"))
scala.Some(
BeamConfig.Beam.Agentsim.Agents.Vehicles.SharedFleets$Elm
.FixedNonReservingRandomlyDistributed(c.getConfig("fixed_non_reserving_random_dist"))
)
else None,
fixed_non_reserving =
if (c.hasPathOrNull("fixed-non-reserving"))
scala.Some(
Expand Down
Loading