-
Notifications
You must be signed in to change notification settings - Fork 466
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Persisting Introspection Dataflows Part 1 #13340
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks mostly fine, but it'd be great to know why we cannot read the data from the persist source. I think we should either have a convincing answer as to what causes the problem or make sure it works.
I left some minor comments. In general, it'd be great to have more doc comments, even if things aren't pub
.
src/compute/src/render/sinks.rs
Outdated
use mz_dataflow_types::client::controller::storage::CollectionMetadata; | ||
use timely::dataflow::Scope; | ||
|
||
use mz_dataflow_types::client::controller::storage::CollectionMetadata; | ||
use mz_dataflow_types::sinks::{SinkConnection, SinkDesc, SinkEnvelope}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stray?
I quickly tried this locally. Two things I noticed:
|
I can't reproduce the crash -- can you? This should never happen as it indicates malformed rows, or the knowledge of row formats being incorrect. |
I'll try this once i merged upstream. it seems the storage controller changed a bit... Default cluster is definetly a TODO! |
I can reproduce it, but sometimes I need to repeat the final |
I can also reproduce the issue (with a couple of tries). Good news is, computed crashes, gets restarted and then the query goes through 😲 |
I forgot to uncomment the code to enable logging to persist! Now it repros nicely. A good candidate to fix before merging :) |
4ac7fd2
to
a7bdedd
Compare
* Refactors the logging dataflows such that they produce an unarranged version * sink_logs instructs the computed to send these unarranged dataflows to the indicated persist shard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this effort! I finished a first round of review. It looks like it's headed in the right direction, but I wanted to use the opportunity to add some comments to improve the quality/reduce the diff size. Specifically, some changes seem to be stray, and while they work, they're not needed.
The only thing I'd change are the changes to reachability.rs
where we now potentially do duplicate work.
src/adapter/src/coord.rs
Outdated
let instance = self.catalog.resolve_compute_instance(&of_cluster)?.clone(); | ||
let replica_id = instance.replica_id_by_name[&name]; | ||
let replica = instance.replicas_by_id[&replica_id].clone(); | ||
|
||
if let Some(c) = &instance.logging { | ||
self.initialize_compute_read_policies( | ||
introspection_collection_ids, | ||
instance.id(), | ||
Some((c.granularity_ns / 1000) as u64), | ||
) | ||
.await; | ||
} | ||
|
||
self.dataflow_client | ||
.add_replica_to_instance(instance.id, replica_id, config) | ||
.add_replica_to_instance( | ||
instance.id, | ||
replica_id, | ||
replica.config, | ||
replica.log_collections, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can avoid the clone
by capturing instance.id()
in a local variable. Also, you're using instance.id()
in the first call and instance.id
in the second, is this intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
Another question, I have a double self.catalog.resolve_compute_instance(&of_cluster)?;
is there a good way to get rid of this? The problem is that self.catalog_transact
needs &mut self
, and i need instance to go out of scope before. So I don't see an obvious way (code coming in a sec)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added some comments, but these are mostly smaller things. I need to look at this again when I'm more awake :/
Co-authored-by: Moritz Hoffmann <[email protected]>
Is there a design justification for this? This is an anti pattern for normal SQL applications, and it would be bad form for a sql database itself to implement this pattern unless there was a very good reason. The replica id should be a column, not a postfix of the name. If the reason is "our current code makes this hard" then we should fix that instead of ship it to users. I think this is one of those things where our non-cloud roots are showing their age and we need a way to fix it. It seems likely that someone on the adapter side might need to work with you to do something here, but as an adapter person I'm not sure what because I don't have a clear understanding of how these introspection sources work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand 100% of the context in which this is used, but I left comments about persist usage, as requested
I do have a larger thought that it'd be nice to stop adding bespoke persist sink operators, we already have 1 fork of it. (e.g. we're about to do some work to increase the performance of persist_sink and it's gonna be a pain to have to keep all these maintained.) better, if we can swing it, is something like the pattern I linked in slack, where we represent as dataflow Collections "this is what I want in persist" and "this is what is in persist" and some common code just does what is necessary to make that happen. unclear to me if it can be used here, but we should definitely think about it. https://materializeinc.slack.com/archives/C03K23ECB8U/p1658248877515799?thread_ts=1658241816.436409&cid=C03K23ECB8U
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like things are coming together, thanks for the progress! I left a few comments, please address them before merging.
The PR currently has tests, but it seems there are no tests that read from the new variants (e.g., mz_arrangement_batch_internal_1
). I think it would be good to have some test that these sources (1) produce data, and (2) while the arranged data still exists, they produce the same data.
active_logs have moved to btreemap, do the same to persisted_logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, considering that you've mentioned that things like cleaning up introspection sources, removing the arrangements, and removing the persist_sink code duplication are TODOs for later.
My comments are mostly nits, but I agree with @antiguru that having tests that actually read from the new introspection sources would be good.
Adressed nits & added a test that diff's to the active logs. Will hit merge+squash if tests are green! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Migration LGTM
This PR adds the first steps for sinking the introspection source to persist. It is an updated version of Jan's PoC PR #13236 .
The introspection sources are exposed in
mz_catalog
with the postfix of the replica. The default cluster creates a replica with one id 1. Thus for example the querywill return the data of the default cluster. Newly created replicas will create the corresponding catalog entries as can be checked with
\dt mz_catalog.*;
in psql.The introspection shards are stored with the replica data in the stash/catalog. Thus on restart the same shards are re-used. Allowing an external reader to obtain an uninterrupted stream of updates. The computed's will remove stale date upon start.
As of now, these sources are kept after a
DROP replica/cluster
, there is an outstanding design discussion on how to handle this case.Motivation
This PR adds a known-desirable feature.
Related to [Epic] Introspection sources/views across replicas #11782 (see TODOs on what's missing)
Testing
Basic testing ("create cluster creates a new log entry") has been added in
test/sqllogictest/cluster_log_sinks.slt
. Existing tests verify the presence of introspection sources for the default cluster.Release notes
This PR includes the following user-facing behavior changes:
SELECT * FROM mz_catalog.mz_dataflow_operators;
the introspection source should be post-fixed with the replica id (such asSELECT * FROM mz_catalog.mz_dataflow_operators_1;
).