Skip to content

Commit

Permalink
Add Coordinator with PubSub and LockerMap (yorkie-team#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
hackerwins authored and jeonjonghyeok committed Aug 4, 2022
1 parent ffa2ded commit a67af08
Show file tree
Hide file tree
Showing 16 changed files with 263 additions and 210 deletions.
4 changes: 2 additions & 2 deletions design/pub-sub.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ The order of operation is as follows.

1. The WatchDocuments API creates a `Subscription` instance with the client and document keys.
2. Subscription instance internally manages the `DocEvent channel`, and the WatchDocuments API uses a select statement to retrieve events passed to the Subscription instance.
3. The client can deliver the event to the Subscription instances that are subscribed to the same document through the `PubSub.Publish(publisherID *time.ActorID, topic string, event DocEvent)` method.
3. The client can deliver the event to the Subscription instances that are subscribed to the same document through the `Publish(publisherID *time.ActorID, topic string, event DocEvent)` method.
4. In the select statement mentioned earlier, when an event is confirmed, it is sent to the stream server.

### Risks and Mitigation
Currently, Subscription instances are managed in memory. This can be a problem when building a cluster of servers.
To solve this problem, we are planning to support cluster-mode using [etcd](https://github.com/etcd-io/etcd).
To solve this problem, we are planning to support cluster-mode using [etcd](https://github.com/etcd-io/etcd).
4 changes: 2 additions & 2 deletions test/stress/sync_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestSyncStress(t *testing.T) {
t.Run("lock/unlock stress test", func(t *testing.T) {
start := time.Now()

lockerMap := memory.NewLockerMap()
coordinator := memory.NewCoordinator(nil)

size := 100
sum := 0
Expand All @@ -46,7 +46,7 @@ func TestSyncStress(t *testing.T) {
defer wg.Done()

ctx := context.Background()
locker, err := lockerMap.NewLocker(ctx, sync.Key(t.Name()))
locker, err := coordinator.NewLocker(ctx, sync.Key(t.Name()))
assert.NoError(t, err)
assert.NoError(t, locker.Lock(ctx))
sum += 1
Expand Down
34 changes: 14 additions & 20 deletions yorkie/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ type Backend struct {
Config *Config
agentInfo *sync.AgentInfo

DB db.DB
LockerMap sync.LockerMap
PubSub sync.PubSub
Metrics metrics.Metrics
DB db.DB
Coordinator sync.Coordinator
Metrics metrics.Metrics

// closing is closed by backend close.
closing chan struct{}
Expand Down Expand Up @@ -92,9 +91,7 @@ func New(
return nil, err
}

// TODO(hackerwins): Merge these instances into Coordinator.
var pubSub sync.PubSub
var lockerMap sync.LockerMap
var coordinator sync.Coordinator
if etcdConf != nil {
etcdClient, err := etcd.Dial(etcdConf, agentInfo)
if err != nil {
Expand All @@ -104,11 +101,9 @@ func New(
return nil, err
}

lockerMap = etcdClient
pubSub = etcdClient
coordinator = etcdClient
} else {
lockerMap = memory.NewLockerMap()
pubSub = memory.NewPubSub(agentInfo)
coordinator = memory.NewCoordinator(agentInfo)
}

log.Logger.Infof(
Expand All @@ -118,13 +113,12 @@ func New(
)

return &Backend{
Config: conf,
agentInfo: agentInfo,
DB: mongoClient,
LockerMap: lockerMap,
PubSub: pubSub,
Metrics: met,
closing: make(chan struct{}),
Config: conf,
agentInfo: agentInfo,
DB: mongoClient,
Coordinator: coordinator,
Metrics: met,
closing: make(chan struct{}),
}, nil
}

Expand All @@ -137,7 +131,7 @@ func (b *Backend) Close() error {
// wait for goroutines before closing backend
b.wg.Wait()

if err := b.LockerMap.Close(); err != nil {
if err := b.Coordinator.Close(); err != nil {
log.Logger.Error(err)
}

Expand Down Expand Up @@ -176,5 +170,5 @@ func (b *Backend) AttachGoroutine(f func()) {

// Members returns the members of this cluster.
func (b *Backend) Members() map[string]*sync.AgentInfo {
return b.PubSub.Members()
return b.Coordinator.Members()
}
56 changes: 56 additions & 0 deletions yorkie/backend/sync/coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2020 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sync

import (
"errors"
gotime "time"

"github.com/yorkie-team/yorkie/pkg/types"
)

var (
// ErrEmptyTopics is returned when the given topic is empty.
ErrEmptyTopics = errors.New("empty topics")
)

// DocEvent represents events that occur related to the document.
type DocEvent struct {
Type types.EventType
DocKey string
Publisher types.Client
}

// AgentInfo represents the information of the Agent.
type AgentInfo struct {
ID string `json:"id"`
Hostname string `json:"hostname"`
RPCAddr string `json:"rpc_addr"`
UpdatedAt gotime.Time `json:"updated_at"`
}

// Coordinator provides synchronization functions such as locks and event Pub/Sub.
type Coordinator interface {
LockerMap
PubSub

// Members returns the members of this cluster.
Members() map[string]*AgentInfo

// Close closes all resources of this Coordinator.
Close() error
}
31 changes: 6 additions & 25 deletions yorkie/backend/sync/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"google.golang.org/grpc"

"github.com/yorkie-team/yorkie/internal/log"
Expand Down Expand Up @@ -50,7 +49,9 @@ type Config struct {

// Client is a client that connects to ETCD.
type Client struct {
config *Config
config *Config
agentInfo *sync.AgentInfo

client *clientv3.Client

pubSub *memory.PubSub
Expand All @@ -74,9 +75,10 @@ func newClient(conf *Config, agentInfo *sync.AgentInfo) *Client {
ctx, cancelFunc := context.WithCancel(context.Background())

return &Client{
config: conf,
config: conf,
agentInfo: agentInfo,

pubSub: memory.NewPubSub(agentInfo),
pubSub: memory.NewPubSub(),

memberMapMu: &gosync.RWMutex{},
memberMap: make(map[string]*sync.AgentInfo),
Expand Down Expand Up @@ -127,24 +129,3 @@ func (c *Client) Close() error {

return c.client.Close()
}

// NewLocker creates locker of the given key.
func (c *Client) NewLocker(
ctx context.Context,
key sync.Key,
) (sync.Locker, error) {
session, err := concurrency.NewSession(
c.client,
concurrency.WithContext(ctx),
concurrency.WithTTL(c.config.LockLeaseTimeSec),
)
if err != nil {
log.Logger.Error(err)
return nil, err
}

return &internalLocker{
session,
concurrency.NewMutex(session, key.String()),
}, nil
}
22 changes: 22 additions & 0 deletions yorkie/backend/sync/etcd/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,30 @@ import (
"go.etcd.io/etcd/clientv3/concurrency"

"github.com/yorkie-team/yorkie/internal/log"
"github.com/yorkie-team/yorkie/yorkie/backend/sync"
)

// NewLocker creates locker of the given key.
func (c *Client) NewLocker(
ctx context.Context,
key sync.Key,
) (sync.Locker, error) {
session, err := concurrency.NewSession(
c.client,
concurrency.WithContext(ctx),
concurrency.WithTTL(c.config.LockLeaseTimeSec),
)
if err != nil {
log.Logger.Error(err)
return nil, err
}

return &internalLocker{
session,
concurrency.NewMutex(session, key.String()),
}, nil
}

type internalLocker struct {
session *concurrency.Session
mu *concurrency.Mutex
Expand Down
10 changes: 5 additions & 5 deletions yorkie/backend/sync/etcd/membermap.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,17 @@ func (c *Client) putAgentPeriodically() {
func (c *Client) putAgent(ctx context.Context) error {
grantResponse, err := c.client.Grant(ctx, int64(agentValueTTL.Seconds()))
if err != nil {
return fmt.Errorf("grant %s: %w", c.pubSub.AgentInfo.ID, err)
return fmt.Errorf("grant %s: %w", c.agentInfo.ID, err)
}

agentInfo := *c.pubSub.AgentInfo
agentInfo := *c.agentInfo
agentInfo.UpdatedAt = time.Now()
bytes, err := json.Marshal(agentInfo)
if err != nil {
return fmt.Errorf("marshal %s: %w", c.pubSub.AgentInfo.ID, err)
return fmt.Errorf("marshal %s: %w", c.agentInfo.ID, err)
}

key := fmt.Sprintf("%s/%s", agentsPath, c.pubSub.AgentInfo.ID)
key := fmt.Sprintf("%s/%s", agentsPath, c.agentInfo.ID)
_, err = c.client.Put(ctx, key, string(bytes), clientv3.WithLease(grantResponse.ID))
if err != nil {
return fmt.Errorf("put %s: %w", key, err)
Expand All @@ -126,7 +126,7 @@ func (c *Client) putAgent(ctx context.Context) error {

// removeAgent removes the local agent in etcd.
func (c *Client) removeAgent(ctx context.Context) error {
key := fmt.Sprintf("%s/%s", agentsPath, c.pubSub.AgentInfo.ID)
key := fmt.Sprintf("%s/%s", agentsPath, c.agentInfo.ID)
_, err := c.client.Delete(ctx, key)
if err != nil {
return fmt.Errorf("remove %s: %w", key, err)
Expand Down
3 changes: 0 additions & 3 deletions yorkie/backend/sync/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,4 @@ type Locker interface {
type LockerMap interface {
// NewLocker creates a sync.Locker.
NewLocker(ctx context.Context, key Key) (Locker, error)

// Close closes all resources of this LockerMap.
Close() error
}
89 changes: 89 additions & 0 deletions yorkie/backend/sync/memory/coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2021 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package memory

import (
"context"

"github.com/moby/locker"

"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/pkg/types"
"github.com/yorkie-team/yorkie/yorkie/backend/sync"
)

// Coordinator is a memory-based implementation of sync.Coordinator.
type Coordinator struct {
agentInfo *sync.AgentInfo

locks *locker.Locker
pubSub *PubSub
}

// NewCoordinator creates an instance of Coordinator.
func NewCoordinator(agentInfo *sync.AgentInfo) *Coordinator {
return &Coordinator{
agentInfo: agentInfo,
locks: locker.New(),
pubSub: NewPubSub(),
}
}

// NewLocker creates locker of the given key.
func (m *Coordinator) NewLocker(
ctx context.Context,
key sync.Key,
) (sync.Locker, error) {
return &internalLocker{
key.String(),
m.locks,
}, nil
}

// Subscribe subscribes to the given topics.
func (m *Coordinator) Subscribe(
subscriber types.Client,
topics []string,
) (*sync.Subscription, map[string][]types.Client, error) {
return m.pubSub.Subscribe(subscriber, topics)
}

// Unsubscribe unsubscribes the given topics.
func (m *Coordinator) Unsubscribe(topics []string, sub *sync.Subscription) {
m.pubSub.Unsubscribe(topics, sub)
}

// Publish publishes the given event to the given Topic.
func (m *Coordinator) Publish(
publisherID *time.ActorID,
topic string,
event sync.DocEvent,
) {
m.pubSub.Publish(publisherID, topic, event)
}

// Members returns the members of this cluster.
func (m *Coordinator) Members() map[string]*sync.AgentInfo {
members := make(map[string]*sync.AgentInfo)
members[m.agentInfo.ID] = m.agentInfo
return members
}

// Close closes all resources of this Coordinator.
func (m *Coordinator) Close() error {
return nil
}
Loading

0 comments on commit a67af08

Please sign in to comment.