-
Notifications
You must be signed in to change notification settings - Fork 278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat consist load balance #600
Changes from all commits
3a50c2a
4c6ad0f
004badf
f646586
580ab4a
0939057
b6e759a
4016001
4579084
9f2665a
3fd0b4c
1944ae1
9095355
dd38d07
634657b
6d0ea72
83cccac
bfa69cc
7ceb462
8a78ef8
986e5f8
38b6804
4c36bc4
114147c
ed9b0a7
a1da44e
1044c64
29d735b
681f298
7511240
38ef36c
e334fc5
0a92500
abb4f0d
0c507d4
1527d84
5f48f35
b441c09
0665905
2b5e21c
3c20d22
2fc1aaf
55f6e13
27bac0a
6c37558
682a97c
fdc2e0b
fa3a28b
cd28efc
2c8e490
14a3850
8b4dedf
32a3b94
22f7be4
598b039
9d10508
0473b67
6f862b2
c3c5d46
84f8ec2
58ee495
511859f
ae5eab8
12e2194
d130a9e
5e2319e
ed33bad
d356756
233e919
1290838
e1b03ed
62f61c1
6500da6
ab0f45b
d8eca5c
d7f6ffa
74c0e5c
c37c161
05bf7b1
6c8fb25
b07c2cf
d147103
fbbd682
1797722
7f3d6d9
45bf050
df69626
e1dcdfe
dc93beb
ed7bef5
ebad10e
c5ce148
b591b73
40133c5
bd18186
fb985f5
739821f
37b8e40
c230965
47682c4
70a93e2
0db6531
5da5904
3b395cf
7167a33
328aee7
47053ee
f60f768
5e029a8
b753049
533afe8
485312c
7167474
0042b76
b9b7d8c
d0f98f4
ba06918
38924b0
8bbc5b5
15d6a1a
93aa852
1db393f
eadf8bf
61a4c23
7d1dfdf
4109db9
0ab3ea2
14ab243
b20f33b
8fbdd3f
661a647
eea542f
ea67dc4
4c9195f
10c6872
7c68aef
c97ff1b
2f4277f
48794de
446ed79
c73bf46
2dd7ac0
f592e1b
f9de2fc
a686a43
38b554e
e8cfa01
6713885
4c61c3a
e0db439
e3d7b14
268a72e
eaee521
b72477c
1c22b98
25ef488
24ea75b
9229a4b
fed3ad4
7846472
d7254d9
1dc20df
6880911
b181fba
9bb236e
ae4ac1d
a2511f0
6727499
2cbbf2f
beabfc3
05de141
f3a7c86
ad8f6be
f71bfa7
1eccddf
c9fffa5
9f6d875
85d7aea
1b67327
1ef83f7
e6aab85
35f29ac
f1320b5
8846c93
0f1fcce
bead900
7563891
a82fb71
6e314e6
c7ff490
bcbb864
d598b76
7f368a4
a1d9108
8965710
447de4c
3d420a1
857a35b
04d1523
03ad2e6
f3fb86a
856eb1a
abfb512
0a5cd26
33151a4
f87fbea
f55f05f
2eb98b3
9eacf87
9f1165e
e86afc7
2c732df
f374404
3a107da
a71af57
72a5fee
40f50b7
55e90d2
8eb2521
26915c7
866b4d0
77130c7
c9be94a
f6c537a
910fc81
a1ba6d1
3e98bec
69a4f22
4a667b6
9d4bc1e
20154d3
0661ce5
9732f3e
98722cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package loadbalance | ||
|
||
import ( | ||
"crypto/md5" | ||
"fmt" | ||
"sort" | ||
"sync" | ||
|
||
getty "github.com/apache/dubbo-getty" | ||
) | ||
|
||
var ( | ||
once sync.Once | ||
defaultVirtualNodeNumber = 10 | ||
consistentInstance *Consistent | ||
) | ||
|
||
type Consistent struct { | ||
sync.RWMutex | ||
virtualNodeCount int | ||
// consistent hashCircle | ||
hashCircle map[int64]getty.Session | ||
sortedHashNodes []int64 | ||
} | ||
|
||
func (c *Consistent) put(key int64, session getty.Session) { | ||
c.Lock() | ||
defer c.Unlock() | ||
c.hashCircle[key] = session | ||
} | ||
|
||
func (c *Consistent) hash(key string) int64 { | ||
hashByte := md5.Sum([]byte(key)) | ||
var res int64 | ||
for i := 0; i < 4; i++ { | ||
res <<= 8 | ||
res |= int64(hashByte[i]) & 0xff | ||
} | ||
|
||
return res | ||
} | ||
|
||
// pick get a node | ||
func (c *Consistent) pick(sessions *sync.Map, key string) getty.Session { | ||
hashKey := c.hash(key) | ||
index := sort.Search(len(c.sortedHashNodes), func(i int) bool { | ||
return c.sortedHashNodes[i] >= hashKey | ||
}) | ||
|
||
if index == len(c.sortedHashNodes) { | ||
return RandomLoadBalance(sessions, key) | ||
} | ||
|
||
c.RLock() | ||
session, ok := c.hashCircle[c.sortedHashNodes[index]] | ||
if !ok { | ||
c.RUnlock() | ||
return RandomLoadBalance(sessions, key) | ||
} | ||
c.RUnlock() | ||
|
||
if session.IsClosed() { | ||
go c.refreshHashCircle(sessions) | ||
return c.firstKey() | ||
} | ||
|
||
return session | ||
} | ||
|
||
// refreshHashCircle refresh hashCircle | ||
func (c *Consistent) refreshHashCircle(sessions *sync.Map) { | ||
var sortedHashNodes []int64 | ||
hashCircle := make(map[int64]getty.Session) | ||
var session getty.Session | ||
sessions.Range(func(key, value interface{}) bool { | ||
session = key.(getty.Session) | ||
for i := 0; i < defaultVirtualNodeNumber; i++ { | ||
if !session.IsClosed() { | ||
position := c.hash(fmt.Sprintf("%s%d", session.RemoteAddr(), i)) | ||
hashCircle[position] = session | ||
sortedHashNodes = append(sortedHashNodes, position) | ||
} else { | ||
sessions.Delete(key) | ||
} | ||
} | ||
return true | ||
}) | ||
|
||
// virtual node sort | ||
sort.Slice(sortedHashNodes, func(i, j int) bool { | ||
return sortedHashNodes[i] < sortedHashNodes[j] | ||
}) | ||
|
||
c.sortedHashNodes = sortedHashNodes | ||
c.hashCircle = hashCircle | ||
} | ||
|
||
func (c *Consistent) firstKey() getty.Session { | ||
c.RLock() | ||
defer c.RUnlock() | ||
|
||
if len(c.sortedHashNodes) > 0 { | ||
return c.hashCircle[c.sortedHashNodes[0]] | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func newConsistenceInstance(sessions *sync.Map) *Consistent { | ||
once.Do(func() { | ||
consistentInstance = &Consistent{ | ||
hashCircle: make(map[int64]getty.Session), | ||
} | ||
// construct hash circle | ||
sessions.Range(func(key, value interface{}) bool { | ||
session := key.(getty.Session) | ||
for i := 0; i < defaultVirtualNodeNumber; i++ { | ||
if !session.IsClosed() { | ||
position := consistentInstance.hash(fmt.Sprintf("%s%d", session.RemoteAddr(), i)) | ||
consistentInstance.put(position, session) | ||
consistentInstance.sortedHashNodes = append(consistentInstance.sortedHashNodes, position) | ||
} else { | ||
sessions.Delete(key) | ||
} | ||
} | ||
return true | ||
}) | ||
|
||
// virtual node sort | ||
sort.Slice(consistentInstance.sortedHashNodes, func(i, j int) bool { | ||
return consistentInstance.sortedHashNodes[i] < consistentInstance.sortedHashNodes[j] | ||
}) | ||
}) | ||
|
||
return consistentInstance | ||
} | ||
|
||
func ConsistentHashLoadBalance(sessions *sync.Map, xid string) getty.Session { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 目前getty模块目前调用Select时的sessions地址是不变的,所以这里使用单例模式也就没有问题。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 突然想起来,我这里有一个 refreshHashCircle 方法,如果 session 地址发生变更,session 相当于 close了,我会重新构造 hashCircle,所以应该能覆盖到这种变更的case There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Wang There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
对对对,早上还是不够清醒,哈哈 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 我在 consistentInstance.pick(sessions, xid) 的时候会把最新的 sessions 传进去,如果检测到无效地址会重新构建单例对象的hash环,应该是cover住了这个场景。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 外部sessions中的元素可能会增加、减少,pick函数中只检测closed是不够的。 记录一下上次传入的sessions地址,每次被调用时比较下,地址不同就refresh。这个思路你觉得怎么样? |
||
if consistentInstance == nil { | ||
newConsistenceInstance(sessions) | ||
} | ||
|
||
// pick a node | ||
return consistentInstance.pick(sessions, xid) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package loadbalance | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
"testing" | ||
|
||
"github.com/golang/mock/gomock" | ||
"github.com/stretchr/testify/assert" | ||
|
||
"github.com/seata/seata-go/pkg/remoting/mock" | ||
) | ||
|
||
func TestConsistentHashLoadBalance(t *testing.T) { | ||
ctrl := gomock.NewController(t) | ||
sessions := &sync.Map{} | ||
|
||
for i := 0; i < 3; i++ { | ||
session := mock.NewMockTestSession(ctrl) | ||
session.EXPECT().IsClosed().Return(false).AnyTimes() | ||
session.EXPECT().RemoteAddr().AnyTimes().DoAndReturn(func() string { | ||
return "127.0.0.1:8000" | ||
}) | ||
sessions.Store(session, fmt.Sprintf("session-%d", i)) | ||
} | ||
|
||
result := ConsistentHashLoadBalance(sessions, "test_xid") | ||
assert.NotNil(t, result) | ||
assert.False(t, result.IsClosed()) | ||
|
||
sessions.Range(func(key, value interface{}) bool { | ||
t.Logf("key: %v, value: %v", key, value) | ||
return true | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
构建哈希环的这部分代码和refreshHashCircle函数中的高度重合,是不是可以重构下,调用refreshHashCircle函数。