-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛 Source Salesforce: fix bug with pagination for BULK API #6209
🐛 Source Salesforce: fix bug with pagination for BULK API #6209
Conversation
/test connector=connectors/source-salesforce
|
@@ -186,7 +186,7 @@ def delete_job(self, url: str): | |||
|
|||
def next_page_token(self, last_record: dict) -> str: | |||
if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: | |||
return f"WHERE {self.primary_key} > '{last_record[self.primary_key]}' " | |||
return f"WHERE {self.primary_key} >= '{last_record[self.primary_key]}' " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are PKs numeric and incremental? why are we not using cursor field here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used for Full Refresh
streams. It turned out that it is impossible to use Offset
more than 2000
- Salesforce API swears at this, so I had to use the same method as for Incremental, only use ID
, not cursor_field
.
@@ -272,11 +272,12 @@ def read_records( | |||
|
|||
if job_status in ["JobComplete", "Aborted", "Failed"]: | |||
self.delete_job(url=job_full_url) | |||
pagination_complete = True | |||
if job_status in ["Aborted", "Failed"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be raising exception if the job failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if len(response_data["records"]) == self.limit and self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: | ||
return f"WHERE {self.primary_key} > '{response_data['records'][-1][self.primary_key]}' " | ||
if len(response_data["records"]) == self.page_size and self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: | ||
return f"WHERE {self.primary_key} >= '{response_data['records'][-1][self.primary_key]}' " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why aer we comparing PK and not cursor field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered in the comment above
/test connector=connectors/source-salesforce
|
…urochkin/salesforce-bug-with-bulk-pagination
/publish connector=connectors/source-salesforce
|
What
closes #6122.
How
Describe the solution
Recommended reading order
x.java
y.python
Pre-merge Checklist
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described here