diff --git a/analytics/config/config.go b/analytics/config/config.go index 40d0ce1b8c5..0a474d5f68f 100644 --- a/analytics/config/config.go +++ b/analytics/config/config.go @@ -3,6 +3,7 @@ package config import ( "github.com/golang/glog" "github.com/prebid/prebid-server/analytics" + "github.com/prebid/prebid-server/analytics/clients" "github.com/prebid/prebid-server/analytics/filesystem" "github.com/prebid/prebid-server/analytics/pubstack" "github.com/prebid/prebid-server/config" @@ -20,6 +21,7 @@ func NewPBSAnalytics(analytics *config.Analytics) analytics.PBSAnalyticsModule { } if analytics.Pubstack.Enabled { pubstackModule, err := pubstack.NewPubstackModule( + clients.GetDefaultHttpInstance(), analytics.Pubstack.ScopeId, analytics.Pubstack.IntakeUrl, analytics.Pubstack.ConfRefresh, diff --git a/analytics/pubstack/config.go b/analytics/pubstack/config.go index 9295e9f59ae..e519c7c7fb3 100644 --- a/analytics/pubstack/config.go +++ b/analytics/pubstack/config.go @@ -4,7 +4,7 @@ import ( "encoding/json" "github.com/docker/go-units" "github.com/golang/glog" - "github.com/prebid/prebid-server/analytics/clients" + "net/http" "net/url" "os" "path" @@ -17,7 +17,7 @@ func (p *PubstackModule) fetchAndUpdateConfig(refreshDelay time.Duration, endCh for { select { case <-tick.C: - config, err := fetchConfig(p.cfg.ScopeId, p.cfg.Endpoint) + config, err := fetchConfig(p.httpClient, p.cfg.ScopeId, p.cfg.Endpoint) if err != nil { glog.Errorf("[pubstack] Fail to fetch remote configuration: %v", err) continue @@ -29,19 +29,22 @@ func (p *PubstackModule) fetchAndUpdateConfig(refreshDelay time.Duration, endCh } } -func fetchConfig(scope string, intake string) (*Configuration, error) { +func fetchConfig(client *http.Client, scope string, intake string) (*Configuration, error) { u, err := url.Parse(intake) if err != nil { return nil, err } u.Path = path.Join(u.Path, "bootstrap") - q, _ := url.ParseQuery(u.RawQuery) + q, err := url.ParseQuery(u.RawQuery) + if err != nil { + return nil, err + } q.Add("scopeId", scope) u.RawQuery = q.Encode() - res, err := clients.GetDefaultHttpInstance().Get(u.String()) + res, err := client.Get(u.String()) if err != nil { return nil, err } diff --git a/analytics/pubstack/config_test.go b/analytics/pubstack/config_test.go new file mode 100644 index 00000000000..08a80e7b9fb --- /dev/null +++ b/analytics/pubstack/config_test.go @@ -0,0 +1 @@ +package pubstack diff --git a/analytics/pubstack/eventchannel/eventchannel_test.go b/analytics/pubstack/eventchannel/eventchannel_test.go index 65ebc6a28cf..4c5b63c6a16 100644 --- a/analytics/pubstack/eventchannel/eventchannel_test.go +++ b/analytics/pubstack/eventchannel/eventchannel_test.go @@ -15,7 +15,7 @@ var maxTime = 2 * time.Hour func TestEventChannel_isBufferFull(t *testing.T) { - send := func(_ []byte) {} + send := func(_ []byte) error { return nil } eventChannel := NewEventChannel(send, maxByteSize, maxEventCount, maxTime) eventChannel.buffer([]byte("one")) @@ -35,7 +35,7 @@ func TestEventChannel_isBufferFull(t *testing.T) { func TestEventChannel_reset(t *testing.T) { - send := func(_ []byte) {} + send := func(_ []byte) error { return nil } eventChannel := NewEventChannel(send, maxByteSize, maxEventCount, maxTime) assert.Equal(t, eventChannel.metrics.eventCount, int64(0)) @@ -56,8 +56,9 @@ func TestEventChannel_reset(t *testing.T) { func TestEventChannel_flush(t *testing.T) { data := bytes.Buffer{} - send := func(payload []byte) { + send := func(payload []byte) error { data.Write(payload) + return nil } maxByteSize := int64(15) maxEventCount := int64(3) diff --git a/analytics/pubstack/eventchannel/sender.go b/analytics/pubstack/eventchannel/sender.go index f1c9ab08582..2d734cd5cd3 100644 --- a/analytics/pubstack/eventchannel/sender.go +++ b/analytics/pubstack/eventchannel/sender.go @@ -2,42 +2,44 @@ package eventchannel import ( "bytes" + "fmt" "github.com/golang/glog" - "github.com/prebid/prebid-server/analytics/clients" "net/http" "net/url" "path" ) -type Sender = func(payload []byte) +type Sender = func(payload []byte) error -func NewHttpSender(endpoint string) Sender { - return func(payload []byte) { +func NewHttpSender(client *http.Client, endpoint string) Sender { + return func(payload []byte) error { req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(payload)) if err != nil { glog.Error(err) - return + return err } req.Header.Set("Content-Type", "application/octet-stream") req.Header.Set("Content-Encoding", "gzip") - resp, err := clients.GetDefaultHttpInstance().Do(req) + resp, err := client.Do(req) if err != nil { - return + return err } + if resp.StatusCode != http.StatusOK { glog.Errorf("[pubstack] Wrong code received %d instead of %d", resp.StatusCode, http.StatusOK) - return + return fmt.Errorf("wrong code received %d instead of %d", resp.StatusCode, http.StatusOK) } + return nil } } -func BuildEndpointSender(baseUrl string, module string) Sender { +func BuildEndpointSender(client *http.Client, baseUrl string, module string) Sender { endpoint, err := url.Parse(baseUrl) if err != nil { glog.Fatal(err) } endpoint.Path = path.Join(endpoint.Path, "intake", module) - return NewHttpSender(endpoint.String()) + return NewHttpSender(client, endpoint.String()) } diff --git a/analytics/pubstack/eventchannel/sender_test.go b/analytics/pubstack/eventchannel/sender_test.go new file mode 100644 index 00000000000..fb07bd527fe --- /dev/null +++ b/analytics/pubstack/eventchannel/sender_test.go @@ -0,0 +1,40 @@ +package eventchannel + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewHttpSender(t *testing.T) { + requestBody := make([]byte, 10) + server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + requestBody, _ = ioutil.ReadAll(req.Body) + res.WriteHeader(200) + })) + + defer server.Close() + + sender := NewHttpSender(server.Client(), server.URL) + err := sender([]byte("message")) + + assert.Equal(t, requestBody, []byte("message")) + assert.Nil(t, err) +} + +func TestNewHttpSender_Error(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + res.WriteHeader(400) + })) + + defer server.Close() + + sender := NewHttpSender(server.Client(), server.URL) + err := sender([]byte("message")) + + assert.NotNil(t, err) +} diff --git a/analytics/pubstack/helpers/json.go b/analytics/pubstack/helpers/json.go index fe8aece0854..f02f1120626 100644 --- a/analytics/pubstack/helpers/json.go +++ b/analytics/pubstack/helpers/json.go @@ -56,7 +56,6 @@ func JsonifyCookieSync(cso *analytics.CookieSyncObject, scope string) ([]byte, e } func JsonifySetUIDObject(so *analytics.SetUIDObject, scope string) ([]byte, error) { - type alias analytics.SetUIDObject b, err := json.Marshal(&struct { Scope string `json:"scope"` *analytics.SetUIDObject diff --git a/analytics/pubstack/helpers/json_test.go b/analytics/pubstack/helpers/json_test.go new file mode 100644 index 00000000000..4e36e8db2be --- /dev/null +++ b/analytics/pubstack/helpers/json_test.go @@ -0,0 +1,61 @@ +package helpers + +import ( + "github.com/mxmCherry/openrtb" + "github.com/prebid/prebid-server/analytics" + "github.com/prebid/prebid-server/usersync" + "net/http" + "testing" +) + +func TestJsonifyAuctionObject(t *testing.T) { + ao := &analytics.AuctionObject{ + Status: http.StatusOK, + } + if _, err := JsonifyAuctionObject(ao, "scopeId"); err != nil { + t.Fail() + } + +} + +func TestJsonifyVideoObject(t *testing.T) { + vo := &analytics.VideoObject{ + Status: http.StatusOK, + } + if _, err := JsonifyVideoObject(vo, "scopeId"); err != nil { + t.Fail() + } +} + +func TestJsonifyCookieSync(t *testing.T) { + cso := &analytics.CookieSyncObject{ + Status: http.StatusOK, + BidderStatus: []*usersync.CookieSyncBidders{}, + } + if _, err := JsonifyCookieSync(cso, "scopeId"); err != nil { + t.Fail() + } +} + +func TestJsonifySetUIDObject(t *testing.T) { + so := &analytics.SetUIDObject{ + Status: http.StatusOK, + Bidder: "any-bidder", + UID: "uid string", + } + if _, err := JsonifySetUIDObject(so, "scopeId"); err != nil { + t.Fail() + } +} + +func TestJsonifyAmpObject(t *testing.T) { + ao := &analytics.AmpObject{ + Status: http.StatusOK, + Errors: make([]error, 0), + AuctionResponse: &openrtb.BidResponse{}, + AmpTargetingValues: map[string]string{}, + } + if _, err := JsonifyAmpObject(ao, "scopeId"); err != nil { + t.Fail() + } +} diff --git a/analytics/pubstack/pubstack_module.go b/analytics/pubstack/pubstack_module.go index 8b83fbbaf3d..12cabda0593 100644 --- a/analytics/pubstack/pubstack_module.go +++ b/analytics/pubstack/pubstack_module.go @@ -3,6 +3,7 @@ package pubstack import ( "fmt" "github.com/prebid/prebid-server/analytics/pubstack/eventchannel" + "net/http" "os" "os/signal" "sync" @@ -38,6 +39,7 @@ type bufferConfig struct { type PubstackModule struct { eventChannels map[string]*eventchannel.EventChannel + httpClient *http.Client configCh chan *Configuration scope string cfg *Configuration @@ -45,7 +47,7 @@ type PubstackModule struct { mux sync.Mutex } -func NewPubstackModule(scope, endpoint, configRefreshDelay string, maxEventCount int, maxByteSize, maxTime string) (analytics.PBSAnalyticsModule, error) { +func NewPubstackModule(client *http.Client, scope, endpoint, configRefreshDelay string, maxEventCount int, maxByteSize, maxTime string) (analytics.PBSAnalyticsModule, error) { glog.Infof("[pubstack] Initializing module scope=%s endpoint=%s\n", scope, endpoint) // parse args @@ -60,12 +62,13 @@ func NewPubstackModule(scope, endpoint, configRefreshDelay string, maxEventCount return nil, fmt.Errorf("fail to parse the module args, arg=analytics.pubstack.buffers, :%v", err) } - defaultFeatures := make(map[string]bool) - defaultFeatures[auction] = false - defaultFeatures[video] = false - defaultFeatures[amp] = false - defaultFeatures[cookieSync] = false - defaultFeatures[setUID] = false + defaultFeatures := map[string]bool{ + auction: false, + video: false, + amp: false, + cookieSync: false, + setUID: false, + } defaultConfig := &Configuration{ ScopeId: scope, @@ -75,6 +78,7 @@ func NewPubstackModule(scope, endpoint, configRefreshDelay string, maxEventCount pb := PubstackModule{ scope: scope, + httpClient: client, cfg: defaultConfig, buffsCfg: bufferCfg, configCh: make(chan *Configuration), @@ -183,7 +187,7 @@ func (p *PubstackModule) LogAmpObject(ao *analytics.AmpObject) { } func (p *PubstackModule) setup() { - config, err := fetchConfig(p.cfg.ScopeId, p.cfg.Endpoint) + config, err := fetchConfig(p.httpClient, p.cfg.ScopeId, p.cfg.Endpoint) if err != nil { glog.Errorf("[pubstack] Fail to fetch remote configuration: %v", err) return @@ -221,26 +225,23 @@ func (p *PubstackModule) configure(config *Configuration) { } if p.isFeatureEnable(amp) { - p.eventChannels[amp] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.cfg.Endpoint, amp), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout) + p.eventChannels[amp] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.httpClient, p.cfg.Endpoint, amp), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout) } if p.isFeatureEnable(auction) { - p.eventChannels[auction] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.cfg.Endpoint, auction), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout) + p.eventChannels[auction] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.httpClient, p.cfg.Endpoint, auction), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout) } if p.isFeatureEnable(cookieSync) { - p.eventChannels[cookieSync] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.cfg.Endpoint, cookieSync), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout) + p.eventChannels[cookieSync] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.httpClient, p.cfg.Endpoint, cookieSync), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout) } if p.isFeatureEnable(video) { - p.eventChannels[video] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.cfg.Endpoint, video), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout) + p.eventChannels[video] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.httpClient, p.cfg.Endpoint, video), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout) } if p.isFeatureEnable(setUID) { - p.eventChannels[setUID] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.cfg.Endpoint, setUID), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout) + p.eventChannels[setUID] = eventchannel.NewEventChannel(eventchannel.BuildEndpointSender(p.httpClient, p.cfg.Endpoint, setUID), p.buffsCfg.size, p.buffsCfg.count, p.buffsCfg.timeout) } } func (p *PubstackModule) isFeatureEnable(feature string) bool { val, ok := p.cfg.Features[feature] - if !(ok && val) { - return false - } - return true + return ok && val } diff --git a/analytics/pubstack/pubstack_module_test.go b/analytics/pubstack/pubstack_module_test.go index 3d93117e2eb..35667ceca3a 100644 --- a/analytics/pubstack/pubstack_module_test.go +++ b/analytics/pubstack/pubstack_module_test.go @@ -3,6 +3,7 @@ package pubstack import ( "bytes" "encoding/json" + "github.com/prebid/prebid-server/analytics/clients" "github.com/prebid/prebid-server/analytics/pubstack/eventchannel" "io/ioutil" "os" @@ -57,18 +58,20 @@ func loadJsonFromFile() (*analytics.AuctionObject, error) { func TestPubstackModule(t *testing.T) { + client := clients.GetDefaultHttpInstance() + // Loading Issues - _, err := NewPubstackModule("scope", "http://localhost:11287", "1z", 100, "90MB", "15m") + _, err := NewPubstackModule(client, "scope", "http://localhost:11287", "1z", 100, "90MB", "15m") assert.NotEqual(t, err, nil) // should raise an error since we can't parse args // configRefreshDelay - _, err = NewPubstackModule("scope", "http://localhost:11287", "1h", 100, "90z", "15m") + _, err = NewPubstackModule(client, "scope", "http://localhost:11287", "1h", 100, "90z", "15m") assert.NotEqual(t, err, nil) // should raise an error since we can't parse args // maxByte - _, err = NewPubstackModule("scope", "http://localhost:11287", "1h", 100, "90MB", "15z") + _, err = NewPubstackModule(client, "scope", "http://localhost:11287", "1h", 100, "90MB", "15z") assert.NotEqual(t, err, nil) // should raise an error since we can't parse args // maxTime // Loading OK - module, err := NewPubstackModule("scope", "http://localhost:11287", "1h", 100, "90MB", "15m") + module, err := NewPubstackModule(client, "scope", "http://localhost:11287", "1h", 100, "90MB", "15m") assert.Equal(t, err, nil) // Default Configuration @@ -85,8 +88,9 @@ func TestPubstackModule(t *testing.T) { // Process Auction Event data := bytes.Buffer{} - send := func(payload []byte) { + send := func(payload []byte) error { data.Write(payload) + return nil } mockedEvent, err := loadJsonFromFile()