Skip to content

Commit

Permalink
log backup: fix the issue that paused service safepoint not removed (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 24, 2024
1 parent 3841026 commit 0145d4e
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 5 deletions.
5 changes: 3 additions & 2 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/binary"
"fmt"
"math"
"net/http"
"slices"
"strings"
Expand Down Expand Up @@ -724,8 +725,8 @@ func RunStreamStop(
if err := streamMgr.setGCSafePoint(ctx,
utils.BRServiceSafePoint{
ID: buildPauseSafePointName(ti.Info.Name),
TTL: utils.DefaultStreamStartSafePointTTL,
BackupTS: 0,
TTL: 0, // 0 means remove this service safe point.
BackupTS: math.MaxUint64,
},
); err != nil {
log.Warn("failed to remove safe point", zap.String("error", err.Error()))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
shard_count = 37,
shard_count = 38,
deps = [
"//br/pkg/errors",
"//br/pkg/metautil",
Expand Down
60 changes: 58 additions & 2 deletions br/pkg/utils/safe_point_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package utils_test

import (
"context"
"math"
"sync"
"testing"

Expand All @@ -14,7 +15,7 @@ import (

func TestCheckGCSafepoint(t *testing.T) {
ctx := context.Background()
pdClient := &mockSafePoint{safepoint: 2333}
pdClient := &mockSafePoint{safepoint: 2333, services: make(map[string]uint64)}
{
err := utils.CheckGCSafePoint(ctx, pdClient, 2333+1)
require.NoError(t, err)
Expand All @@ -34,23 +35,78 @@ func TestCheckGCSafepoint(t *testing.T) {
}
}

func TestCheckUpdateServiceSafepoint(t *testing.T) {
ctx := context.Background()
pdClient := &mockSafePoint{safepoint: 2333, services: make(map[string]uint64)}
{
// nothing happened, because current safepoint is large than servicee safepoint.
err := utils.UpdateServiceSafePoint(ctx, pdClient, utils.BRServiceSafePoint{
"BR_SERVICE",
1,
1,
})
require.NoError(t, err)
curSafePoint, err := pdClient.UpdateGCSafePoint(ctx, 0)
require.NoError(t, err)
require.Equal(t, uint64(2333), curSafePoint)
}
{
// register br service safepoint
err := utils.UpdateServiceSafePoint(ctx, pdClient, utils.BRServiceSafePoint{
"BR_SERVICE",
1,
2334,
})
require.NoError(t, err)
curSafePoint, find := pdClient.GetServiceSafePoint("BR_SERVICE")
// update with new safepoint - 1.
require.Equal(t, uint64(2333), curSafePoint)
require.True(t, find)
}
{
// remove br service safepoint
err := utils.UpdateServiceSafePoint(ctx, pdClient, utils.BRServiceSafePoint{
"BR_SERVICE",
0,
math.MaxUint64,
})
require.NoError(t, err)
_, find := pdClient.GetServiceSafePoint("BR_SERVICE")
require.False(t, find)
}
}

type mockSafePoint struct {
sync.Mutex
pd.Client
services map[string]uint64
safepoint uint64
minServiceSafepoint uint64
}

func (m *mockSafePoint) GetServiceSafePoint(serviceID string) (uint64, bool) {
m.Lock()
defer m.Unlock()
safepoint, ok := m.services[serviceID]
return safepoint, ok
}

func (m *mockSafePoint) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
m.Lock()
defer m.Unlock()

if ttl <= 0 {
delete(m.services, serviceID)
return 0, nil
}

if m.safepoint > safePoint {
return m.safepoint, nil
}
if m.minServiceSafepoint == 0 || m.minServiceSafepoint > safePoint {
m.minServiceSafepoint = safePoint
}
m.services[serviceID] = safePoint
return m.minServiceSafepoint, nil
}

Expand All @@ -65,7 +121,7 @@ func (m *mockSafePoint) UpdateGCSafePoint(ctx context.Context, safePoint uint64)
}

func TestStartServiceSafePointKeeper(t *testing.T) {
pdClient := &mockSafePoint{safepoint: 2333}
pdClient := &mockSafePoint{safepoint: 2333, services: make(map[string]uint64)}

cases := []struct {
sp utils.BRServiceSafePoint
Expand Down

0 comments on commit 0145d4e

Please sign in to comment.