Skip to content

Commit

Permalink
Noise injection in R5
Browse files Browse the repository at this point in the history
  • Loading branch information
REASY committed Apr 2, 2020
1 parent 6b58ca2 commit 4d54b6b
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 56 deletions.
1 change: 1 addition & 0 deletions src/main/resources/beam-template.conf
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ beam.routing {
osmMapdbFile = ${beam.inputDirectory}"/r5/osm.mapdb"
mNetBuilder.fromCRS = "EPSG:4326" # WGS84
mNetBuilder.toCRS = "EPSG:26910" # UTM10N
travelTimeNoiseFraction = "double | 0.0"
}
startingIterationForTravelTimesMSA = "int | 0"
}
Expand Down
209 changes: 157 additions & 52 deletions src/main/scala/beam/router/r5/R5RoutingWorker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package beam.router.r5
import java.time.temporal.ChronoUnit
import java.time.{ZoneOffset, ZonedDateTime}
import java.util
import java.util.concurrent.{ExecutorService, Executors}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ExecutorService, Executors, ThreadLocalRandom}
import java.util.{Collections, Optional}

import akka.actor._
Expand Down Expand Up @@ -136,7 +137,11 @@ class R5RoutingWorker(workerParams: WorkerParameters) extends Actor with ActorLo

private var workAssigner: ActorRef = context.parent

private var r5: R5Wrapper = new R5Wrapper(workerParams, new FreeFlowTravelTime)
private var r5: R5Wrapper = new R5Wrapper(
workerParams,
new FreeFlowTravelTime,
workerParams.beamConfig.beam.routing.r5.travelTimeNoiseFraction
)

private val linksBelowMinCarSpeed =
workerParams.networkHelper.allLinks
Expand Down Expand Up @@ -197,7 +202,27 @@ class R5RoutingWorker(workerParams: WorkerParameters) extends Actor with ActorLo
if (firstMsgTime.isEmpty) firstMsgTime = Some(ZonedDateTime.now(ZoneOffset.UTC))
val eventualResponse = Future {
latency("request-router-time", Metrics.RegularLevel) {
r5.calcRoute(request)
val routeWithNoise = r5.calcRoute(request)
if (workerParams.beamConfig.beam.routing.r5.travelTimeNoiseFraction > 0) {
val itinerariesWithoutNoise = routeWithNoise.itineraries.map { itinerary =>
if (!itinerary.tripClassifier.isTransit) {
val newLegs = itinerary.legs.map { leg =>
if (leg.beamLeg.mode == BeamMode.CAR) {
val updatedLeg = r5.createBeamLeg(
leg.beamVehicleTypeId,
leg.beamLeg.travelPath.startPoint,
leg.beamLeg.travelPath.endPoint.loc,
leg.beamLeg.mode.r5Mode.get.left.get,
leg.beamLeg.travelPath.linkIds
)
leg.copy(beamLeg = updatedLeg)
} else leg
}
itinerary.copy(legs = newLegs)
} else itinerary
}
routeWithNoise.copy(itineraries = itinerariesWithoutNoise)
} else routeWithNoise
}
}
eventualResponse.recover {
Expand All @@ -208,14 +233,19 @@ class R5RoutingWorker(workerParams: WorkerParameters) extends Actor with ActorLo
askForMoreWork()

case UpdateTravelTimeLocal(newTravelTime) =>
r5 = new R5Wrapper(workerParams, newTravelTime)
r5 = new R5Wrapper(
workerParams,
newTravelTime,
workerParams.beamConfig.beam.routing.r5.travelTimeNoiseFraction
)
log.info(s"{} UpdateTravelTimeLocal. Set new travel time", getNameAndHashCode)
askForMoreWork()

case UpdateTravelTimeRemote(map) =>
r5 = new R5Wrapper(
workerParams,
TravelTimeCalculatorHelper.CreateTravelTimeCalculator(workerParams.beamConfig.beam.agentsim.timeBinSize, map)
TravelTimeCalculatorHelper.CreateTravelTimeCalculator(workerParams.beamConfig.beam.agentsim.timeBinSize, map),
workerParams.beamConfig.beam.routing.r5.travelTimeNoiseFraction
)
log.info(
s"{} UpdateTravelTimeRemote. Set new travel time from map with size {}",
Expand All @@ -239,7 +269,8 @@ class R5RoutingWorker(workerParams: WorkerParameters) extends Actor with ActorLo
if (workAssigner != null) workAssigner ! GimmeWork //Master will retry if it hasn't heard
}

class R5Wrapper(workerParams: WorkerParameters, travelTime: TravelTime) extends MetricsSupport {
class R5Wrapper(workerParams: WorkerParameters, travelTime: TravelTime, travelTimeNoiseFraction: Double)
extends MetricsSupport {

private val WorkerParameters(
beamConfig,
Expand All @@ -261,11 +292,11 @@ class R5Wrapper(workerParams: WorkerParameters, travelTime: TravelTime) extends
vehicleId: Id[Vehicle],
vehicleTypeId: Id[BeamVehicleType],
embodyRequestId: Int
) = {
): RoutingResponse = {
val linksTimesAndDistances = RoutingModel.linksToTimeAndDistance(
leg.travelPath.linkIds,
leg.startTime,
travelTimeByLinkCalculator(vehicleTypes(vehicleTypeId)),
travelTimeByLinkCalculator(vehicleTypes(vehicleTypeId), shouldAddNoise = false),
toR5StreetMode(leg.mode),
transportNetwork.streetLayer
)
Expand Down Expand Up @@ -331,7 +362,11 @@ class R5Wrapper(workerParams: WorkerParameters, travelTime: TravelTime) extends
for (mode <- profileRequest.directModes.asScala) {
val streetRouter = new StreetRouter(
transportNetwork.streetLayer,
travelTimeCalculator(vehicleTypes(request.beamVehicleTypeId), profileRequest.fromTime),
travelTimeCalculator(
vehicleTypes(request.beamVehicleTypeId),
profileRequest.fromTime,
shouldAddNoise = !profileRequest.hasTransit
), // Add error if it is not transit
turnCostCalculator,
travelCostCalculator(request.timeValueOfMoney, profileRequest.fromTime)
)
Expand Down Expand Up @@ -444,7 +479,14 @@ class R5Wrapper(workerParams: WorkerParameters, travelTime: TravelTime) extends
)
} else {
val streetSegment = profileResponse.options.get(0).access.get(0)
Some(buildStreetBasedLegs(streetSegment, request.departureTime, body, unbecomeDriverOnCompletion = false))
Some(
buildStreetBasedLegs(
streetSegment,
request.departureTime,
body,
unbecomeDriverOnCompletion = false
)
)
}
} else {
Some(
Expand Down Expand Up @@ -500,7 +542,12 @@ class R5Wrapper(workerParams: WorkerParameters, travelTime: TravelTime) extends
}
if (!profileResponse.options.isEmpty) {
val streetSegment = profileResponse.options.get(0).access.get(0)
buildStreetBasedLegs(streetSegment, time, vehicle, unbecomeDriverOnCompletion = true)
buildStreetBasedLegs(
streetSegment,
time,
vehicle,
unbecomeDriverOnCompletion = true
)
} else {
EmbodiedBeamLeg(
createBushwackingBeamLeg(request.departureTime, from, to, geo),
Expand Down Expand Up @@ -605,7 +652,11 @@ class R5Wrapper(workerParams: WorkerParameters, travelTime: TravelTime) extends
profileRequest.toTime = profileRequest.fromTime + 61 // Important to allow 61 seconds for transit schedules to be considered!
val streetRouter = new StreetRouter(
transportNetwork.streetLayer,
travelTimeCalculator(vehicleTypes(vehicle.vehicleTypeId), profileRequest.fromTime),
travelTimeCalculator(
vehicleTypes(vehicle.vehicleTypeId),
profileRequest.fromTime,
shouldAddNoise = !profileRequest.hasTransit()
),
turnCostCalculator,
travelCostCalculator(request.timeValueOfMoney, profileRequest.fromTime)
)
Expand Down Expand Up @@ -645,7 +696,11 @@ class R5Wrapper(workerParams: WorkerParameters, travelTime: TravelTime) extends
} else if (profileRequest.streetTime * 60 > streetRouter.timeLimitSeconds) {
val streetRouter = new StreetRouter(
transportNetwork.streetLayer,
travelTimeCalculator(vehicleTypes(vehicle.vehicleTypeId), profileRequest.fromTime),
travelTimeCalculator(
vehicleTypes(vehicle.vehicleTypeId),
profileRequest.fromTime,
shouldAddNoise = !profileRequest.hasTransit
),
turnCostCalculator,
travelCostCalculator(request.timeValueOfMoney, profileRequest.fromTime)
)
Expand Down Expand Up @@ -702,7 +757,11 @@ class R5Wrapper(workerParams: WorkerParameters, travelTime: TravelTime) extends
profileRequest.toLat = to.getY
val streetRouter = new StreetRouter(
transportNetwork.streetLayer,
travelTimeCalculator(vehicleTypes(vehicle.vehicleTypeId), profileRequest.fromTime),
travelTimeCalculator(
vehicleTypes(vehicle.vehicleTypeId),
profileRequest.fromTime,
shouldAddNoise = !profileRequest.hasTransit
),
turnCostCalculator,
travelCostCalculator(request.timeValueOfMoney, profileRequest.fromTime)
)
Expand Down Expand Up @@ -888,15 +947,25 @@ class R5Wrapper(workerParams: WorkerParameters, travelTime: TravelTime) extends
.toInt
if (transitSegment.middle != null) {
val body = request.streetVehicles.find(_.mode == WALK).get
embodiedBeamLegs += buildStreetBasedLegs(transitSegment.middle, arrivalTime, body, false)
embodiedBeamLegs += buildStreetBasedLegs(
transitSegment.middle,
arrivalTime,
body,
unbecomeDriverOnCompletion = false
)
arrivalTime = arrivalTime + transitSegment.middle.duration
}
}

if (itinerary.connection.egress != null) {
val egress = option.egress.get(itinerary.connection.egress)
val vehicle = egressVehicles.find(v => v.mode.r5Mode.get.left.get == egress.mode).get
embodiedBeamLegs += buildStreetBasedLegs(egress, arrivalTime, vehicle, true)
embodiedBeamLegs += buildStreetBasedLegs(
egress,
arrivalTime,
vehicle,
unbecomeDriverOnCompletion = true
)
val body = request.streetVehicles.find(_.mode == WALK).get
if (isRouteForPerson && egress.mode != LegMode.WALK) {
embodiedBeamLegs += EmbodiedBeamLeg(
Expand Down Expand Up @@ -970,40 +1039,22 @@ class R5Wrapper(workerParams: WorkerParameters, travelTime: TravelTime) extends
vehicle: StreetVehicle,
unbecomeDriverOnCompletion: Boolean
): EmbodiedBeamLeg = {
val startPoint = SpaceTime(
segment.geometry.getStartPoint.getX,
segment.geometry.getStartPoint.getY,
tripStartTime
)
val endCoord = new Coord(
segment.geometry.getEndPoint.getX,
segment.geometry.getEndPoint.getY,
)

var activeLinkIds = ArrayBuffer[Int]()
for (edge: StreetEdgeInfo <- segment.streetEdges.asScala) {
activeLinkIds += edge.edgeId.intValue()
}
val linksTimesDistances = RoutingModel.linksToTimeAndDistance(
activeLinkIds,
tripStartTime,
travelTimeByLinkCalculator(vehicleTypes(vehicle.vehicleTypeId)),
toR5StreetMode(segment.mode),
transportNetwork.streetLayer
)
val distance = linksTimesDistances.distances.tail.sum // note we exclude the first link to keep with MATSim convention
val theTravelPath = BeamPath(
activeLinkIds,
linksTimesDistances.travelTimes,
None,
SpaceTime(
segment.geometry.getStartPoint.getX,
segment.geometry.getStartPoint.getY,
tripStartTime
),
SpaceTime(
segment.geometry.getEndPoint.getX,
segment.geometry.getEndPoint.getY,
tripStartTime + math.round(linksTimesDistances.travelTimes.tail.sum.toFloat)
),
distance
)
val beamLeg = BeamLeg(
tripStartTime,
mapLegMode(segment.mode),
theTravelPath.duration,
travelPath = theTravelPath
)
val beamLeg: BeamLeg =
createBeamLeg(vehicle.vehicleTypeId, startPoint, endCoord, segment.mode, activeLinkIds)
val toll = if (segment.mode == LegMode.CAR) {
val osm = segment.streetEdges.asScala
.map(
Expand All @@ -1013,7 +1064,7 @@ class R5Wrapper(workerParams: WorkerParameters, travelTime: TravelTime) extends
.getOSMID
)
.toVector
tollCalculator.calcTollByOsmIds(osm) + tollCalculator.calcTollByLinkIds(theTravelPath)
tollCalculator.calcTollByOsmIds(osm) + tollCalculator.calcTollByLinkIds(beamLeg.travelPath)
} else 0.0
val drivingCost = if (segment.mode == LegMode.CAR) {
DrivingCost.estimateDrivingCost(beamLeg, vehicleTypes(vehicle.vehicleTypeId), fuelTypePrices)
Expand All @@ -1028,6 +1079,39 @@ class R5Wrapper(workerParams: WorkerParameters, travelTime: TravelTime) extends
)
}

def createBeamLeg(
vehicleTypeId: Id[BeamVehicleType],
startPoint: SpaceTime,
endCoord: Coord,
legMode: LegMode,
activeLinkIds: IndexedSeq[Int]
): BeamLeg = {
val tripStartTime: Int = startPoint.time
val linksTimesDistances = RoutingModel.linksToTimeAndDistance(
activeLinkIds,
tripStartTime,
travelTimeByLinkCalculator(vehicleTypes(vehicleTypeId), shouldAddNoise = false),
toR5StreetMode(legMode),
transportNetwork.streetLayer
)
val distance = linksTimesDistances.distances.tail.sum // note we exclude the first link to keep with MATSim convention
val theTravelPath = BeamPath(
linkIds = activeLinkIds,
linkTravelTime = linksTimesDistances.travelTimes,
transitStops = None,
startPoint = startPoint,
endPoint = SpaceTime(endCoord, startPoint.time + math.round(linksTimesDistances.travelTimes.tail.sum.toFloat)),
distanceInM = distance
)
val beamLeg = BeamLeg(
tripStartTime,
mapLegMode(legMode),
theTravelPath.duration,
travelPath = theTravelPath
)
beamLeg
}

/**
* Use to extract a collection of FareSegments for an itinerary.
*
Expand Down Expand Up @@ -1115,15 +1199,31 @@ class R5Wrapper(workerParams: WorkerParameters, travelTime: TravelTime) extends

private def getStopId(stop: Stop) = stop.stopId.split(":")(1)

private def travelTimeCalculator(vehicleType: BeamVehicleType, startTime: Int): TravelTimeCalculator = {
val ttc = travelTimeByLinkCalculator(vehicleType)
private def travelTimeCalculator(
vehicleType: BeamVehicleType,
startTime: Int,
shouldAddNoise: Boolean
): TravelTimeCalculator = {
val ttc = travelTimeByLinkCalculator(vehicleType, shouldAddNoise)
(edge: EdgeStore#Edge, durationSeconds: Int, streetMode: StreetMode, _) =>
{
ttc(startTime + durationSeconds, edge.getEdgeIndex, streetMode).floatValue()
}
}

private def travelTimeByLinkCalculator(vehicleType: BeamVehicleType): (Double, Int, StreetMode) => Double = {
private val travelTimeNoises: Array[Double] = if (travelTimeNoiseFraction == 0.0) {
Array.empty
} else {
Array.fill(1000000) {
ThreadLocalRandom.current().nextDouble(1 - travelTimeNoiseFraction, 1 + travelTimeNoiseFraction)
}
}
private val noiseIdx: AtomicInteger = new AtomicInteger(0)

private def travelTimeByLinkCalculator(
vehicleType: BeamVehicleType,
shouldAddNoise: Boolean
): (Double, Int, StreetMode) => Double = {
val profileRequest = createProfileRequest
(time: Double, linkId: Int, streetMode: StreetMode) =>
{
Expand All @@ -1137,8 +1237,13 @@ class R5Wrapper(workerParams: WorkerParameters, travelTime: TravelTime) extends
} else {
val link = networkHelper.getLinkUnsafe(linkId)
assert(link != null)
val physSimTravelTime = travelTime.getLinkTravelTime(link, time, null, null).ceil.toInt
val linkTravelTime = Math.max(physSimTravelTime, minTravelTime)
val physSimTravelTime = travelTime.getLinkTravelTime(link, time, null, null)
val physSimTravelTimeWithNoise =
(if (travelTimeNoiseFraction == 0.0 || !shouldAddNoise) { physSimTravelTime } else {
val idx = Math.abs(noiseIdx.getAndIncrement() % travelTimeNoises.length)
physSimTravelTime * travelTimeNoises(idx)
}).ceil.toInt
val linkTravelTime = Math.max(physSimTravelTimeWithNoise, minTravelTime)
Math.min(linkTravelTime, maxTravelTime)
}
}
Expand Down
10 changes: 7 additions & 3 deletions src/main/scala/beam/sim/config/BeamConfig.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// generated by tscfg 0.9.4 on Wed Apr 01 03:33:06 ICT 2020
// generated by tscfg 0.9.4 on Fri Apr 03 01:52:37 ICT 2020
// source: src/main/resources/beam-template.conf

package beam.sim.config
Expand Down Expand Up @@ -2693,7 +2693,8 @@ object BeamConfig {
mNetBuilder: BeamConfig.Beam.Routing.R5.MNetBuilder,
numberOfSamples: scala.Int,
osmFile: java.lang.String,
osmMapdbFile: java.lang.String
osmMapdbFile: java.lang.String,
travelTimeNoiseFraction: scala.Double
)

object R5 {
Expand Down Expand Up @@ -2724,7 +2725,10 @@ object BeamConfig {
osmFile =
if (c.hasPathOrNull("osmFile")) c.getString("osmFile") else "/test/input/beamville/r5/beamville.osm.pbf",
osmMapdbFile =
if (c.hasPathOrNull("osmMapdbFile")) c.getString("osmMapdbFile") else "/test/input/beamville/r5/osm.mapdb"
if (c.hasPathOrNull("osmMapdbFile")) c.getString("osmMapdbFile")
else "/test/input/beamville/r5/osm.mapdb",
travelTimeNoiseFraction =
if (c.hasPathOrNull("travelTimeNoiseFraction")) c.getDouble("travelTimeNoiseFraction") else 0.0
)
}
}
Expand Down
Loading

0 comments on commit 4d54b6b

Please sign in to comment.