Skip to content

Commit

Permalink
fix race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
gpolaert committed Jul 1, 2020
1 parent 7af0060 commit 3253151
Showing 1 changed file with 29 additions and 16 deletions.
45 changes: 29 additions & 16 deletions analytics/pubstack/pubstack_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func NewPubstackModule(scope, endpoint, configRefreshDelay string, maxEventCount
}

func (p *PubstackModule) LogAuctionObject(ao *analytics.AuctionObject) {
p.mux.Lock()
defer p.mux.Unlock()

if !p.isFeatureEnable(auction) {
return
}
Expand All @@ -102,10 +105,13 @@ func (p *PubstackModule) LogAuctionObject(ao *analytics.AuctionObject) {
return
}

p.logObject(auction, payload)
p.eventChannels[auction].Push(payload)
}

func (p *PubstackModule) LogVideoObject(vo *analytics.VideoObject) {
p.mux.Lock()
defer p.mux.Unlock()

if !p.isFeatureEnable(video) {
return
}
Expand All @@ -117,10 +123,13 @@ func (p *PubstackModule) LogVideoObject(vo *analytics.VideoObject) {
return
}

p.logObject(video, payload)
p.eventChannels[video].Push(payload)
}

func (p *PubstackModule) LogSetUIDObject(so *analytics.SetUIDObject) {
p.mux.Lock()
defer p.mux.Unlock()

if !p.isFeatureEnable(setUID) {
return
}
Expand All @@ -132,10 +141,13 @@ func (p *PubstackModule) LogSetUIDObject(so *analytics.SetUIDObject) {
return
}

p.logObject(setUID, payload)
p.eventChannels[setUID].Push(payload)
}

func (p *PubstackModule) LogCookieSyncObject(cso *analytics.CookieSyncObject) {
p.mux.Lock()
defer p.mux.Unlock()

if !p.isFeatureEnable(cookieSync) {
return
}
Expand All @@ -147,10 +159,14 @@ func (p *PubstackModule) LogCookieSyncObject(cso *analytics.CookieSyncObject) {
return
}

p.logObject(cookieSync, payload)
p.eventChannels[cookieSync].Push(payload)

}

func (p *PubstackModule) LogAmpObject(ao *analytics.AmpObject) {
p.mux.Lock()
defer p.mux.Unlock()

if !p.isFeatureEnable(amp) {
return
}
Expand All @@ -162,7 +178,8 @@ func (p *PubstackModule) LogAmpObject(ao *analytics.AmpObject) {
return
}

p.logObject(amp, payload)
p.eventChannels[amp].Push(payload)

}

func (p *PubstackModule) setup() {
Expand All @@ -182,10 +199,7 @@ func (p *PubstackModule) start(refreshDelay time.Duration, endCh chan os.Signal)
for {
select {
case config := <-p.configCh:
p.mux.Lock()
p.cfg = config
p.setEventChannels()
p.mux.Unlock()
p.configure(config)
glog.Infof("[pubstack] Updating config: %v", p.cfg)
case <-endCh:
return
Expand All @@ -194,7 +208,12 @@ func (p *PubstackModule) start(refreshDelay time.Duration, endCh chan os.Signal)

}

func (p *PubstackModule) setEventChannels() {
func (p *PubstackModule) configure(config *Configuration) {
p.mux.Lock()
defer p.mux.Unlock()

p.cfg = config

// close previous instance
for key, ch := range p.eventChannels {
ch.Close()
Expand Down Expand Up @@ -225,9 +244,3 @@ func (p *PubstackModule) isFeatureEnable(feature string) bool {
}
return true
}

func (p *PubstackModule) logObject(feature string, payload []byte) {
p.mux.Lock()
defer p.mux.Unlock()
p.eventChannels[feature].Push(payload)
}

0 comments on commit 3253151

Please sign in to comment.