Skip to content

Commit

Permalink
Merge branch 'main' into skyzh/fix-lru-cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jul 15, 2022
2 parents fc36989 + 487ef80 commit 85ce7b6
Show file tree
Hide file tree
Showing 12 changed files with 325 additions and 44 deletions.
8 changes: 8 additions & 0 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ impl Catalog {
.drop_table(tb_id);
}

pub fn update_table(&mut self, proto: &ProstTable) {
self.get_database_mut(proto.database_id)
.unwrap()
.get_schema_mut(proto.schema_id)
.unwrap()
.update_table(proto);
}

pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
self.get_database_mut(db_id)
.unwrap()
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ impl SchemaCatalog {
.unwrap();
}

pub fn update_table(&mut self, prost: &ProstTable) {
let name = prost.name.clone();
let id = prost.id.into();
let table: TableCatalog = prost.into();

self.table_by_name.insert(name.clone(), table);
self.table_name_by_id.insert(id, name);
}

pub fn drop_table(&mut self, id: TableId) {
let name = self.table_name_by_id.remove(&id).unwrap();
self.table_by_name.remove(&name).unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl FrontendObserverNode {
Operation::Delete => {
catalog_guard.drop_table(table.database_id, table.schema_id, table.id.into())
}
Operation::Update => catalog_guard.update_table(table),
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::Source(source) => match resp.operation() {
Expand Down
6 changes: 6 additions & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ pub struct GlobalBarrierManager<S: MetaStore> {
/// Enable recovery or not when failover.
enable_recovery: bool,

/// Enable migrate expired actors to newly joined node
enable_migrate: bool,

/// The queue of scheduled barriers.
scheduled_barriers: ScheduledBarriers,

Expand Down Expand Up @@ -341,6 +344,7 @@ where
metrics: Arc<MetaMetrics>,
) -> Self {
let enable_recovery = env.opts.enable_recovery;
let enable_migrate = env.opts.enable_migrate;
let interval = env.opts.checkpoint_interval;
let in_flight_barrier_nums = env.opts.in_flight_barrier_nums;
tracing::info!(
Expand All @@ -353,6 +357,7 @@ where
Self {
interval,
enable_recovery,
enable_migrate,
cluster_manager,
catalog_manager,
fragment_manager,
Expand Down Expand Up @@ -697,6 +702,7 @@ where
node.command_ctx.prev_epoch.0,
err
);
return Err(err.clone());
}
};
}
Expand Down
154 changes: 125 additions & 29 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::iter::Map;
use std::sync::Arc;
use std::time::Duration;

use futures::future::try_join_all;
use itertools::Itertools;
use log::{debug, error};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::{ParallelUnitId, VIRTUAL_NODE_COUNT};
use risingwave_common::util::compress::decompress_data;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::common::worker_node::State;
use risingwave_pb::common::{ActorInfo, WorkerNode, WorkerType};
use risingwave_pb::data::Epoch as ProstEpoch;
use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
use risingwave_pb::stream_service::{
Expand All @@ -34,6 +38,7 @@ use uuid::Uuid;
use crate::barrier::command::CommandContext;
use crate::barrier::info::BarrierActorInfo;
use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager};
use crate::cluster::WorkerId;
use crate::model::ActorId;
use crate::storage::MetaStore;

Expand Down Expand Up @@ -64,11 +69,23 @@ where
debug!("recovery start!");
let retry_strategy = Self::get_retry_strategy();
let (new_epoch, responses) = tokio_retry::Retry::spawn(retry_strategy, || async {
let info = self.resolve_actor_info(&CheckpointControl::new()).await;
let mut info = self.resolve_actor_info(&CheckpointControl::new()).await;
let mut new_epoch = prev_epoch.next();

if self.enable_migrate {
// Migrate expired actors to newly joined node by changing actor_map
self.migrate_actors(&info).await?;
info = self.resolve_actor_info(&CheckpointControl::new()).await;
}

// Reset all compute nodes, stop and drop existing actors.
self.reset_compute_nodes(&info, &prev_epoch, &new_epoch)
.await;
if let Err(err) = self
.reset_compute_nodes(&info, &prev_epoch, &new_epoch)
.await
{
error!("reset compute nodes failed: {}", err);
return Err(err);
}

// Refresh sources in local source manger of compute node.
if let Err(err) = self.sync_sources(&info).await {
Expand Down Expand Up @@ -132,6 +149,93 @@ where
)
}

/// map expired CNs to newly joined CNs, so we can migrate actors later
/// wait until get a sufficient amount of new CNs
/// return "map of `parallelUnitId` in expired CN to new CN id" and "map of `WorkerId` to
/// `WorkerNode` struct in new CNs"
async fn get_migrate_map_plan(
&self,
info: &BarrierActorInfo,
expired_workers: &Vec<WorkerId>,
) -> (
HashMap<ParallelUnitId, WorkerId>,
HashMap<WorkerId, WorkerNode>,
) {
let workers_size = expired_workers.len();
let mut cur = 0;
let mut migrate_map = HashMap::new();
let mut node_map = HashMap::new();
while cur < workers_size {
let current_nodes = self
.cluster_manager
.list_worker_node(WorkerType::ComputeNode, Some(State::Running))
.await;
let new_nodes = current_nodes
.iter()
.filter(|&node| {
!info.node_map.contains_key(&node.id) && !node_map.contains_key(&node.id)
})
.collect_vec();
for new_node in new_nodes {
let actors = info.actor_map.get(&expired_workers[cur]).unwrap();
let actors_len = actors.len();
for actor in actors.iter().take(actors_len) {
migrate_map.insert(*actor, new_node.id);
}
node_map.insert(new_node.id, new_node.clone());
cur += 1;
debug!(
"got new worker {} , migrate process ({}/{})",
new_node.id, cur, workers_size
);
}
// wait to get newly joined CN
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
(migrate_map, node_map)
}

async fn migrate_actors(&self, info: &BarrierActorInfo) -> Result<()> {
debug!("start migrate actors.");
// get expired workers
let expired_workers = info
.actor_map
.iter()
.filter(|(&worker, actors)| !actors.is_empty() && !info.node_map.contains_key(&worker))
.map(|(&worker, _)| worker)
.collect_vec();
if expired_workers.is_empty() {
debug!("no expired workers, skipping.");
return Ok(());
}
debug!("got expired workers {:#?}", expired_workers);
let (migrate_map, node_map) = self.get_migrate_map_plan(info, &expired_workers).await;
// migrate actors in fragments, return updated fragments and pu to pu migrate plan
let (new_fragments, migrate_map) = self
.fragment_manager
.migrate_actors(&migrate_map, &node_map)
.await?;
debug!("got parallel unit migrate plan {:#?}", migrate_map);
// update mapping in table and notify frontends
let res = self
.catalog_manager
.update_table_mapping(&new_fragments, &migrate_map)
.await;
// update hash mapping
for fragments in new_fragments {
for (fragment_id, fragment) in fragments.fragments {
let mapping = fragment.vnode_mapping.as_ref().unwrap();
let vnode_mapping = decompress_data(&mapping.original_indices, &mapping.data);
assert_eq!(vnode_mapping.len(), VIRTUAL_NODE_COUNT);
self.env
.hash_mapping_manager()
.set_fragment_hash_mapping(fragment_id, vnode_mapping);
}
}
debug!("migrate actors succeed.");
res
}

/// Sync all sources in compute nodes, the local source manager in compute nodes may be dirty
/// already.
async fn sync_sources(&self, info: &BarrierActorInfo) -> Result<()> {
Expand Down Expand Up @@ -228,34 +332,26 @@ where
info: &BarrierActorInfo,
prev_epoch: &Epoch,
new_epoch: &Epoch,
) {
let futures = info.node_map.iter().map(|(_, worker_node)| {
let retry_strategy = Self::get_retry_strategy();

async move {
tokio_retry::Retry::spawn(retry_strategy, || async {
let client = self.env.stream_client_pool().get(worker_node).await?;
debug!("force stop actors: {}", worker_node.id);
client
.to_owned()
.force_stop_actors(ForceStopActorsRequest {
request_id: Uuid::new_v4().to_string(),
epoch: Some(ProstEpoch {
curr: new_epoch.0,
prev: prev_epoch.0,
}),
})
.await
.map_err(RwError::from)
) -> Result<()> {
let futures = info.node_map.iter().map(|(_, worker_node)| async move {
let client = self.env.stream_client_pool().get(worker_node).await?;
debug!("force stop actors: {}", worker_node.id);
client
.to_owned()
.force_stop_actors(ForceStopActorsRequest {
request_id: Uuid::new_v4().to_string(),
epoch: Some(ProstEpoch {
curr: new_epoch.0,
prev: prev_epoch.0,
}),
})
.await
.expect("Force stop actors until success");

Ok::<_, RwError>(())
}
.map_err(RwError::from)
});

try_join_all(futures).await.unwrap();
try_join_all(futures).await?;
debug!("all compute nodes have been reset.");

Ok(())
}
}
1 change: 1 addition & 0 deletions src/meta/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::model::{MetadataModel, Worker, INVALID_EXPIRE_AT};
use crate::storage::MetaStore;

pub type WorkerId = u32;
pub type ParallelId = u32;
pub type WorkerLocations = HashMap<WorkerId, WorkerNode>;
pub type ClusterManagerRef<S> = Arc<ClusterManager<S>>;

Expand Down
5 changes: 5 additions & 0 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ pub struct MetaNodeOpts {
#[clap(long)]
disable_recovery: bool,

/// enable migrate actors when recovery, disable by default.
#[clap(long)]
enable_migrate: bool,

#[clap(long, default_value = "10")]
meta_leader_lease_secs: u64,

Expand Down Expand Up @@ -164,6 +168,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
opts.meta_leader_lease_secs,
MetaOpts {
enable_recovery: !opts.disable_recovery,
enable_migrate: opts.enable_migrate,
checkpoint_interval,
max_idle_ms,
in_flight_barrier_nums,
Expand Down
47 changes: 46 additions & 1 deletion src/meta/src/manager/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ use risingwave_common::catalog::{
use risingwave_common::ensure;
use risingwave_common::error::ErrorCode::{CatalogError, InternalError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::ParallelUnitId;
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{Database, Schema, Source, Table};
use risingwave_pb::common::ParallelUnit;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use tokio::sync::{Mutex, MutexGuard};

use super::IdCategory;
use crate::manager::{MetaSrvEnv, NotificationVersion};
use crate::model::{MetadataModel, Transactional};
use crate::model::{MetadataModel, TableFragments, Transactional};
use crate::storage::{MetaStore, Transaction};

pub type DatabaseId = u32;
Expand Down Expand Up @@ -434,6 +436,49 @@ where
}
}

pub async fn update_table_mapping(
&self,
fragments: &Vec<TableFragments>,
migrate_map: &HashMap<ParallelUnitId, ParallelUnit>,
) -> Result<()> {
let mut core = self.core.lock().await;
let mut transaction = Transaction::default();
let mut tables = Vec::new();
for fragment in fragments {
let table_id = fragment.table_id().table_id();
let internal_tables = fragment.internal_table_ids();
let mut table_to_updates = vec![table_id];
table_to_updates.extend(internal_tables);
for table_id in table_to_updates {
let table = Table::select(self.env.meta_store(), &table_id).await?;
if let Some(mut table) = table {
if let Some(ref mut mapping) = table.mapping {
let mut migrated = false;
mapping.data.iter_mut().for_each(|id| {
if migrate_map.contains_key(id) {
migrated = true;
*id = migrate_map.get(id).unwrap().id;
}
});
if migrated {
table.upsert_in_transaction(&mut transaction)?;
tables.push(table);
}
}
}
}
}
core.env.meta_store().txn(transaction).await?;
for table in &tables {
self.env
.notification_manager()
.notify_frontend(Operation::Update, Info::Table(table.to_owned()))
.await;
core.add_table(table);
}
Ok(())
}

pub async fn drop_source(&self, source_id: SourceId) -> Result<NotificationVersion> {
let mut core = self.core.lock().await;
let source = Source::select(self.env.meta_store(), &source_id).await?;
Expand Down
5 changes: 4 additions & 1 deletion src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ where
/// Options shared by all meta service instances
pub struct MetaOpts {
pub enable_recovery: bool,
pub enable_migrate: bool,
pub checkpoint_interval: Duration,

/// After specified seconds of idle (no mview or flush), the process will be exited.
Expand All @@ -84,6 +85,7 @@ impl Default for MetaOpts {
fn default() -> Self {
Self {
enable_recovery: false,
enable_migrate: false,
checkpoint_interval: Duration::from_millis(250),
max_idle_ms: 0,
in_flight_barrier_nums: 40,
Expand All @@ -94,9 +96,10 @@ impl Default for MetaOpts {

impl MetaOpts {
/// some test need `enable_recovery=true`
pub fn test(enable_recovery: bool) -> Self {
pub fn test(enable_recovery: bool, enable_migrate: bool) -> Self {
Self {
enable_recovery,
enable_migrate,
checkpoint_interval: Duration::from_millis(250),
max_idle_ms: 0,
in_flight_barrier_nums: 40,
Expand Down
Loading

0 comments on commit 85ce7b6

Please sign in to comment.