-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Augmented Diff Generation #52
Comments
To facilitate these, we need the ability to make lookups against the following inverted indices:
(inclusion of |
It struck me that while generating XML is beneficial for consumers that already understand OsmChange, a simpler row-based format would suffice for our needs. I've updated the description accordingly. |
I also attempted to clarify my proposal re: date ranges in the Caveats section by simplifying "augmented diff" to actually mean "augmentation of a specific diff" rather than a window of time. |
Much appreciated. |
Work in progress can be found here. Summary of ORC-backed Dataframe ApproachThis approach uses an evolving dataframe backed by ORC files rather than a traditional database to hold to additional index information ("reverse lookup" information) needed to support creation of augmented diffs. The present fragments that I have include four tables:
The schemas are as follows: scala> val osm = spark.table("osm")
osm: org.apache.spark.sql.DataFrame = [id: bigint, tags: map<string,string> ... 11 more fields]
scala> osm.printSchema
root
|-- id: long (nullable = true)
|-- tags: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- lat: decimal(9,7) (nullable = true)
|-- lon: decimal(10,7) (nullable = true)
|-- nds: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- ref: long (nullable = true)
|-- members: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- type: string (nullable = true)
| | |-- ref: long (nullable = true)
| | |-- role: string (nullable = true)
|-- changeset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- uid: long (nullable = true)
|-- user: string (nullable = true)
|-- version: long (nullable = true)
|-- visible: boolean (nullable = true)
|-- type: string (nullable = true) scala> val nodeToWays = spark.table("node_to_ways")
nodeToWays: org.apache.spark.sql.DataFrame = [id: bigint, valid_from: bigint ... 1 more field]
scala> nodeToWays.printSchema
root
|-- id: long (nullable = true)
|-- valid_from: long (nullable = true)
|-- way_id: long (nullable = true) scala> val wayToRelations = spark.table("way_to_relations")
wayToRelations: org.apache.spark.sql.DataFrame = [id: bigint, valid_from: bigint ... 1 more field]
scala> wayToRelations.printSchema
root
|-- id: long (nullable = true)
|-- valid_from: long (nullable = true)
|-- relation_id: long (nullable = true) scala> val relationToRelations = spark.table("relation_to_relations")
relationToRelations: org.apache.spark.sql.DataFrame = [id: bigint, valid_from: bigint ... 1 more field]
scala> relationToRelations.printSchema
root
|-- id: long (nullable = true)
|-- valid_from: long (nullable = true)
|-- relation_id: long (nullable = true) The last three tables have been kept separate for the sake of simplicity, but it might eventually be helpful to combine them. The first table is there so that the results of the query code shown below can be elaborated into a full augmented diff. Rump QueryAs I understand the problem, the core primitive needed to construct augmented diffs is what the Overpass API refers to as the private def recurseNode(nodeId: Long) = {
var keepGoing = true
val relations = mutable.Set.empty[Long]
val ways = nodeToWays
.filter(col("id") === nodeId)
.map({ r => r.getAs[Long]("way_id") })
.collect
.toSet
relations ++=
(if (ways.isEmpty)
Set.empty[Long]
else {
wayToRelations
.filter(col("id").isin(ways.toSeq:_*))
.map({ r => r.getAs[Long]("relation_id") })
.collect
.toSet
})
while (keepGoing) {
keepGoing = false
val newRelations =
if (relations.isEmpty) Set.empty[Long]
else {
relationToRelations
.filter(col("id").isin(relations.toSeq:_*))
.map({ r => r.getAs[Long]("relation_id") })
.collect
.toSet
}
if (!newRelations.subsetOf(relations)) {
keepGoing = true
relations ++= newRelations
}
}
(ways, relations)
} The code above is incomplete in that it only handles queries starting from a single node id, does not pay attention to timestamps (needed for queries about state before the present moment), should perhaps accept initial batches not singletons, &c, but it shows a basic outline. (In fact, the code above not only ignores timestamps, it actually assumes that no updates have been made, so it is not to be taken literally.) I will have more to say about query efficiency below, but it can be seen that one of the basic operations that needs to be efficiently supported is search for an id in a particular set. Looking at that in isolation gives scala> val list = List(1L, 2L, 3L)
list: List[Long] = List(1, 2, 3)
scala> wayToRelations.filter(col("id").isin(list:_*)).explain
== Physical Plan ==
*(1) Filter id#9L IN (1,2,3)
+- *(1) FileScan orc default.way_to_relations[id#9L,valid_from#10L,relation_id#11L] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/home/jmcclain/Desktop/spark-warehouse/way_to_relations], PartitionFilters: [], PushedFilters: [In(id, [1,2,3])], ReadSchema: struct<id:bigint,valid_from:bigint,relation_id:bigint> where we see that the list Comparison to e.g. RocksDB-based ApproachI did some experimenting with RocksDB. Unfortunately, what I found was that the size of the data on disk inflated by a factor of about 100 when I tried to store the equivalent of the OSM table above. The attempt was naive and can be criticized in various ways (I was just serializing spark sql row objects into values, I included all of the columns, &c), but even if the inflation came down by a factor of 10 it seems like it would still be too large. This implies (to me) that if the search index itself is stored in RocksDB, some other database would be needed to store the bulk data needed to produce actual augmented diffs (if this is not correct, I am happy to hear it). That still leaves open the possibility of using RocksDB for the reverse index and storing the bulk data elsewhere, but that is a complication that we hope to avoid. I can say more about query efficiency of the spark sql based approach versus that of RocksDB if there is interest (this comment is getting long). Cons
Pros
|
Thanks for the great writeup, @jamesmcclain. Do we need to store a copy of the history ORC file already created, or is there a way to point to the history that is already generated and write updates to a separate bucket? Not necessarily now in v1, but something to think about as far as data storage, if we could utilize the main ORC file and push updates to it with the fragments that might be a good process for keeping the history file up to date / be the compaction process. My opinion is that this approach could very well be good enough to handle the stream of OSM replication files, and we should try to disprove it by trying to break it in prototypes to see if it holds up. Some ways I think it could potentially break:
This is looking good, I'm hoping this approach holds up to scrutiny because it seems like the most straightforward one/quickest to prove out. |
Yes, I believe that that can be done. I don't have the examples to hand right now, but I have seen examples of creating tables that span multiple URIs. All three of the questions above are indeed questions. I hope that answers will be forth-coming soon. |
Thanks for the write up @jamesmcclain, very useful and can't wait to see the results of the experiments!
I don't think this should be the job of this process. This can be a downstream process (similar to planet-stream) that takes in augmented diff updates and keeps track of the changeset replication files. I think @mojodna has improved on the planet-stream process for Missing Maps. |
Agreed. Within Spark, a stream-stream join could handle the problem of varying windows of validity. My approach for Missing Maps was to avoid generating complete roll-ups and update changeset stats as changes arrived that were associated with them. |
Since #25 covers a number of different, intertwined pieces, I wanted to break out the augmented diff generation component.
Overview
Augmented diffs are intended to provide both elements that were actually modified during a time period (the
timestamp
attribute within the<new>
element should be within the requested window (see Caveats below) as well as elements which refer to them. In the case of modifications (indirect or direct) to ways or relations, member (waynds
or relationmembers
) element metadata (coordinates, authorship, etc.) is inlined to avoid the need for additional lookups.Output Format(s)
Row-based
The simplest format that likely meets our needs is row-based, such that it can be combined with the intermediate format containing reconstructed way geometries.
Fields needed are:
type
id
geom
(reconstructed in the case of ways and relations)version
minorVersion
(this may not be necessary, as it can be re-calculated using a window function provided suitableupdated
values)updated
(direct or indirect timestamp)changeset
(changeset which directly or indirectly modified the element)uid
user
Overpass Augmented OsmChange
For compatibility with Overpass-generated augmented diffs (which extend the OsmChange format, as used by minutely diffs, swapping the root
<osmChange>
for<osm>
), OSMesa should match that format.Overpass augmented diff sequences can be converted to Unix timestamps using
<sequence> * 60 + 1347432900
(2801986
→2018-01-10T02:41:00.000Z
).Example
Simplified version of sequence 2801986:
Element Ordering
Elements should be ordered such that any references to other elements in the same diff appear after the element itself. In practice, this means 1) nodes, 2) ways, 3) relations referencing only nodes + ways, 4) relations referencing relations that previously appeared, 5) ...
Overpass appears to further order by 1)
modify
, 2)delete
, 3)create
(although this isn't strictly followed--see the full augmented diff--timestamps may be taken into account.Caveats
Overpass aims to answer the question "what changed in the requested time period?" When provided with sequences, they're converted into timestamps (see above) and the minute following is used as the time period being queried.
Due to the way that OSM minutely diffs are created (more context), they won't contain all changes made during the minute they "represent" if database transactions are open. Rather than retroactively updating elements that changed during a time period, the question we should be asking is "what changes did we find out during the requested time period?" (breaking the assumption of a match between
<new>
timestamp
values and the time associated with a sequence).Another way to think about this is "what elements were affected by a given minutely diff" and genuinely just augmenting that diff (populating referenced elements and adding referred-to ones) rather than trying to be clever with time ranges.
In the existing Overpass implementation,
changeset
values for referring elements match the changeset in which the referring element was modified. This introduces subtle bugs, e.g. mapbox/osm-adiff-parser#2, when attempting to aggregate changes by changeset (which is another process that requires updating past knowledge because changesets are replicated separately and may remain open (collecting additional changes) for 24 hours after first appearing in the minutely diff replication stream.GeoJSON
Bonus points: a GeoJSON FeatureCollection containing 1 or 2 Features (re-assembled geometries) representing the old and new versions of modified elements (indirect or direct).
JSON
Newline-delimited JSON representation of OsmChange (
action
may be able to be omitted, as it can be inferred by the presence of<old>
orversion
/visible
attributes, at least for non-referring elements), for easy consumption?Related Work
@geohacker was previously producing JSON augmented diffs aggregated by changeset. The post goes into a bit more detail about edge cases that make augmented diff aggregation difficult.
osm-replication-streams contains a parser for Overpass augmented diffs that outputs GeoJSON as described above.
@kamicut used osm-replication-streams to build a Lambda function that consumes minutely diffs and outputs them as individual messages on a Kinesis stream.
The text was updated successfully, but these errors were encountered: