From 6e3ee6ecbbba12364847b56bb516922d9fd2f455 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 20 Sep 2024 11:05:59 +0100 Subject: [PATCH 1/4] pageserver: non-zero shards learn GC offset from shard zero --- .../src/tenant/remote_timeline_client.rs | 23 ++++ pageserver/src/tenant/timeline.rs | 119 ++++++++++++------ 2 files changed, 107 insertions(+), 35 deletions(-) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 1f9ae40af534..aee05e9e89e5 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -196,6 +196,7 @@ use utils::backoff::{ self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; use utils::pausable_failpoint; +use utils::shard::ShardNumber; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicU32, Ordering}; @@ -2145,6 +2146,28 @@ impl RemoteTimelineClient { inner.initialized_mut()?; Ok(UploadQueueAccessor { inner }) } + + /// 'foreign' in the sense that it does not belong to this tenant shard. This method + /// is used during GC for other shards to get the index of shard zero. + pub(crate) async fn download_foreign_index( + &self, + shard_number: ShardNumber, + cancel: &CancellationToken, + ) -> Result<(IndexPart, Generation), DownloadError> { + let foreign_shard_id = TenantShardId { + shard_number, + shard_count: self.tenant_shard_id.shard_count, + tenant_id: self.tenant_shard_id.tenant_id, + }; + download_index_part( + &self.storage_impl, + &foreign_shard_id, + &self.timeline_id, + Generation::MAX, + cancel, + ) + .await + } } pub(crate) struct UploadQueueAccessor<'a> { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f08f5caf95f7..f5f51daf97e7 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -35,6 +35,7 @@ use pageserver_api::{ shard::{ShardIdentity, ShardNumber, TenantShardId}, }; use rand::Rng; +use remote_storage::DownloadError; use serde_with::serde_as; use storage_broker::BrokerClientChannel; use tokio::{ @@ -136,7 +137,8 @@ use self::logical_size::LogicalSize; use self::walreceiver::{WalReceiver, WalReceiverConf}; use super::{ - config::TenantConf, storage_layer::inmemory_layer, storage_layer::LayerVisibilityHint, + config::TenantConf, + storage_layer::{inmemory_layer, LayerVisibilityHint}, upload_queue::NotInitialized, }; use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf}; @@ -4673,6 +4675,86 @@ impl Timeline { Ok(()) } + async fn find_gc_time_cutoff( + &self, + pitr: Duration, + cancel: &CancellationToken, + ctx: &RequestContext, + ) -> Result, PageReconstructError> { + debug_assert_current_span_has_tenant_and_timeline_id(); + if self.shard_identity.is_shard_zero() { + // Shard Zero has SLRU data and can calculate the PITR time -> LSN mapping itself + let now = SystemTime::now(); + let time_range = if pitr == Duration::ZERO { + humantime::parse_duration(DEFAULT_PITR_INTERVAL).expect("constant is invalid") + } else { + pitr + }; + + // If PITR is so large or `now` is so small that this underflows, we will retain no history (highly unexpected case) + let time_cutoff = now.checked_sub(time_range).unwrap_or(now); + let timestamp = to_pg_timestamp(time_cutoff); + + let time_cutoff = match self.find_lsn_for_timestamp(timestamp, cancel, ctx).await? { + LsnForTimestamp::Present(lsn) => Some(lsn), + LsnForTimestamp::Future(lsn) => { + // The timestamp is in the future. That sounds impossible, + // but what it really means is that there hasn't been + // any commits since the cutoff timestamp. + // + // In this case we should use the LSN of the most recent commit, + // which is implicitly the last LSN in the log. + debug!("future({})", lsn); + Some(self.get_last_record_lsn()) + } + LsnForTimestamp::Past(lsn) => { + debug!("past({})", lsn); + None + } + LsnForTimestamp::NoData(lsn) => { + debug!("nodata({})", lsn); + None + } + }; + Ok(time_cutoff) + } else { + // Shards other than shard zero cannot do timestamp->lsn lookups, and must instead learn their GC cutoff + // from shard zero's index. The index doesn't explicitly tell us the time cutoff, but we may assume that + // the point up to which shard zero's last_gc_cutoff has advanced will either be the time cutoff, or a + // space cutoff that we would also have respected ourselves. + match self + .remote_client + .download_foreign_index(ShardNumber(0), &cancel) + .await + { + Ok((index_part, index_generation)) => { + tracing::info!("GC loaded shard zero metadata (gen {index_generation:?}): latest_gc_cutoff_lsn: {}", + index_part.metadata.latest_gc_cutoff_lsn()); + Ok(Some(index_part.metadata.latest_gc_cutoff_lsn())) + } + Err(DownloadError::NotFound) => { + // This is unexpected, because during timeline creations shard zero persists to remote + // storage before other shards are called, and during timeline deletion non-zeroth shards are + // deleted before the zeroth one. However, it should be harmless: if we somehow end up in this + // state, then shard zero should _eventually_ write an index when it GCs. + tracing::warn!("GC couldn't find shard zero's index for timeline"); + Ok(None) + } + Err(e) => { + // TODO: this function should return a different error type than page reconstruct error + Err(PageReconstructError::Other(anyhow::anyhow!(e))) + } + } + + // TODO: after reading shard zero's GC cutoff, we should validate its generation with the storage + // controller. Otherwise, it is possible that we see the GC cutoff go backwards while shard zero + // is going through a migration if we read the old location's index and it has GC'd ahead of the + // new location. This is legal in principle, but problematic in practice because it might result + // in a timeline creation succeeding on shard zero ('s new location) but then failing on other shards + // because they have GC'd past the branch point. + } + } + /// Find the Lsns above which layer files need to be retained on /// garbage collection. /// @@ -4715,40 +4797,7 @@ impl Timeline { // - if PITR interval is set, then this is our cutoff. // - if PITR interval is not set, then we do a lookup // based on DEFAULT_PITR_INTERVAL, so that size-based retention does not result in keeping history around permanently on idle databases. - let time_cutoff = { - let now = SystemTime::now(); - let time_range = if pitr == Duration::ZERO { - humantime::parse_duration(DEFAULT_PITR_INTERVAL).expect("constant is invalid") - } else { - pitr - }; - - // If PITR is so large or `now` is so small that this underflows, we will retain no history (highly unexpected case) - let time_cutoff = now.checked_sub(time_range).unwrap_or(now); - let timestamp = to_pg_timestamp(time_cutoff); - - match self.find_lsn_for_timestamp(timestamp, cancel, ctx).await? { - LsnForTimestamp::Present(lsn) => Some(lsn), - LsnForTimestamp::Future(lsn) => { - // The timestamp is in the future. That sounds impossible, - // but what it really means is that there hasn't been - // any commits since the cutoff timestamp. - // - // In this case we should use the LSN of the most recent commit, - // which is implicitly the last LSN in the log. - debug!("future({})", lsn); - Some(self.get_last_record_lsn()) - } - LsnForTimestamp::Past(lsn) => { - debug!("past({})", lsn); - None - } - LsnForTimestamp::NoData(lsn) => { - debug!("nodata({})", lsn); - None - } - } - }; + let time_cutoff = self.find_gc_time_cutoff(pitr, cancel, ctx).await?; Ok(match (pitr, time_cutoff) { (Duration::ZERO, Some(time_cutoff)) => { From c4949578fb95ff9c4bbd49c6a200a664fbc24b3f Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 20 Sep 2024 11:26:45 +0100 Subject: [PATCH 2/4] pageserver: don't ingest checkpoint page on shards >0 --- pageserver/src/walingest.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 229c01a6817e..0814f782e4a4 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -143,7 +143,7 @@ impl WalIngest { let mut buf = decoded.record.clone(); buf.advance(decoded.main_data_offset); - assert!(!self.checkpoint_modified); + assert!(!self.checkpoint_modified || !self.shard.is_shard_zero()); if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID && self.checkpoint.update_next_xid(decoded.xl_xid) { @@ -565,8 +565,8 @@ impl WalIngest { .await?; } - // If checkpoint data was updated, store the new version in the repository - if self.checkpoint_modified { + // If checkpoint data was updated, store the new version in the repository (on shard zero only) + if self.checkpoint_modified && self.shard.is_shard_zero() { let new_checkpoint_bytes = self.checkpoint.encode()?; modification.put_checkpoint(new_checkpoint_bytes)?; From 42f5694f058cc0ab6905769dbc7d9b085493f594 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 20 Sep 2024 11:30:35 +0100 Subject: [PATCH 3/4] pageserver: do not ingest SLRUs on shard >0 --- libs/pageserver_api/src/shard.rs | 7 +++- pageserver/src/pgdatadir_mapping.rs | 56 ++++++++++++++++++----------- pageserver/src/walingest.rs | 4 +++ 3 files changed, 46 insertions(+), 21 deletions(-) diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index e83cf4c855a1..eda02c77c31e 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -175,7 +175,12 @@ impl ShardIdentity { /// /// Shards _may_ drop keys which return false here, but are not obliged to. pub fn is_key_disposable(&self, key: &Key) -> bool { - if key_is_shard0(key) { + if self.count < ShardCount(2) { + // Fast path: unsharded tenant doesn't dispose of anything + return false; + } + + if key_is_shard0(key) && key.field1 != 0x01 { // Q: Why can't we dispose of shard0 content if we're not shard 0? // A1: because the WAL ingestion logic currently ingests some shard 0 // content on all shards, even though it's only read on shard 0. If we diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 5f8766ca2c51..07a7d3fcff53 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -336,6 +336,7 @@ impl Timeline { lsn: Lsn, ctx: &RequestContext, ) -> Result { + assert!(self.tenant_shard_id.is_shard_zero()); let n_blocks = self .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx) .await?; @@ -358,6 +359,7 @@ impl Timeline { lsn: Lsn, ctx: &RequestContext, ) -> Result { + assert!(self.tenant_shard_id.is_shard_zero()); let key = slru_block_to_key(kind, segno, blknum); self.get(key, lsn, ctx).await } @@ -370,6 +372,7 @@ impl Timeline { version: Version<'_>, ctx: &RequestContext, ) -> Result { + assert!(self.tenant_shard_id.is_shard_zero()); let key = slru_segment_size_to_key(kind, segno); let mut buf = version.get(self, key, ctx).await?; Ok(buf.get_u32_le()) @@ -383,6 +386,7 @@ impl Timeline { version: Version<'_>, ctx: &RequestContext, ) -> Result { + assert!(self.tenant_shard_id.is_shard_zero()); // fetch directory listing let key = slru_dir_to_key(kind); let buf = version.get(self, key, ctx).await?; @@ -885,26 +889,28 @@ impl Timeline { } // Iterate SLRUs next - for kind in [ - SlruKind::Clog, - SlruKind::MultiXactMembers, - SlruKind::MultiXactOffsets, - ] { - let slrudir_key = slru_dir_to_key(kind); - result.add_key(slrudir_key); - let buf = self.get(slrudir_key, lsn, ctx).await?; - let dir = SlruSegmentDirectory::des(&buf)?; - let mut segments: Vec = dir.segments.iter().cloned().collect(); - segments.sort_unstable(); - for segno in segments { - let segsize_key = slru_segment_size_to_key(kind, segno); - let mut buf = self.get(segsize_key, lsn, ctx).await?; - let segsize = buf.get_u32_le(); - - result.add_range( - slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize), - ); - result.add_key(segsize_key); + if self.tenant_shard_id.is_shard_zero() { + for kind in [ + SlruKind::Clog, + SlruKind::MultiXactMembers, + SlruKind::MultiXactOffsets, + ] { + let slrudir_key = slru_dir_to_key(kind); + result.add_key(slrudir_key); + let buf = self.get(slrudir_key, lsn, ctx).await?; + let dir = SlruSegmentDirectory::des(&buf)?; + let mut segments: Vec = dir.segments.iter().cloned().collect(); + segments.sort_unstable(); + for segno in segments { + let segsize_key = slru_segment_size_to_key(kind, segno); + let mut buf = self.get(segsize_key, lsn, ctx).await?; + let segsize = buf.get_u32_le(); + + result.add_range( + slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize), + ); + result.add_key(segsize_key); + } } } @@ -1208,6 +1214,10 @@ impl<'a> DatadirModification<'a> { blknum: BlockNumber, rec: NeonWalRecord, ) -> anyhow::Result<()> { + if !self.tline.tenant_shard_id.is_shard_zero() { + return Ok(()); + } + self.put( slru_block_to_key(kind, segno, blknum), Value::WalRecord(rec), @@ -1241,6 +1251,9 @@ impl<'a> DatadirModification<'a> { blknum: BlockNumber, img: Bytes, ) -> anyhow::Result<()> { + if !self.tline.tenant_shard_id.is_shard_zero() { + return Ok(()); + } let key = slru_block_to_key(kind, segno, blknum); if !key.is_valid_key_on_write_path() { anyhow::bail!( @@ -1277,6 +1290,9 @@ impl<'a> DatadirModification<'a> { segno: u32, blknum: BlockNumber, ) -> anyhow::Result<()> { + if !self.tline.tenant_shard_id.is_shard_zero() { + return Ok(()); + } let key = slru_block_to_key(kind, segno, blknum); if !key.is_valid_key_on_write_path() { anyhow::bail!( diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 0814f782e4a4..283e559d9add 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1905,6 +1905,10 @@ impl WalIngest { img: Bytes, ctx: &RequestContext, ) -> Result<()> { + if !self.shard.is_shard_zero() { + return Ok(()); + } + self.handle_slru_extend(modification, kind, segno, blknum, ctx) .await?; modification.put_slru_page_image(kind, segno, blknum, img)?; From d86b440dbe3b4f2edc61bed63a7bec598f1d1e1f Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 23 Sep 2024 09:07:19 +0100 Subject: [PATCH 4/4] tests: do GC in pg_regress --- test_runner/regress/test_pg_regress.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/test_runner/regress/test_pg_regress.py b/test_runner/regress/test_pg_regress.py index 45ce5b1c5b86..3eb57519ffa2 100644 --- a/test_runner/regress/test_pg_regress.py +++ b/test_runner/regress/test_pg_regress.py @@ -107,13 +107,17 @@ def post_checks(env: NeonEnv, test_output_dir: Path, db_name: str, endpoint: End check_restored_datadir_content(test_output_dir, env, endpoint, ignored_files=ignored_files) - # Ensure that compaction works, on a timeline containing all the diversity that postgres regression tests create. + # Ensure that compaction/GC works, on a timeline containing all the diversity that postgres regression tests create. # There should have been compactions mid-test as well, this final check is in addition those. for shard, pageserver in tenant_get_shards(env, env.initial_tenant): pageserver.http_client().timeline_checkpoint( shard, env.initial_timeline, force_repartition=True, force_image_layer_creation=True ) + pageserver.http_client().timeline_gc( + shard, env.initial_timeline, None + ) + # Run the main PostgreSQL regression tests, in src/test/regress. #