-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source Salesforce: fix pagination in REST API streams #9151
Conversation
/test connector=connectors/source-salesforce
|
I found that some streams have problem with native pagination on Salesforce side. The problem is totalSize returned in result does not always match to the actual number of records returned. Today I also found that, BULK API reads 4780 records from PermissionSetTabSetting stream too. I assume, |
/test connector=connectors/source-salesforce
|
/test connector=connectors/source-salesforce
|
/test connector=connectors/source-salesforce
|
/test connector=connectors/source-salesforce |
After tests successfully passed, only comments and unit test were added, but the last test run did not started. |
/test connector=connectors/source-salesforce |
/test connector=connectors/source-salesforce
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please provide more detail about changes.
return f"/services/data/{self.sf_api.version}/queryAll" | ||
|
||
def next_page_token(self, response: requests.Response) -> str: | ||
response_data = response.json() | ||
if len(response_data["records"]) == self.page_size and self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: | ||
return f"WHERE {self.primary_key} >= '{response_data['records'][-1][self.primary_key]}' " | ||
return response_data.get("nextRecordsUrl") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vitaliizazmic
If REST API query result has more than 1 page, the next page url is present in response body as nextRecordsUrl
.
""" | ||
If `next_page_token` is set, subsequent requests use `nextRecordsUrl`. | ||
""" | ||
return next_page_token |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nextRecordsUrl
is relative url, so we can return it here instead original url
""" | ||
If `next_page_token` is set, subsequent requests use `nextRecordsUrl`, and do not include any parameters. | ||
""" | ||
return {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we trying to get 2nd and more pages we don't need to send any params, use just nextRecordsUrl
for api call.
|
||
if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: | ||
query += f"ORDER BY {self.primary_key} ASC LIMIT {self.page_size}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, in REST API, we don't need to LIMIT results set, because limit does not work for all streams.
I noticed, that when we limit by 2000 (page_size) some stream always returns 1000 records per page, other stream 465 per page.
@@ -259,6 +266,32 @@ def next_page_token(self, last_record: dict) -> str: | |||
if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: | |||
return f"WHERE {self.primary_key} >= '{last_record[self.primary_key]}' " | |||
|
|||
def request_params( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before the change, request_params
was inherited from SalesfroceStream. But in this PR, parent's method was changed to respect new pagination approach. So I just added this method here not to break BULK API functionality.
@@ -324,13 +358,13 @@ def request_params( | |||
} | |||
|
|||
stream_date = stream_state.get(self.cursor_field) | |||
start_date = next_page_token or stream_date or self.start_date |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need next_page_token
here, because next_page_token
is relative URL for the next chunk of results.
@@ -352,3 +386,26 @@ class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalSalesforc | |||
def next_page_token(self, last_record: dict) -> str: | |||
if self.name not in UNSUPPORTED_FILTERING_STREAMS: | |||
return last_record[self.cursor_field] | |||
|
|||
def request_params( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before the change, this method was inherited from IncrementalSalesforceStream
. In this PR request_params is overriden not to break existing functionality, because parents methods were changed.
return f"/services/data/{self.sf_api.version}/queryAll" | ||
|
||
def next_page_token(self, response: requests.Response) -> str: | ||
response_data = response.json() | ||
if len(response_data["records"]) == self.page_size and self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main issue was here, self.page_size = 2000
, but real records count per page was different than 2000, so this method returned None: only the first page was read in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good enough, I like it.
/publish connector=connectors/source-salesforce
|
What
Resolves 9136
Salesforce returns 1000 records in response for some streams. As page_size = 2000 this condition is not met in this case in
next_page_token
method. So that user gets only the first 1000 records, as the method returns None.How
The main change is start using native Salesforce pagination in REST API streams.
If the initial query (REST API) returns only part of the results, the end of the response will contain a field called
nextRecordsUrl
. In such cases, request the next batch of records usingnextRecordsUrl
and repeat until all records have been retrieved. For more info go here.The solution will work only for REST API.
BULK API stream should work as before.
This change is