Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support auto-compaction with finer granularity #8563

Merged
merged 3 commits into from
Sep 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ var (
)

const (
checkCompactionInterval = 5 * time.Minute
executeCompactionInterval = time.Hour
checkCompactionInterval = 5 * time.Minute
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this const can also be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkCompactionInterval is used Revision compactor.
https://github.com/coreos/etcd/blob/master/compactor/revision.go#L64
I don't think it should be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack


ModePeriodic = "periodic"
ModeRevision = "revision"
Expand All @@ -57,7 +56,7 @@ type RevGetter interface {
Rev() int64
}

func New(mode string, retention int, rg RevGetter, c Compactable) (Compactor, error) {
func New(mode string, retention time.Duration, rg RevGetter, c Compactable) (Compactor, error) {
switch mode {
case ModePeriodic:
return NewPeriodic(retention, rg, c), nil
Expand Down
46 changes: 23 additions & 23 deletions compactor/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
)

// Periodic compacts the log by purging revisions older than
// the configured retention time. Compaction happens hourly.
// the configured retention time.
type Periodic struct {
clock clockwork.Clock
periodInHour int
clock clockwork.Clock
period time.Duration

rg RevGetter
c Compactable
Expand All @@ -38,60 +38,60 @@ type Periodic struct {
ctx context.Context
cancel context.CancelFunc

mu sync.Mutex
// mu protects paused
mu sync.RWMutex
paused bool
}

// NewPeriodic creates a new instance of Periodic compactor that purges
// the log older than h hours.
func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic {
// the log older than h Duration.
func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic {
return &Periodic{
clock: clockwork.NewRealClock(),
periodInHour: h,
rg: rg,
c: c,
clock: clockwork.NewRealClock(),
period: h,
rg: rg,
c: c,
}
}

// periodDivisor divides Periodic.period in into checkCompactInterval duration
const periodDivisor = 10

func (t *Periodic) Run() {
t.ctx, t.cancel = context.WithCancel(context.Background())
t.revs = make([]int64, 0)
clock := t.clock

checkCompactInterval := t.period / time.Duration(periodDivisor)
go func() {
last := clock.Now()
for {
t.revs = append(t.revs, t.rg.Rev())
select {
case <-t.ctx.Done():
return
case <-clock.After(checkCompactionInterval):
case <-clock.After(checkCompactInterval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
if p {
continue
}
}

if clock.Now().Sub(last) < executeCompactionInterval {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we're improving this function. Can move the remaining code in the for loop into the case <-clock.After for readability? It's only ever run as part of that case...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we refactor the code in a future pr?

if clock.Now().Sub(last) < t.period {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify the timer logic a bit here? I found the logic a bit difficult to follow with the case <-clock.After(checkCompactionInterval): select case and the periodDivisor combined together.

Could we instead do something like

select {
...
case <-clock.After(t.period - clock.Now().Sub(last)):
  ...

and then eliminate this if clock.Now().Sub(last) < t.period { check and the periodDivisor?

(let me know if I'm missing an important subtlety to how the timers are being use here...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpbetz I am trying to minimize any change. we can refactor the code in a future pr if you want.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

continue
}

rev, remaining := t.getRev(t.periodInHour)
rev, remaining := t.getRev()
if rev < 0 {
continue
}

plog.Noticef("Starting auto-compaction at revision %d (retention: %d hours)", rev, t.periodInHour)
plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
t.revs = remaining
last = clock.Now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we delete this line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good a catch. I don't think this is meant to be deleted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it actually causes a bug as #9443

Copy link
Member Author

@fanminshi fanminshi Mar 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I dig into on why last = clock.Now() is deleted again. It is explained:

I change the compaction frequency to every checkCompactionInterval when T > retentionPeriod.
#8563 (comment)

I think that's cause.

plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
plog.Noticef("Retry after %v", checkCompactionInterval)
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
plog.Noticef("Retry after %v", checkCompactInterval)
}
}
}()
Expand All @@ -113,8 +113,8 @@ func (t *Periodic) Resume() {
t.paused = false
}

func (t *Periodic) getRev(h int) (int64, []int64) {
i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
func (t *Periodic) getRev() (int64, []int64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass in n? probably we do not need the global defined const n.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the n can be used in testing. that's why I had a const n.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah. ok. makes sense.

i := len(t.revs) - periodDivisor
if i < 0 {
return -1, t.revs
}
Expand Down
53 changes: 26 additions & 27 deletions compactor/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,36 @@ import (

func TestPeriodic(t *testing.T) {
retentionHours := 2
retentionDuration := time.Duration(retentionHours) * time.Hour

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := &Periodic{
clock: fc,
periodInHour: retentionHours,
rg: rg,
c: compactable,
clock: fc,
period: retentionDuration,
rg: rg,
c: compactable,
}

tb.Run()
defer tb.Stop()

n := int(time.Hour / checkCompactionInterval)
// collect 5 hours of revisions
for i := 0; i < 5; i++ {
// advance one hour, one revision for each interval
for j := 0; j < n; j++ {
rg.Wait(1)
fc.Advance(checkCompactionInterval)
}

// compaction doesn't happen til 2 hours elapses
if i+1 < retentionHours {
checkCompactInterval := retentionDuration / time.Duration(periodDivisor)
n := periodDivisor
// simulate 5 hours worth of intervals.
for i := 0; i < n/retentionHours*5; i++ {
rg.Wait(1)
fc.Advance(checkCompactInterval)
// compaction doesn't happen til 2 hours elapses.
if i < n {
continue
}

// after 2 hours, compaction happens at every checkCompactInterval.
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(1 + (i+1)*n - retentionHours*n)
expectedRevision := int64(i + 1 - n)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
Expand All @@ -75,21 +72,23 @@ func TestPeriodicPause(t *testing.T) {
fc := clockwork.NewFakeClock()
compactable := &fakeCompactable{testutil.NewRecorderStream()}
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
retentionDuration := time.Hour
tb := &Periodic{
clock: fc,
periodInHour: 1,
rg: rg,
c: compactable,
clock: fc,
period: retentionDuration,
rg: rg,
c: compactable,
}

tb.Run()
tb.Pause()

// tb will collect 3 hours of revisions but not compact since paused
n := int(time.Hour / checkCompactionInterval)
checkCompactInterval := retentionDuration / time.Duration(periodDivisor)
n := periodDivisor
for i := 0; i < 3*n; i++ {
rg.Wait(1)
fc.Advance(checkCompactionInterval)
fc.Advance(checkCompactInterval)
}
// tb ends up waiting for the clock

Expand All @@ -102,14 +101,14 @@ func TestPeriodicPause(t *testing.T) {
// tb resumes to being blocked on the clock
tb.Resume()

// unblock clock, will kick off a compaction at hour 3:05
// unblock clock, will kick off a compaction at hour 3:06
rg.Wait(1)
fc.Advance(checkCompactionInterval)
fc.Advance(checkCompactInterval)
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
// compact the revision from hour 2:05
// compact the revision from hour 2:06
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
if !reflect.DeepEqual(a[0].Params[0], wreq) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
Expand Down
2 changes: 1 addition & 1 deletion embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type Config struct {
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
SnapCount uint64 `json:"snapshot-count"`
AutoCompactionRetention int `json:"auto-compaction-retention"`
AutoCompactionRetention string `json:"auto-compaction-retention"`
AutoCompactionMode string `json:"auto-compaction-mode"`

// TickMs is the number of milliseconds between heartbeat ticks.
Expand Down
21 changes: 20 additions & 1 deletion embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net"
"net/http"
"net/url"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -127,6 +128,24 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
}
}

var (
autoCompactionRetention time.Duration
h int
)
// AutoCompactionRetention defaults to "0" if not set.
if len(cfg.AutoCompactionRetention) == 0 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a check see if AutoCompactionRetention is not set.

cfg.AutoCompactionRetention = "0"
}
h, err = strconv.Atoi(cfg.AutoCompactionRetention)
if err == nil {
autoCompactionRetention = time.Duration(int64(h)) * time.Hour
} else {
autoCompactionRetention, err = time.ParseDuration(cfg.AutoCompactionRetention)
if err != nil {
return nil, fmt.Errorf("error parsing AutoCompactionRetention: %v", err)
}
}

srvcfg := etcdserver.ServerConfig{
Name: cfg.Name,
ClientURLs: cfg.ACUrls,
Expand All @@ -145,7 +164,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
AutoCompactionRetention: cfg.AutoCompactionRetention,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
QuotaBackendBytes: cfg.QuotaBackendBytes,
MaxTxnOps: cfg.MaxTxnOps,
Expand Down
4 changes: 2 additions & 2 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ func newConfig() *config {
// version
fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.")

fs.IntVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.")
fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "Interpret 'auto-compaction-retention' as hours when 'periodic', as revision numbers when 'revision'.")
fs.StringVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", "0", "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.")
fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.")

// pprof profiler via HTTP
fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"")
Expand Down
2 changes: 1 addition & 1 deletion etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ clustering flags:
--auto-compaction-retention '0'
auto compaction retention length. 0 means disable auto compaction.
--auto-compaction-mode 'periodic'
'periodic' means hours, 'revision' means revision numbers to retain by auto compaction
interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.
--enable-v2
Accept etcd V2 client requests.

Expand Down
2 changes: 1 addition & 1 deletion etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type ServerConfig struct {
ElectionTicks int
BootstrapTimeout time.Duration

AutoCompactionRetention int
AutoCompactionRetention time.Duration
AutoCompactionMode string
QuotaBackendBytes int64
MaxTxnOps uint
Expand Down