Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add document change event broadcast using members list #189

Merged
merged 12 commits into from
Jun 9, 2021
20 changes: 20 additions & 0 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/yorkie-team/yorkie/pkg/document/operation"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/pkg/types"
"github.com/yorkie-team/yorkie/yorkie/backend/sync"
)

// FromClient converts the given Protobuf format to model format.
Expand Down Expand Up @@ -139,6 +140,25 @@ func FromEventType(pbEventType api.EventType) (types.EventType, error) {
return "", fmt.Errorf("%v: %w", pbEventType, ErrUnsupportedEventType)
}

// FromDocEvent converts the given Protobuf format to model format.
func FromDocEvent(docEvent *api.DocEvent) (*sync.DocEvent, error) {
eventType, err := FromEventType(docEvent.EventType)
if err != nil {
return nil, err
}

publisher, err := FromClient(docEvent.Publisher)
if err != nil {
return nil, err
}

return &sync.DocEvent{
Type: eventType,
DocKey: docEvent.DocKey,
Publisher: *publisher,
}, nil
}

// FromOperations converts the given Protobuf format to model format.
func FromOperations(pbOps []*api.Operation) ([]operation.Operation, error) {
var ops []operation.Operation
Expand Down
17 changes: 17 additions & 0 deletions api/converter/to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/yorkie-team/yorkie/pkg/document/operation"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/pkg/types"
"github.com/yorkie-team/yorkie/yorkie/backend/sync"
)

// ToClient converts the given model to Protobuf format.
Expand Down Expand Up @@ -136,6 +137,22 @@ func ToEventType(eventType types.EventType) (api.EventType, error) {
}
}

// ToDocEvent converts the given model to Protobuf format.
func ToDocEvent(docEvent sync.DocEvent) (*api.DocEvent, error) {
eventType, err := ToEventType(docEvent.Type)
if err != nil {
return nil, err
}

publisher := ToClient(docEvent.Publisher)

return &api.DocEvent{
EventType: eventType,
DocKey: docEvent.DocKey,
Publisher: publisher,
}, nil
}

// ToOperations converts the given model format to Protobuf format.
func ToOperations(operations []operation.Operation) ([]*api.Operation, error) {
var pbOperations []*api.Operation
Expand Down
1,479 changes: 1,230 additions & 249 deletions api/yorkie.pb.go

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions api/yorkie.proto
Original file line number Diff line number Diff line change
Expand Up @@ -353,3 +353,23 @@ enum EventType {
DOCUMENTS_WATCHED = 1;
DOCUMENTS_UNWATCHED = 2;
}

service Broadcast {
rpc Publish (PublishRequest) returns (PublishResponse) {}
}
hackerwins marked this conversation as resolved.
Show resolved Hide resolved

message PublishRequest {
bytes publisher_id = 1;
string topic = 2;
DocEvent doc_event = 3;
}

message PublishResponse {
string topic = 1;
}

message DocEvent {
EventType event_type = 1;
string doc_key = 2;
Client publisher = 3;
}
hackerwins marked this conversation as resolved.
Show resolved Hide resolved
8 changes: 5 additions & 3 deletions test/integration/cluster_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ func TestClusterMode(t *testing.T) {
wg.Add(1)
rch := clientA.Watch(ctx, docA)
go func() {
defer wg.Done()

select {
case resp := <-rch:
defer wg.Done()
hackerwins marked this conversation as resolved.
Show resolved Hide resolved
if resp.Err == io.EOF {
return
}
Expand All @@ -118,10 +118,12 @@ func TestClusterMode(t *testing.T) {
})
assert.NoError(t, err)

err = clientB.Sync(ctx)
assert.NoError(t, err)

wg.Wait()

// TODO(hackerwins): uncomment below test
// assert.Equal(t, docA.Marshal(), docB.Marshal())
assert.Equal(t, docA.Marshal(), docB.Marshal())

defer func() {
assert.NoError(t, clientA.Deactivate(ctx))
Expand Down
166 changes: 166 additions & 0 deletions test/stress/cluster_stress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// +build stress

package stress

import (
"context"
"math/rand"
"strconv"
gosync "sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/yorkie-team/yorkie/client"
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/proxy"
"github.com/yorkie-team/yorkie/pkg/types"
"github.com/yorkie-team/yorkie/test/helper"
"github.com/yorkie-team/yorkie/yorkie"
"github.com/yorkie-team/yorkie/yorkie/backend/sync"
"github.com/yorkie-team/yorkie/yorkie/backend/sync/memory"
)

func TestClusterStress(t *testing.T) {
t.Run("watch document test", func(t *testing.T) {
hackerwins marked this conversation as resolved.
Show resolved Hide resolved
testerSize := 5
agents := make([]*yorkie.Yorkie, testerSize)

for i := 0; i < testerSize; i++ {
agent := helper.TestYorkie()
assert.NoError(t, agent.Start())
agents[i] = agent
}

defer func() {
for _, agent := range agents {
assert.NoError(t, agent.Shutdown(true))
}
}()

clients := make([]*client.Client, testerSize)
docs := make([]*document.Document, testerSize)
ctx := context.Background()
for i, agent := range agents {
cli, err := client.Dial(agent.RPCAddr(), client.Option{
Metadata: map[string]string{
"key": strconv.Itoa(i),
},
})
assert.NoError(t, err)

assert.NoError(t, cli.Activate(ctx))
clients[i] = cli

doc := document.New(helper.Collection, t.Name())
assert.NoError(t, cli.Attach(ctx, doc))
docs[i] = doc

err = doc.Update(func(root *proxy.ObjectProxy) error {
root.SetString("hello", "world")
root.SetNewText("text")
root.SetNewArray("arr")

return nil
})
assert.NoError(t, err)
}

for _, cli := range clients {
err := cli.Sync(ctx)
assert.NoError(t, err)
}

defer func() {
for _, cli := range clients {
assert.NoError(t, cli.Deactivate(ctx))
assert.NoError(t, cli.Close())
}
}()

assertForDocs := func(docs []*document.Document) {
var expected string
for i, doc := range docs {
if i == 0 {
expected = doc.Marshal()
continue
}

assert.Equal(t, expected, doc.Marshal())
}
}

requestCount := 100
wg := &gosync.WaitGroup{}
wg.Add(1)

locks := memory.NewLockerMap()
for i, cli := range clients {
rch := cli.Watch(ctx, docs[i])
go func(cli *client.Client) {
timeoutCount := 0
for {
select {
case resp := <-rch:
timeoutCount = 0
if resp.EventType == types.DocumentsChangeEvent {
locker, err := locks.NewLocker(ctx, sync.NewKey(cli.Metadata()["key"]))
assert.NoError(t, err)

err = locker.Lock(ctx)
assert.NoError(t, err)

err = cli.Sync(ctx, resp.Keys...)
assert.NoError(t, err)

err = locker.Unlock(ctx)
assert.NoError(t, err)
}
case <-time.After(time.Second):
timeoutCount++
if timeoutCount > 10 {
// A 'nagative WaitGroup counter' error occurs intermittently, causing a panic.
defer func() {
if r := recover(); r != nil {
assertForDocs(docs)
}
}()
wg.Done()
return
}
}
}
}(cli)
}

for i := 0; i < requestCount; i++ {
testerIdx := rand.Intn(testerSize)
cli := clients[testerIdx]

locker, err := locks.NewLocker(ctx, sync.NewKey(cli.Metadata()["key"]))
assert.NoError(t, err)

err = locker.Lock(ctx)
assert.NoError(t, err)
doc := docs[testerIdx]
err = doc.Update(func(root *proxy.ObjectProxy) error {
root.SetString("test"+strconv.Itoa(testerIdx), "yorkie")
root.GetText("text").Edit(0, 0, strconv.Itoa(testerIdx))
root.GetArray("arr").AddInteger(testerIdx)
return nil
}, "update "+strconv.Itoa(i)+"::"+strconv.Itoa(testerIdx))
assert.NoError(t, err)

err = cli.Sync(ctx)
assert.NoError(t, err)

err = locker.Unlock(ctx)
assert.NoError(t, err)
}

wg.Wait()

assertForDocs(docs)
})
}
17 changes: 13 additions & 4 deletions yorkie/backend/sync/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.etcd.io/etcd/clientv3/concurrency"
"google.golang.org/grpc"

"github.com/yorkie-team/yorkie/api"
"github.com/yorkie-team/yorkie/internal/log"
"github.com/yorkie-team/yorkie/yorkie/backend/sync"
"github.com/yorkie-team/yorkie/yorkie/backend/sync/memory"
Expand All @@ -48,15 +49,22 @@ type Config struct {
LockLeaseTimeSec int `json:"LockLeaseTimeSec"`
}

// broadcastClientInfo manages broadcast grpc server connections and clients.
type broadcastClientInfo struct {
client api.BroadcastClient
conn *grpc.ClientConn
}

// Client is a client that connects to ETCD.
type Client struct {
config *Config
client *clientv3.Client

pubSub *memory.PubSub

memberMapMu *gosync.RWMutex
memberMap map[string]*sync.AgentInfo
memberMapMu *gosync.RWMutex
memberMap map[string]*sync.AgentInfo
broadcastClientMap map[string]*broadcastClientInfo
hackerwins marked this conversation as resolved.
Show resolved Hide resolved

ctx context.Context
cancelFunc context.CancelFunc
Expand All @@ -78,8 +86,9 @@ func newClient(conf *Config, agentInfo *sync.AgentInfo) *Client {

pubSub: memory.NewPubSub(agentInfo),

memberMapMu: &gosync.RWMutex{},
memberMap: make(map[string]*sync.AgentInfo),
memberMapMu: &gosync.RWMutex{},
memberMap: make(map[string]*sync.AgentInfo),
broadcastClientMap: make(map[string]*broadcastClientInfo),

ctx: ctx,
cancelFunc: cancelFunc,
Expand Down
18 changes: 16 additions & 2 deletions yorkie/backend/sync/etcd/membermap.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ func (c *Client) syncAgents() {
}
c.setAgentInfo(string(event.Kv.Key), info)
case mvccpb.DELETE:
c.removeAgentInfo(string(event.Kv.Key))
err := c.removeAgentInfo(string(event.Kv.Key))
if err != nil {
log.Logger.Error(err)
}
}
}
case <-c.ctx.Done():
Expand All @@ -170,9 +173,20 @@ func (c *Client) setAgentInfo(key string, value sync.AgentInfo) {
}

// removeAgentInfo removes the given agentInfo from the local member map.
func (c *Client) removeAgentInfo(key string) {
func (c *Client) removeAgentInfo(key string) error {
c.memberMapMu.Lock()
defer c.memberMapMu.Unlock()

addr := c.memberMap[key].RPCAddr
if info, ok := c.broadcastClientMap[addr]; ok {
err := info.conn.Close()
if err != nil {
log.Logger.Error(err)
return err
}
hackerwins marked this conversation as resolved.
Show resolved Hide resolved
delete(c.broadcastClientMap, addr)
}
delete(c.memberMap, key)

return nil
}
Loading