Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

修复wait长查询场景下配置项发生变化,更新缓存失败 #294

Merged
merged 12 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions server/pubsub/notifier/kv.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package notifier

import (
"context"
"sync"
"time"

"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/server/cache"
"github.com/apache/servicecomb-kie/server/pubsub"
kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
"github.com/go-chassis/openlog"
"github.com/hashicorp/serf/serf"
)
Expand Down Expand Up @@ -75,30 +71,12 @@ func (h *KVHandler) FindTopicAndFire(ke *pubsub.KVChangeEvent) {
return true
}
if t.Match(ke) {
prepareCache(key.(string), t)
notifyAndRemoveObservers(value, ke)
}
return true
})
}

func prepareCache(topicName string, topic *pubsub.Topic) {
rev, kvs, err := kvsvc.ListKV(context.TODO(), &model.ListKVRequest{
Domain: topic.DomainID,
Project: topic.Project,
Labels: topic.Labels,
Match: topic.MatchType,
})
if err != nil {
openlog.Error("can not query kvs:" + err.Error())
}
cache.CachedKV().Write(topicName, &cache.DBResult{
KVs: kvs,
Rev: rev,
Err: err,
})
}

func notifyAndRemoveObservers(value interface{}, ke *pubsub.KVChangeEvent) {
observers := value.(*sync.Map)
observers.Range(func(id, value interface{}) bool {
Expand Down
20 changes: 19 additions & 1 deletion server/resource/v1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func getMatchPattern(rctx *restful.Context) string {
}
return m
}
func eventHappened(waitStr string, topic *pubsub.Topic) (bool, string, error) {
func eventHappened(waitStr string, topic *pubsub.Topic, ctx context.Context) (bool, string, error) {
d, err := time.ParseDuration(waitStr)
if err != nil || d > common.MaxWait {
return false, "", errors.New(common.MsgInvalidWait)
Expand All @@ -230,6 +230,7 @@ func eventHappened(waitStr string, topic *pubsub.Topic) (bool, string, error) {
happened = false
pubsub.RemoveObserver(o.UUID, topic)
case <-o.Event:
prepareCache(topicName, topic, ctx)
}
return happened, topicName, nil
}
Expand Down Expand Up @@ -307,3 +308,20 @@ func queryAndResponse(rctx *restful.Context, request *model.ListKVRequest) {
openlog.Error(err.Error())
}
}

func prepareCache(topicName string, topic *pubsub.Topic, ctx context.Context) {
rev, kvs, err := kvsvc.ListKV(ctx, &model.ListKVRequest{
Domain: topic.DomainID,
Project: topic.Project,
Labels: topic.Labels,
Match: topic.MatchType,
})
if err != nil {
openlog.Error("can not query kvs:" + err.Error())
}
cache.CachedKV().Write(topicName, &cache.DBResult{
KVs: kvs,
Rev: rev,
Err: err,
})
}
2 changes: 1 addition & 1 deletion server/resource/v1/kv_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func watch(rctx *restful.Context, request *model.ListKVRequest, wait string) boo
Project: request.Project,
MatchType: request.Match,
DomainID: request.Domain,
})
}, rctx.Ctx)
if err != nil {
WriteErrResponse(rctx, config.ErrObserveEvent, err.Error())
return true
Expand Down
Loading