Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beyond consolidated metadata for V3: inspiration from Apache Iceberg #154

Open
rabernat opened this issue Aug 10, 2022 · 25 comments
Open

Beyond consolidated metadata for V3: inspiration from Apache Iceberg #154

rabernat opened this issue Aug 10, 2022 · 25 comments
Labels
protocol-extension Protocol extension related issue

Comments

@rabernat
Copy link
Contributor

rabernat commented Aug 10, 2022

I just reviewed the V3 spec. As with V2, the core spec does not address how to deal with unlistable stores. In V2, we developed the semi-ad-hoc "consolidated metadata" approach. I know there is already an issue about this vor V3 (#136), but I am opening a new one with the hope of broadening the discussion of what consolidated metadata can be.

Background: What is Apache Iceberg

I have recently been reading about Apache Iceberg. Iceberg is aimed at tabular data, so it is not a direct alternative or competitor to Zarr. Iceberg is not a file format itself (it can use Parquet, Avro, or ORC for individual data files). Instead, it is a scheme for organizing many data files individual into a larger (potentially massive) single "virtual" table. In addition to the table files, it tracks copious metadata about the files. In this sense, it is similar to Zarr: Zarr organizes many individual chunk files into a single large array.

For folks who want quickly come up to speed on Iceberg, I recommend reading the following documents in order:

Architecturally, these are the main points

This table format tracks individual data files in a table instead of directories. This allows writers to create data files in-place and only adds files to the table in an explicit commit.

Table state is maintained in metadata files. All changes to table state create a new metadata file and replace the old metadata with an atomic swap. The table metadata file tracks the table schema, partitioning config, custom properties, and snapshots of the table contents. A snapshot represents the state of a table at some time and is used to access the complete set of data files in the table.

Data files in snapshots are tracked by one or more manifest files that contain a row for each data file in the table, the file’s partition data, and its metrics. The data in a snapshot is the union of all files in its manifests. Manifest files are reused across snapshots to avoid rewriting metadata that is slow-changing. Manifests can track data files with any subset of a table and are not associated with partitions.

The manifests that make up a snapshot are stored in a manifest list file. Each manifest list stores metadata about manifests, including partition stats and data file counts. These stats are used to avoid reading manifests that are not required for an operation.

This diagram says 1000 words.

Another interesting part of the spec, regarding the underling storage system capabilities:

Iceberg only requires that file systems support the following operations:

  • In-place write – Files are not moved or altered once they are written.
  • Seekable reads – Data file formats require seek support.
  • Deletes – Tables delete files that are no longer used.

These requirements are compatible with object stores, like S3.

Tables do not require random-access writes. Once written, data and metadata files are immutable until they are deleted.

Within the data lake / warehouse / lakehouse world, you can think of Iceberg as an open-source alternative to Databricks Delta Lake. It can be used by many different data warehouse query engines like Snowflake, Dremio, Spark, etc.

Inspiration for Zarr

It doesn't make sense to use Iceberg directly for Zarr--it's too tied to the tabular data model. There are lots of interesting ideas and concepts in the Iceberg spec that I think are worth trying to copy in Zarr. Here is a list of some that come to mind

Manifests

Manifests are essentially files which list other files in the dataset. This is conceptually similar to our V2 consolidated metadata in that it removes the need to explicitly list a store. We could store whatever is useful to us in our manifests. For example, we could explicitly list all chunks and their sizes. This would make many operations go much faster, particular on stores that are slow to list or get files information.

Snapshots, Commits, and Time Travel

Iceberg tables are updated by simply adding new files and then updating the manifest list to point to the new files. The new state is registered via an atomic commit operation (familiar to git users). A snapshot points to a particular set of files. We could imagine doing this with zarr chunks, e.g. using the new V3 layout

data/root/foo/baz/c1/0_v0
data/root/foo/baz/c1/0_v1

where _v0 and _v1 are different versions of the same chunk. We could check out different versions of the array corresponding to different commits, preserving the whole history without having to duplicate unnecessary data. @jakirkham and I discussed this idea at scipy. (TileDB has a similar feature.)

Applying the same concept to metadata documents would allow us to evolve array or group schemas and metadata while preserving previous states.

Partition Statistics

If we stored statistics for each chunk, we could optimize many queries. For example, storing the min, max, and sum of each chunk, with reductions applied along each combination of dimensions, would sometimes allow us to avoid explicitly reading a chunk.

Different underlying file formats

Zarr V2 chunks are extremely simple: a single compressed binary blob. As we move towards the sharding storage transformer (#134, #152), they are getting more complicated--a single shard can contain multiple chunks. Via kerchunk, we have learned that most other common array formats (e.g. NetCDF, HDF5) can actually be mapped directly to the zarr data model. (🙌 @martindurant) We are already using kerchunk to create virtual zarr datasets that map each chunk to different memory locations in multiple different HDF5 files. However, that is all outside of any Zarr specification.

Iceberg uses this exact same pattern! You can use many different optimized tabular formats (Parquet, Avro, ORC) for the data files and still expose everything as part of a single big virtual dataset. Adopting Iceberg-style manifests would enable us to formalize this as a supported feature of Zarr.

Implementation Ideas

Having finally understood how storage transformers are supposed to work (thanks to #149 (comment)), I have convinced myself that the iceberg-like features we would want can all be implemented via a storage transformer. In pseudocode, the API might look like this

base_store = S3Store("s3://some-bucket")
zarrberg_store = ZarrIcebergStorageTransformer(base_store)

# check out a the latest version 
zarrberg_store.checkout()   
group = zarr.open_group(zarrberg_store)

# update some data
group['foo'][0, 0] = 1

new_version = zarrberg_store.commit()

#  check out some older version
zarrberg_store.checkout("abc123")
older_group = zarr.open_group(zarrberg_store)

The storage transformer class would do the work of translating specific storage keys requests into the correct paths in the underlying store (e.g. data/root/foo/0.0 ➡️ data/root/foo/0.0_v2).

Summary

I'm quite excited about the possibilities that this direction would open up. Benefits for Zarr would be

  • Even better performance on slow-to-list stores
  • Time-travel and versioning
  • Ability to wrap other array file types explicitly, formalizing what we are already doing with Kerchunk
  • A solution for concurrent writes to the same chunk (one of the commits would fail and have to be retried with updated data)
  • Alignment with a widely adopted and successful approach in enterprise big data architecture

At the same time, I am aware of the large amount of work involved. I think a useful path forward is to look for the 90/10 solution--what could we imitate from Iceberg that would give us 90% of the benefits for 10% of the effort.

@jbms
Copy link
Contributor

jbms commented Aug 10, 2022

If I understand correctly, really what you are looking for is a key value-store adapter that handles versioning, snapshots, possibly packs things into a smaller number of files, and avoids the need for a list operation on the underlying storage.

This is actually precisely what is addressed by this proposal that we are working on implementing as part of tensorstore (but is not tied to tensorstore):

https://docs.google.com/document/d/1PLfyjtCnfJRr-zcWSxKy-gxgHJHSZvJ2y4C3JEHRwkQ/edit?resourcekey=0-o0JdDnC44cJ0FfT8K6U2pw#heading=h.8g8ih69qb0v

(To be clear, while we are currently implementing it, it is still very much a prototype and the actual format is not at all set in stone.)

My understanding is that "storage transformers" were intended to apply only to the chunk data, not the metadata. Since the goal here seems to be to also version the metadata and store it in the same way as the chunks, it seems like this would more appropriately be implemented at the key-value store and not need any particular integration with zarr, just that zarr would be used with it.

@rabernat
Copy link
Contributor Author

Thanks for sharing @jbms! I am currently reading and digesting that proposal.

One point of clarification: storage transformers can act on any of the items in the store (chunks AND metadata). For example, the sharding transformer (#152) declares explicitly that

This sharding transformer only adapts entries where the key starts with
data/root, as they indicate data keys for array chunks, see
:ref:storage-keys. All other entries are simply passed on.

@jbms
Copy link
Contributor

jbms commented Aug 11, 2022

Thanks for sharing @jbms! I am currently reading and digesting that proposal.

One point of clarification: storage transformers can act on any of the items in the store (chunks AND metadata). For example, the sharding transformer (#152) declares explicitly that

This sharding transformer only adapts entries where the key starts with
data/root, as they indicate data keys for array chunks, see
:ref:storage-keys. All other entries are simply passed on.

Currently, storage_transformers are specified as part of the array metadata, so the array metadata must be known and therefore it doesn't seem to make sense for it to affect anything other than data keys for that single array. I suppose a different type of storage transformer could instead be specified in the root metadata, and could then also affect metadata keys.

@rabernat
Copy link
Contributor Author

Ah good point. I was confused by the comment above. I guess for this to work we would want the storage transformer in the root metadata.

@martindurant
Copy link
Member

Kerchunk would welcome being formalised as (part of) a zarrberg manifest, if that gets us a large fraction of the way to your goals. I think it can, since the storage layer can then express a change in the target of a key and could also be a place for per-chunk transformations if we want. That is not the same as a higher-level transformer; but see for example the ability to concat bytes from several references in preffs. A new version is a new references set (or some sort of delta scheme). Of course, it doesn't have to be via kerchunk, especially if you really want to have transformations a separately specified entity.

Note that kerchunk does also have aims with non-zarr data, such as the very directory-file structure of parquet/orc you mentioned, or even just finding valid byte start offsets in a CSV or zstd compressed file. Those are all related-but-not-the-same.

@yuvipanda
Copy link

I wanted to pipe up and say I absolutely love this proposal, @rabernat :)

@briannapagan
Copy link

@rabernat can we have a huddle about this sometime soon. We are evaluating a feature at ESDIS level to better understand what a persistant zarr store would look like, you pointed me to this thread awhile back, we're now having more active conversations and I would like to better understand this proposal because it might exactly address our need.

@martindurant
Copy link
Member

@briannapagan , would be happy to take part too.

As an orthogonally connected topic, I have also made a client for reading actual iceberg datasets into dask (i.e., tabular as opposed to zarrish nd-arrays).

@rabernat
Copy link
Contributor Author

rabernat commented Nov 3, 2022

@briannapagan - sure thing. I'll ping you via email. For context, this idea is central to what Joe an I are now building at Earthmover.

@jakirkham
Copy link
Member

I'd be interested in joining (if that's ok). Though no worries if not

@yuvipanda
Copy link

me too!

@joshmoore
Copy link
Member

also (always) happy to join.

@vietnguyengit
Copy link

I have a discussion with my colleague the other day about Time-travel and versioning for Zarr, this proposal is fantastic! We will be happy to contribute to the project if that's okay. @rabernat

@briannapagan
Copy link

Picking up this conversation as we have an open ticket at ESDIS. @christine-e-smit gave a great summary of our use case:

Setup:

Paul has a Zarr store in s3. This Zarr store currently has a data array called precipitation of shape 180x360x504 (latitude/longitude/time). The chunk size is 10x10x100, so there are 18x36x6 chunks.
Amy is using the Zarr precipitation array to run calculations. These calculations use multiple independent workers, which pull data from multiple zarr chunks at different times. The workers also sometimes read zarr metadata at different times. (Note: if Amy is using dask, dask will generally read the metadata once, calculate what tasks need to be run, and then run them on available workers. But Amy may have a complicated calculation that requires her to run dask more than once. So even if she is using dask, she may end up reading the metadata more than once.)

Scenario:

Paul gets data for a new time slice and needs to append this data to his existing Zarr array. With this new data added, the data array will have shape 180x360x505.

Paul resizes the precipitation array and sets precipitation[:,:,504] = new_data. There are 18x36 chunks that must be updated with new data and at least one metadata object. His code (somehow) downloads what is currently in these chunks, updates them locally, and then copies them into s3. Note: it is not possible to update all these chunks simultaneously. s3 does not have this capability. So, in general, someone reading the chunks in s3 while Paul is writing them will get a mixture of old and new chunks.
Amy tries to run her calculations. Some of her workers read the zarr metadata before the resize and some read it afterwards. In addition, some workers read the chunks before they have been updated and some read them afterwards. Amy's code has no way of knowing that Paul is in the middle of updating data. Her code fails because it is getting inconsistent shape information. Or, even if all the workers happen to see the same shape, she gets a weird answer because some workers got data chunks before they were updated and some got data chunks after they were updated.

We've had tangent discussions in pangeo-forge with some solutions mentioned including consolidated-metadata, and @rabernat pointing out that the core issue is an ACID transaction to update the Zarr store.

You want to make changes to multiple files, but you don’t want those changes to become visible to the users until the change is complete; otherwise users see an inconsistent state.

I am proposing another discussion so we can try and sync these two ongoing conversations.

For those interested in joining a discussion with NAsA folks, please fill in this doodle by end of day Friday March 17th: https://doodle.com/meeting/participate/id/aQkYRPMd

@jbms
Copy link
Contributor

jbms commented Mar 15, 2023

As part of the tensorstore project we have developed a key-value store database format called OCDBT (Optionally-cooperative distributed B+tree) that can sit as an adapter between an array format like zarr and the underlying base filesystem / object store.

This format is designed to support versioning and copy-on-write snapshots, which could be used in various ways to solve the scenario described above:

  • An entire database, which may consist of one or more zarr arrays including both their json metadata and the chunks, is versioned; every batch of writes creates a new versino, which can be accessed as a static snapshot.
  • Distributed pipelines can be configured to read a specific version (indicated by a generation number) rather than reading the floating latest version. This ensures that they operate on a consistent snapshot.
  • You can "save" specific snapshots of individual arrays or of the entire database by "copying" arbitrary prefixes of the key space to a new prefix. This can be used to create e.g. named versions of the precipitation array, or to perform the writes on a temporary copy of the precipitation array before replacing the original precipitation array with the new version.

You can find a high-level description of the format and its motivations here:
https://docs.google.com/document/d/1PLfyjtCnfJRr-zcWSxKy-gxgHJHSZvJ2y4C3JEHRwkQ/edit?resourcekey=0-o0JdDnC44cJ0FfT8K6U2pw#heading=h.8g8ih69qb0v

And a description of the current on-disk format here:
https://google.github.io/tensorstore/kvstore/ocdbt/index.html#storage-format

@briannapagan
Copy link

briannapagan commented Mar 20, 2023

@jbms I don't have your contact information but also for anyone else able to join here is the meeting information for tomorrow 2PM eastern:
Forward Stream Processing + Zarr Stores
Monday, March 20 · 2:00 – 3:00pm
Google Meet joining info
Video call link: https://meet.google.com/shq-zenj-xqm
Or dial: ‪(US) +1 314-384-5292‬ PIN: ‪691 877 052‬#
More phone numbers: https://tel.meet/shq-zenj-xqm?pin=2854704778130

Meetings agenda and notes:
https://hackmd.io/@briannapagan/updating-zarr-stores

@NathanCummings
Copy link

Some of us in the fusion energy research world have started using zarr (see [mastapp.site](https://mastapp.site if you’re curious!). For this data repository, we’re looking at using lakefs to handle versioning, but this proposal outlines a much better and more lightweight way to version our data.

I’ll be following this with keen interest and would love to have an idea on timelines and progress.

@martindurant
Copy link
Member

@NathanCummings : lakeFS versions individual files within a set, I think, which would make it hard to use for this kind of thing

@NathanCummings
Copy link

Snapshots, Commits, and Time Travel
Iceberg tables are updated by simply adding new files and then updating the manifest list to point to the new files. The new state is registered via an atomic commit operation (familiar to git users). A snapshot points to a particular set of files. We could imagine doing this with zarr chunks, e.g. using the new V3 layout

data/root/foo/baz/c1/0_v0
data/root/foo/baz/c1/0_v1
where _v0 and _v1 are different versions of the same chunk. We could check out different versions of the array corresponding to different commits, preserving the whole history without having to duplicate unnecessary data. @jakirkham and I discussed this idea at scipy. (TileDB has a similar feature.)

Applying the same concept to metadata documents would allow us to evolve array or group schemas and metadata while preserving previous states.

@martindurant if something like this were implemented, it would suit our needs nicely.

@martindurant
Copy link
Member

Absolutely, version control as described in this thread sounds like what you might want (cc @rabernat , if you want to comment). tiledb has yet another take on amending array data in-place.

@NathanCummings
Copy link

Yes, tiledb is another option that we considered before deciding to use zarr. It’s still something I’m keeping in the back of my mind.

@rabernat
Copy link
Contributor Author

Hi @NathanCummings! We currently support all of these features in Zarr via our Arraylake platform. We are currently working on open sourcing some of these features, along the lines of the original proposal. Not able to share much more publicly at this point, but feel free to reach out ([email protected]) for an update.

@yarikoptic
Copy link

Continuing on the #314 (comment) by @rabernat since more relevant here

FYI, we are planning on open sourcing the solution we have built at Earthmover later this fall.

any specifics on what components would be open-sourced/what they would allow to do?

@martindurant
Copy link
Member

I'm sure the community would be very excited to hear about what's to come from Earthmover, and interest could be stirred by such specifics; but it's hard for developers (not me!) to reveal too much, especially if licenses have not yet been updated in all the places. "later this fall" sounds pretty soon to me, it being already mid-September.

@rabernat
Copy link
Contributor Author

Our Arraylake platform already offers transactions, serializable isolation, and version control for Zarr data, with the feature set described here (and more technical details here). The way this works today requires both object storage AND interaction with our REST API. With our forthcoming open source release, we have figured out how to do with without the REST API, using only object storage.

Happy to chat in person if you're interested in hearing more details: [email protected].

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
protocol-extension Protocol extension related issue
Projects
None yet
Development

No branches or pull requests