Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Salesforce: fix bug with pagination for BULK API #6209

Merged
merged 4 commits into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962",
"name": "Salesforce",
"dockerRepository": "airbyte/source-salesforce",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/salesforce",
"icon": "salesforce.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@
- sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
name: Salesforce
dockerRepository: airbyte/source-salesforce
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,42 @@
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "LoginGeo",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "LoginHistory",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["LoginTime"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "PermissionSetTabSetting",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,42 @@
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "LoginGeo",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "LoginHistory",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["LoginTime"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "PermissionSetTabSetting",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,14 @@
},
"ObjectPermissions": {
"SystemModstamp": "2121-08-23T10:27:22.000Z"
},
"LoginGeo": {
"SystemModstamp": "2121-08-23T10:27:22.000Z"
},
"LoginHistory": {
"LoginTime": "2121-08-23T10:27:22.000Z"
},
"PermissionSetTabSetting": {
"SystemModstamp": "2121-08-23T10:27:22.000Z"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,14 @@
},
"ObjectPermissions": {
"SystemModstamp": "2021-08-23T10:27:22.000Z"
},
"LoginGeo": {
"SystemModstamp": "2021-08-23T10:27:22.000Z"
},
"LoginHistory": {
"LoginTime": "2021-08-23T10:27:22.000Z"
},
"PermissionSetTabSetting": {
"SystemModstamp": "2021-08-23T10:27:22.000Z"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

class SalesforceStream(HttpStream, ABC):

limit = 2000
page_size = 2000

def __init__(self, sf_api: Salesforce, pk: str, stream_name: str, schema: dict = None, **kwargs):
super().__init__(**kwargs)
Expand All @@ -66,8 +66,8 @@ def path(self, **kwargs) -> str:

def next_page_token(self, response: requests.Response) -> str:
response_data = response.json()
if len(response_data["records"]) == self.limit and self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
return f"WHERE {self.primary_key} > '{response_data['records'][-1][self.primary_key]}' "
if len(response_data["records"]) == self.page_size and self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
return f"WHERE {self.primary_key} >= '{response_data['records'][-1][self.primary_key]}' "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why aer we comparing PK and not cursor field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answered in the comment above


def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
Expand All @@ -91,7 +91,7 @@ def request_params(
query += next_page_token

if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
query += f"ORDER BY {self.primary_key} ASC LIMIT {self.limit}"
query += f"ORDER BY {self.primary_key} ASC LIMIT {self.page_size}"

return {"q": query}

Expand All @@ -116,7 +116,7 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:

class BulkSalesforceStream(SalesforceStream):

limit = 10000
page_size = 30000
JOB_WAIT_TIMEOUT_MINS = 10
CHECK_INTERVAL_SECONDS = 2

Expand Down Expand Up @@ -186,7 +186,7 @@ def delete_job(self, url: str):

def next_page_token(self, last_record: dict) -> str:
if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
return f"WHERE {self.primary_key} > '{last_record[self.primary_key]}' "
return f"WHERE {self.primary_key} >= '{last_record[self.primary_key]}' "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are PKs numeric and incremental? why are we not using cursor field here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used for Full Refresh streams. It turned out that it is impossible to use Offset more than 2000 - Salesforce API swears at this, so I had to use the same method as for Incremental, only use ID, not cursor_field.


def transform(self, record: dict, schema: dict = None):
"""
Expand Down Expand Up @@ -259,7 +259,7 @@ def read_records(
for count, record in self.download_data(url=job_full_url):
yield self.transform(record)

if count == self.limit:
if count == self.page_size:
next_page_token = self.next_page_token(record)
if not next_page_token:
pagination_complete = True
Expand All @@ -272,11 +272,12 @@ def read_records(

if job_status in ["JobComplete", "Aborted", "Failed"]:
self.delete_job(url=job_full_url)
pagination_complete = True
if job_status in ["Aborted", "Failed"]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be raising exception if the job failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

raise Exception(f"Job for {self.name} stream using BULK API was failed")


class IncrementalSalesforceStream(SalesforceStream, ABC):
state_checkpoint_interval = 100
state_checkpoint_interval = 500

def __init__(self, replication_key: str, start_date: str, **kwargs):
super().__init__(**kwargs)
Expand All @@ -285,7 +286,7 @@ def __init__(self, replication_key: str, start_date: str, **kwargs):

def next_page_token(self, response: requests.Response) -> str:
response_data = response.json()
if len(response_data["records"]) == self.limit and self.name not in UNSUPPORTED_FILTERING_STREAMS:
if len(response_data["records"]) == self.page_size and self.name not in UNSUPPORTED_FILTERING_STREAMS:
return response_data["records"][-1][self.cursor_field]

def request_params(
Expand All @@ -304,9 +305,9 @@ def request_params(
stream_date = stream_state.get(self.cursor_field)
start_date = next_page_token or stream_date or self.start_date

query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} WHERE {self.cursor_field} > {start_date} "
query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} WHERE {self.cursor_field} >= {start_date} "
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
query += f"ORDER BY {self.cursor_field} ASC LIMIT {self.limit}"
query += f"ORDER BY {self.cursor_field} ASC LIMIT {self.page_size}"
return {"q": query}

@property
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -735,4 +735,5 @@ List of available streams:

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.1.1 | 2021-09-21 | [6209](https://github.com/airbytehq/airbyte/pull/6209) | Fix bug with pagination for BULK API |
| 0.1.0 | 2021-09-08 | [5619](https://github.com/airbytehq/airbyte/pull/5619) | Salesforce Aitbyte-Native Connector |