-
Notifications
You must be signed in to change notification settings - Fork 721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: Add a new way to store metadata of regions #1237
Conversation
em, why introduce LevelDB, do we really need it? |
92240fe
to
4eaa00c
Compare
473633f
to
cbcd851
Compare
@siddontang we need load regions from regions to speed up recover router of the regions. |
/rebuild |
server/region_syncer.go
Outdated
return err | ||
} | ||
|
||
cc, err := grpc.Dial(u.Host, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(8*1024*1024))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to use a constant?
server/region_syncer.go
Outdated
s.Unlock() | ||
} | ||
|
||
func (s *regionSyncer) stopSyncerWithLeader() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe stopSyncWithLeader
?
server/region_syncer.go
Outdated
s.wg.Wait() | ||
} | ||
|
||
func (s *regionSyncer) statSyncerWithLeader(addr string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean start*?
server/region_syncer.go
Outdated
} | ||
u, err := url.Parse(addr) | ||
if err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about errors.WithStack(err)
?
server/core/levedb_kv.go
Outdated
count++ | ||
} | ||
iter.Release() | ||
//iter.Error() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this comment mean?
server/core/kv_base.go
Outdated
@@ -27,6 +28,13 @@ type KVBase interface { | |||
Delete(key string) error | |||
} | |||
|
|||
// RegionKV used to save region metadata. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is used
server/config.go
Outdated
@@ -633,6 +635,12 @@ func (s SecurityConfig) ToTLSConfig() (*tls.Config, error) { | |||
return tlsConfig, nil | |||
} | |||
|
|||
// PDServerConfig is the configuration for pd server. | |||
type PDServerConfig struct { | |||
// EnableRegionStorage enable the independent region storage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enables and remove the extra space
server/core/kv.go
Outdated
DefaultBatchSize = 100 | ||
) | ||
|
||
// WithLevelDBKV store the regions information in levelDB. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stores
server/core/kv.go
Outdated
}() | ||
} | ||
|
||
// FlushRegion save the cache region to region kv storage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
saves
server/core/levedb_kv.go
Outdated
db *leveldb.DB | ||
} | ||
|
||
// NewLeveldbKV to store regions information. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is used to store
server/core/kv.go
Outdated
} | ||
kv.cacheSize = 0 | ||
kv.batchRegions = make(map[string]*metapb.Region, kv.batchSize) | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicated
server/core/kv.go
Outdated
kv.cacheSize = 0 | ||
kv.batchRegions = make(map[string]*metapb.Region, kv.batchSize) | ||
return nil | ||
//return saveProto(kv.regionKV, kv.regionPath(region.GetId()), region) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be removed?
server/server.go
Outdated
@@ -388,7 +393,8 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe | |||
} | |||
|
|||
log.Infof("bootstrap cluster %d ok", clusterID) | |||
|
|||
s.kv.SaveRegion(req.GetRegion()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to check error?
server/server.go
Outdated
@@ -208,7 +209,8 @@ func (s *Server) startServer() error { | |||
|
|||
s.idAlloc = &idAllocator{s: s} | |||
kvBase := newEtcdKVBase(s) | |||
s.kv = core.NewKV(kvBase) | |||
path := filepath.Join(s.cfg.DataDir, "leveldb") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about making it something like "region-meta"?
server/grpc_service.go
Outdated
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId()) | ||
} | ||
switch request.GetTp() { | ||
case 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the Tp
used for? I think better to define it as enum in proto file.
server/core/kv.go
Outdated
if !isFlush { | ||
continue | ||
} | ||
kv.FlushRegion() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the error?
server/cluster.go
Outdated
running: false, | ||
clusterID: clusterID, | ||
clusterRoot: s.getClusterRootPath(), | ||
clients: make(map[string]pdpb.PDClient), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused?
server/cluster.go
Outdated
Header: &pdpb.ResponseHeader{ClusterId: c.s.clusterID}, | ||
Data: data, | ||
} | ||
for _, stream := range c.regionSyncer.streams { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read streams
without lock?
server/region_syncer.go
Outdated
|
||
type regionSyncer struct { | ||
sync.Mutex | ||
streams map[string]pdpb.PD_SyncRegionsServer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think streams can be managed in Server
directly. As it is used for leader to sync regions to followers, while other fields are for sync regions with leader.
server/leader.go
Outdated
log.Error("reload config failed:", err) | ||
return | ||
} | ||
err = s.cluster.regionSyncer.statSyncerWithLeader(leader.GetClientUrls()[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not the first address. Maybe part of addrs can be accessed and the valid address is not in the first place.
server/core/kv.go
Outdated
return nil | ||
} | ||
|
||
func (kv *KV) doGC() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think GC
is a suitable name here.
server/core/kv.go
Outdated
} | ||
|
||
// FlushRegion save the cache region to region kv storage. | ||
func (kv *KV) FlushRegion() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really necessary to save regions to levelDB in batches?
92da196
to
308cdde
Compare
cancel() | ||
return nil, err | ||
} | ||
err = client.Send(&pdpb.SyncRegionRequest{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need client streaming if it only sends one request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we also use it to receive the response of newly updated regions from leader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess server streaming will be enough.
rpc SyncRegions(stream SyncRegionRequest) returns (stream SyncRegionResponse) {}
PTAL @disksing @rleungx @liukun4515 |
server/core/kv.go
Outdated
func NewKVProxy(defaultKV KVBase, regionKV *RegionKV) *KVProxy { | ||
kv := &KVProxy{ | ||
defaultKV: defaultKV, | ||
regionKV: regionKV, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seem we don't need to initialize regionKV
with nil
here
} | ||
// ensure flush to region kv | ||
time.Sleep(3 * time.Second) | ||
leaderServer.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check error?
regions = append(regions, core.NewRegionInfo(r, r.Peers[0])) | ||
} | ||
for _, region := range regions { | ||
rc.HandleRegionHeartbeat(region) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check error?
@@ -633,6 +635,12 @@ func (s SecurityConfig) ToTLSConfig() (*tls.Config, error) { | |||
return tlsConfig, nil | |||
} | |||
|
|||
// PDServerConfig is the configuration for pd server. | |||
type PDServerConfig struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this? Can we put EnableRegionStorage
into Config
directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use it to persist the config of PD-server
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by persist to PD-server
?
db433ff
to
9a0360e
Compare
for _, sender := range s.streams { | ||
err := sender.Send(regions) | ||
if err != nil { | ||
log.Error("region syncer send data meet error:", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to remove the stream from the map?
server/core/kv_base.go
Outdated
@@ -26,7 +26,6 @@ type KVBase interface { | |||
Save(key, value string) error | |||
Delete(key string) error | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove this line?
server/leader.go
Outdated
if isSync { | ||
s.cluster.regionSyncer.stopSyncWithLeader() | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if sync {
startSyncWithLeader
defer stopSyncWithLeader
}
server/core/kv.go
Outdated
isDefault atomic.Value | ||
} | ||
|
||
// NewKVProxy return a proxy of KV storage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
returns
server/core/region_kv.go
Outdated
continue | ||
} | ||
if err = kv.FlushRegion(); err != nil { | ||
log.Info("flush regions error: ", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.error
// Close closes the kv. | ||
func (kv *RegionKV) Close() error { | ||
kv.cancel() | ||
return kv.db.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we flush the cached region before closing the leveldb?
c.Assert(leaderServer, NotNil) | ||
loadRegions := leaderServer.server.GetRaftCluster().GetRegions() | ||
c.Assert(len(loadRegions), Equals, regionLen) | ||
cluster.Destroy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use defer
here?
server/core/kv.go
Outdated
} | ||
} | ||
} | ||
// Flush flush the dirty region to storage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushes
server/core/region_kv.go
Outdated
defaultBatchSize = 100 | ||
) | ||
|
||
// NewRegionKV return a kv storage that is used to save regions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
returns
server/core/region_kv.go
Outdated
rangeLimit := maxKVRangeLimit | ||
for { | ||
key := regionPath(nextID) | ||
res, err := kv.LoadRange(key, endKey, rangeLimit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to use startKey
?
server/server.go
Outdated
|
||
err = s.kv.SaveRegion(req.GetRegion()) | ||
if err != nil { | ||
log.Warnf("save the bootstrap region faild: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/faild/failed/g
server/core/region_kv.go
Outdated
func (kv *RegionKV) SaveRegion(region *metapb.Region) error { | ||
kv.mu.Lock() | ||
defer kv.mu.Unlock() | ||
if kv.cacheSize < kv.batchSize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kv.cacheSize < kv.batchSize -1
kv.cacheSize < kv.batchSize
means that the flush will be triggered until the size of batchRegions
reach to batchSize+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
server/core/levedb_kv.go
Outdated
func newLeveldbKV(path string) (*leveldbKV, error) { | ||
db, err := leveldb.OpenFile(path, nil) | ||
if err != nil { | ||
return nil, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to wrap the leveldb errors with call stack.
PTAL @rleungx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rest LGTM.
server/core/kv.go
Outdated
@@ -35,11 +37,14 @@ const ( | |||
const ( | |||
maxKVRangeLimit = 10000 | |||
minKVRangeLimit = 100 | |||
dirtyFlushTick = time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to move it into region_kv.go
?
server/core/kv.go
Outdated
} | ||
} | ||
} | ||
// Flush flushs the dirty region to storage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushes
/rebuild |
/run-all-tests |
What problem does this PR solve?
This Version is used for testing first, not completed.
What is changed and how it works?
Check List
Tests