From a67af0809871360a0df538c1931d51dde7d5bfd1 Mon Sep 17 00:00:00 2001 From: Youngteac Hong Date: Sat, 5 Jun 2021 20:19:54 +0900 Subject: [PATCH] Add Coordinator with PubSub and LockerMap (#198) --- design/pub-sub.md | 4 +- test/stress/sync_stress_test.go | 4 +- yorkie/backend/backend.go | 34 ++++----- yorkie/backend/sync/coordinator.go | 56 ++++++++++++++ yorkie/backend/sync/etcd/client.go | 31 ++------ yorkie/backend/sync/etcd/locker.go | 22 ++++++ yorkie/backend/sync/etcd/membermap.go | 10 +-- yorkie/backend/sync/locker.go | 3 - yorkie/backend/sync/memory/coordinator.go | 89 +++++++++++++++++++++++ yorkie/backend/sync/memory/locker.go | 29 -------- yorkie/backend/sync/memory/pubsub.go | 29 +++----- yorkie/backend/sync/memory/pubsub_test.go | 4 +- yorkie/backend/sync/pubsub.go | 66 +++++++++++------ yorkie/backend/sync/subscription.go | 74 ------------------- yorkie/packs/pack_service.go | 4 +- yorkie/rpc/server.go | 14 ++-- 16 files changed, 263 insertions(+), 210 deletions(-) create mode 100644 yorkie/backend/sync/coordinator.go create mode 100644 yorkie/backend/sync/memory/coordinator.go delete mode 100644 yorkie/backend/sync/subscription.go diff --git a/design/pub-sub.md b/design/pub-sub.md index c7d0668ff..af3acf515 100644 --- a/design/pub-sub.md +++ b/design/pub-sub.md @@ -32,9 +32,9 @@ The order of operation is as follows. 1. The WatchDocuments API creates a `Subscription` instance with the client and document keys. 2. Subscription instance internally manages the `DocEvent channel`, and the WatchDocuments API uses a select statement to retrieve events passed to the Subscription instance. -3. The client can deliver the event to the Subscription instances that are subscribed to the same document through the `PubSub.Publish(publisherID *time.ActorID, topic string, event DocEvent)` method. +3. The client can deliver the event to the Subscription instances that are subscribed to the same document through the `Publish(publisherID *time.ActorID, topic string, event DocEvent)` method. 4. In the select statement mentioned earlier, when an event is confirmed, it is sent to the stream server. ### Risks and Mitigation Currently, Subscription instances are managed in memory. This can be a problem when building a cluster of servers. -To solve this problem, we are planning to support cluster-mode using [etcd](https://github.com/etcd-io/etcd). \ No newline at end of file +To solve this problem, we are planning to support cluster-mode using [etcd](https://github.com/etcd-io/etcd). diff --git a/test/stress/sync_stress_test.go b/test/stress/sync_stress_test.go index a245fc311..01a8a0ec1 100644 --- a/test/stress/sync_stress_test.go +++ b/test/stress/sync_stress_test.go @@ -35,7 +35,7 @@ func TestSyncStress(t *testing.T) { t.Run("lock/unlock stress test", func(t *testing.T) { start := time.Now() - lockerMap := memory.NewLockerMap() + coordinator := memory.NewCoordinator(nil) size := 100 sum := 0 @@ -46,7 +46,7 @@ func TestSyncStress(t *testing.T) { defer wg.Done() ctx := context.Background() - locker, err := lockerMap.NewLocker(ctx, sync.Key(t.Name())) + locker, err := coordinator.NewLocker(ctx, sync.Key(t.Name())) assert.NoError(t, err) assert.NoError(t, locker.Lock(ctx)) sum += 1 diff --git a/yorkie/backend/backend.go b/yorkie/backend/backend.go index 7d58cf93a..f675c5363 100644 --- a/yorkie/backend/backend.go +++ b/yorkie/backend/backend.go @@ -51,10 +51,9 @@ type Backend struct { Config *Config agentInfo *sync.AgentInfo - DB db.DB - LockerMap sync.LockerMap - PubSub sync.PubSub - Metrics metrics.Metrics + DB db.DB + Coordinator sync.Coordinator + Metrics metrics.Metrics // closing is closed by backend close. closing chan struct{} @@ -92,9 +91,7 @@ func New( return nil, err } - // TODO(hackerwins): Merge these instances into Coordinator. - var pubSub sync.PubSub - var lockerMap sync.LockerMap + var coordinator sync.Coordinator if etcdConf != nil { etcdClient, err := etcd.Dial(etcdConf, agentInfo) if err != nil { @@ -104,11 +101,9 @@ func New( return nil, err } - lockerMap = etcdClient - pubSub = etcdClient + coordinator = etcdClient } else { - lockerMap = memory.NewLockerMap() - pubSub = memory.NewPubSub(agentInfo) + coordinator = memory.NewCoordinator(agentInfo) } log.Logger.Infof( @@ -118,13 +113,12 @@ func New( ) return &Backend{ - Config: conf, - agentInfo: agentInfo, - DB: mongoClient, - LockerMap: lockerMap, - PubSub: pubSub, - Metrics: met, - closing: make(chan struct{}), + Config: conf, + agentInfo: agentInfo, + DB: mongoClient, + Coordinator: coordinator, + Metrics: met, + closing: make(chan struct{}), }, nil } @@ -137,7 +131,7 @@ func (b *Backend) Close() error { // wait for goroutines before closing backend b.wg.Wait() - if err := b.LockerMap.Close(); err != nil { + if err := b.Coordinator.Close(); err != nil { log.Logger.Error(err) } @@ -176,5 +170,5 @@ func (b *Backend) AttachGoroutine(f func()) { // Members returns the members of this cluster. func (b *Backend) Members() map[string]*sync.AgentInfo { - return b.PubSub.Members() + return b.Coordinator.Members() } diff --git a/yorkie/backend/sync/coordinator.go b/yorkie/backend/sync/coordinator.go new file mode 100644 index 000000000..bcf5c07a0 --- /dev/null +++ b/yorkie/backend/sync/coordinator.go @@ -0,0 +1,56 @@ +/* + * Copyright 2020 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sync + +import ( + "errors" + gotime "time" + + "github.com/yorkie-team/yorkie/pkg/types" +) + +var ( + // ErrEmptyTopics is returned when the given topic is empty. + ErrEmptyTopics = errors.New("empty topics") +) + +// DocEvent represents events that occur related to the document. +type DocEvent struct { + Type types.EventType + DocKey string + Publisher types.Client +} + +// AgentInfo represents the information of the Agent. +type AgentInfo struct { + ID string `json:"id"` + Hostname string `json:"hostname"` + RPCAddr string `json:"rpc_addr"` + UpdatedAt gotime.Time `json:"updated_at"` +} + +// Coordinator provides synchronization functions such as locks and event Pub/Sub. +type Coordinator interface { + LockerMap + PubSub + + // Members returns the members of this cluster. + Members() map[string]*AgentInfo + + // Close closes all resources of this Coordinator. + Close() error +} diff --git a/yorkie/backend/sync/etcd/client.go b/yorkie/backend/sync/etcd/client.go index 23b2da9ae..5e4922308 100644 --- a/yorkie/backend/sync/etcd/client.go +++ b/yorkie/backend/sync/etcd/client.go @@ -22,7 +22,6 @@ import ( "time" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/clientv3/concurrency" "google.golang.org/grpc" "github.com/yorkie-team/yorkie/internal/log" @@ -50,7 +49,9 @@ type Config struct { // Client is a client that connects to ETCD. type Client struct { - config *Config + config *Config + agentInfo *sync.AgentInfo + client *clientv3.Client pubSub *memory.PubSub @@ -74,9 +75,10 @@ func newClient(conf *Config, agentInfo *sync.AgentInfo) *Client { ctx, cancelFunc := context.WithCancel(context.Background()) return &Client{ - config: conf, + config: conf, + agentInfo: agentInfo, - pubSub: memory.NewPubSub(agentInfo), + pubSub: memory.NewPubSub(), memberMapMu: &gosync.RWMutex{}, memberMap: make(map[string]*sync.AgentInfo), @@ -127,24 +129,3 @@ func (c *Client) Close() error { return c.client.Close() } - -// NewLocker creates locker of the given key. -func (c *Client) NewLocker( - ctx context.Context, - key sync.Key, -) (sync.Locker, error) { - session, err := concurrency.NewSession( - c.client, - concurrency.WithContext(ctx), - concurrency.WithTTL(c.config.LockLeaseTimeSec), - ) - if err != nil { - log.Logger.Error(err) - return nil, err - } - - return &internalLocker{ - session, - concurrency.NewMutex(session, key.String()), - }, nil -} diff --git a/yorkie/backend/sync/etcd/locker.go b/yorkie/backend/sync/etcd/locker.go index 4cdc97248..96ca68c50 100644 --- a/yorkie/backend/sync/etcd/locker.go +++ b/yorkie/backend/sync/etcd/locker.go @@ -22,8 +22,30 @@ import ( "go.etcd.io/etcd/clientv3/concurrency" "github.com/yorkie-team/yorkie/internal/log" + "github.com/yorkie-team/yorkie/yorkie/backend/sync" ) +// NewLocker creates locker of the given key. +func (c *Client) NewLocker( + ctx context.Context, + key sync.Key, +) (sync.Locker, error) { + session, err := concurrency.NewSession( + c.client, + concurrency.WithContext(ctx), + concurrency.WithTTL(c.config.LockLeaseTimeSec), + ) + if err != nil { + log.Logger.Error(err) + return nil, err + } + + return &internalLocker{ + session, + concurrency.NewMutex(session, key.String()), + }, nil +} + type internalLocker struct { session *concurrency.Session mu *concurrency.Mutex diff --git a/yorkie/backend/sync/etcd/membermap.go b/yorkie/backend/sync/etcd/membermap.go index ba2b26c2c..f74e36136 100644 --- a/yorkie/backend/sync/etcd/membermap.go +++ b/yorkie/backend/sync/etcd/membermap.go @@ -106,17 +106,17 @@ func (c *Client) putAgentPeriodically() { func (c *Client) putAgent(ctx context.Context) error { grantResponse, err := c.client.Grant(ctx, int64(agentValueTTL.Seconds())) if err != nil { - return fmt.Errorf("grant %s: %w", c.pubSub.AgentInfo.ID, err) + return fmt.Errorf("grant %s: %w", c.agentInfo.ID, err) } - agentInfo := *c.pubSub.AgentInfo + agentInfo := *c.agentInfo agentInfo.UpdatedAt = time.Now() bytes, err := json.Marshal(agentInfo) if err != nil { - return fmt.Errorf("marshal %s: %w", c.pubSub.AgentInfo.ID, err) + return fmt.Errorf("marshal %s: %w", c.agentInfo.ID, err) } - key := fmt.Sprintf("%s/%s", agentsPath, c.pubSub.AgentInfo.ID) + key := fmt.Sprintf("%s/%s", agentsPath, c.agentInfo.ID) _, err = c.client.Put(ctx, key, string(bytes), clientv3.WithLease(grantResponse.ID)) if err != nil { return fmt.Errorf("put %s: %w", key, err) @@ -126,7 +126,7 @@ func (c *Client) putAgent(ctx context.Context) error { // removeAgent removes the local agent in etcd. func (c *Client) removeAgent(ctx context.Context) error { - key := fmt.Sprintf("%s/%s", agentsPath, c.pubSub.AgentInfo.ID) + key := fmt.Sprintf("%s/%s", agentsPath, c.agentInfo.ID) _, err := c.client.Delete(ctx, key) if err != nil { return fmt.Errorf("remove %s: %w", key, err) diff --git a/yorkie/backend/sync/locker.go b/yorkie/backend/sync/locker.go index 5c770eb20..089b13f32 100644 --- a/yorkie/backend/sync/locker.go +++ b/yorkie/backend/sync/locker.go @@ -43,7 +43,4 @@ type Locker interface { type LockerMap interface { // NewLocker creates a sync.Locker. NewLocker(ctx context.Context, key Key) (Locker, error) - - // Close closes all resources of this LockerMap. - Close() error } diff --git a/yorkie/backend/sync/memory/coordinator.go b/yorkie/backend/sync/memory/coordinator.go new file mode 100644 index 000000000..7c6dfc9e1 --- /dev/null +++ b/yorkie/backend/sync/memory/coordinator.go @@ -0,0 +1,89 @@ +/* + * Copyright 2021 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package memory + +import ( + "context" + + "github.com/moby/locker" + + "github.com/yorkie-team/yorkie/pkg/document/time" + "github.com/yorkie-team/yorkie/pkg/types" + "github.com/yorkie-team/yorkie/yorkie/backend/sync" +) + +// Coordinator is a memory-based implementation of sync.Coordinator. +type Coordinator struct { + agentInfo *sync.AgentInfo + + locks *locker.Locker + pubSub *PubSub +} + +// NewCoordinator creates an instance of Coordinator. +func NewCoordinator(agentInfo *sync.AgentInfo) *Coordinator { + return &Coordinator{ + agentInfo: agentInfo, + locks: locker.New(), + pubSub: NewPubSub(), + } +} + +// NewLocker creates locker of the given key. +func (m *Coordinator) NewLocker( + ctx context.Context, + key sync.Key, +) (sync.Locker, error) { + return &internalLocker{ + key.String(), + m.locks, + }, nil +} + +// Subscribe subscribes to the given topics. +func (m *Coordinator) Subscribe( + subscriber types.Client, + topics []string, +) (*sync.Subscription, map[string][]types.Client, error) { + return m.pubSub.Subscribe(subscriber, topics) +} + +// Unsubscribe unsubscribes the given topics. +func (m *Coordinator) Unsubscribe(topics []string, sub *sync.Subscription) { + m.pubSub.Unsubscribe(topics, sub) +} + +// Publish publishes the given event to the given Topic. +func (m *Coordinator) Publish( + publisherID *time.ActorID, + topic string, + event sync.DocEvent, +) { + m.pubSub.Publish(publisherID, topic, event) +} + +// Members returns the members of this cluster. +func (m *Coordinator) Members() map[string]*sync.AgentInfo { + members := make(map[string]*sync.AgentInfo) + members[m.agentInfo.ID] = m.agentInfo + return members +} + +// Close closes all resources of this Coordinator. +func (m *Coordinator) Close() error { + return nil +} diff --git a/yorkie/backend/sync/memory/locker.go b/yorkie/backend/sync/memory/locker.go index d9bf1d74b..2f7037c04 100644 --- a/yorkie/backend/sync/memory/locker.go +++ b/yorkie/backend/sync/memory/locker.go @@ -22,26 +22,8 @@ import ( "github.com/moby/locker" "github.com/yorkie-team/yorkie/internal/log" - "github.com/yorkie-team/yorkie/yorkie/backend/sync" ) -// LockerMap is locker map based on memory. -type LockerMap struct { - locks *locker.Locker -} - -// NewLockerMap creates an instance of LockerMap. -func NewLockerMap() *LockerMap { - return &LockerMap{ - locks: locker.New(), - } -} - -// Close closes all resources of this LockerMap. -func (m *LockerMap) Close() error { - return nil -} - type internalLocker struct { key string locks *locker.Locker @@ -63,14 +45,3 @@ func (il *internalLocker) Unlock(ctx context.Context) error { return nil } - -// NewLocker creates locker of the given key. -func (m *LockerMap) NewLocker( - ctx context.Context, - key sync.Key, -) (sync.Locker, error) { - return &internalLocker{ - key.String(), - m.locks, - }, nil -} diff --git a/yorkie/backend/sync/memory/pubsub.go b/yorkie/backend/sync/memory/pubsub.go index de76bace7..50e8cf912 100644 --- a/yorkie/backend/sync/memory/pubsub.go +++ b/yorkie/backend/sync/memory/pubsub.go @@ -63,15 +63,13 @@ func (s *subscriptions) Len() int { // PubSub is the memory implementation of PubSub, used for single agent or // tests. type PubSub struct { - AgentInfo *sync.AgentInfo subscriptionsMapMu *gosync.RWMutex subscriptionsMapByTopic map[string]*subscriptions } // NewPubSub creates an instance of PubSub. -func NewPubSub(info *sync.AgentInfo) *PubSub { +func NewPubSub() *PubSub { return &PubSub{ - AgentInfo: info, subscriptionsMapMu: &gosync.RWMutex{}, subscriptionsMapByTopic: make(map[string]*subscriptions), } @@ -153,24 +151,19 @@ func (m *PubSub) Publish( if subs, ok := m.subscriptionsMapByTopic[topic]; ok { for _, sub := range subs.Map() { - if sub.Subscriber().ID.Compare(publisherID) != 0 { - log.Logger.Debugf( - `Publish(%s,%s) to %s`, - event.DocKey, - publisherID.String(), - sub.SubscriberID(), - ) - sub.Events() <- event + if sub.Subscriber().ID.Compare(publisherID) == 0 { + continue } + + log.Logger.Debugf( + `Publish(%s,%s) to %s`, + event.DocKey, + publisherID.String(), + sub.SubscriberID(), + ) + sub.Events() <- event } } log.Logger.Debugf(`Publish(%s,%s) End`, event.DocKey, publisherID.String()) } - -// Members returns the members of this cluster. -func (m *PubSub) Members() map[string]*sync.AgentInfo { - members := make(map[string]*sync.AgentInfo) - members[m.AgentInfo.ID] = m.AgentInfo - return members -} diff --git a/yorkie/backend/sync/memory/pubsub_test.go b/yorkie/backend/sync/memory/pubsub_test.go index 346a85d0f..bdecc7bab 100644 --- a/yorkie/backend/sync/memory/pubsub_test.go +++ b/yorkie/backend/sync/memory/pubsub_test.go @@ -33,7 +33,7 @@ func TestPubSub(t *testing.T) { actorB := types.Client{ID: &time.ActorID{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}} t.Run("publish subscribe test", func(t *testing.T) { - pubSub := memory.NewPubSub(nil) + pubSub := memory.NewPubSub() event := sync.DocEvent{ Type: types.DocumentsWatchedEvent, DocKey: t.Name(), @@ -61,7 +61,7 @@ func TestPubSub(t *testing.T) { }) t.Run("subscriptions map test", func(t *testing.T) { - pubSub := memory.NewPubSub(nil) + pubSub := memory.NewPubSub() for i := 0; i < 5; i++ { _, subs, err := pubSub.Subscribe(actorA, []string{t.Name()}) diff --git a/yorkie/backend/sync/pubsub.go b/yorkie/backend/sync/pubsub.go index c893cc653..3e5fe92fb 100644 --- a/yorkie/backend/sync/pubsub.go +++ b/yorkie/backend/sync/pubsub.go @@ -1,5 +1,5 @@ /* - * Copyright 2020 The Yorkie Authors. All rights reserved. + * Copyright 2021 The Yorkie Authors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,31 +17,58 @@ package sync import ( - "errors" - gotime "time" + "github.com/rs/xid" "github.com/yorkie-team/yorkie/pkg/document/time" "github.com/yorkie-team/yorkie/pkg/types" ) -var ( - // ErrEmptyTopics is returned when the given topic is empty. - ErrEmptyTopics = errors.New("empty topics") -) +// Subscription represents the subscription of a subscriber. It is used across +// several topics. +type Subscription struct { + id string + subscriber types.Client + closed bool + events chan DocEvent +} + +// NewSubscription creates a new instance of Subscription. +func NewSubscription(subscriber types.Client) *Subscription { + return &Subscription{ + id: xid.New().String(), + subscriber: subscriber, + events: make(chan DocEvent, 1), + } +} + +// ID returns the id of this subscription. +func (s *Subscription) ID() string { + return s.id +} -// DocEvent represents events that occur related to the document. -type DocEvent struct { - Type types.EventType - DocKey string - Publisher types.Client +// Events returns the DocEvent channel of this subscription. +func (s *Subscription) Events() chan DocEvent { + return s.events } -// AgentInfo represents the information of the Agent. -type AgentInfo struct { - ID string `json:"id"` - Hostname string `json:"hostname"` - RPCAddr string `json:"rpc_addr"` - UpdatedAt gotime.Time `json:"updated_at"` +// Subscriber returns the subscriber of this subscription. +func (s *Subscription) Subscriber() types.Client { + return s.subscriber +} + +// SubscriberID returns string representation of the subscriber. +func (s *Subscription) SubscriberID() string { + return s.subscriber.ID.String() +} + +// Close closes all resources of this Subscription. +func (s *Subscription) Close() { + if s.closed { + return + } + + s.closed = true + close(s.events) } // PubSub is a structure to support event publishing/subscription. @@ -57,7 +84,4 @@ type PubSub interface { // Publish publishes the given event to the given Topic. Publish(publisherID *time.ActorID, topic string, event DocEvent) - - // Members returns the members of this cluster. - Members() map[string]*AgentInfo } diff --git a/yorkie/backend/sync/subscription.go b/yorkie/backend/sync/subscription.go deleted file mode 100644 index 6f8c07d0e..000000000 --- a/yorkie/backend/sync/subscription.go +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2021 The Yorkie Authors. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package sync - -import ( - "github.com/rs/xid" - - "github.com/yorkie-team/yorkie/pkg/types" -) - -// Subscription represents the subscription of a subscriber. It is used across -// several topics. -type Subscription struct { - id string - subscriber types.Client - closed bool - events chan DocEvent -} - -// NewSubscription creates a new instance of Subscription. -func NewSubscription(subscriber types.Client) *Subscription { - return &Subscription{ - id: xid.New().String(), - subscriber: subscriber, - // [Workaround] The channel buffer size below avoids stopping during - // event issuing to the events channel. This bug occurs in the order - // of Publish and Unsubscribe. - events: make(chan DocEvent, 10), - } -} - -// ID returns the id of this subscription. -func (s *Subscription) ID() string { - return s.id -} - -// Events returns the DocEvent channel of this subscription. -func (s *Subscription) Events() chan DocEvent { - return s.events -} - -// Subscriber returns the subscriber of this subscription. -func (s *Subscription) Subscriber() types.Client { - return s.subscriber -} - -// SubscriberID returns string representation of the subscriber. -func (s *Subscription) SubscriberID() string { - return s.subscriber.ID.String() -} - -// Close closes all resources of this Subscription. -func (s *Subscription) Close() { - if s.closed { - return - } - - s.closed = true - close(s.events) -} diff --git a/yorkie/packs/pack_service.go b/yorkie/packs/pack_service.go index 3ef06917a..6fb61a841 100644 --- a/yorkie/packs/pack_service.go +++ b/yorkie/packs/pack_service.go @@ -106,7 +106,7 @@ func PushPull( // TODO(hackerwins): We need to replace Lock with TryLock. // If the snapshot is already being created by another routine, it // is not necessary to recreate it, so we can skip it. - locker, err := be.LockerMap.NewLocker( + locker, err := be.Coordinator.NewLocker( ctx, sync.NewKey(fmt.Sprintf("snapshot-%s", docInfo.Key)), ) @@ -126,7 +126,7 @@ func PushPull( } }() - be.PubSub.Publish( + be.Coordinator.Publish( publisherID, reqPack.DocumentKey.BSONKey(), sync.DocEvent{ diff --git a/yorkie/rpc/server.go b/yorkie/rpc/server.go index facb3f40c..dce472216 100644 --- a/yorkie/rpc/server.go +++ b/yorkie/rpc/server.go @@ -181,7 +181,7 @@ func (s *Server) AttachDocument( } if pack.HasChanges() { - locker, err := s.backend.LockerMap.NewLocker( + locker, err := s.backend.Coordinator.NewLocker( ctx, sync.NewKey(fmt.Sprintf("pushpull-%s", pack.DocumentKey.BSONKey())), ) @@ -246,7 +246,7 @@ func (s *Server) DetachDocument( } if pack.HasChanges() { - locker, err := s.backend.LockerMap.NewLocker( + locker, err := s.backend.Coordinator.NewLocker( ctx, sync.NewKey(fmt.Sprintf("pushpull-%s", pack.DocumentKey.BSONKey())), ) @@ -318,7 +318,7 @@ func (s *Server) PushPull( if pack.HasChanges() { s.backend.Metrics.SetPushPullReceivedChanges(len(pack.Changes)) - locker, err := s.backend.LockerMap.NewLocker( + locker, err := s.backend.Coordinator.NewLocker( ctx, sync.NewKey(fmt.Sprintf("pushpull-%s", pack.DocumentKey.BSONKey())), ) @@ -464,7 +464,7 @@ func (s *Server) watchDocs( client types.Client, docKeys []string, ) (*sync.Subscription, map[string][]types.Client, error) { - subscription, peersMap, err := s.backend.PubSub.Subscribe( + subscription, peersMap, err := s.backend.Coordinator.Subscribe( client, docKeys, ) @@ -474,7 +474,7 @@ func (s *Server) watchDocs( } for _, docKey := range docKeys { - s.backend.PubSub.Publish( + s.backend.Coordinator.Publish( subscription.Subscriber().ID, docKey, sync.DocEvent{ @@ -489,10 +489,10 @@ func (s *Server) watchDocs( } func (s *Server) unwatchDocs(docKeys []string, subscription *sync.Subscription) { - s.backend.PubSub.Unsubscribe(docKeys, subscription) + s.backend.Coordinator.Unsubscribe(docKeys, subscription) for _, docKey := range docKeys { - s.backend.PubSub.Publish( + s.backend.Coordinator.Publish( subscription.Subscriber().ID, docKey, sync.DocEvent{