-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
server: support resigning ddl owner, use http method ddl/owner/resign #7649
Changes from 12 commits
8453481
3ac8bfa
0e10bb0
91406bc
861a528
3e1481c
aa218ae
30172e0
c90f2c5
e7c24f0
d12ce8d
e3f88d2
24cdaf7
e376c2b
d94720a
b89de26
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import ( | |
"strconv" | ||
"sync/atomic" | ||
"time" | ||
"unsafe" | ||
|
||
"github.com/coreos/etcd/clientv3" | ||
"github.com/coreos/etcd/clientv3/concurrency" | ||
|
@@ -45,12 +46,14 @@ type Manager interface { | |
ID() string | ||
// IsOwner returns whether the ownerManager is the owner. | ||
IsOwner() bool | ||
// SetOwner sets whether the ownerManager is the owner. | ||
SetOwner(isOwner bool) | ||
// RetireOwner make the manager to be a not owner. It's exported for testing | ||
RetireOwner() | ||
// GetOwnerID gets the owner ID. | ||
GetOwnerID(ctx context.Context) (string, error) | ||
// CampaignOwner campaigns the owner. | ||
CampaignOwner(ctx context.Context) error | ||
// ResignOwner lets the owner start a new election. | ||
ResignOwner(ctx context.Context) error | ||
// Cancel cancels this etcd ownerManager campaign. | ||
Cancel() | ||
} | ||
|
@@ -70,22 +73,24 @@ type DDLOwnerChecker interface { | |
|
||
// ownerManager represents the structure which is used for electing owner. | ||
type ownerManager struct { | ||
owner int32 | ||
id string // id is the ID of the manager. | ||
key string | ||
prompt string | ||
etcdCli *clientv3.Client | ||
cancel context.CancelFunc | ||
id string // id is the ID of the manager. | ||
key string | ||
prompt string | ||
logPrefix string | ||
etcdCli *clientv3.Client | ||
cancel context.CancelFunc | ||
elec unsafe.Pointer | ||
} | ||
|
||
// NewOwnerManager creates a new Manager. | ||
func NewOwnerManager(etcdCli *clientv3.Client, prompt, id, key string, cancel context.CancelFunc) Manager { | ||
return &ownerManager{ | ||
etcdCli: etcdCli, | ||
id: id, | ||
key: key, | ||
prompt: prompt, | ||
cancel: cancel, | ||
etcdCli: etcdCli, | ||
id: id, | ||
key: key, | ||
prompt: prompt, | ||
cancel: cancel, | ||
logPrefix: fmt.Sprintf("[%s] %s ownerManager %s", prompt, key, id), | ||
} | ||
} | ||
|
||
|
@@ -96,16 +101,7 @@ func (m *ownerManager) ID() string { | |
|
||
// IsOwner implements Manager.IsOwner interface. | ||
func (m *ownerManager) IsOwner() bool { | ||
return atomic.LoadInt32(&m.owner) == 1 | ||
} | ||
|
||
// SetOwner implements Manager.SetOwner interface. | ||
func (m *ownerManager) SetOwner(isOwner bool) { | ||
if isOwner { | ||
atomic.StoreInt32(&m.owner, 1) | ||
} else { | ||
atomic.StoreInt32(&m.owner, 0) | ||
} | ||
return atomic.LoadPointer(&m.elec) != unsafe.Pointer(nil) | ||
} | ||
|
||
// Cancel implements Manager.Cancel interface. | ||
|
@@ -179,6 +175,31 @@ func (m *ownerManager) CampaignOwner(ctx context.Context) error { | |
return nil | ||
} | ||
|
||
// ResignOwner lets the owner start a new election. | ||
func (m *ownerManager) ResignOwner(ctx context.Context) error { | ||
elec := (*concurrency.Election)(atomic.LoadPointer(&m.elec)) | ||
if elec == nil { | ||
return errors.Errorf("This node is not a ddl owner, can't be resigned.") | ||
} | ||
|
||
err := elec.Resign(ctx) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
log.Warnf("%s Resign ddl owner success!", m.logPrefix) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, this is not the work of ResignOwner. |
||
return nil | ||
} | ||
|
||
func (m *ownerManager) toBeOwner(elec *concurrency.Election) { | ||
atomic.StorePointer(&m.elec, unsafe.Pointer(elec)) | ||
} | ||
|
||
// RetireOwner make the manager to be a not owner. | ||
func (m *ownerManager) RetireOwner() { | ||
atomic.StorePointer(&m.elec, nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we only use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are some test cases use SetOwner(false) and just use elec to determin if it is the owner will not easy to mock in tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we change |
||
} | ||
|
||
func (m *ownerManager) campaignLoop(ctx context.Context, etcdSession *concurrency.Session) { | ||
defer func() { | ||
if r := recover(); r != nil { | ||
|
@@ -188,7 +209,7 @@ func (m *ownerManager) campaignLoop(ctx context.Context, etcdSession *concurrenc | |
} | ||
}() | ||
|
||
logPrefix := fmt.Sprintf("[%s] %s ownerManager %s", m.prompt, m.key, m.id) | ||
logPrefix := m.logPrefix | ||
var err error | ||
for { | ||
if err != nil { | ||
|
@@ -232,9 +253,10 @@ func (m *ownerManager) campaignLoop(ctx context.Context, etcdSession *concurrenc | |
if err != nil { | ||
continue | ||
} | ||
m.SetOwner(true) | ||
|
||
m.toBeOwner(elec) | ||
m.watchOwner(ctx, etcdSession, ownerKey) | ||
m.SetOwner(false) | ||
m.RetireOwner() | ||
|
||
metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, metrics.NoLongerOwner).Inc() | ||
log.Warnf("%s isn't the owner", logPrefix) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,13 +49,13 @@ func (m *mockManager) IsOwner() bool { | |
return atomic.LoadInt32(&m.owner) == 1 | ||
} | ||
|
||
// SetOwner implements Manager.SetOwner interface. | ||
func (m *mockManager) SetOwner(isOwner bool) { | ||
if isOwner { | ||
atomic.StoreInt32(&m.owner, 1) | ||
} else { | ||
atomic.StoreInt32(&m.owner, 0) | ||
} | ||
func (m *mockManager) toBeOwner() { | ||
atomic.StoreInt32(&m.owner, 1) | ||
} | ||
|
||
// setOwner implements Manager.RetireOwner interface. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/setOwner/RetireOwner |
||
func (m *mockManager) RetireOwner() { | ||
atomic.StoreInt32(&m.owner, 0) | ||
} | ||
|
||
// Cancel implements Manager.Cancel interface. | ||
|
@@ -73,6 +73,14 @@ func (m *mockManager) GetOwnerID(ctx context.Context) (string, error) { | |
|
||
// CampaignOwner implements Manager.CampaignOwner interface. | ||
func (m *mockManager) CampaignOwner(_ context.Context) error { | ||
m.SetOwner(true) | ||
m.toBeOwner() | ||
return nil | ||
} | ||
|
||
// ResignOwner lets the owner start a new election. | ||
func (m *mockManager) ResignOwner(ctx context.Context) error { | ||
if m.IsOwner() { | ||
m.RetireOwner() | ||
} | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -334,6 +334,11 @@ type ddlHistoryJobHandler struct { | |
*tikvHandlerTool | ||
} | ||
|
||
// ddlResignOwnerHandler is the handler for resigning ddl owner. | ||
type ddlResignOwnerHandler struct { | ||
store kv.Storage | ||
} | ||
|
||
type serverInfoHandler struct { | ||
*tikvHandlerTool | ||
} | ||
|
@@ -713,6 +718,37 @@ func (h ddlHistoryJobHandler) ServeHTTP(w http.ResponseWriter, req *http.Request | |
return | ||
} | ||
|
||
func (h ddlResignOwnerHandler) resignDDLOwner() error { | ||
dom, err := session.GetDomain(h.store) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
ownerMgr := dom.DDL().OwnerManager() | ||
err = ownerMgr.ResignOwner(context.Background()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to add a timeout? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @winkyao Please answer this question. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, PTAL |
||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
return nil | ||
} | ||
|
||
// ServeHTTP handles request of resigning ddl owner. | ||
func (h ddlResignOwnerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { | ||
if req.Method != http.MethodPost { | ||
writeError(w, errors.Errorf("This api only support POST method.")) | ||
return | ||
} | ||
|
||
err := h.resignDDLOwner() | ||
if err != nil { | ||
log.Error(err) | ||
writeError(w, err) | ||
return | ||
} | ||
|
||
writeData(w, "success!") | ||
} | ||
|
||
func (h tableHandler) getPDAddr() ([]string, error) { | ||
var pdAddrs []string | ||
etcd, ok := h.store.(domain.EtcdBackend) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
end with .