This repository has been archived by the owner on Nov 24, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 188
/
strategy_filename.go
114 lines (94 loc) · 2.93 KB
/
strategy_filename.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package purger
import (
"fmt"
"strings"
"github.com/pingcap/errors"
"github.com/siddontang/go/sync2"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/utils"
)
var (
fakeTaskName = strategyFilename.String()
)
// filenameArgs represents args needed by filenameStrategy
// NOTE: should handle master-slave switch
type filenameArgs struct {
relayBaseDir string
filename string // specified end safe filename
subDir string // sub dir for @filename, empty indicates latest sub dir
uuids []string
safeRelayLog *streamer.RelayLogInfo // all relay log files prior to this should be purged
}
func (fa *filenameArgs) SetActiveRelayLog(active *streamer.RelayLogInfo) {
uuid := fa.subDir
if len(uuid) == 0 && len(fa.uuids) > 0 {
// no sub dir specified, use the latest one
uuid = fa.uuids[len(fa.uuids)-1]
}
_, endSuffix, _ := utils.ParseSuffixForUUID(uuid)
safeRelayLog := &streamer.RelayLogInfo{
TaskName: fakeTaskName,
UUID: uuid,
UUIDSuffix: endSuffix,
Filename: fa.filename,
}
if active.Earlier(safeRelayLog) {
safeRelayLog = active
}
fa.safeRelayLog = safeRelayLog
// discard newer UUIDs
uuids := make([]string, 0, len(fa.uuids))
for _, uuid := range fa.uuids {
_, suffix, _ := utils.ParseSuffixForUUID(uuid)
if suffix > endSuffix {
break
}
uuids = append(uuids, uuid)
}
fa.uuids = uuids
}
func (fa *filenameArgs) String() string {
return fmt.Sprintf("(RelayBaseDir: %s, Filename: %s, SubDir: %s, UUIDs: %s, SafeRelayLog: %s)",
fa.relayBaseDir, fa.filename, fa.subDir, strings.Join(fa.uuids, ";"), fa.safeRelayLog)
}
// filenameStrategy represents a relay purge strategy by filename
// similar to `PURGE BINARY LOGS TO`
type filenameStrategy struct {
purging sync2.AtomicInt32
}
func newFilenameStrategy() PurgeStrategy {
return &filenameStrategy{}
}
func (s *filenameStrategy) Check(args interface{}) (bool, error) {
// do not support purge in the background
return false, nil
}
func (s *filenameStrategy) Do(args interface{}) error {
if !s.purging.CompareAndSwap(0, 1) {
return ErrSelfPurging
}
defer s.purging.Set(0)
fa, ok := args.(*filenameArgs)
if !ok {
return errors.NotValidf("args (%T) %+v", args, args)
}
return errors.Trace(purgeRelayFilesBeforeFile(fa.relayBaseDir, fa.uuids, fa.safeRelayLog))
}
func (s *filenameStrategy) Purging() bool {
return s.purging.Get() > 0
}
func (s *filenameStrategy) Type() strategyType {
return strategyFilename
}