Skip to content

Commit

Permalink
Merge pull request #18 from conduitio-labs/configure-initial-date
Browse files Browse the repository at this point in the history
Make `SnapshotInitialDate` configurable
  • Loading branch information
alyakimenko authored Sep 19, 2022
2 parents 769f223 + b295187 commit 5e7c805
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 59 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ test:
# Each test cases takes upto 1-2 minutes to finish, since marketo API is slow. Also it takes a while to prepare snapshots by Marketo. So 30 Mins as overall timeout period.
# But most of the time tests will finish in 10-15 mins.
go test $(GOTEST_FLAGS) -p 1 -timeout 1h -v -race ./...

lint:
golangci-lint run ./...
40 changes: 26 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
# Conduit Connector for Adobe Marketo

Marketo source connector for [Conduit](https://conduit.com) which pulls and syncs the **`Leads(People)`** object from [Marketo Engage](https://marketo.com).

### Configuration

The config passed to `Configure` can contain the following fields.
| name | part of | description | required | default value | example |
|------|---------|-------------|----------|---------------|---------|
|`clientID`|source|The Client ID for Marketo Instance|true|NONE| 1de3017c-fe42-4f20-8013-798678c956a9 |
|`clientSecret`|source|The Client Secret for Marketo Instance|true|NONE|ZZZv0Mev29vNm5vIyMwTa43lioVoBT7N|
|`clientEndpoint`|source|The Endpoint for Marketo Instance|true|NONE| https://\<instance\>.mktorest.com |
|`pollingPeriod`|source|Polling time for CDC mode. Less than 10s is not recommended |false|`1m`| `10s`, `1m`, `5m`, `10m`, `30m`, `1h` |
|`snapshotInitialDate`|source|The date from which the snapshot iterator initially starts getting records.|false|Creation date of the oldest record.|`2006-01-02T15:04:05Z07:00`|
|`fields`|source|comma seperated fields to fetch from Marketo Leads|false|`id, createdAt, updatedAt, firstName, lastName, email`| `company, jobTitle, phone, personSource` etc... |

>Note: By default **`id, createdAt, updatedAt`** is prepended to `fields` config. So no need to add that explictly. For eg: if you want to request `email, company, phone` fields, then it will be requested as **`id, createdAt, updatedAt, email, company, phone`**
> Note: By default **`id, createdAt, updatedAt`** is prepended to `fields` config. So no need to add that explictly. For eg: if you want to request `email, company, phone` fields, then it will be requested as **`id, createdAt, updatedAt, email, company, phone`**
## Source
Marketo source connector connects to Marketo instance through the REST API with provided configuration, using `clientID` and `clientSecret`. Once connector is started `Configure` method is called to parse configurations and validate them. After that `Open` method is called to establish connection to Marketo instance with provided position. Once connection is established `Read` method is called which calls current iterator's `Next` method to fetch next record. `Teardown` is called when connector is stopped.

Marketo source connector connects to Marketo instance through the REST API with provided configuration, using `clientID` and `clientSecret`. Once connector is started `Configure` method is called to parse configurations and validate them. After that `Open` method is called to establish connection to Marketo instance with provided position. Once connection is established `Read` method is called which calls current iterator's `Next` method to fetch next record. `Teardown` is called when connector is stopped.

### Snapshot Iterator

Snapshot iterator is used first to extract bulk data from Marketo instance. [Bulk Lead Extract API](https://developers.marketo.com/rest-api/bulk-extract/bulk-lead-extract/) is used with `createdAt` filter which permits datetime ranges up to 31 days, so we will need to run multiple jobs and combine the results. In order to get started we need to find the oldest lead created in the instance. To know the date [querry all folders](https://developers.marketo.com/rest-api/assets/folders/#browse) with maxdepth of 1 which will give us a list of all the top-level folders in the instance. Then collecting `createdAt` dates, parse them, and find the oldest date. This method works because some default, top-level folders are created with the instance and no leads could be created before then. `fields` from config also requested along with `createdAt` filter. To find the available fields for your target instance using the [Describe Lead 2 endpoint](https://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Leads/describeUsingGET_6) which return an exhaustive list including both standard and custom fields.

**Exporting Job involves 4 APIS**

- Create a Job -> `/bulk/v1/leads/export/create.json`
- Enqueue a Job ->`/bulk/v1/leads/export/{{exportID}}/enqueue.json`
- Wait for Job to Complete -> `/bulk/v1/leads/export/{{exportID}}/status.json`
Expand All @@ -28,26 +34,32 @@ Snapshot iterator is used first to extract bulk data from Marketo instance. [Bul
After each cycle, obtained records will be flushed to conduit. Once all cycles(export jobs) are completed, connector switches to CDC mode.

### Change Data Capture Iterator
Once Snapshot iterator is completed, connector automatically switches to CDC iterator. CDC events are captured using two REST endpoints, [Get Lead Changes](https://developers.marketo.com/documentation/rest/get-lead-changes/), [Get Lead by Id](https://developers.marketo.com/documentation/rest/get-lead-by-id/). In CDC we are intrested in `New Lead (12)` and `Change Data Value (13)` events. Hence once done with [Get Lead Changes](https://developers.marketo.com/documentation/rest/get-lead-changes/) api, we filter for these `activityTypeId` 12 and 13. Once we have list of changed leads ID's, we'll query each leads with [Get Lead by Id](https://developers.marketo.com/documentation/rest/get-lead-by-id/) API to get the changed data for leads. [Deleted Leads](https://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Activities/getDeletedLeadsUsingGET) API is used in order to capture the delete events. Output record will have a metadata of "action":"delete" to handle deletions by Conduit destination connector. No metadata is added for other CDC events such as New leads and Update leads.

Once Snapshot iterator is completed, connector automatically switches to CDC iterator. CDC events are captured using two REST endpoints, [Get Lead Changes](https://developers.marketo.com/documentation/rest/get-lead-changes/), [Get Lead by Id](https://developers.marketo.com/documentation/rest/get-lead-by-id/). In CDC we are intrested in `New Lead (12)` and `Change Data Value (13)` events. Hence once done with [Get Lead Changes](https://developers.marketo.com/documentation/rest/get-lead-changes/) api, we filter for these `activityTypeId` 12 and 13. Once we have list of changed leads ID's, we'll query each leads with [Get Lead by Id](https://developers.marketo.com/documentation/rest/get-lead-by-id/) API to get the changed data for leads. [Deleted Leads](https://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Activities/getDeletedLeadsUsingGET) API is used in order to capture the delete events. Output record will have a metadata of "action":"delete" to handle deletions by Conduit destination connector. No metadata is added for other CDC events such as New leads and Update leads.
From config `pollingPeriod` will be used to poll CDC events.

### Position Handling
| Name | type | desc |
| ---- | ---- | ---- |
| Key | string| unique `id` for the record |
| CreatedAt | time.Time | UTC time |
| UpdatedAt | time.Time | UTC time |
| Type | IteratorType(int) | 0=snapshot(default), 1=CDC |
### Position Handling

| Name | type | desc |
| --------- | ----------------- | -------------------------- |
| Key | string | unique `id` for the record |
| CreatedAt | time.Time | UTC time |
| UpdatedAt | time.Time | UTC time |
| Type | IteratorType(int) | 0=snapshot(default), 1=CDC |

### To build

Run `make build` to build the connector.

### Testing

Run `make test` to run all the unit tests. Run `make test-integration` to run the integration tests.

The Docker compose file at `test/docker-compose.yml` can be used to run the required resource locally.

### Known Issues & Limitations
* In snapshot mode, the total amount of data that you can export from Marketo is limited to 500MB per day unless you have purchased a higher data limit. This 500MB limit resets daily at 12:00AM CST. Once the limit is hit pipeline stops with error. In order to pull rest of the records you need to run the pipeline again next day.
* Concurrency Limit: Maximum of 10 concurrent API calls.
* Rate Limit: API access per instance limited to 100 calls per 20 seconds.
* Daily Quota: Subscriptions are allocated 50,000 API calls per day (which resets daily at 12:00AM CST). You can increase your daily quota through your account manager.

- In snapshot mode, the total amount of data that you can export from Marketo is limited to 500MB per day unless you have purchased a higher data limit. This 500MB limit resets daily at 12:00AM CST. Once the limit is hit pipeline stops with error. In order to pull rest of the records you need to run the pipeline again next day.
- Concurrency Limit: Maximum of 10 concurrent API calls.
- Rate Limit: API access per instance limited to 100 calls per 20 seconds.
- Daily Quota: Subscriptions are allocated 50,000 API calls per day (which resets daily at 12:00AM CST). You can increase your daily quota through your account manager.
3 changes: 0 additions & 3 deletions source/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ func TestAcceptance(t *testing.T) {
goleak.IgnoreTopFunction("net/http.(*persistConn).roundTrip"),
goleak.IgnoreTopFunction("net/http.setRequestCancel.func4"),
},
BeforeTest: func(t *testing.T) {
src.InitialDate = time.Now().UTC()
},
AfterTest: func(t *testing.T) {
t.Cleanup(func() {
err := cleanUp(client)
Expand Down
76 changes: 44 additions & 32 deletions source/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,22 @@ import (

const (
// Marketo CDC polling period
ConfigKeyPollingPeriod = "pollingPeriod"
KeyPollingPeriod = "pollingPeriod"
// KeySnapshotInitialDate is a date from which the snapshot iterator initially starts getting records.
KeySnapshotInitialDate = "snapshotInitialDate"
// Fields to retrieve from Marketo database
ConfigKeyFields = "fields"
KeyFields = "fields"
// DefaultPollingPeriod is the value assumed for the pooling period when the
// config omits the polling period parameter
DefaultPollingPeriod = "1m"
DefaultPollingPeriod = time.Minute
)

// SourceConfig represents source configuration with GCS configurations
type SourceConfig struct {
config.Config
PollingPeriod time.Duration
Fields []string
PollingPeriod time.Duration
SnapshotInitialDate time.Time
Fields []string
}

// ParseSourceConfig attempts to parse the configurations into a SourceConfig struct that Source could utilize
Expand All @@ -49,39 +52,48 @@ func ParseSourceConfig(ctx context.Context, cfg map[string]string) (SourceConfig
globalConfig, err := config.ParseGlobalConfig(ctx, cfg)
if err != nil {
logger.Error().Stack().Err(err).Msg("Error While Parsing the Global Config")
return SourceConfig{}, err
return SourceConfig{}, fmt.Errorf("parse global config: %w", err)
}

pollingPeriodString, exists := cfg[ConfigKeyPollingPeriod]
if !exists || pollingPeriodString == "" {
pollingPeriodString = DefaultPollingPeriod
sourceConfig := SourceConfig{
Config: globalConfig,
PollingPeriod: DefaultPollingPeriod,
Fields: []string{"id", "createdAt", "updatedAt", "firstName", "lastName", "email"},
}
pollingPeriod, err := time.ParseDuration(pollingPeriodString)
if err != nil {
return SourceConfig{}, fmt.Errorf(
"%q config value should be a valid duration",
ConfigKeyPollingPeriod,
)

if pollingPeriodString := cfg[KeyPollingPeriod]; pollingPeriodString != "" {
sourceConfig.PollingPeriod, err = time.ParseDuration(pollingPeriodString)
if err != nil {
return SourceConfig{}, fmt.Errorf(
"%q config value should be a valid duration: %w",
KeyPollingPeriod, err,
)
}

if sourceConfig.PollingPeriod <= 0 {
return SourceConfig{}, fmt.Errorf(
"%q config value should be positive, got %s",
KeyPollingPeriod,
sourceConfig.PollingPeriod,
)
}
}
if pollingPeriod <= 0 {
return SourceConfig{}, fmt.Errorf(
"%q config value should be positive, got %s",
ConfigKeyPollingPeriod,
pollingPeriod,
)

if snapshotInitialDateString := cfg[KeySnapshotInitialDate]; snapshotInitialDateString != "" {
sourceConfig.SnapshotInitialDate, err = time.Parse(time.RFC3339, snapshotInitialDateString)
if err != nil {
return SourceConfig{}, fmt.Errorf(
"%q config value should be a valid ISO 8601/RFC 3339 time: %w",
KeySnapshotInitialDate, err,
)
}
}

var fields []string
if cfg[ConfigKeyFields] == "" {
fields = []string{"id", "createdAt", "updatedAt", "firstName", "lastName", "email"}
} else {
fields = append([]string{"id", "createdAt", "updatedAt"}, strings.Split(cfg[ConfigKeyFields], ",")...)
if cfg[KeyFields] != "" {
sourceConfig.Fields = []string{"id", "createdAt", "updatedAt"}
sourceConfig.Fields = append(sourceConfig.Fields, strings.Split(cfg[KeyFields], ",")...)
}

logger.Trace().Msg("Start Parsing the Config")
return SourceConfig{
Config: globalConfig,
PollingPeriod: pollingPeriod,
Fields: fields,
}, nil
logger.Trace().Msg("Stop Parsing the Config")
return sourceConfig, nil
}
31 changes: 31 additions & 0 deletions source/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,37 @@ func TestParseGlobalConfig(t *testing.T) {
Fields: []string{"id", "createdAt", "updatedAt", "firstName", "lastName", "email"},
},
},
{
name: "Invalid snapshotInitialDate",
wantErr: true,
in: map[string]string{
"clientID": "client_id",
"clientSecret": "client_secret",
"clientEndpoint": "https://xxx-xxx-xxx.mktorest.com",
"snapshotInitialDate": "wrong",
},
expectedCon: SourceConfig{},
},
{
name: "Custom snapshotInitialDate",
wantErr: false,
in: map[string]string{
"clientID": "client_id",
"clientSecret": "client_secret",
"clientEndpoint": "https://xxx-xxx-xxx.mktorest.com",
"snapshotInitialDate": "2022-09-10T00:00:00Z",
},
expectedCon: SourceConfig{
Config: globalConfig.Config{
ClientID: "client_id",
ClientSecret: "client_secret",
ClientEndpoint: "https://xxx-xxx-xxx.mktorest.com",
},
PollingPeriod: time.Minute,
SnapshotInitialDate: time.Date(2022, time.September, 10, 0, 0, 0, 0, time.UTC),
Fields: []string{"id", "createdAt", "updatedAt", "firstName", "lastName", "email"},
},
},
}

for _, tt := range configTests {
Expand Down
5 changes: 3 additions & 2 deletions source/misc_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (c Client) getLeadByID(id int, fields []string) (*json.RawMessage, error) {

// returns Test source.
func newTestSource() *source.Source {
return &source.Source{InitialDate: time.Now().UTC()}
return &source.Source{}
}

// returns configs for testing.
Expand All @@ -158,7 +158,8 @@ func getConfigs() map[string]string {
cfg[config.ClientID] = ClinetID
cfg[config.ClientSecret] = ClientSecret
cfg[config.ClientEndpoint] = ClientEndpoint
cfg[sourceConfig.ConfigKeyPollingPeriod] = "10s"
cfg[sourceConfig.KeyPollingPeriod] = "10s"
cfg[sourceConfig.KeySnapshotInitialDate] = time.Now().UTC().Format(time.RFC3339)
return cfg
}

Expand Down
19 changes: 11 additions & 8 deletions source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package source
import (
"context"
"fmt"
"time"

"github.com/SpeakData/minimarketo"
sdk "github.com/conduitio/conduit-connector-sdk"
Expand All @@ -31,10 +30,9 @@ import (
// Source connector
type Source struct {
sdk.UnimplementedSource
config config.SourceConfig
client marketoclient.Client
iterator Iterator
InitialDate time.Time
config config.SourceConfig
client marketoclient.Client
iterator Iterator
}

type Iterator interface {
Expand Down Expand Up @@ -65,12 +63,17 @@ func (s *Source) Parameters() map[string]sdk.Parameter {
Default: "",
Description: "The endpoint for the Marketo instance.",
},
config.ConfigKeyPollingPeriod: {
config.KeyPollingPeriod: {
Required: false,
Default: "1m",
Description: "The polling period CDC mode.",
},
config.ConfigKeyFields: {
config.KeySnapshotInitialDate: {
Required: false,
Default: "Creation date of the oldest record.",
Description: "The date from which the snapshot iterator initially starts getting records.",
},
config.KeyFields: {
Required: false,
Default: "id, createdAt, updatedAt, firstName, lastName, email",
Description: "The fields to be pulled from Marketo",
Expand Down Expand Up @@ -116,7 +119,7 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
logger.Error().Stack().Err(err).Msg("Error While Creating the Marketo Client")
return fmt.Errorf("couldn't create the marketo client: %w", err)
}
s.iterator, err = iterator.NewCombinedIterator(ctx, s.config.ClientEndpoint, s.config.PollingPeriod, s.client, p, s.config.Fields, s.InitialDate)
s.iterator, err = iterator.NewCombinedIterator(ctx, s.config.ClientEndpoint, s.config.PollingPeriod, s.client, p, s.config.Fields, s.config.SnapshotInitialDate)
if err != nil {
logger.Error().Stack().Err(err).Msg("Error while create a combined iterator")
return fmt.Errorf("couldn't create a combined iterator: %w", err)
Expand Down

0 comments on commit 5e7c805

Please sign in to comment.