Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Internal Storage Sink #13236

Closed
wants to merge 15 commits into from
Closed

Conversation

teskje
Copy link
Contributor

@teskje teskje commented Jun 23, 2022

This is a proof-of-concept for MaterializeInc/database-issues#3692. It demonstrates sinking data to a storage collection and reading back from it again.

The poc re-uses the existing "CREATE SINK ... INTO PERSIST" syntax. A real implementation would not do this, but the user interface of storage sinks is still under discussion.

Usage

Create some data to sink:

CREATE TABLE mytable (a int);
INSERT INTO mytable VALUES (1), (2), (3), (4);

Create a storage sink:

CREATE SINK mysink FROM mytable INTO PERSIST;

Read back sinked data:

SELECT * FROM mysink;
--  a
-- ---
--  1
--  2
--  3
--  4
-- (4 rows)

DELETE FROM mytable WHERE a != 1;
INSERT INTO mytable VALUES (5);
SELECT * FROM mysink;
--  a
-- ---
--  1
--  5
-- (2 rows)

Notes

  • The poc re-uses the existing persist sink SQL syntax, but gets rid of the explicit specification of the persist shard location. We can collect this information from the storage controller now.
  • The current code treats persist sinks as both sinks and sources. This makes for some awkward pattern-matching in the coordinator, but it's also somewhat unintuitive to use the name "sink" for a source. @benesch has suggested we reserve "sink" for external sinks that write data to subscribers outside Materialize (like Kafka) and find a different designation for internal sinks (like "recording" or "materialization"). I think we should definitely do that!
  • The current persist_sink used by computed writes a collection of (Option<Row>, Option<Row>) key-value pairs, while storage's persist_source reads a collection of (SourceData, ()). Since both have to be compatible, the poc drops the key and unwraps the value in persist_sink, which is probably not the right thing to do.
  • @benesch raised the question whether storage sinks/recordings/materializations should rather work like indexes in that you don't SELECT them explicitly (as this poc expects) but they get selected by the query planner automatically (https://github.com/MaterializeInc/database-issues/issues/3692).

@@ -36,7 +37,7 @@ where
tokens: &mut std::collections::BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
import_ids: BTreeSet<GlobalId>,
sink_id: GlobalId,
sink: &SinkDesc,
sink: &SinkDesc<CollectionMetadata>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SinkDesc get a "storage metadata type", akin to SourceInstanceDesc. The coordinator deals exclusively with SinkDesc<()>. The controller converts these into SinkDesc<CollectionMetadata> before sending them to the compute instances, by asking the storage controller for the collection information.

let ingestion = IngestionDescription {
id,
desc,
since: Antichain::from_elem(0), // TODO
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably wrong too.

.collect();
self.storage_mut()
.update_write_frontiers(&storage_updates)
.await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tells the storage controller about new uppers for collections targeted by sinks. @aljoscha has proposed that we should instead make the compute controller query the persist shards for their uppers directly.

@lluki
Copy link
Contributor

lluki commented Jun 23, 2022

In your example, after CREATE SINK mysink FROM mytable INTO PERSIST; should show sources list mysink ?

@teskje
Copy link
Contributor Author

teskje commented Jun 23, 2022

I'm not sure! Currently storage sinks don't show in the SHOW SOURCES output because a storage sink is not a source in the catalog, just a source in the dataflow sense. As to whether they should show, that depends on the results of the on-going UI discussions. I think we'll probably introduce a new noun for these kinds of sinks ("recording"?) and with that a new command to show them (SHOW RECORDINGS?), so they'll probably not show up as sources in the end.

If you think about it, storage sinks are similar to tables in that you can both write to and read from them. Tables also don't show up in the SHOW SOURCES output, they are a separate thing.

@teskje teskje closed this Jun 28, 2022
@teskje teskje deleted the internal-sinks-poc branch July 26, 2022 14:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants