Skip to content

Commit

Permalink
load population fixed #640 (#653)
Browse files Browse the repository at this point in the history
* Population replacement

* Population replacement, backup population

* Population replacement, backup population

* revert beam config

* added if statement

* fixed warnings

* fixed warmStart test

* scalafmt

* updating warmstart (tested on beamville - works)

* updating warmstart spec

* add comment.

* undo changes warmstart
  • Loading branch information
zishanbilal authored and wrashid committed Sep 26, 2018
1 parent 5b3e437 commit 22c320c
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 60 deletions.
31 changes: 31 additions & 0 deletions src/main/java/beam/utils/UnzipUtility.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.*;
import java.nio.file.Paths;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

Expand Down Expand Up @@ -54,6 +55,36 @@ public static void unzip(String zipFilePath, String destDirectory, boolean delet
}
}

// TODO: double check, if this is redundant
/**
* Extracts a zip file specified by the zipFilePath to a directory specified by
* destDirectory (will be created if does not exists)
*
* @param compressedFile Source directory for zip file
* @param decompressedFile Target directory for unzipping.
* @throws IOException Error on failure.
*/
public static void unGunzipFile(String compressedFile, String decompressedFile, boolean delete) throws IOException {
FileInputStream fileIn = new FileInputStream(compressedFile);
GZIPInputStream gZIPInputStream = new GZIPInputStream(fileIn);
FileOutputStream fileOutputStream = new FileOutputStream(decompressedFile);

byte[] buffer = new byte[BUFFER_SIZE];

int bytes_read;

while ((bytes_read = gZIPInputStream.read(buffer)) > 0) {
fileOutputStream.write(buffer, 0, bytes_read);
}

gZIPInputStream.close();
fileOutputStream.close();

if (delete) {
delete(Paths.get(compressedFile));
}
}

/**
* Extracts a zip entry (file entry)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class ModeChoiceMultinomialLogit(val beamServices: BeamServices, val model: Mult
def altsToModeCostTimeTransfers(
alternatives: IndexedSeq[EmbodiedBeamTrip]
): IndexedSeq[ModeCostTimeTransfer] = {
val walkTripStartTime = alternatives.filter(_.tripClassifier==WALK).headOption.map(_.legs.head.beamLeg.startTime)
val walkTripStartTime = alternatives.filter(_.tripClassifier == WALK).headOption.map(_.legs.head.beamLeg.startTime)
val transitFareDefaults =
TransitFareDefaults.estimateTransitFares(alternatives)
val gasolineCostDefaults =
Expand Down Expand Up @@ -148,7 +148,9 @@ class ModeChoiceMultinomialLogit(val beamServices: BeamServices, val model: Mult
}
val waitTime = altAndIdx._1.tripClassifier match {
case RIDE_HAIL =>
altAndIdx._1.legs.head.beamLeg.startTime - walkTripStartTime.getOrElse(altAndIdx._1.legs.head.beamLeg.startTime)
altAndIdx._1.legs.head.beamLeg.startTime - walkTripStartTime.getOrElse(
altAndIdx._1.legs.head.beamLeg.startTime
)
case RIDE_HAIL_TRANSIT =>
0 // TODO getting this would require we put wait time into EmbodiedBeamLeg, which is the right next step
case _ =>
Expand Down
24 changes: 12 additions & 12 deletions src/main/scala/beam/router/r5/R5RoutingWorker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ class R5RoutingWorker(workerParams: WorkerParameters) extends Actor with ActorLo
profileRequest.accessModes = util.EnumSet.of(request.accessMode)
profileRequest.egressModes = util.EnumSet.of(request.egressMode)
}
// log.debug(profileRequest.toString)
// log.debug(profileRequest.toString)
val result = try {
getPlan(profileRequest)
} catch {
Expand All @@ -396,12 +396,12 @@ class R5RoutingWorker(workerParams: WorkerParameters) extends Actor with ActorLo
case _: ArrayIndexOutOfBoundsException =>
new ProfileResponse
}
// log.debug(s"# options found = ${result.options.size()}")
// log.debug(s"# options found = ${result.options.size()}")
result
}

def calcRoute(routingRequest: RoutingRequest): RoutingResponse = {
// log.debug(routingRequest.toString)
// log.debug(routingRequest.toString)

// For each street vehicle (including body, if available): Route from origin to street vehicle, from street vehicle to destination.
val isRouteForPerson = routingRequest.streetVehicles.exists(_.mode == WALK)
Expand Down Expand Up @@ -804,10 +804,10 @@ class R5RoutingWorker(workerParams: WorkerParameters) extends Actor with ActorLo
routingRequest.streetVehicles.flatMap(vehicle => tripsForVehicle(vehicle))

if (!embodiedTrips.exists(_.tripClassifier == WALK)) {
// log.debug("No walk route found. {}", routingRequest)
// log.debug("No walk route found. {}", routingRequest)
val maybeBody = routingRequest.streetVehicles.find(_.mode == WALK)
if (maybeBody.isDefined) {
// log.debug("Adding dummy walk route with maximum street time.")
// log.debug("Adding dummy walk route with maximum street time.")
val dummyTrip = R5RoutingWorker.createBushwackingTrip(
beamServices.geo.utm2Wgs(new Coord(routingRequest.origin.getX, routingRequest.origin.getY)),
beamServices.geo.utm2Wgs(new Coord(routingRequest.destination.getX, routingRequest.destination.getY)),
Expand All @@ -821,7 +821,7 @@ class R5RoutingWorker(workerParams: WorkerParameters) extends Actor with ActorLo
Some(routingRequest.requestId)
)
} else {
// log.debug("Not adding a dummy walk route since agent has no body.")
// log.debug("Not adding a dummy walk route since agent has no body.")
RoutingResponse(
embodiedTrips,
routingRequest.staticRequestId,
Expand Down Expand Up @@ -1095,13 +1095,13 @@ class R5RoutingWorker(workerParams: WorkerParameters) extends Actor with ActorLo
new StreetSegment(streetPath, mode, transportNetwork.streetLayer)
option.addDirect(streetSegment, request.getFromTimeDateZD)
} else {
// log.debug("Direct mode {} last state wasn't found", mode)
// log.debug("Direct mode {} last state wasn't found", mode)
}
} else {
// log.debug("Direct mode {} destination wasn't found!", mode)
// log.debug("Direct mode {} destination wasn't found!", mode)
}
} else {
// log.debug("Direct mode {} origin wasn't found!", mode)
// log.debug("Direct mode {} origin wasn't found!", mode)
}
}
option.summary = option.generateSummary
Expand Down Expand Up @@ -1152,7 +1152,7 @@ class R5RoutingWorker(workerParams: WorkerParameters) extends Actor with ActorLo

foo(o1, o2)
})
// log.debug("Usefull paths:{}", usefullpathList.size)
// log.debug("Usefull paths:{}", usefullpathList.size)

for (path <- usefullpathList.asScala) {
profileResponse.addTransitPath(
Expand All @@ -1168,8 +1168,8 @@ class R5RoutingWorker(workerParams: WorkerParameters) extends Actor with ActorLo
} // latency possible candidate
}
profileResponse.recomputeStats(request)
// log.debug("Returned {} options", profileResponse.getOptions.size)
// log.debug("Took {} ms", System.currentTimeMillis - startRouting)
// log.debug("Returned {} options", profileResponse.getOptions.size)
// log.debug("Took {} ms", System.currentTimeMillis - startRouting)
profileResponse
}

Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/beam/sim/BeamHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ trait BeamHelper extends LazyLogging {
val networkCoordinator = new NetworkCoordinator(beamConfig)
networkCoordinator.loadNetwork()

val beamWarmStart = BeamWarmStart(beamConfig)
beamWarmStart.warmStartPopulationIfNeeded(matsimConfig)

val scenario = ScenarioUtils.loadScenario(matsimConfig).asInstanceOf[MutableScenario]
scenario.setNetwork(networkCoordinator.network)

Expand Down
30 changes: 15 additions & 15 deletions src/main/scala/beam/sim/BeamMobsim.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import beam.agentsim.agents.ridehail.RideHailManager.{
import beam.agentsim.agents.ridehail.{RideHailAgent, RideHailManager, RideHailSurgePricingManager}
import beam.agentsim.agents.vehicles.EnergyEconomyAttributes.Powertrain
import beam.agentsim.agents.vehicles._
import beam.agentsim.infrastructure.ParkingManager.ParkingStockAttributes
import beam.agentsim.infrastructure.{ParkingManager, TAZTreeMap, ZonalParkingManager}
import beam.agentsim.scheduler.{BeamAgentScheduler, Trigger}
import beam.agentsim.agents.{BeamAgent, InitializeTrigger, Population}
import beam.agentsim.infrastructure.ParkingManager.ParkingStockAttributes
import beam.agentsim.infrastructure.ZonalParkingManager
Expand All @@ -40,9 +37,10 @@ import org.matsim.api.core.v01.population.{Activity, Person}
import org.matsim.api.core.v01.{Coord, Id, Scenario}
import org.matsim.core.api.experimental.events.EventsManager
import org.matsim.core.mobsim.framework.Mobsim
import org.matsim.core.scenario.{MutableScenario, ScenarioUtils}
import org.matsim.core.utils.misc.Time
import org.matsim.households.Household
import org.matsim.vehicles.{Vehicle, VehicleType, VehicleUtils}
import org.matsim.vehicles.VehicleType

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -207,12 +205,13 @@ class BeamMobsim @Inject()(
private val numRideHailAgents = math.round(
beamServices.beamConfig.beam.agentsim.numAgents.toDouble * beamServices.beamConfig.beam.agentsim.agents.rideHail.numDriversAsFractionOfPopulation
)
private val rideHailVehicleType = BeamVehicleUtils
.getVehicleTypeById(
beamServices.beamConfig.beam.agentsim.agents.rideHail.vehicleTypeId,
scenario.getVehicles.getVehicleTypes
)
.getOrElse(scenario.getVehicles.getVehicleTypes.get(Id.create("1", classOf[VehicleType])))
private val rideHailVehicleType =
BeamVehicleUtils
.getVehicleTypeById(
beamServices.beamConfig.beam.agentsim.agents.rideHail.vehicleTypeId,
scenario.getVehicles.getVehicleTypes
)
.getOrElse(scenario.getVehicles.getVehicleTypes.get(Id.create("1", classOf[VehicleType])))

val quadTreeBounds: QuadTreeBounds = getQuadTreeBound(
scenario.getPopulation.getPersons
Expand Down Expand Up @@ -302,9 +301,8 @@ class BeamMobsim @Inject()(

val ridehailBeamVehicleTypeId =
Id.create(beamServices.beamConfig.beam.agentsim.agents.rideHail.vehicleTypeId, classOf[BeamVehicleType])
val ridehailBeamVehicleType = beamServices.vehicleTypes
.get(ridehailBeamVehicleTypeId)
.getOrElse(BeamVehicleType.defaultCarBeamVehicleType)
val ridehailBeamVehicleType =
beamServices.vehicleTypes.getOrElse(ridehailBeamVehicleTypeId, BeamVehicleType.defaultCarBeamVehicleType)

val rideHailAgentPersonId: Id[RideHailAgent] =
Id.create(rideHailName, classOf[RideHailAgent])
Expand Down Expand Up @@ -367,8 +365,10 @@ class BeamMobsim @Inject()(

Await.result(beamServices.beamRouter ? InitTransit(scheduler, parkingManager), timeout.duration)

if (beamServices.iterationNumber == 0)
new BeamWarmStart(beamServices).init()
if (beamServices.iterationNumber == 0) {
val warmStart = BeamWarmStart(beamServices.beamConfig)
warmStart.warmStartRouterIfNeeded(beamServices.beamRouter)
}

log.info(s"Transit schedule has been initialized")

Expand Down
78 changes: 54 additions & 24 deletions src/main/scala/beam/sim/BeamWarmStart.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,26 @@ package beam.sim
import java.io.File
import java.nio.file.{Files, Paths}

import akka.actor.ActorRef
import beam.router.BeamRouter.UpdateTravelTime
import beam.router.LinkTravelTimeContainer
import beam.router.{BeamRouter, LinkTravelTimeContainer}
import beam.sim.config.BeamConfig
import beam.utils.FileUtils.downloadFile
import beam.utils.UnzipUtility.unzip
import beam.utils.UnzipUtility._
import com.typesafe.scalalogging.LazyLogging
import org.apache.commons.io.FileUtils.getTempDirectoryPath
import org.apache.commons.io.FilenameUtils.{getBaseName, getExtension, getName}
import org.matsim.core.config.Config
import org.matsim.core.router.util.TravelTime
import org.matsim.core.scenario.{MutableScenario, ScenarioUtils}

import scala.compat.java8.StreamConverters._

class BeamWarmStart(val beamServices: BeamServices) extends LazyLogging {
private val beamConfig = beamServices.beamConfig
object BeamWarmStart {
def apply(beamConfig: BeamConfig): BeamWarmStart = new BeamWarmStart(beamConfig)
}

class BeamWarmStart(beamConfig: BeamConfig) extends LazyLogging {
// beamConfig.beam.warmStart.pathType=PARENT_RUN, ABSOLUTE_PATH
private val pathType = beamConfig.beam.warmStart.pathType
private val srcPath = beamConfig.beam.warmStart.path
Expand All @@ -30,13 +37,12 @@ class BeamWarmStart(val beamServices: BeamServices) extends LazyLogging {
/**
* initialize warm start mode.
*/
def init(): Unit = {
def warmStartRouterIfNeeded(beamRouter: ActorRef): Unit = {
if (!isWarmMode) return

getWarmStartPath match {
warmStartPath match {
case Some(statsPath) =>
if (Files.exists(Paths.get(statsPath))) {
beamServices.beamRouter ! UpdateTravelTime(getTravelTime(statsPath))
beamRouter ! UpdateTravelTime(getTravelTime(statsPath))
logger.info(s"Warm start mode initialized successfully from stats located at $statsPath.")
} else {
logger.warn(
Expand All @@ -47,24 +53,40 @@ class BeamWarmStart(val beamServices: BeamServices) extends LazyLogging {
}
}

private def getWarmStartPath: Option[String] = {
pathType match {
case "PARENT_RUN" =>
getWarmStartPath(getParentRunPath)

case "ABSOLUTE_PATH" =>
Files
.walk(Paths.get(srcPath))
.toScala[Stream]
.map(_.toString)
.find(_.endsWith(".linkstats.csv.gz"))

case _ =>
logger.warn(s"Warm start mode initialization failed, not a valid path type ( $pathType )")
None
def warmStartPopulationIfNeeded(matsimConfig: Config): Unit = {
if (isWarmMode) {
populationFilePath.foreach { file =>
matsimConfig.plans().setInputFile(file)
logger.info("Warm start population initialized successfully from file located at {}", file)
}
}
}

lazy val populationFilePath: Option[String] =
if (isWarmMode) {
val path = pathType match {
case "PARENT_RUN" => parentRunPath
case "ABSOLUTE_PATH" => srcPath
}
Some(loadPopulation(path, populationFile(path)))
} else None

private lazy val warmStartPath: Option[String] = pathType match {
case "PARENT_RUN" =>
getWarmStartPath(parentRunPath)

case "ABSOLUTE_PATH" =>
Files
.walk(Paths.get(srcPath))
.toScala[Stream]
.map(_.toString)
.find(_.endsWith(".linkstats.csv.gz"))

case _ =>
logger.warn(s"Warm start mode initialization failed, not a valid path type ( $pathType )")
None
}

private def getWarmStartPath(runPath: String) = {
val iterOption = Files
.walk(Paths.get(runPath))
Expand Down Expand Up @@ -93,7 +115,7 @@ class BeamWarmStart(val beamServices: BeamServices) extends LazyLogging {
}
}

private def getParentRunPath = {
private lazy val parentRunPath = {
if (isZipArchive(srcPath)) {
var archivePath = srcPath
if (isOutputBucketUrl(srcPath)) {
Expand All @@ -108,6 +130,14 @@ class BeamWarmStart(val beamServices: BeamServices) extends LazyLogging {
}
}

private def populationFile(runPath: String): String = Paths.get(runPath, "output_plans.xml.gz").toString

private def loadPopulation(runPath: String, populationFile: String): String = {
val plansPath = Paths.get(runPath, "output_plans.xml")
unGunzipFile(populationFile, plansPath.toString, false)
plansPath.toString
}

private def isOutputBucketUrl(source: String): Boolean = {
assert(source != null)
source.startsWith("https://s3.us-east-2.amazonaws.com/beam-outputs/")
Expand Down
12 changes: 6 additions & 6 deletions src/test/scala/beam/router/WarmStartRoutingSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.matsim.core.scenario.ScenarioUtils
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.when
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfterAll, Ignore, Matchers, WordSpecLike}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}

import scala.concurrent.duration._
import scala.language.postfixOps
Expand Down Expand Up @@ -125,12 +125,12 @@ class WarmStartRoutingSpec
)
)
)
val response = expectMsgType[RoutingResponse]
var response = expectMsgType[RoutingResponse]
assert(response.itineraries.exists(_.tripClassifier == CAR))
val carOption = response.itineraries.find(_.tripClassifier == CAR).get
assert(carOption.totalTravelTimeInSecs == 76)

new BeamWarmStart(services).init()
new BeamWarmStart(services.beamConfig).warmStartRouterIfNeeded(services.beamRouter)
router ! RoutingRequest(
origin,
destination,
Expand All @@ -145,9 +145,9 @@ class WarmStartRoutingSpec
)
)
)
val response2 = expectMsgType[RoutingResponse]
assert(response2.itineraries.exists(_.tripClassifier == CAR))
val carOption2 = response2.itineraries.find(_.tripClassifier == CAR).get
response = expectMsgType[RoutingResponse]
assert(response.itineraries.exists(_.tripClassifier == CAR))
val carOption2 = response.itineraries.find(_.tripClassifier == CAR).get
assert(carOption2.totalTravelTimeInSecs == 55)
}
}
Expand Down
2 changes: 1 addition & 1 deletion test/input/beamville/beam.conf
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ matsim.modules {
}
controler {
firstIteration = 0
lastIteration = 20
lastIteration = 1
eventsFileFormat = "xml"
overwriteFiles = "overwriteExistingFiles"
}
Expand Down

0 comments on commit 22c320c

Please sign in to comment.