Skip to content

Commit

Permalink
refactor(pkg&cmd): optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
piglig committed Dec 6, 2023
1 parent 296a937 commit 9f7c946
Show file tree
Hide file tree
Showing 16 changed files with 20 additions and 31 deletions.
1 change: 0 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func main() {
_, _ = fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
return
}

const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func NewBackend(kv storage.KvStorage, config Config, metricCli metrics.Metrics)
scanner: scanner.NewScanner(kv, normalCoder, config.getScannerConfig(), metricCli),
config: config,
capacity: config.WatchCacheSize,
watchEventsRingBuffer: make([]atomic.Value, watchersChanCapacity, watchersChanCapacity),
watchEventsRingBuffer: make([]atomic.Value, watchersChanCapacity),
watchCache: NewRing(config.WatchCacheSize),
watchChan: make(chan []*proto.Event, watchersChanCapacity),
watcherHub: &WatcherHub{
Expand Down
5 changes: 1 addition & 4 deletions pkg/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ func (ct *createTestcase) run(t *testing.T, s suite, output <-chan []*proto.Even
return
}
ast.Equal(0, len(output), expectNoEvent)
return
})

}
Expand Down Expand Up @@ -541,9 +540,7 @@ func sortKvs(kvs []*proto.KeyValue) []*proto.KeyValue {

func extractPartitions(resp *proto.ListPartitionResponse) [][]byte {
ret := make([][]byte, len(resp.PartitionKeys))
for idx, kv := range resp.PartitionKeys {
ret[idx] = kv
}
copy(ret, resp.PartitionKeys)
return ret
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/backend/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (b *backend) Get(ctx context.Context, r *proto.GetRequest) (resp *proto.Get
requireRev := r.GetRevision()

val, modRev, err := b.get(ctx, r.Key, requireRev)
if err == storage.ErrKeyNotFound {
if errors.Is(err, storage.ErrKeyNotFound) {
return &proto.GetResponse{
Header: responseHeader(curRev),
}, nil
Expand Down Expand Up @@ -79,7 +79,7 @@ func (b *backend) getLatestInternalVal(ctx context.Context, key []byte) (val []b

func (b *backend) get(ctx context.Context, key []byte, revision uint64) (val []byte, modRevision uint64, err error) {
val, modRevision, err = b.getInternalVal(ctx, key, revision)
if bytes.Compare(val, tombStoneBytes) == 0 {
if bytes.Equal(val, tombStoneBytes) {
return nil, modRevision, storage.ErrKeyNotFound
}
return val, modRevision, err
Expand Down Expand Up @@ -107,7 +107,7 @@ func (b *backend) getInternalVal(ctx context.Context, key []byte, revision uint6
}

userKey, modRev, err := b.coder.Decode(iter.Key())
if modRev == 0 || bytes.Compare(userKey, key) != 0 {
if modRev == 0 || !bytes.Equal(userKey, key) {
// it's marked by deleting
return nil, modRev, storage.ErrKeyNotFound
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/backend/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (a *asyncFifoRetryImpl) retry(ctx context.Context) (breakLoop bool) {
func (a *asyncFifoRetryImpl) overwrite(ctx context.Context, key []byte, prevOpRev uint64) (rev uint64, err error) {
val, modRev, err := a.getter(ctx, key)
if err != nil {
if err == storage.ErrKeyNotFound {
if errors.Is(err, storage.ErrKeyNotFound) {
return 0, nil
}
return 0, err
Expand Down Expand Up @@ -250,7 +250,7 @@ func (a *asyncFifoRetryImpl) overwrite(ctx context.Context, key []byte, prevOpRe
revBytes := make([]byte, 8, 9)
binary.BigEndian.PutUint64(revBytes, rev)

if bytes.Compare(val, a.config.Tombstone) == 0 {
if bytes.Equal(val, a.config.Tombstone) {
// append delete flag after revision bytes
revBytes = append(revBytes, 0)
prevRevBytes = append(prevRevBytes, 0)
Expand Down
2 changes: 1 addition & 1 deletion pkg/backend/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Ring struct {

func NewRing(l int) *Ring {
return &Ring{
arr: make([]*proto.Event, l, l),
arr: make([]*proto.Event, l),
l: l,
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/backend/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (w *worker) runWithBackoffRetry(ctx context.Context, receiver resultReceive

if err != nil {
// fail after retry, scanErr records the latest scan err
if err == wait.ErrWaitTimeout {
if errors.Is(err, wait.ErrWaitTimeout) {
err = fmt.Errorf("partition %d reached the max retry time: %d, encounter latest error %v", w.idx, scanBackoffSteps, scanErr)
}
}
Expand Down Expand Up @@ -510,7 +510,7 @@ func (w *worker) info() string {
}

func (w *worker) isSkippedRawKey(rawKey []byte, rev uint64) bool {
if len(w.lastCompactFailedRawKey) > 0 && bytes.Compare(w.lastCompactFailedRawKey, rawKey) == 0 {
if len(w.lastCompactFailedRawKey) > 0 && bytes.Equal(w.lastCompactFailedRawKey, rawKey) {
klog.InfoS("compact skip", "rawKey", string(rawKey), "rev", rev)
w.metricCli.EmitCounter("compact.skip", 1)
return true
Expand Down
2 changes: 1 addition & 1 deletion pkg/endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (sc *SecurityConfig) isInsecure() bool {
return sc.CertFile == "" &&
sc.KeyFile == "" &&
sc.CA == "" &&
sc.ClientAuth == false
!sc.ClientAuth
}

func (sc *SecurityConfig) init() (err error) {
Expand Down
1 change: 0 additions & 1 deletion pkg/endpoint/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ func (gs *rootServer) run(ctx context.Context) (err error) {
klog.InfoS("root server start to listen", "port", gs.port)
err = mux.Serve()
klog.ErrorS(err, "root server shutdown cause by temporary network error", "port", gs.port)
return
}()

return runServers(ctx, mux, gs.services)
Expand Down
5 changes: 2 additions & 3 deletions pkg/metrics/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,8 @@ func (pw *prometheusWrapper) labelsToMap(labels []metrics.T) (ret map[string]str

func (pw *prometheusWrapper) extractLabelNames(labels []metrics.T) (ret []string) {
ret = make([]string, len(labels)+len(pw.globalLabelNames))
for i, labelName := range pw.globalLabelNames {
ret[i] = labelName
}
copy(ret, pw.globalLabelNames)

offset := len(pw.globalLabelNames)
for i, label := range labels {
ret[i+offset] = label.Name
Expand Down
7 changes: 2 additions & 5 deletions pkg/server/etcd/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (w *watcher) List(ctx context.Context, id int64, r *etcdserverpb.WatchCreat
revision := r.StartRevision * -1
// range stream eof, tell client range ends
// if has err, CancelReason is not nil
if watchResponse.Canceled == true {
if watchResponse.Canceled {
klog.InfoS("receive cancel message", "watcher", w.id, "watch", id, "key", r.Key, "end", r.RangeEnd, r.StartRevision*-1)
// indicates eof
revision = -1
Expand Down Expand Up @@ -340,8 +340,5 @@ func (w *watcher) Watch(ctx context.Context, id int64, r *etcdserverpb.WatchCrea

func isPureWatchRequest(r *etcdserverpb.WatchCreateRequest) bool {
// if starts with /, it is a normal watch request, not a range stream request
if strings.HasPrefix(string(r.Key), "/") {
return true
}
return false
return strings.HasPrefix(string(r.Key), "/")
}
1 change: 0 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ func (s *server) electionHandler(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
respBytes, _ := json.Marshal(info)
w.Write(respBytes)
return
}

func (s *server) revisionHandler(w http.ResponseWriter, req *http.Request) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/service/etcdproxy/etcd_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (e *etcdProxy) EtcdProxyEnabled() bool {

// defaultCallOption is a copy of etcd client default call option
var defaultCallOption = []grpc.CallOption{
grpc.FailFast(false),
grpc.WaitForReady(false),
grpc.MaxCallSendMsgSize(2 * 1024 * 1024),
grpc.MaxCallRecvMsgSize(math.MaxInt32),
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/badger/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ func (b *batch) CAS(key []byte, newVal []byte, oldVal []byte, ttl int64) {
f := func() error {
item, err := b.txn.Get(key)
if err != nil {
if err == badger.ErrKeyNotFound {
if errors.Is(err, badger.ErrKeyNotFound) {
return storage.NewErrConflict(idx, key, nil)
}
return err
}

err = item.Value(func(val []byte) error {
if bytes.Compare(oldVal, val) != 0 {
if !bytes.Equal(oldVal, val) {
return storage.NewErrConflict(idx, key, val)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/memkv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (b *batch) CAS(key []byte, newVal []byte, oldVal []byte, ttl int64) {
return
}

if bytes.Compare(val, oldVal) != 0 {
if !bytes.Equal(val, oldVal) {
b.err = storage.NewErrConflict(b.opCount, key, oldVal)
//b.err = errors.Errorf("cas failed: key %s old val is %s but expect %s", key, string(elem.Value.([]byte)), oldVal)
}
Expand Down Expand Up @@ -123,7 +123,7 @@ func (b *batch) DelCurrent(it storage.Iter) {
return
}

if bytes.Compare(b.get(it.Key()), it.Val()) != 0 {
if !bytes.Equal(b.get(it.Key()), it.Val()) {
b.err = storage.ErrCASFailed
}

Expand Down
1 change: 0 additions & 1 deletion pkg/storage/memkv/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func (it *iter) init() {
elem = elem.Prev()
}
}
return
}

func (it *iter) inRange(elem *skiplist.Element) bool {
Expand Down

0 comments on commit 9f7c946

Please sign in to comment.