-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
snapshot and CDC iterator #5
Conversation
soon i'll add unit and integration tests |
b56f29b
to
66d4e8c
Compare
Hi @hariso golangci-lint are failing. In summary it says, if nothing works i'll add |
source/iterator/snapshot_iterator.go
Outdated
func (s *SnapshotIterator) getLastProcessedDate(ctx context.Context, p position.Position) (time.Time, error) { | ||
logger := sdk.Logger(ctx).With().Str("Method", "getInitialDate").Logger() | ||
logger.Trace().Msg("Starting the getInitialDate method") | ||
var date = p.CreatedAt.Add(1 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something could have been created during that second.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding a second to avoid duplicates. if we pull in same time, we may get already read records from marketo. so processing from next second. Also marketo handles data in seconds level only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hariso any comment on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we pull in same time, we may get already read records from marketo. so processing from next second.
It feels like that's because our query is "return leads where createdAt is greater or equal than lastModified". If so, we need a query like "return leads where createdAt is greater than lastModified".
Also marketo handles data in seconds level only.
I think that's not a bad reason to to add one second, but what if Marketo changes the precision of timestamps to a millisecond? We may or may not see that soon, but if we can have logic which does not depend on timestamp precision, that would be best.
for k := range leadIds { | ||
keys = append(keys, k) | ||
} | ||
sort.Ints(keys) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed? Also, it looks like we first collect all changed leads and then send them to Conduit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it requires for some edge cases, especially for func (a acceptanceTest) TestSource_Open_ResumeAtPositionCDC(t *testing.T) {
tests, sorting helps in choosing the last key and resuming the CDC mode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the data from Marketo is not sorted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a kind of no, coz we get records based on change time, it can be any record. For eg: at given time if id=1234 updated first and later id=1000 updated, then we get records in sequence 1234,1000 so on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So why is that a problem? The position is based on the change time, so the next time CDC starts, it starts from the last processed change time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually without sorting, connector will work fine, but some tests in acceptance tests are failing. especially for func (a acceptanceTest) TestSource_Open_ResumeAtPositionCDC(t *testing.T) {
tests, sorting helps in choosing the last key and resuming the CDC mode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So why is that a problem? The position is based on the change time, so the next time CDC starts, it starts from the last processed change time.
in this func (a acceptanceTest) TestSource_Open_ResumeAtPositionCDC(t *testing.T) {
test we genrate 10 rec and ack for 5. And then expect other 5 recs. But all these 10 rec are updated in same time. so we'll get same 10 recs again. So we use lastKey
to continue from where we left.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rustiever So the keys sorted, are lead IDs. Are lead IDs actually timestamps? I might be missing something, but I can't see a connection. We use the last modified timestamp of a lead in the position, and when resuming CDC. But here, we are sorting lead IDs to make sure we can resume from a position.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we primarily use lastModiefiedTime
for CDC. But sometimes this doesn't work when we resume from CDC. For eg: at 10:15 time we get 8 recs 5,6,3,2,7,1,8,4. Lets assume we processed 4(5,6,3,2) records among them. timestamp for all records are same. So when we resume CDC, We get again same set of records. we need a way to filter the processed records. So we use sorting. once the records are sorted such as 1,2,3,4,5,6,7,8, and CDC is resumed we get lasttime and lastKey. We compare fetched records with lastKey to filter which records are not processed.
) | ||
|
||
var ( | ||
InitialDate time.Time // holds the initial date of the snapshot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not make it part of SnapshotIterator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this variable was initially with SnapshotIterator, later moved as Global var for testing. if it is in SnapshotIterator then tests taking too much time. so for testing InitialDate
will be current datetime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the variable is part of SnapshotIterator, then you can use a different value for it in testing, no? Introducing a global variable only to make tests work feels like something which can be improved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be handled in next PR @hariso
for k := range leadIds { | ||
keys = append(keys, k) | ||
} | ||
sort.Ints(keys) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So why is that a problem? The position is based on the change time, so the next time CDC starts, it starts from the last processed change time.
source/acceptance_test.go
Outdated
// AcceptanceSourceTestDriverConfig contains the configuration for | ||
// AcceptanceTestDriver. | ||
type AcceptanceSourceTestDriverConfig struct { | ||
// Connector is the connector to be tested. | ||
Connector sdk.Connector | ||
|
||
// SourceConfig should be a valid config for a source connector, reading | ||
// from the same location as the destination will write to. | ||
SourceConfig map[string]string | ||
// DestinationConfig should be a valid config for a destination connector, | ||
// writing to the same location as the source will read from. | ||
DestinationConfig map[string]string | ||
|
||
// BeforeTest is executed before each acceptance test. | ||
BeforeTest func(t *testing.T) | ||
// AfterTest is executed after each acceptance test. | ||
AfterTest func(t *testing.T) | ||
|
||
// GoleakOptions will be applied to goleak.VerifyNone. Can be used to | ||
// suppress false positive goroutine leaks. | ||
GoleakOptions []goleak.Option | ||
} | ||
|
||
func (d AcceptanceTestDriver) DestinationConfig(*testing.T) map[string]string { | ||
return map[string]string{} | ||
} | ||
func (d AcceptanceTestDriver) Connector() sdk.Connector { | ||
return d.Config.Connector | ||
} | ||
|
||
func (d AcceptanceTestDriver) SourceConfig(*testing.T) map[string]string { | ||
return d.Config.SourceConfig | ||
} | ||
|
||
func (d AcceptanceTestDriver) BeforeTest(t *testing.T) { | ||
// before test check if the test should be skipped | ||
d.Skip(t) | ||
|
||
if d.Config.BeforeTest != nil { | ||
d.Config.BeforeTest(t) | ||
} | ||
} | ||
|
||
func (d AcceptanceTestDriver) AfterTest(t *testing.T) { | ||
if d.Config.AfterTest != nil { | ||
d.Config.AfterTest(t) | ||
} | ||
} | ||
|
||
func (d AcceptanceTestDriver) GoleakOptions(_ *testing.T) []goleak.Option { | ||
return d.Config.GoleakOptions | ||
} | ||
|
||
func (d AcceptanceTestDriver) ReadTimeout() time.Duration { | ||
return time.Minute * 5 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we don't need these methods anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when i delete these methods tests are failing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rustiever because of timeouts? Did you update to the latest SDK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
source/iterator/cdc_iterator.go
Outdated
logger := sdk.Logger(ctx).With().Str("Method", "flushLatestLeads").Logger() | ||
logger.Trace().Msg("Starting the flushLatestLeads") | ||
token, err := c.client.GetNextPageToken(c.lastModified) | ||
c.lastModified = time.Now().UTC() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's safer to change this only once we actually get the leads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry @hariso, i couldn't understand, can you elaborate pls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, you first update c.lastModified, then get the changed and deleted leads. Is it safer, to first get the changed and deleted leads, and then update c.lastModified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
source/iterator/snapshot_iterator.go
Outdated
const ( | ||
// is the maximum number of days between two snapshots. If the gap between two snapshots is greater than this number, | ||
// API will return an error. This is limitation of the API. | ||
MaximumDaysGap = 744 // 31 days in Hours |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should then call it MaximumHoursGap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
Description
Fixes # (issue)
#1
Quick checks: