Skip to content

Commit

Permalink
changefeedccl: Expire protected timestamps
Browse files Browse the repository at this point in the history
Changefeeds utilize protected timestamp system (PTS)
to ensure that the data targeted by changefeed is not
garbage collected prematurely.  PTS record is managed
by running changefeed by periodically updating
PTS record timestamp, so that the data older than
the that timestamp may be GCed.  However, if the
changefeed stops running when it is paused (either due
to operator action, or due to `on_error=pause` option,
the PTS record remains so that the changefeed can
be resumed at a later time. However, it is also possible
that operator may not notice that the job is paused for
too long, thus causing buildup of garbage data.

Excessive buildup of GC work is not great since it
impacts overall cluster performance, and, once GC can resume,
its cost is proportional to how much GC work needs to be done.
This PR introduces a new setting
`changefeed.protect_timestamp.expire_after` to automatically
expire PTS records that are too old.
This automatic expiration is a safety mechanism and the setting
default is rather concervative.  The operator is still expected
to monitor changefeed jobs, and to restart paused changefeeds
expediently.  If the changefeed job remains paused, and the
underlying PTS records is removed due to expiration, then the
changefeed job will fail when it is restarted.  This failure
is preferable to unbounded buildup of garbage data in the cluster.

Epic: CRDB-21953
This PR does not add expiration to the job itself, as requested
by #84598, but it does accomplish the same goal.

Release note (enterprise change): Changefeed will automatically
expire PTS records after `changefeed.protect_timestamp.expire_after` so
that paused changefeed jobs do not cause GC data buildup in the cluster.
  • Loading branch information
Yevgeniy Miretskiy committed Feb 16, 2023
1 parent 0082117 commit 12ac8d7
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consu
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled
changefeed.fast_gzip.enabled boolean true use fast gzip implementation
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
changefeed.protect_timestamp.expire_after duration 72h0m0s automatically expire protected timestamp older than this threshold. 0 disables expiration. Note: this setting should not be too small; it should be larger than changefeed.protect_timestamp_interval
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables
cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload
cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td></tr>
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td></tr>
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td></tr>
<tr><td><div id="setting-changefeed-protect-timestamp-expire-after" class="anchored"><code>changefeed.protect_timestamp.expire_after</code></div></td><td>duration</td><td><code>72h0m0s</code></td><td>automatically expire protected timestamp older than this threshold. 0 disables expiration. Note: this setting should not be too small; it should be larger than changefeed.protect_timestamp_interval</td></tr>
<tr><td><div id="setting-changefeed-schema-feed-read-with-priority-after" class="anchored"><code>changefeed.schema_feed.read_with_priority_after</code></div></td><td>duration</td><td><code>1m0s</code></td><td>retry with high priority if we were not able to read descriptors for too long; 0 disables</td></tr>
<tr><td><div id="setting-cloudstorage-azure-concurrent-upload-buffers" class="anchored"><code>cloudstorage.azure.concurrent_upload_buffers</code></div></td><td>integer</td><td><code>1</code></td><td>controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload</td></tr>
<tr><td><div id="setting-cloudstorage-http-custom-ca" class="anchored"><code>cloudstorage.http.custom_ca</code></div></td><td>string</td><td><code></code></td><td>custom root CA (appended to system&#39;s default CAs) for verifying certificates when interacting with HTTPS storage</td></tr>
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package changefeedccl
import (
"context"
"encoding/json"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/cloud"
Expand Down Expand Up @@ -115,11 +116,13 @@ func createProtectedTimestampRecord(
jobID jobspb.JobID,
targets changefeedbase.Targets,
resolved hlc.Timestamp,
expiration time.Duration,
progress *jobspb.ChangefeedProgress,
) *ptpb.Record {
progress.ProtectedTimestampRecord = uuid.MakeV4()
deprecatedSpansToProtect := makeSpansToProtect(codec, targets)
targetToProtect := makeTargetToProtect(targets)
targetToProtect.Expiration = expiration

log.VEventf(ctx, 2, "creating protected timestamp %v at %v", progress.ProtectedTimestampRecord, resolved)
return jobsprotectedts.MakeRecord(
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1368,8 +1368,11 @@ func (cf *changeFrontier) manageProtectedTimestamps(
}

recordID := progress.ProtectedTimestampRecord
expiration := changefeedbase.PTSExpiresAfter.Get(&cf.flowCtx.Cfg.Settings.SV)
if recordID == uuid.Nil {
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress)
ptr := createProtectedTimestampRecord(
ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, expiration, progress,
)
if err := pts.Protect(ctx, ptr); err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func changefeedPlanHook(
jobID,
AllTargets(details),
details.StatementTime,
changefeedbase.PTSExpiresAfter.Get(&p.ExecCfg().Settings.SV),
progress.GetChangefeed(),
)

Expand Down Expand Up @@ -1206,7 +1207,10 @@ func (b *changefeedResumer) OnPauseRequest(
return nil
}
pts := execCfg.ProtectedTimestampProvider.WithTxn(txn)
ptr := createProtectedTimestampRecord(ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp)
ptr := createProtectedTimestampRecord(
ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved,
changefeedbase.PTSExpiresAfter.Get(&execCfg.Settings.SV), cp,
)
return pts.Protect(ctx, ptr)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5411,6 +5411,7 @@ func TestChangefeedProtectedTimestampOnPause(t *testing.T) {
require.NotEqual(t, uuid.Nil, details.ProtectedTimestampRecord)
r, err := pts.GetRecord(ctx, details.ProtectedTimestampRecord)
require.NoError(t, err)
require.Equal(t, changefeedbase.PTSExpiresAfter.Get(&serverCfg.Settings.SV), r.Target.Expiration)
require.True(t, r.Timestamp.LessEq(*progress.GetHighWater()))
} else {
require.Equal(t, uuid.Nil, details.ProtectedTimestampRecord)
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ var ProtectTimestampInterval = settings.RegisterDurationSetting(
settings.PositiveDuration,
)

// PTSExpiresAfter sets the expiration for the protected timestamp records.
var PTSExpiresAfter = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.protect_timestamp.expire_after",
"automatically expire protected timestamp older than this threshold. 0 disables expiration. "+
"Note: this setting should not be too small; it should be larger than changefeed.protect_timestamp_interval",
72*time.Hour, // a very generous default in case job is paused in error.
settings.NonNegativeDuration,
).WithPublic()

// BatchReductionRetryEnabled enables the temporary reduction of batch sizes upon kafka message too large errors
var BatchReductionRetryEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
Expand Down

0 comments on commit 12ac8d7

Please sign in to comment.