Skip to content

Commit

Permalink
make use of submux lock visible and fix missing locks
Browse files Browse the repository at this point in the history
  • Loading branch information
magiconair committed Apr 7, 2022
1 parent 78dec6b commit f50785f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 23 deletions.
42 changes: 23 additions & 19 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (c *Client) SubscribeWithContext(ctx context.Context, params *SubscriptionP
}

c.subs[sub.SubscriptionID] = sub
c.updatePublishTimeout()
c.updatePublishTimeout_NeedsSubMuxRLock()
return sub, nil
}

Expand All @@ -98,11 +98,14 @@ func (c *Client) SubscriptionIDs() []uint32 {
// recreateSubscriptions creates new subscriptions
// with the same parameters to replace the previous ones
func (c *Client) recreateSubscription(ctx context.Context, id uint32) error {
c.subMux.Lock()
defer c.subMux.Unlock()

sub, ok := c.subs[id]
if !ok {
return ua.StatusBadSubscriptionIDInvalid
}
return sub.recreate(ctx)
return sub.recreate_NeedsSubMuxLock(ctx)
}

// transferSubscriptions ask the server to transfer the given subscriptions
Expand All @@ -123,10 +126,10 @@ func (c *Client) transferSubscriptions(ctx context.Context, ids []uint32) (*ua.T
// republishSubscriptions sends republish requests for the given subscription id.
func (c *Client) republishSubscription(ctx context.Context, id uint32, availableSeq []uint32) error {
c.subMux.RLock()
defer c.subMux.RUnlock()
sub := c.subs[id]
c.subMux.RUnlock()

sub, ok := c.subs[id]
if !ok {
if sub == nil {
return errors.Errorf("invalid subscription id %d", id)
}

Expand Down Expand Up @@ -208,11 +211,8 @@ func (c *Client) sendRepublishRequests(ctx context.Context, sub *Subscription, a
}
}

// registerSubscription register a subscription
func (c *Client) registerSubscription(sub *Subscription) error {
c.subMux.Lock()
defer c.subMux.Unlock()

// registerSubscription_NeedsSubMuxLock registers a subscription
func (c *Client) registerSubscription_NeedsSubMuxLock(sub *Subscription) error {
if sub.SubscriptionID == 0 {
return ua.StatusBadSubscriptionIDInvalid
}
Expand All @@ -227,19 +227,23 @@ func (c *Client) registerSubscription(sub *Subscription) error {

func (c *Client) forgetSubscription(ctx context.Context, id uint32) {
c.subMux.Lock()
delete(c.subs, id)
c.updatePublishTimeout()
c.forgetSubscription_NeedsSubMuxLock(ctx, id)
c.subMux.Unlock()
}

func (c *Client) forgetSubscription_NeedsSubMuxLock(ctx context.Context, id uint32) {
delete(c.subs, id)
c.updatePublishTimeout_NeedsSubMuxRLock()
stats.Subscription().Add("Count", -1)

if len(c.subs) == 0 {
// todo(fs): are we holding the lock too long here?
// todo(fs): consider running this as a go routine
c.pauseSubscriptions(ctx)
}
}

func (c *Client) updatePublishTimeout() {
// we need to hold the subMux lock already

func (c *Client) updatePublishTimeout_NeedsSubMuxRLock() {
maxTimeout := uasc.MaxTimeout
for _, s := range c.subs {
if d := s.publishTimeout(); d < maxTimeout {
Expand Down Expand Up @@ -441,7 +445,7 @@ func (c *Client) publish(ctx context.Context) error {
default:
c.subMux.Lock()
// handle pending acks for all subscriptions
c.handleAcks(res.Results)
c.handleAcks_NeedsSubMuxLock(res.Results)

sub, ok := c.subs[res.SubscriptionID]
if !ok {
Expand All @@ -452,7 +456,7 @@ func (c *Client) publish(ctx context.Context) error {
}

// handle the publish response for a specific subscription
c.handleNotification(ctx, sub, res)
c.handleNotification_NeedsSubMuxLock(ctx, sub, res)
c.subMux.Unlock()

c.notifySubscription(ctx, sub, res.NotificationMessage)
Expand All @@ -462,7 +466,7 @@ func (c *Client) publish(ctx context.Context) error {
return nil
}

func (c *Client) handleAcks(res []ua.StatusCode) {
func (c *Client) handleAcks_NeedsSubMuxLock(res []ua.StatusCode) {
dlog := debug.NewPrefixLogger("publish: ")

// we assume that the number of results in the response match
Expand Down Expand Up @@ -495,7 +499,7 @@ func (c *Client) handleAcks(res []ua.StatusCode) {
dlog.Printf("notAcked=%v", notAcked)
}

func (c *Client) handleNotification(ctx context.Context, sub *Subscription, res *ua.PublishResponse) {
func (c *Client) handleNotification_NeedsSubMuxLock(ctx context.Context, sub *Subscription, res *ua.PublishResponse) {
dlog := debug.NewPrefixLogger("publish: sub %d: ", res.SubscriptionID)

// keep-alive message
Expand Down
8 changes: 4 additions & 4 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,9 @@ func (p *SubscriptionParameters) setDefaults() {
}
}

// recreate creates a new subscription based on the previous subscription
// recreate_NeedsSubMuxLock creates a new subscription based on the previous subscription
// parameters and monitored items.
func (s *Subscription) recreate(ctx context.Context) error {
func (s *Subscription) recreate_NeedsSubMuxLock(ctx context.Context) error {
dlog := debug.NewPrefixLogger("sub %d: recreate: ", s.SubscriptionID)

if s.SubscriptionID == terminatedSubscriptionID {
Expand All @@ -361,7 +361,7 @@ func (s *Subscription) recreate(ctx context.Context) error {
})
dlog.Print("subscription deleted")
}
s.c.forgetSubscription(ctx, s.SubscriptionID)
s.c.forgetSubscription_NeedsSubMuxLock(ctx, s.SubscriptionID)
dlog.Printf("subscription forgotton")

req := &ua.CreateSubscriptionRequest{
Expand Down Expand Up @@ -394,7 +394,7 @@ func (s *Subscription) recreate(ctx context.Context) error {
s.lastSeq = 0
s.nextSeq = 1

if err := s.c.registerSubscription(s); err != nil {
if err := s.c.registerSubscription_NeedsSubMuxLock(s); err != nil {
return err
}
dlog.Printf("subscription registered")
Expand Down

0 comments on commit f50785f

Please sign in to comment.