Skip to content

Commit

Permalink
refactoring tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rustiever authored Jun 18, 2022
1 parent 70ef894 commit ea0f1ee
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 53 deletions.
22 changes: 22 additions & 0 deletions marketo-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -310,3 +311,24 @@ func WithRetry(ctx context.Context, r RetryFunc) error {
}
return nil
}

// returns filterd leads from marketo rest api
func (c Client) FilterLeads(fileterType string, filterValues []int, fields []string, nextPageToken string) (*minimarketo.Response, error) {
var leads []string
for _, v := range filterValues {
leads = append(leads, strconv.Itoa(v))
}
var path string
path = fmt.Sprintf("/rest/v1/leads.json?filterType=%s&filterValues=%s&fields=%s", fileterType, strings.Join(leads, ","), strings.Join(fields, ","))
if nextPageToken != "" {
path = fmt.Sprintf("/rest/v1/lead/filter.json?filterType=%s&filterValues=%s&fields=%s&nextPageToken=%s", fileterType, strings.Join(leads, ","), strings.Join(fields, ","), nextPageToken)
}
response, err := c.Get(path)
if err != nil {
return nil, err
}
if !response.Success {
return nil, fmt.Errorf("%+v", response.Errors)
}
return response, nil
}
47 changes: 29 additions & 18 deletions source/iterator/cdc_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (c *CDCIterator) flushLatestLeads(ctx context.Context) error {
logger.Error().Err(err).Msg("Error while getting the next page token")
return fmt.Errorf("error getting next page token %w", err)
}
c.lastModified = time.Now().UTC() // updating last modified time here to avoid missing any records in the next poll.
changedLeadIds, changedLeadMaps, err := c.GetChangedLeadsIDs(ctx, token)
if err != nil {
logger.Error().Err(err).Msg("Error while getting the changed leads")
Expand All @@ -195,30 +196,38 @@ func (c *CDCIterator) flushLatestLeads(ctx context.Context) error {
data: nil,
}
}
for _, id := range changedLeadIds {
if id <= lastKey && changedLeadMaps[id] == ActivityTypeIDNewLead {
continue
}
res, err := c.client.GetLeadByID(id, c.fields)
if len(changedLeadIds) == 0 {
return nil
}
var leads []map[string]interface{}
var moreResult = true
token = ""
for moreResult {
res, err := c.client.FilterLeads("id", changedLeadIds, c.fields, token)
if err != nil {
logger.Error().Err(err).Msg("Error while getting the lead")
return fmt.Errorf("error getting lead %w", err)
logger.Error().Err(err).Msg("Error while getting the changed leads")
return fmt.Errorf("error getting changed leads %w", err)
}
dataMap := make([]map[string]interface{}, 0)
err = json.Unmarshal(*res, &dataMap)
moreResult = res.MoreResult
token = res.NextPageToken
err = json.Unmarshal(res.Result, &leads)
if err != nil {
logger.Error().Err(err).Msg("Error while unmarshalling the lead")
return fmt.Errorf("error unmarshalling lead %w", err)
logger.Error().Err(err).Msg("Error while unmarshalling the changed leads")
return fmt.Errorf("error unmarshalling changed leads %w", err)
}
for _, data := range dataMap {
c.buffer <- Record{
id: id,
deleted: false,
data: data,
}
}

for _, lead := range leads {
id := int(lead["id"].(float64))
if id <= lastKey && changedLeadMaps[id] == ActivityTypeIDNewLead {
continue
}
c.buffer <- Record{
id: id,
deleted: false,
data: lead,
}
}
c.lastModified = time.Now().UTC()
return nil
}

Expand Down Expand Up @@ -275,6 +284,8 @@ func (c *CDCIterator) GetChangedLeadsIDs(ctx context.Context, token string) ([]i
for k := range leadIds {
keys = append(keys, k)
}
// sorting helps in choosing last processed lead which handles
// the case when there are multiple leads with same createdAt and updatedAt time.
sort.Ints(keys)
return keys, leadIds, nil
}
4 changes: 2 additions & 2 deletions source/iterator/combined_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type CombinedIterator struct {

var ErrDone = errors.New("no more records in iterator")

func NewCombinedIterator(ctx context.Context, endpoint string, pollingPeriod time.Duration, client marketoclient.Client, p position.Position, fields []string) (*CombinedIterator, error) {
func NewCombinedIterator(ctx context.Context, endpoint string, pollingPeriod time.Duration, client marketoclient.Client, p position.Position, fields []string, initialDate time.Time) (*CombinedIterator, error) {
logger := sdk.Logger(ctx).With().Str("Method", "NewCombinedIterator").Logger()
logger.Trace().Msg("Starting the NewCombinedIterator")

Expand All @@ -53,7 +53,7 @@ func NewCombinedIterator(ctx context.Context, endpoint string, pollingPeriod tim
case position.TypeSnapshot:
logger.Trace().Msg("Starting creating a New Snaphot iterator")

c.snapshotIterator, err = NewSnapshotIterator(ctx, endpoint, fields, client, p)
c.snapshotIterator, err = NewSnapshotIterator(ctx, endpoint, fields, client, p, initialDate)
if err != nil {
logger.Error().Err(err).Msg("Error while creating a new snapshot iterator")
return nil, err
Expand Down
23 changes: 12 additions & 11 deletions source/iterator/snapshot_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,10 @@ const (
MaximumHoursGap = 744 // 31 days in Hours
)

var (
InitialDate time.Time // holds the initial date of the snapshot
)

// to handle snapshot iterator
type SnapshotIterator struct {
client *marketoclient.Client
initialDate time.Time // holds the initial date of the snapshot
fields []string // holds the fields to be returned from the API
endpoint string // holds the endpoint of the API
exportID string // holds the current processin exportId
Expand All @@ -55,7 +52,7 @@ type SnapshotIterator struct {
}

// returns NewSnapshotIterator with supplied parameters, also initiates the pull and flush goroutines.
func NewSnapshotIterator(ctx context.Context, endpoint string, fields []string, client marketoclient.Client, p position.Position) (*SnapshotIterator, error) {
func NewSnapshotIterator(ctx context.Context, endpoint string, fields []string, client marketoclient.Client, p position.Position, initialDate time.Time) (*SnapshotIterator, error) {
logger := sdk.Logger(ctx).With().Str("Method", "NewSnapshotIterator").Logger()
logger.Trace().Msg("Starting the NewSnapshotIterator")
var err error
Expand All @@ -67,18 +64,19 @@ func NewSnapshotIterator(ctx context.Context, endpoint string, fields []string,
data: make(chan []string, 100),
hasData: make(chan struct{}, 100),
lastMaxModified: time.Time{},
initialDate: initialDate,
}
eg, ctx := errgroup.WithContext(ctx)
if InitialDate.IsZero() {
InitialDate, err = s.getLastProcessedDate(ctx, p)
if s.initialDate.IsZero() {
s.initialDate, err = s.getLastProcessedDate(ctx, p)
}
if err != nil {
logger.Error().Err(err).Msg("Error getting initial date")
return nil, fmt.Errorf("error getting initial date: %w", err)
}
startDateDuration := time.Since(InitialDate)
startDateDuration := time.Since(s.initialDate)
s.iteratorCount = int(startDateDuration.Hours()/MaximumHoursGap) + 1
logger.Info().Msgf("Creating %d snapshots", s.iteratorCount)
logger.Info().Msgf("Creating %d snapshots one by one", s.iteratorCount)
s.csvReader = make(chan *csv.Reader, s.iteratorCount)
eg.Go(func() error {
return s.pull(ctx)
Expand Down Expand Up @@ -177,7 +175,7 @@ func (s *SnapshotIterator) pull(ctx context.Context) error {
logger.Trace().Msg("Starting the pull")
defer close(s.csvReader)
var startDate, endDate time.Time
date := InitialDate
date := s.initialDate
for i := 0; i < s.iteratorCount; i++ {
startDate = date
endDate = date.Add(time.Hour * time.Duration(MaximumHoursGap)).Add(-1 * time.Second)
Expand Down Expand Up @@ -332,7 +330,10 @@ func (s *SnapshotIterator) prepareRecord(ctx context.Context, data []string) (sd
func (s *SnapshotIterator) getLastProcessedDate(ctx context.Context, p position.Position) (time.Time, error) {
logger := sdk.Logger(ctx).With().Str("Method", "getInitialDate").Logger()
logger.Trace().Msg("Starting the getInitialDate method")
var date = p.CreatedAt.Add(1 * time.Second) // start pulling data from next second to avoid duplicates

// marketo api handles records at seconds level. When we start snapshot iterator with same last time,
// there is a chance of getting same records again. So we need to add 1 second to the last time.
var date = p.CreatedAt.Add(1 * time.Second)
var err error
if reflect.ValueOf(p).IsZero() {
date, err = s.getLeastDate(ctx, *s.client)
Expand Down
5 changes: 5 additions & 0 deletions source/misc_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ func (c Client) getLeadChanges(nextPageToken string, fields []string) (*minimark
return response, nil
}

// returns Test source.
func newTestSource() *source.Source {
return &source.Source{InitialDate: time.Now().UTC()}
}

// returns configs for testing.
func getConfigs() map[string]string {
cfg := map[string]string{}
Expand Down
10 changes: 6 additions & 4 deletions source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package source
import (
"context"
"fmt"
"time"

"github.com/SpeakData/minimarketo"
sdk "github.com/conduitio/conduit-connector-sdk"
Expand All @@ -29,9 +30,10 @@ import (
// Source connector
type Source struct {
sdk.UnimplementedSource
config config.SourceConfig
client marketoclient.Client
iterator Iterator
config config.SourceConfig
client marketoclient.Client
iterator Iterator
InitialDate time.Time
}

type Iterator interface {
Expand Down Expand Up @@ -82,7 +84,7 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
logger.Error().Stack().Err(err).Msg("Error While Creating the Marketo Client")
return fmt.Errorf("couldn't create the marketo client: %w", err)
}
s.iterator, err = iterator.NewCombinedIterator(ctx, s.config.ClientEndpoint, s.config.PollingPeriod, s.client, p, s.config.Fields)
s.iterator, err = iterator.NewCombinedIterator(ctx, s.config.ClientEndpoint, s.config.PollingPeriod, s.client, p, s.config.Fields, s.InitialDate)
if err != nil {
logger.Error().Stack().Err(err).Msg("Error while create a combined iterator")
return fmt.Errorf("couldn't create a combined iterator: %w", err)
Expand Down
27 changes: 9 additions & 18 deletions source/source_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,16 @@ import (
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/rustiever/conduit-connector-marketo/config"
"github.com/rustiever/conduit-connector-marketo/source"
"github.com/rustiever/conduit-connector-marketo/source/iterator"
"github.com/rustiever/conduit-connector-marketo/source/position"
)

func TestSource_SuccessfullSnapshot(t *testing.T) {
iterator.InitialDate = time.Now().UTC()
client, err := getClient()
if err != nil {
t.Fatal(err)
}
startTime := time.Now().UTC()
src := newTestSource()
testLeads, err := addLeads(client, 10)
if err != nil {
t.Fatal(err)
Expand All @@ -46,7 +45,6 @@ func TestSource_SuccessfullSnapshot(t *testing.T) {
t.Error(err)
}
})
src := &source.Source{}
ctx := context.Background()
defer func() {
_ = src.Teardown(ctx)
Expand All @@ -66,11 +64,11 @@ func TestSource_SuccessfullSnapshot(t *testing.T) {
}

func TestSource_SnapshotRestart(t *testing.T) {
iterator.InitialDate = time.Now().UTC()
client, err := getClient()
if err != nil {
t.Fatal(err)
}
src := newTestSource()
startTime := time.Now().UTC()
testLeads, err := addLeads(client, 10)
if err != nil {
Expand All @@ -82,7 +80,6 @@ func TestSource_SnapshotRestart(t *testing.T) {
t.Error(err)
}
})
src := &source.Source{}
ctx := context.Background()
defer func() {
_ = src.Teardown(ctx)
Expand Down Expand Up @@ -111,8 +108,7 @@ func TestSource_SnapshotRestart(t *testing.T) {
}

func TestSource_EmptyDatabase(t *testing.T) {
iterator.InitialDate = time.Now().UTC()
src := &source.Source{}
src := newTestSource()
ctx := context.Background()
defer func() {
_ = src.Teardown(ctx)
Expand All @@ -128,9 +124,8 @@ func TestSource_EmptyDatabase(t *testing.T) {
}

func TestSource_StartCDCAfterEmptyBucket(t *testing.T) {
iterator.InitialDate = time.Now().UTC()
ctx := context.Background()
src := &source.Source{}
src := newTestSource()
defer func() {
_ = src.Teardown(ctx)
}()
Expand Down Expand Up @@ -168,8 +163,7 @@ func TestSource_StartCDCAfterEmptyBucket(t *testing.T) {
}

func TestSource_NonExistentDatabase(t *testing.T) {
iterator.InitialDate = time.Now().UTC()
src := &source.Source{}
src := newTestSource()
ctx := context.Background()
defer func() {
_ = src.Teardown(ctx)
Expand All @@ -188,8 +182,7 @@ func TestSource_NonExistentDatabase(t *testing.T) {
}

func TestSource_CDC_ReadRecordsUpdate(t *testing.T) {
iterator.InitialDate = time.Now().UTC()
src := &source.Source{}
src := newTestSource()
ctx := context.Background()
defer func() {
_ = src.Teardown(ctx)
Expand Down Expand Up @@ -234,9 +227,8 @@ func TestSource_CDC_ReadRecordsUpdate(t *testing.T) {
}

func TestCDC_Delete(t *testing.T) {
iterator.InitialDate = time.Now().UTC()
ctx := context.Background()
src := &source.Source{}
src := newTestSource()
defer func() {
_ = src.Teardown(ctx)
}()
Expand Down Expand Up @@ -285,8 +277,7 @@ func TestCDC_Delete(t *testing.T) {
}

func TestSource_CDC_ReadRecordsInsertAfterTeardown(t *testing.T) {
iterator.InitialDate = time.Now().UTC()
src := &source.Source{}
src := newTestSource()
ctx := context.Background()
err := configAndOpen(ctx, src, nil)
if err != nil {
Expand Down Expand Up @@ -314,7 +305,7 @@ func TestSource_CDC_ReadRecordsInsertAfterTeardown(t *testing.T) {
}
lastPosition := rec.Position
_ = src.Teardown(ctx)
src1 := &source.Source{}
src1 := newTestSource()
defer func() {
_ = src1.Teardown(ctx)
}()
Expand Down

0 comments on commit ea0f1ee

Please sign in to comment.