Skip to content

Commit

Permalink
add config sync task
Browse files Browse the repository at this point in the history
  • Loading branch information
robotLJW committed Nov 20, 2021
1 parent 3c1c16a commit defe730
Show file tree
Hide file tree
Showing 11 changed files with 225 additions and 13 deletions.
2 changes: 2 additions & 0 deletions examples/dev/kie-conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ db:
# kind=embedded_etcd, then is the embedded etcd server's advertise-peer-urls, e.g. default=http://127.0.0.1:2380
#uri: mongodb://kie:[email protected]:27017/kie
uri: http://127.0.0.1:2379
# turn on the synchronization switch related operations will be written to the task in the db
syncEnabled: false
# poolSize: 10
# timeout: 5m
# sslEnabled: false
Expand Down
9 changes: 9 additions & 0 deletions pkg/model/db_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ type KVDoc struct {
Domain string `json:"domain,omitempty" yaml:"domain,omitempty" validate:"min=1,max=256,commonName"` //redundant
}

// Task is db struct to store sync task
type Task struct {
Action string `json:"action" bson:"action"`
DataType string `json:"data_type" bson:"data_type"`
Data interface{} `json:"data" bson:"data"`
Timestamp string `json:"timestamp" bson:"timestamp"`
Status string `json:"status" bson:"status"`
}

//ViewDoc is db struct, it saves user's custom view name and criteria
type ViewDoc struct {
ID string `json:"id,omitempty" bson:"id,omitempty" yaml:"id,omitempty" swag:"string"`
Expand Down
1 change: 1 addition & 0 deletions server/config/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type DB struct {
CertPwdFile string `yaml:"certPwdFile"`
Timeout string `yaml:"timeout"`
VerifyPeer bool `yaml:"verifyPeer"`
SyncEnable bool `yaml:"syncEnabled"`
}

//RBAC is rbac config
Expand Down
23 changes: 17 additions & 6 deletions server/datasource/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ var (
)

var (
ErrKeyNotExists = errors.New("can not find any key value")
ErrRecordNotExists = errors.New("can not find any polling data")
ErrRevisionNotExist = errors.New("revision does not exist")
ErrAliasNotGiven = errors.New("label alias not given")
ErrKVAlreadyExists = errors.New("kv already exists")
ErrTooMany = errors.New("key with labels should be only one")
ErrKeyNotExists = errors.New("can not find any key value")
ErrRecordNotExists = errors.New("can not find any polling data")
ErrRevisionNotExist = errors.New("revision does not exist")
ErrAliasNotGiven = errors.New("label alias not given")
ErrKVAlreadyExists = errors.New("kv already exists")
ErrKVTaskAlreadyExists = errors.New("kv or sync task already exists")
ErrTooMany = errors.New("key with labels should be only one")
)

const (
Expand Down Expand Up @@ -73,6 +74,10 @@ type KVDao interface {
Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error)
Update(ctx context.Context, kv *model.KVDoc) error
List(ctx context.Context, project, domain string, options ...FindOption) (*model.KVResponse, error)

CreateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) (*model.KVDoc, error)
UpdateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) error

//FindOneAndDelete deletes one kv by id and return the deleted kv as these appeared before deletion
FindOneAndDelete(ctx context.Context, kvID string, project, domain string) (*model.KVDoc, error)
//FindManyAndDelete deletes multiple kvs and return the deleted kv list as these appeared before deletion
Expand All @@ -84,6 +89,12 @@ type KVDao interface {
Total(ctx context.Context, project, domain string) (int64, error)
}

// TaskDao provide api of Task entity
type TaskDao interface {
Create(ctx context.Context, task *model.Task, domain string, project string) (*model.Task, error)
Update(ctx context.Context, task *model.Task, domain string, project string) error
}

//HistoryDao provide api of History entity
type HistoryDao interface {
AddHistory(ctx context.Context, kv *model.KVDoc) error
Expand Down
1 change: 1 addition & 0 deletions server/datasource/etcd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (*Broker) GetHistoryDao() datasource.HistoryDao {
func (*Broker) GetTrackDao() datasource.TrackDao {
return &track.Dao{}
}

func init() {
datasource.RegisterPlugin("etcd", NewFrom)
datasource.RegisterPlugin("embedded_etcd", NewFrom)
Expand Down
10 changes: 10 additions & 0 deletions server/datasource/etcd/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,18 @@ const (
keyCounter = "counter"
keyHistory = "kv-history"
keyTrack = "track"
sync = "sync"
task = "task"
)

func getSyncRootKey() string {
return split + sync
}

func TaskKey(domain, project, timestamp string) string {
return strings.Join([]string{getSyncRootKey(), task, domain, project, timestamp}, split)
}

func KV(domain, project, kvID string) string {
return strings.Join([]string{keyKV, domain, project, kvID}, split)
}
Expand Down
88 changes: 88 additions & 0 deletions server/datasource/etcd/kv/kv_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kv
import (
"context"
"encoding/json"
"fmt"
"regexp"
"strings"

Expand Down Expand Up @@ -59,6 +60,93 @@ func (s *Dao) Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error)
return kv, nil
}

// CreateWithTask is used to create with the task after synchronization is turned on
func (s *Dao) CreateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) (*model.KVDoc, error) {
kvBytes, err := json.Marshal(kv)
if err != nil {
openlog.Error("fail to marshal kv")
return nil, err
}
taskBytes, err := json.Marshal(task)
if err != nil {
openlog.Error("fail to marshal task ")
return nil, err
}
kvOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.KV(kv.Domain, kv.Project, kv.ID)), etcdadpt.WithValue(kvBytes))
taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(kv.Domain, kv.Project, task.Timestamp)), etcdadpt.WithValue(taskBytes))
kvOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(kvOpPut.Key), etcdadpt.CmpEqual, 0)
taskOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(taskOpPut.Key), etcdadpt.CmpEqual, 0)
resp, err := etcdadpt.Instance().TxnWithCmp(ctx, []etcdadpt.OpOptions{kvOpPut, taskOpPut}, []etcdadpt.CmpOptions{kvOpCmp, taskOpCmp}, nil)
if err != nil {
openlog.Error("create error", openlog.WithTags(openlog.Tags{
"err": err.Error(),
"kv": kv,
"task": task,
}))
return nil, err
}
if !resp.Succeeded {
openlog.Error("create error", openlog.WithTags(openlog.Tags{
"err": datasource.ErrKVTaskAlreadyExists.Error(),
"kv": kv,
"task": task,
}))
return nil, datasource.ErrKVTaskAlreadyExists
}
return kv, nil
}

func (s *Dao) UpdateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) error {
keyKv := key.KV(kv.Domain, kv.Project, kv.ID)
resp, err := etcdadpt.Get(ctx, keyKv)
if err != nil {
openlog.Error(err.Error())
return err
}
if resp == nil {
return datasource.ErrRecordNotExists
}

var old model.KVDoc
err = json.Unmarshal(resp.Value, &old)
if err != nil {
openlog.Error(err.Error())
return err
}
old.LabelFormat = kv.LabelFormat
old.Value = kv.Value
old.Status = kv.Status
old.Checker = kv.Checker
old.UpdateTime = kv.UpdateTime
old.UpdateRevision = kv.UpdateRevision

kvBytes, err := json.Marshal(old)
if err != nil {
openlog.Error(err.Error())
return err
}

taskBytes, err := json.Marshal(task)
if err != nil {
openlog.Error(err.Error())
return err
}

kvOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(keyKv), etcdadpt.WithValue(kvBytes))
taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(kv.Domain, kv.Project, task.Timestamp)), etcdadpt.WithValue(taskBytes))
kvOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(kvOpPut.Key), etcdadpt.CmpEqual, 0)
taskOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(taskOpPut.Key), etcdadpt.CmpEqual, 0)
response, err := etcdadpt.Instance().TxnWithCmp(ctx, []etcdadpt.OpOptions{kvOpPut, taskOpPut}, []etcdadpt.CmpOptions{kvOpCmp, taskOpCmp}, nil)
openlog.Info("44444444444444444444")
openlog.Info(fmt.Sprintf("%v",response))
//err = etcdadpt.PutBytes(ctx, keyKv, bytes)
if err != nil {
openlog.Error(err.Error())
return err
}
return nil
}

//Update update key value
func (s *Dao) Update(ctx context.Context, kv *model.KVDoc) error {
keyKv := key.KV(kv.Domain, kv.Project, kv.ID)
Expand Down
1 change: 1 addition & 0 deletions server/datasource/mongo/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (*Broker) GetHistoryDao() datasource.HistoryDao {
func (*Broker) GetTrackDao() datasource.TrackDao {
return &track.Dao{}
}

func init() {
datasource.RegisterPlugin("mongo", NewFrom)
}
58 changes: 58 additions & 0 deletions server/datasource/mongo/kv/kv_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,64 @@ func (s *Dao) Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error)
return kv, nil
}

func (s *Dao) CreateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) (*model.KVDoc, error) {
taskSession, err := session.GetDB().Client().StartSession()
if err != nil {
return nil, err
}
if err = taskSession.StartTransaction(); err != nil {
return nil, err
}
defer taskSession.EndSession(ctx)
if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
collection := session.GetDB().Collection(session.CollectionKV)
_, err = collection.InsertOne(sessionContext, kv)
if err != nil {
openlog.Error("create error", openlog.WithTags(openlog.Tags{
"err": err.Error(),
"kv": kv,
}))
errAbort := taskSession.AbortTransaction(sessionContext)
if errAbort != nil {
openlog.Error("fail to abort transaction", openlog.WithTags(openlog.Tags{
"err": errAbort.Error(),
"kv": kv,
}))
}
return err
}

collection = session.GetDB().Collection(session.CollectionTask)
_, err = collection.InsertOne(sessionContext, task)
if err != nil {
openlog.Error("create error", openlog.WithTags(openlog.Tags{
"err": err.Error(),
"task": task,
}))
errAbort := taskSession.AbortTransaction(sessionContext)
if errAbort != nil {
openlog.Error("fail to abort transaction", openlog.WithTags(openlog.Tags{
"err": errAbort.Error(),
"task": task,
}))
}
return err
}
if err = taskSession.CommitTransaction(sessionContext); err != nil {
return err
}
return nil
}); err != nil {
openlog.Error(err.Error())
return nil, err
}
return kv, nil
}

func (s *Dao) UpdateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) error {
return nil
}

//Update update key value
func (s *Dao) Update(ctx context.Context, kv *model.KVDoc) error {
collection := session.GetDB().Collection(session.CollectionKV)
Expand Down
1 change: 1 addition & 0 deletions server/datasource/mongo/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
CollectionPollingDetail = "polling_detail"
CollectionCounter = "counter"
CollectionView = "view"
CollectionTask = "task"
)

//db errors
Expand Down
44 changes: 37 additions & 7 deletions server/service/kv/kv_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package kv
import (
"context"
"fmt"
"strconv"
"time"

"github.com/apache/servicecomb-kie/pkg/common"
"github.com/apache/servicecomb-kie/pkg/concurrency"
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
cfg "github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/pubsub"
"github.com/go-chassis/cari/config"
Expand Down Expand Up @@ -111,10 +113,25 @@ func Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, *errsvc.Error)
openlog.Error(err.Error())
return nil, config.NewError(config.ErrInternal, "create kv failed")
}
kv, err = datasource.GetBroker().GetKVDao().Create(ctx, kv)
if err != nil {
openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
return nil, config.NewError(config.ErrInternal, "create kv failed")
// open synchronization needs to write tasks to db
if cfg.GetDB().SyncEnable {
task := &model.Task{
Action: "create",
DataType: "config",
Data: kv,
Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
}
kv, err = datasource.GetBroker().GetKVDao().CreateWithTask(ctx, kv, task)
if err != nil {
openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
return nil, config.NewError(config.ErrInternal, "create kv failed")
}
} else {
kv, err = datasource.GetBroker().GetKVDao().Create(ctx, kv)
if err != nil {
openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
return nil, config.NewError(config.ErrInternal, "create kv failed")
}
}
err = datasource.GetBroker().GetHistoryDao().AddHistory(ctx, kv)
if err != nil {
Expand Down Expand Up @@ -228,9 +245,22 @@ func Update(ctx context.Context, kv *model.UpdateKVRequest) (*model.KVDoc, error
if err != nil {
return nil, err
}
err = datasource.GetBroker().GetKVDao().Update(ctx, oldKV)
if err != nil {
return nil, err
if cfg.GetDB().SyncEnable {
task := &model.Task{
Action: "update",
DataType: "config",
Data: kv,
Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
}
err = datasource.GetBroker().GetKVDao().UpdateWithTask(ctx, oldKV, task)
if err != nil {
return nil, err
}
} else {
err = datasource.GetBroker().GetKVDao().Update(ctx, oldKV)
if err != nil {
return nil, err
}
}
openlog.Info(
fmt.Sprintf("update %s with labels %s value [%s]",
Expand Down

0 comments on commit defe730

Please sign in to comment.