Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minutely updates, augmented diffs, and queries #25

Open
geohacker opened this issue Jan 31, 2018 · 31 comments
Open

Minutely updates, augmented diffs, and queries #25

geohacker opened this issue Jan 31, 2018 · 31 comments

Comments

@geohacker
Copy link

Earlier this week, @lossyrob @moradology @kamicut and I got together in Philly to talk about taking OSMesa forward and kicked around some ideas. I just want to drop notes here, and invite comments from anyone who's interested.

Recap of what we have now

  1. Weekly updated full history ORC files hosted on AWS.
  2. Collection of scripts that runs on Apache Spark that runs periodic analysis jobs.

Where we want to get to

  1. Minutely updates.
  2. Augmented diffs for storing minutely changes.
  3. Streaming minutely augmented diffs.
  4. Infrastructure for arbitrary queries using tags and bboxes.
  5. Infrastructure for periodic analytics jobs.

To be clear, we're not proposing above as the only future of OSMesa - rather thinking about different parts we need to build for making this work be useful for the larger OSM community. We’re using the repo to anchor this discussion for now, and will eventually fork into others when needed.

ORC to augmented diffs

  • We can use the weekly ORC files to seed history parsing much faster than parsing the planet pbf as horizontally scalable Spark jobs.
  • OSMesa already has utilities that can do parts of this.
  • This can be pushed down a stream for consumers.

Nodecache for lookup

  • Building full and intermediate version of geometries is important as part of the augmented diffs. This means we’ll have to store all the current (or perhaps all previous versions) of all nodes in OSM.
  • There are some experiments previously using DynamoDB and RocksDB - but collectively we think DynamoDB might be better because it’s managed and hopefully won’t get too expensive.
  • The datastore should be optimised for faster look up. We’ll store node versions, and membership but no geometries.
  • We’ll persist the nodecache on S3 for recovery against a failure.
  • The nodecache potentially only need to contain current version of all nodes.

Minutely processing

  • For every minutely change, we look up the nodecache to build the augmented diff, and write this to S3 as well into the stream.

Analytics/Query Data Store

Batch jobs

  • We’ll build a process that listens to augmented diffs stream and incrementally stores the data for running period analytics. The store will be optimized for long running batch jobs, rather than speed of query response.

Query server

  • We probably want to think of spatio-temporal queries as a separate use case because the more predictable the queries are, the more control over indices we have especially for hbase like data stores. We discussed GeoMesa as the underlying store here with some sort of frontend server for API ergonomics.

2018-01-29 16 02 18

@mojodna
Copy link
Collaborator

mojodna commented Feb 6, 2018

How did the RocksDB approach work? Was that a single instance w/ a local DB?

I've been looking a little bit into whether surfacing OSM data through WFS 3 makes any sense (probably not) and how well Elasticsearch would do as a data store (no idea on scaling requirements, but it seems to have decent spatial support, does document searching quickly, and includes a Spark SQL API that incorporates push-down and write support).

Publishing snapshots to S3 (or elsewhere) might be an alternative means to helping others provision similar infrastructure; not quite as ideal as the initial HBase model w/ read-only replicas though.

@mojodna
Copy link
Collaborator

mojodna commented Feb 6, 2018

Oh, and Elasticsearch percolation is a neat concept that would allow consumers to subscribe to queries of interest as new data is introduced.

@geohacker
Copy link
Author

How did the RocksDB approach work? Was that a single instance w/ a local DB?

If I remember right this was a single instance with a local DB, but I'll tag @lukasmartinelli who worked on it closely.

ES certainly sounds cool to me. How does that stand with lookup times of Dynamodb?

@lukasmartinelli
Copy link

If I remember right this was a single instance with a local DB, but I'll tag @lukasmartinelli who worked on it closely.

Yes with the advantage that node lookup can happen in memory/ssd instead of a network round trip like with DynamoDB.

The idea was to do in RocksDB on memory+disk what osmium does with the NodeIndex in memory to be able to scale this even as OSM data grows (or in this case all versions of a node which leads to a way bigger data set).

There are some experiments previously using DynamoDB and RocksDB - but collectively we think DynamoDB might be better because it’s managed and hopefully won’t get too expensive.

Using DynamoDB for pure node lookup I am worried that this will get expensive really quickly (x$$$k).

@geohacker
Copy link
Author

@kamicut @mojodna - could you post notes from our previous call? Thank you! Let's look for people who might have time to kick the elastic search idea a bit.

@geohacker
Copy link
Author

After a chat with @dereklieu and @kamicut, we think the next short-term goal here is to seed all nodes into an AWS managed elasticsearch instance using an osmesa script.

If we can build a proof of concept, test for lookup speed, and room for scale then I think we can start sketching out further steps!

@lossyrob
Copy link
Contributor

One idea I just had that is relevant to this discussion: we had talked about using the Augmented Diff stream as a way to update a distributed database that contains all history, such as HBase on S3, so that analytics can be performed on the most up-to-date version of OSM. This seems like a heavy lift and a lot of maintenance burden for batch and interactive processing e.g. through a Zeppelin or Jupyter notebook. What we have been doing with OSMesa-flavored work is to read from the public ORC files, which are about a week old. This is great because you are simply reading a publicly available file from S3, and so there's no DB or permissions to worry about.

For the batch/analytics case, instead of maintaining a fully up to date history distributed database, what we can instead do is have an update mechanism that reads in AugDiff files off of S3 to bring the Spark DataFrame up to date. So you'd read from the ORC file, and then have an easy call to update to latest, which would go fetch the relevant diffs and apply them in order to have your DF be brought to the most recent update. This adds a line of code to analysis/batch jobs/notebooks (maybe 2 with the import statement), but allows the goal of doing large analytic or batch jobs on the most up to date data with a much more simplified backend.

The work required here would be to ensure the AugDiff process outputs files that can be easily consumed by Spark into DataFrames, and writing the logic to read in the appropriate diff files, and use them to update a history/snapshot DataFrame read in from the ORC files.

@kamicut
Copy link

kamicut commented Mar 13, 2018

Augdiff

@lossyrob That makes sense. In that case we can work backwards from the dataframe to figure out what the AugDiff format is, and then write the node cache code accordingly.

One thing that I was thinking about is, did we rule out running the OSM API with its own replication process and write a postgres trigger that writes from the db to AugDiff format? Does it not scale @geohacker @lossyrob?

OSMesa meeting notes

I did find some notes that Seth posted in the Google Hangout a month ago:

traits

  • provider-agnostic
  • fast to ingest
  • persisted in a form that can be cloned quickly, ideally in-place
  • node cache - fast
  • inverted index for membership
  • low-cost
  • scalable

Overpass Feature Parity

  • tag queries
  • geometry storage + query
  • reference-aware
  • consume replication streams

Past

  • DynamoDB
  • RocksDB
  • CosmosDB

Options

  • DynamoDB
  • HBase (S3-backed)
  • Elasticsearch
  • BigTable
  • Hive (for writes), backed by S3

Problems

  • replication / streaming
  • compaction
    (done for now)
    Additional option: Redis

Today - node caching / lookups

https://blog.mapillary.com/tech/2017/01/12/scaling-down-an-elasticsearch-cluster.html

select sum(cardinality(nds) + cardinality(members)) from planet
5,125,065,779
full history: 15,465,761,461
way geometries: s3://osm-pds-tmp/way-geoms-20180129/
point geometries: s3://osm-pds-tmp/point-geoms-20180129/

@geohacker
Copy link
Author

This seems like a heavy lift and a lot of maintenance burden for batch and interactive processing e.g. through a Zeppelin or Jupyter notebook.

@lossyrob Agree that this is going to be a heavy lift.

For the batch/analytics case, instead of maintaining a fully up to date history distributed database, what we can instead do is have an update mechanism that reads in AugDiff files off of S3 to bring the Spark DataFrame up to date. So you'd read from the ORC file, and then have an easy call to update to latest, which would go fetch the relevant diffs and apply them in order to have your DF be brought to the most recent update.

@lossyrob perhaps I'm not reading this right. Wouldn't this mean that a DF would be outdated in the next minute? How much do you think the overhead of fetching and updating ORC would be? I can imagine this happen to practically all analytical queries. I think we will also need a way to keep track of what adiffs map to this area/feature?

OSM API with its own replication process and write a postgres trigger

@kamicut I don't think we were considering this seriously, and I'm not very confident.

@lossyrob
Copy link
Contributor

Wouldn't this mean that a DF would be outdated in the next minute?

In an analytics job, this is fine. If we had a totally up-to-date ORC file, and you read it in as part of a Spark job, the expectation wouldn't be that the DF changes underneath in the process of analysis, but that you'd be working with the most up-to-date information from the start of the job.

Updating the ORC is something that @mojodna looked into but found to be untenable.

I don't want to get "analytics queries" (e.g. a query server like something from overpass) and analytic batch jobs (e.g. generating user and campaign statistics for the entire world, for every user, for every campaign over history) to get confused. For the query server, we would need something that would be up to date in order to respond quickly to queries. For large batch jobs that take a while to run anyway, updating the DF from the ORC snapshot up to recency will add some processing time, but I don't think enough to become an unattractive option.

@mojodna
Copy link
Collaborator

mojodna commented Mar 15, 2018

Updating the ORC is something that @mojodna looked into but found to be untenable.

For 2 reasons: small files (mitigated if we batch diffs up on a daily basis) and the need to periodically compact. ORC includes methods (but no CLI) to merge files at the stripe level (effectively concatenation, so inefficiency increases over time as non-optimal stripes are added), which could facilitate periodic compaction processes. (But atomicity! Maybe we don't care...)

+1 on creating a tool that can bring an ORC file up to date using minutely diffs. This should be relatively straightforward (and moderately parallelizable since order doesn't matter)--we don't need to reconstruct augmented diffs or anything, just append all of the edits in history form and use the resulting file as input.

@kamicut
Copy link

kamicut commented Mar 15, 2018

@mojodna @lossyrob so you're saying

  1. Read in ORC file to a dataframe
  2. Bring in minutely diffs into the dataframe to bring it to current time
  3. Do batch analytic jobs

I'm wondering if there's a way to 'pickle' (serialize) the dataframe after (2)? That way we can just boot another batch process by reading the pickled DF and running (2) again.

@mojodna
Copy link
Collaborator

mojodna commented Mar 15, 2018

Yup, and yup (df.format("orc").save(<path>) will write the dataframe out (to S3))!

@lossyrob
Copy link
Contributor

Good point that we wouldn't need augmented diffs to bring an ORC file up to date in a DataFrame - however order would matter, because we'd have to update visibility flags, yeah? So it wouldn't just be an append.

@mojodna
Copy link
Collaborator

mojodna commented Mar 15, 2018

Nah, I'm pretty sure that visible is specific to an individual version (https://wiki.openstreetmap.org/wiki/OsmChange includes a visible attribute, cf. https://www.openstreetmap.org/api/0.6/changeset/46685591/download, https://www.openstreetmap.org/api/0.6/changeset/45652310/download), so as long as we're windowing correctly and sorting things there to generate validity time ranges, we should be good.

See https://www.openstreetmap.org/node/1/history for an interesting node history (deleted, undeleted, etc.)

@jamesmcclain
Copy link
Contributor

Hello, I am currently working on this task, but am late to the conversation and am perhaps missing some context.

I read the thread above, I would like to ask for comments on the following.

The basic idea that I have been working on is to read the bulk data and produce an index that maps more primitive objects to a sets of aggregations that they participate in (e.g. single nodes to sets of ways, single ways to sets of relations [and I think single relations to sets of relations]). I think that that is consistent with previous discussion.

If the reverse index is keyed not just by id but id and timestamp, and if queries of the type "largest item < x" are supported by whatever mechanism is used to store it, then I think that would also be sufficient to answer augmented diff queries for any moment in time between the last moment covered by the initial ORC file and up to the moment of last update received. Geometric information (e.g. quad-tree node numbers) could be included to support various types of spatial queries.

The last sentence above leads me to the last comment that I want to throw out for discussion: it could be possible to support all of the desired operations without using a separate ("real") database (e.g. dynamodb rocksdb, hbase) but instead just use ORC files. The point above about fragmentation and compaction is well taken, and it is also understood that the ORC format was not designed with random access efficiency as its first priority, but nonetheless it might be worth considering just from a simplicity standpoint.

The main outstanding question that I have (aside from compaction) is whether the "largest item < x" types queries are well supported by dataframes on top of ORC files. I have not confirmed that to be so, and if it isn't then ORC files are obviously not appropriate. In any case, I am using ORC files for my prototyping activity.

@mojodna
Copy link
Collaborator

mojodna commented Mar 19, 2018

queries of the type "largest item < x"

Can you elaborate on this? What would a specific question be?

Geometric information (e.g. quad-tree node numbers) could be included to support various types of spatial queries.

I've been thinking about this too (within the context of ORC files, predicate pushdown, and ORC's ability to skip stripes based on internal indexes). I'd been envisioning (z, type, id, minX, minY, maxX, maxY) (where z = zcurve(zcurve(minX, minY), zcurve(maxX, maxY)) (dumb?) for locality). Queries could then be WHERE x >= minX AND x <= maxX AND y >= minY AND y <= maxY AND ST_Intersects(...) without needing to trigger a full table scan (since ORC would transparently skip stripes outside the min/max range and probable geometries would be local to one another thanks to the Z curve).

The last sentence above leads me to the last comment that I want to throw out for discussion: it could be possible to support all of the desired operations without using a separate ("real") database (e.g. dynamodb rocksdb, hbase) but instead just use ORC files. The point above about fragmentation and compaction is well taken, and it is also understood that the ORC format was not designed with random access efficiency as its first priority, but nonetheless it might be worth considering just from a simplicity standpoint.

👍, at least for prototyping (and perhaps beyond). I suspect we can deal with some of the random access inefficiency by batching lookups to coalesce them across scans.

ProcessOSM creates a lookup table from node → way (inverse of way nds) that could be written to an ORC file:

val nodesToWays = ways
.select(explode('nds).as('id), 'id.as('way_id), 'version, 'timestamp, 'validUntil)

Overall, I think we need:

  • node (id) → way (id + version + updated + validUntil) (inverse of way nds)
  • node (id) → relation (id + version + updated + validUntil + member role) (inverse of relation node members)
  • way (id) → relation (id + version + updated + validUntil + member role) (inverse of relation way members)
  • relation (id) → relation (id + version + updated + validUntil + member role) (inverse of relation relation members)

(I'm proposing updated + validUntil to provide a window to compare the timestamp of the item being looked up and determine which target version to use. I'm proposing role because my intuition suggests that it's important; it may well not be.)

@lossyrob
Copy link
Contributor

I think this approach may not work for some of the streaming applications of this that we're looking for. Perhaps elaborating on how this approach would work on an example would help me understand how it would work...one of the applications we want to apply the augmented diffs to, is to stream in minutely changes and apply them to a set of vector tiles. Having a spark cluster up and running, taking in minutely updates, and either updating an ORC file from it's daily point in the snapshot, or updating an in memory DataFrame, seems like a heavy task to do in the streaming context. Whereas, if we have a quick-access node cache that has reverse index tables Seth mentioned, we could quickly take a replication file, rebuild the geometries, construct the Augmented Diff containing the changes, and send that off to a stream for consumption. Some other process would pick that up, read in the appropriate vector tiles, update them and replace them in the set. How would the ORC-based creation happen for this?

@jamesmcclain
Copy link
Contributor

queries of the type "largest item < x"

Can you elaborate on this? What would a specific question be?

Sorry, about not being clear. I am referring to queries of the form "largest number smaller than 50". In this particular case, I am suggesting that if (for example) there is a list of ways that is jointly ordered by id and timestamp, then a query for all ways which meet a particular node at a particular time can be answered by the most recent ("largest") instance of a node that occurs before the given time. Essentially I would like to treat id+timestamp as the node id, not just id.

@jamesmcclain
Copy link
Contributor

Overall, I think we need:

Much appreciated, what I suggested above is certainly not exclusive of any of these.

@jamesmcclain
Copy link
Contributor

Having a spark cluster up and running, taking in minutely updates, and either updating an ORC file from it's daily point in the snapshot, or updating an in memory DataFrame, seems like a heavy task to do in the streaming context.

What I had in mind was to update a dataframe, but my understanding is that the dataframe needs to be backed by something (orc files or something else).

I was just considering whether the orc format can efficiently support the operations that are needed if for whatever reason data are not available in-memory (e.g. after a restart).

Another question that I have is under what circumstances (other than restart) does this even matter; I don't know if dataframes are able to discard parts of themselves (e.g. under memory pressure or some failure) and reload from the updated backing store or if they use the more typical spark mechanism.

These are question that will be important after getting something initially working, I think.

@lossyrob
Copy link
Contributor

lossyrob commented Mar 19, 2018

Ah, OK, I think I may have my wires crossed on what tasks we're talking about. There's the "updating a DataFrame, that is read in from ORC and update it to the most recent version based on replication files", which has been some of the latter part of the conversation on this issue, and separately the Augmented Diff pipeline, which is different (the ORC update might rely on the AugDiffs, but as was pointed out above it doesn't need to be).

In the former case, which I think is maybe what we're talking about, we read in the ORC file that represents up to a certain point in history, and then read in replication files to update the DataFrame from there. The desired output is a whole DataFrame that we can do analytics on moving forward, not to do AugDiff queries against. These DFs would be ephemeral, and on restart or job fail the job would run again and bring itself to the latest version via the same process. This is encapsulated in issue #48, and if this is what we're talking about I'd suggest we continue the conversation.

The latter case wouldn't be around updating a DataFrame, or probably use DataFrames at all. It would support the streaming case mentioned above.

Am I on target saying we're discussing the former case and not the latter case?

@mojodna
Copy link
Collaborator

mojodna commented Mar 19, 2018

Sorry, about not being clear. I am referring to queries of the form "largest number smaller than 50". In this particular case, I am suggesting that if (for example) there is a list of ways that is jointly ordered by id and timestamp, then a query for all ways which meet a particular node at a particular time can be answered by the most recent ("largest") instance of a node that occurs before the given time. Essentially I would like to treat id+timestamp as the node id, not just id.

Ah, gotcha. In Presto, I'd been using the max_by(timestamp, version) aggregate function with GROUP BY id, version, timestamp. In Spark SQL, I was using the updated...validUntil range as a filter on the JOIN. In something like RocksDB where one is iterating over (sorted) keys, what you're describing here.

Much appreciated, what I suggested above is certainly not exclusive of any of these.

Agreed, just wanted to take a stab at concretely enumerating what we need for augmented diffs.

Another question that I have is under what circumstances (other than restart) does this even matter; I don't know if dataframes are able to discard parts of themselves (e.g. under memory pressure or some failure) and reload from the updated backing store or if they use the more typical spark mechanism.

I started looking at Apache Ignite as a backend when pondering this but didn't conclude anything. Good stuff to pay attention to, since it sounds like no one here knows the answer yet.

Am I on target saying we're discussing the former case and not the latter case?

Should we split this into multiple issues? Augmented diff generation (against something) + streaming (to something) replication + one-shot replication updates (covered by #48).

@mojodna
Copy link
Collaborator

mojodna commented Mar 19, 2018

@jamesmcclain have a look at https://github.com/osmlab/osm-wayback if you haven't already. @jenningsanderson is using RocksDB (for full geometry storage) with id + timestamp keys as you suggest ^^.

@jenningsanderson
Copy link

👋 To chime in briefly, I've been very impressed (and shocked) with the performance of rocksdb in this domain; I've listed a few performance stats when run locally here: https://github.com/osmlab/osm-wayback#performance; I've also experimenting with redis as a node cache for historic node locations, though am less impressed. osm-wayback is currently focused on differences in tags between versions, but am currently expanding to handle historic geometries... though have realized from @mojodna that w/o "augmented diffs" and changeset information, this question becomes far more complex. Still, rocksdb is lightning fast, especially if you pbf encode OSM objects as values in the DB... definitely don't overlook it as a backend object store for reconstructing geometries or diffs...

@jamesmcclain
Copy link
Contributor

@jamesmcclain have a look at https://github.com/osmlab/osm-wayback if you haven't already. @jenningsanderson is using RocksDB (for full geometry storage) with id + timestamp keys as you suggest ^^.

Okay, will definitely have a look. Thanks for the reference.

@jamesmcclain
Copy link
Contributor

I've been very impressed (and shocked) with the performance of rocksdb

Okay, very well taken. It looks as though there is tremendous impetus behind rocksdb. I would like to get a basic prototype working, then move to that.

@moradology
Copy link
Collaborator

It looks as though there is tremendous impetus behind rocksdb

Yeah, my cursory exploration of our options for the node cache suggested that rocksdb was the best price/performance/ease-of-use compromise. We lose the security/ease of a hosted system but gain the ability to use a single SSD for the entire problem.

@kamicut
Copy link

kamicut commented Mar 20, 2018

When we had this meeting last month we were considering rocksdb or elasticsearch for the node cache. We set up meetings around the elasticsearch experiment with @moradology @geohacker @dereklieu and others, because our initial thought was that rocksdb does not have an easy API or bindings. Is this no longer the case?

I also got very confused as to what we're tackling in this ticket. Should we close it in favor of #52 and #48?

@jenningsanderson
Copy link

@kamicut -- rocksdb does not have an easy API or good bindings for anything (except maybe Java?) :( I've been forced to relearn C++. https://github.com/dberesford/rocksdb-node is the best I've found for node, but cannot get it working properly with column families; if you don't use column families, then most leveldb bindings can be used (like levelUp or levelDown), which are much more actively developed.

@lossyrob
Copy link
Contributor

In case people are watching this issue and not the others we broke out, see this comment for the current approach of a spike to create the stream of Augmented Diffs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants