Skip to content
This repository has been archived by the owner on Dec 22, 2022. It is now read-only.

Commit

Permalink
Rework pubstack module tests to remove race conditions (prebid#1522)
Browse files Browse the repository at this point in the history
* Rework pubstack module tests to remove race conditions

* PR feedback

* Remove event count and add helper methods to assert events received on channel
  • Loading branch information
bsardo authored Oct 8, 2020
1 parent 84ec617 commit 02e5266
Showing 1 changed file with 162 additions and 123 deletions.
285 changes: 162 additions & 123 deletions analytics/pubstack/pubstack_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package pubstack

import (
"encoding/json"
"github.com/prebid/prebid-server/analytics/pubstack/eventchannel"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"
"time"
Expand All @@ -16,7 +14,7 @@ import (
"github.com/stretchr/testify/assert"
)

func loadJsonFromFile() (*analytics.AuctionObject, error) {
func loadJSONFromFile() (*analytics.AuctionObject, error) {
req, err := os.Open("mocks/mock_openrtb_request.json")
if err != nil {
return nil, err
Expand Down Expand Up @@ -57,130 +55,171 @@ func loadJsonFromFile() (*analytics.AuctionObject, error) {
}, nil
}

func TestPubstackModule(t *testing.T) {

remoteConfig := &Configuration{}
server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
data, _ := json.Marshal(remoteConfig)
res.Write(data)
}))
client := server.Client()

defer server.Close()

// Loading Issues
_, err := NewPubstackModule(client, "scope", server.URL, "1z", 100, "90MB", "15m")
assert.NotNil(t, err) // should raise an error since we can't parse args // configRefreshDelay

_, err = NewPubstackModule(client, "scope", server.URL, "1h", 100, "90z", "15m")
assert.NotNil(t, err) // should raise an error since we can't parse args // maxByte

_, err = NewPubstackModule(client, "scope", server.URL, "1h", 100, "90MB", "15z")
assert.NotNil(t, err) // should raise an error since we can't parse args // maxTime

// Loading OK
module, err := NewPubstackModule(client, "scope", server.URL, "10ms", 100, "90MB", "15m")
assert.Nil(t, err)

// Default Configuration
pubstack, ok := module.(*PubstackModule)
assert.Equal(t, ok, true) //PBSAnalyticsModule is also a PubstackModule
assert.Equal(t, len(pubstack.cfg.Features), 5)
assert.Equal(t, pubstack.cfg.Features[auction], false)
assert.Equal(t, pubstack.cfg.Features[video], false)
assert.Equal(t, pubstack.cfg.Features[amp], false)
assert.Equal(t, pubstack.cfg.Features[setUID], false)
assert.Equal(t, pubstack.cfg.Features[cookieSync], false)

assert.Equal(t, len(pubstack.eventChannels), 0)

// Process Auction Event
counter := 0
send := func(_ []byte) error {
counter++
return nil
func TestPubstackModuleErrors(t *testing.T) {
tests := []struct {
description string
refreshDelay string
maxByteSize string
maxTime string
}{
{
description: "refresh delay is in an invalid format",
refreshDelay: "1invalid",
maxByteSize: "90MB",
maxTime: "15m",
},
{
description: "max byte size is in an invalid format",
refreshDelay: "1h",
maxByteSize: "90invalid",
maxTime: "15m",
},
{
description: "max time is in an invalid format",
refreshDelay: "1h",
maxByteSize: "90MB",
maxTime: "15invalid",
},
}
mockedEvent, err := loadJsonFromFile()
if err != nil {
t.Fail()

for _, tt := range tests {
_, err := NewPubstackModule(&http.Client{}, "scope", "http://example.com", tt.refreshDelay, 100, tt.maxByteSize, tt.maxTime)
assert.NotNil(t, err, tt.description)
}
}

pubstack.eventChannels[auction] = eventchannel.NewEventChannel(send, 2000, 1, 10*time.Second)
pubstack.eventChannels[video] = eventchannel.NewEventChannel(send, 2000, 1, 10*time.Second)
pubstack.eventChannels[amp] = eventchannel.NewEventChannel(send, 2000, 1, 10*time.Second)
pubstack.eventChannels[setUID] = eventchannel.NewEventChannel(send, 2000, 1, 10*time.Second)
pubstack.eventChannels[cookieSync] = eventchannel.NewEventChannel(send, 2000, 1, 10*time.Second)

pubstack.LogAuctionObject(mockedEvent)
pubstack.LogAmpObject(&analytics.AmpObject{
Status: http.StatusOK,
})
pubstack.LogCookieSyncObject(&analytics.CookieSyncObject{
Status: http.StatusOK,
})
pubstack.LogVideoObject(&analytics.VideoObject{
Status: http.StatusOK,
})
pubstack.LogSetUIDObject(&analytics.SetUIDObject{
Status: http.StatusOK,
})

pubstack.closeAllEventChannels()
time.Sleep(10 * time.Millisecond) // process channel
assert.Equal(t, counter, 0)

// Hot-Reload config
newFeatures := make(map[string]bool)
newFeatures[auction] = true
newFeatures[video] = true
newFeatures[amp] = true
newFeatures[cookieSync] = true
newFeatures[setUID] = true

remoteConfig = &Configuration{
ScopeID: "new-scope",
Endpoint: "new-endpoint",
Features: newFeatures,
func TestPubstackModuleSuccess(t *testing.T) {
tests := []struct {
description string
feature string
logObject func(analytics.PBSAnalyticsModule)
}{
{
description: "auction events are only published when logging an auction object with auction feature on",
feature: auction,
logObject: func(module analytics.PBSAnalyticsModule) {
module.LogAuctionObject(&analytics.AuctionObject{Status: http.StatusOK})
},
},
{
description: "AMP events are only published when logging an AMP object with AMP feature on",
feature: amp,
logObject: func(module analytics.PBSAnalyticsModule) {
module.LogAmpObject(&analytics.AmpObject{Status: http.StatusOK})
},
},
{
description: "video events are only published when logging a video object with video feature on",
feature: video,
logObject: func(module analytics.PBSAnalyticsModule) {
module.LogVideoObject(&analytics.VideoObject{Status: http.StatusOK})
},
},
{
description: "cookie events are only published when logging a cookie object with cookie feature on",
feature: cookieSync,
logObject: func(module analytics.PBSAnalyticsModule) {
module.LogCookieSyncObject(&analytics.CookieSyncObject{Status: http.StatusOK})
},
},
{
description: "setUID events are only published when logging a setUID object with setUID feature on",
feature: setUID,
logObject: func(module analytics.PBSAnalyticsModule) {
module.LogSetUIDObject(&analytics.SetUIDObject{Status: http.StatusOK})
},
},
}

endpoint, _ := url.Parse(server.URL)
pubstack.reloadConfig(endpoint)

time.Sleep(2 * time.Millisecond) // process channel
assert.Equal(t, len(pubstack.cfg.Features), 5)
assert.Equal(t, pubstack.cfg.Features[auction], true)
assert.Equal(t, pubstack.cfg.Features[video], true)
assert.Equal(t, pubstack.cfg.Features[amp], true)
assert.Equal(t, pubstack.cfg.Features[setUID], true)
assert.Equal(t, pubstack.cfg.Features[cookieSync], true)
assert.Equal(t, pubstack.cfg.ScopeID, "new-scope")
assert.Equal(t, pubstack.cfg.Endpoint, "new-endpoint")
assert.Equal(t, len(pubstack.eventChannels), 5)

counter = 0
pubstack.eventChannels[auction] = eventchannel.NewEventChannel(send, 2000, 1, 10*time.Second)
pubstack.eventChannels[video] = eventchannel.NewEventChannel(send, 2000, 1, 10*time.Second)
pubstack.eventChannels[amp] = eventchannel.NewEventChannel(send, 2000, 1, 10*time.Second)
pubstack.eventChannels[setUID] = eventchannel.NewEventChannel(send, 2000, 1, 10*time.Second)
pubstack.eventChannels[cookieSync] = eventchannel.NewEventChannel(send, 2000, 1, 10*time.Second)

pubstack.LogAuctionObject(mockedEvent)
pubstack.LogAmpObject(&analytics.AmpObject{
Status: http.StatusOK,
})
pubstack.LogCookieSyncObject(&analytics.CookieSyncObject{
Status: http.StatusOK,
})
pubstack.LogVideoObject(&analytics.VideoObject{
Status: http.StatusOK,
})
pubstack.LogSetUIDObject(&analytics.SetUIDObject{
Status: http.StatusOK,
})
pubstack.closeAllEventChannels()
time.Sleep(10 * time.Millisecond)

assert.Equal(t, counter, 5)
for _, tt := range tests {
// original config is loaded when the module is created
// the feature is disabled so no events should be sent
origConfig := &Configuration{
Features: map[string]bool{
tt.feature: false,
},
}
// updated config is hot-reloaded after some time passes
// the feature is enabled so events should be sent
updatedConfig := &Configuration{
Features: map[string]bool{
tt.feature: true,
},
}

// create server with root endpoint that returns the current config
// add an intake endpoint that PBS hits when events are sent
rootCount := 0
mux := http.NewServeMux()
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
rootCount++
defer req.Body.Close()

if rootCount > 1 {
if data, err := json.Marshal(updatedConfig); err != nil {
res.WriteHeader(http.StatusBadRequest)
} else {
res.Write(data)
}
} else {
if data, err := json.Marshal(origConfig); err != nil {
res.WriteHeader(http.StatusBadRequest)
} else {
res.Write(data)
}
}
})

intakeChannel := make(chan int) // using a channel rather than examining the count directly to avoid race
mux.HandleFunc("/intake/"+tt.feature+"/", func(res http.ResponseWriter, req *http.Request) {
intakeChannel <- 1
})
server := httptest.NewServer(mux)
client := server.Client()

// set the server url on each of the configs
origConfig.Endpoint = server.URL
updatedConfig.Endpoint = server.URL

// instantiate module with 25ms config refresh rate
module, err := NewPubstackModule(client, "scope", server.URL, "15ms", 100, "1B", "10ms")
assert.Nil(t, err, tt.description)

// allow time for the module to load the original config
time.Sleep(10 * time.Millisecond)

pubstack, _ := module.(*PubstackModule)
// attempt to log but no event channel was created because the feature is disabled in the original config
tt.logObject(pubstack)

// verify no event was received over a 10ms period
assertChanNone(t, intakeChannel, tt.description)

// allow time for the server to start serving the updated config
time.Sleep(10 * time.Millisecond)

// attempt to log; the event channel should have been created because the feature is enabled in updated config
tt.logObject(pubstack)

// verify an event was received within 10ms
assertChanOne(t, intakeChannel, tt.description)
}
}

func assertChanNone(t *testing.T, c <-chan int, msgAndArgs ...interface{}) bool {
select {
case <-c:
return assert.Fail(t, "Should NOT receive an event, but did", msgAndArgs...)
case <-time.After(10 * time.Millisecond):
return true
}
}

func assertChanOne(t *testing.T, c <-chan int, msgAndArgs ...interface{}) bool {
select {
case <-c:
return true
case <-time.After(10 * time.Millisecond):
return assert.Fail(t, "Should receive an event, but did NOT", msgAndArgs...)
}
}

0 comments on commit 02e5266

Please sign in to comment.