Skip to content

Commit

Permalink
cleanup some things
Browse files Browse the repository at this point in the history
  • Loading branch information
lluki committed Jul 7, 2022
1 parent a7bdedd commit c010b71
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 36 deletions.
71 changes: 43 additions & 28 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2683,35 +2683,45 @@ impl<S: Append + 'static> Coordinator<S> {
session: &Session,
CreateComputeInstancePlan {
name,
config,
config: compute_instance_config,
replicas,
}: CreateComputeInstancePlan,
) -> Result<ExecuteResponse, AdapterError> {
let introspection_sources = if config.is_some() {
let introspection_sources = if compute_instance_config.is_some() {
self.catalog.allocate_introspection_source_indexes().await
} else {
Vec::new()
};
let mut ops = vec![catalog::Op::CreateComputeInstance {
name: name.clone(),
config: config.clone(),
config: compute_instance_config.clone(),
introspection_sources,
}];

let mut source_ids = Vec::new();
let mut source_bindings = Vec::new();
for (replica_name, config) in replicas {
let logical_size = match &config {
// This vector collects introspection sources of all replicas of this compute instance
let mut introspection_collections = Vec::new();

for (replica_name, replica_config) in replicas {
let logical_size = match &replica_config {
ReplicaConfig::Managed { size, .. } => Some(size.clone()),
ReplicaConfig::Remote { .. } => None,
};
let config =
concretize_replica_config(config, &self.replica_sizes, &self.availability_zones)?;
let log_collections: Vec<(_, _)> =
self.catalog.allocate_introspection_source_indexes().await;
let replica_config = concretize_replica_config(
replica_config,
&self.replica_sizes,
&self.availability_zones,
)?;

source_ids.extend(log_collections.iter().map(|(_, id)| *id));
source_bindings.extend(log_collections.iter().map(|(log, id)| {
// These are the persisted, per replica log collections
let enable_experimental_recorded_logs = false;
let log_collections =
if compute_instance_config.is_some() && enable_experimental_recorded_logs {
self.catalog.allocate_introspection_source_indexes().await
} else {
Vec::new()
};

introspection_collections.extend(log_collections.iter().map(|(log, id)| {
(
*id,
CollectionDescription {
Expand All @@ -2724,7 +2734,7 @@ impl<S: Append + 'static> Coordinator<S> {

ops.push(catalog::Op::CreateComputeInstanceReplica {
name: replica_name,
config,
config: replica_config,
on_cluster_name: name.clone(),
logical_size,
log_collections,
Expand All @@ -2734,28 +2744,23 @@ impl<S: Append + 'static> Coordinator<S> {
self.catalog_transact(Some(session), ops, |_| Ok(()))
.await?;

let introspection_collection_ids = introspection_collections
.iter()
.map(|(id, _)| *id)
.collect();

self.dataflow_client
.storage_mut()
.create_collections(source_bindings)
.create_collections(introspection_collections)
.await
.unwrap();

// // TODO(lh): This seems to be the cause for not reading correctly
// let policy = ReadPolicy::ValidFrom(Antichain::from_elem(self.get_local_write_ts()));
// let policy = ReadPolicy::ValidFrom(Antichain::from_elem(self.get_local_read_ts()));
// self.dataflow_client
// .storage_mut()
// .set_read_policy(source_ids.iter().map(|&id| (id, policy.clone())).collect())
// .await
// .unwrap();

self.initialize_storage_read_policies(source_ids, self.logical_compaction_window_ms)
.await;

let instance = self
.catalog
.resolve_compute_instance(&name)
.expect("compute instance must exist after creation");
.expect("compute instance must exist after creation")
.clone();

self.dataflow_client
.create_instance(instance.id, instance.logging.clone())
.await
Expand All @@ -2771,6 +2776,16 @@ impl<S: Append + 'static> Coordinator<S> {
.await
.unwrap();
}

if let Some(c) = compute_instance_config {
self.initialize_compute_read_policies(
introspection_collection_ids,
instance.id(),
Some(c.granularity.as_millis() as u64),
)
.await;
}

Ok(ExecuteResponse::CreatedComputeInstance { existed: false })
}

Expand Down
1 change: 0 additions & 1 deletion src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ where
}

/// Adds a new instance replica, by name.

pub fn add_replica<C>(
&mut self,
id: ReplicaId,
Expand Down
9 changes: 2 additions & 7 deletions src/compute-client/src/controller/replicated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,14 +744,9 @@ fn specialize_command<T>(
if let ComputeCommand::CreateInstance(config) = command {
// Set sink_logs
if let Some(logging) = &mut config.logging {
// TODO(LH): Uncomment this to enable writing logging sources to persist.
// Blocked on #11491 (persist compaction).
if false {
logging.sink_logs = replica.log_collections.clone();
logging.log_logging = true; // HACK TODO(lh): Remove this!
}
logging.sink_logs = replica.log_collections.clone();
tracing::debug!(
"Sending logging sinks to replica {:?}: {:?}",
"Enabling sink_logs at replica {:?}: {:?}",
replica_id,
&logging.sink_logs
);
Expand Down

0 comments on commit c010b71

Please sign in to comment.