Skip to content

Commit

Permalink
first pass (pr reviews)
Browse files Browse the repository at this point in the history
  • Loading branch information
gpolaert committed Jul 21, 2020
1 parent 3253151 commit b539ee0
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 41 deletions.
2 changes: 2 additions & 0 deletions analytics/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"github.com/golang/glog"
"github.com/prebid/prebid-server/analytics"
"github.com/prebid/prebid-server/analytics/clients"
"github.com/prebid/prebid-server/analytics/filesystem"
"github.com/prebid/prebid-server/analytics/pubstack"
"github.com/prebid/prebid-server/config"
Expand All @@ -20,6 +21,7 @@ func NewPBSAnalytics(analytics *config.Analytics) analytics.PBSAnalyticsModule {
}
if analytics.Pubstack.Enabled {
pubstackModule, err := pubstack.NewPubstackModule(
clients.GetDefaultHttpInstance(),
analytics.Pubstack.ScopeId,
analytics.Pubstack.IntakeUrl,
analytics.Pubstack.ConfRefresh,
Expand Down
13 changes: 8 additions & 5 deletions analytics/pubstack/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"encoding/json"
"github.com/docker/go-units"
"github.com/golang/glog"
"github.com/prebid/prebid-server/analytics/clients"
"net/http"
"net/url"
"os"
"path"
Expand All @@ -17,7 +17,7 @@ func (p *PubstackModule) fetchAndUpdateConfig(refreshDelay time.Duration, endCh
for {
select {
case <-tick.C:
config, err := fetchConfig(p.cfg.ScopeId, p.cfg.Endpoint)
config, err := fetchConfig(p.httpClient, p.cfg.ScopeId, p.cfg.Endpoint)
if err != nil {
glog.Errorf("[pubstack] Fail to fetch remote configuration: %v", err)
continue
Expand All @@ -29,19 +29,22 @@ func (p *PubstackModule) fetchAndUpdateConfig(refreshDelay time.Duration, endCh
}
}

func fetchConfig(scope string, intake string) (*Configuration, error) {
func fetchConfig(client *http.Client, scope string, intake string) (*Configuration, error) {
u, err := url.Parse(intake)
if err != nil {
return nil, err
}

u.Path = path.Join(u.Path, "bootstrap")
q, _ := url.ParseQuery(u.RawQuery)
q, err := url.ParseQuery(u.RawQuery)
if err != nil {
return nil, err
}

q.Add("scopeId", scope)
u.RawQuery = q.Encode()

res, err := clients.GetDefaultHttpInstance().Get(u.String())
res, err := client.Get(u.String())
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions analytics/pubstack/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package pubstack
7 changes: 4 additions & 3 deletions analytics/pubstack/eventchannel/eventchannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var maxTime = 2 * time.Hour

func TestEventChannel_isBufferFull(t *testing.T) {

send := func(_ []byte) {}
send := func(_ []byte) error { return nil }

eventChannel := NewEventChannel(send, maxByteSize, maxEventCount, maxTime)
eventChannel.buffer([]byte("one"))
Expand All @@ -35,7 +35,7 @@ func TestEventChannel_isBufferFull(t *testing.T) {

func TestEventChannel_reset(t *testing.T) {

send := func(_ []byte) {}
send := func(_ []byte) error { return nil }

eventChannel := NewEventChannel(send, maxByteSize, maxEventCount, maxTime)
assert.Equal(t, eventChannel.metrics.eventCount, int64(0))
Expand All @@ -56,8 +56,9 @@ func TestEventChannel_reset(t *testing.T) {

func TestEventChannel_flush(t *testing.T) {
data := bytes.Buffer{}
send := func(payload []byte) {
send := func(payload []byte) error {
data.Write(payload)
return nil
}
maxByteSize := int64(15)
maxEventCount := int64(3)
Expand Down
22 changes: 12 additions & 10 deletions analytics/pubstack/eventchannel/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,44 @@ package eventchannel

import (
"bytes"
"fmt"
"github.com/golang/glog"
"github.com/prebid/prebid-server/analytics/clients"
"net/http"
"net/url"
"path"
)

type Sender = func(payload []byte)
type Sender = func(payload []byte) error

func NewHttpSender(endpoint string) Sender {
return func(payload []byte) {
func NewHttpSender(client *http.Client, endpoint string) Sender {
return func(payload []byte) error {
req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(payload))
if err != nil {
glog.Error(err)
return
return err
}

req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Content-Encoding", "gzip")

resp, err := clients.GetDefaultHttpInstance().Do(req)
resp, err := client.Do(req)
if err != nil {
return
return err
}

if resp.StatusCode != http.StatusOK {
glog.Errorf("[pubstack] Wrong code received %d instead of %d", resp.StatusCode, http.StatusOK)
return
return fmt.Errorf("wrong code received %d instead of %d", resp.StatusCode, http.StatusOK)
}
return nil
}
}

func BuildEndpointSender(baseUrl string, module string) Sender {
func BuildEndpointSender(client *http.Client, baseUrl string, module string) Sender {
endpoint, err := url.Parse(baseUrl)
if err != nil {
glog.Fatal(err)
}
endpoint.Path = path.Join(endpoint.Path, "intake", module)
return NewHttpSender(endpoint.String())
return NewHttpSender(client, endpoint.String())
}
40 changes: 40 additions & 0 deletions analytics/pubstack/eventchannel/sender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package eventchannel

import (
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewHttpSender(t *testing.T) {
requestBody := make([]byte, 10)
server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
requestBody, _ = ioutil.ReadAll(req.Body)
res.WriteHeader(200)
}))

defer server.Close()

sender := NewHttpSender(server.Client(), server.URL)
err := sender([]byte("message"))

assert.Equal(t, requestBody, []byte("message"))
assert.Nil(t, err)
}

func TestNewHttpSender_Error(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(400)
}))

defer server.Close()

sender := NewHttpSender(server.Client(), server.URL)
err := sender([]byte("message"))

assert.NotNil(t, err)
}
1 change: 0 additions & 1 deletion analytics/pubstack/helpers/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ func JsonifyCookieSync(cso *analytics.CookieSyncObject, scope string) ([]byte, e
}

func JsonifySetUIDObject(so *analytics.SetUIDObject, scope string) ([]byte, error) {
type alias analytics.SetUIDObject
b, err := json.Marshal(&struct {
Scope string `json:"scope"`
*analytics.SetUIDObject
Expand Down
61 changes: 61 additions & 0 deletions analytics/pubstack/helpers/json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package helpers

import (
"github.com/mxmCherry/openrtb"
"github.com/prebid/prebid-server/analytics"
"github.com/prebid/prebid-server/usersync"
"net/http"
"testing"
)

func TestJsonifyAuctionObject(t *testing.T) {
ao := &analytics.AuctionObject{
Status: http.StatusOK,
}
if _, err := JsonifyAuctionObject(ao, "scopeId"); err != nil {
t.Fail()
}

}

func TestJsonifyVideoObject(t *testing.T) {
vo := &analytics.VideoObject{
Status: http.StatusOK,
}
if _, err := JsonifyVideoObject(vo, "scopeId"); err != nil {
t.Fail()
}
}

func TestJsonifyCookieSync(t *testing.T) {
cso := &analytics.CookieSyncObject{
Status: http.StatusOK,
BidderStatus: []*usersync.CookieSyncBidders{},
}
if _, err := JsonifyCookieSync(cso, "scopeId"); err != nil {
t.Fail()
}
}

func TestJsonifySetUIDObject(t *testing.T) {
so := &analytics.SetUIDObject{
Status: http.StatusOK,
Bidder: "any-bidder",
UID: "uid string",
}
if _, err := JsonifySetUIDObject(so, "scopeId"); err != nil {
t.Fail()
}
}

func TestJsonifyAmpObject(t *testing.T) {
ao := &analytics.AmpObject{
Status: http.StatusOK,
Errors: make([]error, 0),
AuctionResponse: &openrtb.BidResponse{},
AmpTargetingValues: map[string]string{},
}
if _, err := JsonifyAmpObject(ao, "scopeId"); err != nil {
t.Fail()
}
}
35 changes: 18 additions & 17 deletions analytics/pubstack/pubstack_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pubstack
import (
"fmt"
"github.com/prebid/prebid-server/analytics/pubstack/eventchannel"
"net/http"
"os"
"os/signal"
"sync"
Expand Down Expand Up @@ -38,14 +39,15 @@ type bufferConfig struct {

type PubstackModule struct {
eventChannels map[string]*eventchannel.EventChannel
httpClient *http.Client
configCh chan *Configuration
scope string
cfg *Configuration
buffsCfg *bufferConfig
mux sync.Mutex
}

func NewPubstackModule(scope, endpoint, configRefreshDelay string, maxEventCount int, maxByteSize, maxTime string) (analytics.PBSAnalyticsModule, error) {
func NewPubstackModule(client *http.Client, scope, endpoint, configRefreshDelay string, maxEventCount int, maxByteSize, maxTime string) (analytics.PBSAnalyticsModule, error) {
glog.Infof("[pubstack] Initializing module scope=%s endpoint=%s\n", scope, endpoint)

// parse args
Expand All @@ -60,12 +62,13 @@ func NewPubstackModule(scope, endpoint, configRefreshDelay string, maxEventCount
return nil, fmt.Errorf("fail to parse the module args, arg=analytics.pubstack.buffers, :%v", err)
}

defaultFeatures := make(map[string]bool)
defaultFeatures[auction] = false
defaultFeatures[video] = false
defaultFeatures[amp] = false
defaultFeatures[cookieSync] = false
defaultFeatures[setUID] = false
defaultFeatures := map[string]bool{
auction: false,
video: false,
amp: false,
cookieSync: false,
setUID: false,
}

defaultConfig := &Configuration{
ScopeId: scope,
Expand All @@ -75,6 +78,7 @@ func NewPubstackModule(scope, endpoint, configRefreshDelay string, maxEventCount

pb := PubstackModule{
scope: scope,
httpClient: client,
cfg: defaultConfig,
buffsCfg: bufferCfg,
configCh: make(chan *Configuration),
Expand Down Expand Up @@ -183,7 +187,7 @@ func (p *PubstackModule) LogAmpObject(ao *analytics.AmpObject) {
}

func (p *PubstackModule) setup() {
config, err := fetchConfig(p.cfg.ScopeId, p.cfg.Endpoint)
config, err := fetchConfig(p.httpClient, p.cfg.ScopeId, p.cfg.Endpoint)
if err != nil {
glog.Errorf("[pubstack] Fail to fetch remote configuration: %v", err)
return
Expand Down Expand Up @@ -221,26 +225,23 @@ func (p *PubstackModule) configure(config *Configuration) {
}

if p.isFeatureEnable(amp) {
p.eventChannels[amp] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.cfg.Endpoint, amp), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout)
p.eventChannels[amp] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.httpClient, p.cfg.Endpoint, amp), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout)
}
if p.isFeatureEnable(auction) {
p.eventChannels[auction] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.cfg.Endpoint, auction), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout)
p.eventChannels[auction] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.httpClient, p.cfg.Endpoint, auction), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout)
}
if p.isFeatureEnable(cookieSync) {
p.eventChannels[cookieSync] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.cfg.Endpoint, cookieSync), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout)
p.eventChannels[cookieSync] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.httpClient, p.cfg.Endpoint, cookieSync), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout)
}
if p.isFeatureEnable(video) {
p.eventChannels[video] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.cfg.Endpoint, video), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout)
p.eventChannels[video] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.httpClient, p.cfg.Endpoint, video), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout)
}
if p.isFeatureEnable(setUID) {
p.eventChannels[setUID] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.cfg.Endpoint, setUID), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout)
p.eventChannels[setUID] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.httpClient, p.cfg.Endpoint, setUID), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout)
}
}

func (p *PubstackModule) isFeatureEnable(feature string) bool {
val, ok := p.cfg.Features[feature]
if !(ok && val) {
return false
}
return true
return ok && val
}
14 changes: 9 additions & 5 deletions analytics/pubstack/pubstack_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pubstack
import (
"bytes"
"encoding/json"
"github.com/prebid/prebid-server/analytics/clients"
"github.com/prebid/prebid-server/analytics/pubstack/eventchannel"
"io/ioutil"
"os"
Expand Down Expand Up @@ -57,18 +58,20 @@ func loadJsonFromFile() (*analytics.AuctionObject, error) {

func TestPubstackModule(t *testing.T) {

client := clients.GetDefaultHttpInstance()

// Loading Issues
_, err := NewPubstackModule("scope", "http://localhost:11287", "1z", 100, "90MB", "15m")
_, err := NewPubstackModule(client, "scope", "http://localhost:11287", "1z", 100, "90MB", "15m")
assert.NotEqual(t, err, nil) // should raise an error since we can't parse args // configRefreshDelay

_, err = NewPubstackModule("scope", "http://localhost:11287", "1h", 100, "90z", "15m")
_, err = NewPubstackModule(client, "scope", "http://localhost:11287", "1h", 100, "90z", "15m")
assert.NotEqual(t, err, nil) // should raise an error since we can't parse args // maxByte

_, err = NewPubstackModule("scope", "http://localhost:11287", "1h", 100, "90MB", "15z")
_, err = NewPubstackModule(client, "scope", "http://localhost:11287", "1h", 100, "90MB", "15z")
assert.NotEqual(t, err, nil) // should raise an error since we can't parse args // maxTime

// Loading OK
module, err := NewPubstackModule("scope", "http://localhost:11287", "1h", 100, "90MB", "15m")
module, err := NewPubstackModule(client, "scope", "http://localhost:11287", "1h", 100, "90MB", "15m")
assert.Equal(t, err, nil)

// Default Configuration
Expand All @@ -85,8 +88,9 @@ func TestPubstackModule(t *testing.T) {

// Process Auction Event
data := bytes.Buffer{}
send := func(payload []byte) {
send := func(payload []byte) error {
data.Write(payload)
return nil
}
mockedEvent, err := loadJsonFromFile()

Expand Down

0 comments on commit b539ee0

Please sign in to comment.