Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Salesforce Stream "Account" cannot be processed by REST or BULK API #20703

Closed
vincentkoc opened this issue Dec 20, 2022 · 20 comments
Closed

Salesforce Stream "Account" cannot be processed by REST or BULK API #20703

vincentkoc opened this issue Dec 20, 2022 · 20 comments

Comments

@vincentkoc
Copy link
Contributor

Environment

  • Airbyte version: 0.40.25
  • OS Version / Instance: Kubernetes
  • Deployment: Kubernetes via Helm Chart
  • Source Connector and version: Salesforce (1.0.27)
  • Destination Connector and version: Amazon S3 (0.1.26)
  • Step where error happened: Sync Job

Current Behavior

When i connect to my Salesforce instance, i am unable to fetch metadata unless i exclude using connection filters where object equals "Account".

Error in pod logs:

Exception: Stream Account cannot be processed by REST or BULK API.
2022-12-20 08:01:18 �[1;31mERROR�[m i.a.w.i.DefaultAirbyteStreamFactory(internalLog):116 - Stream Account cannot be processed by REST or BULK API.
Traceback (most recent call last):
  File "/airbyte/integration_code/main.py", line 13, in <module>
    launch(source, sys.argv[1:])
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py", line 131, in launch
    for message in source_entrypoint.run(parsed_args):
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py", line 116, in run
    catalog = self.source.discover(self.logger, config)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 70, in discover
    streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
  File "/airbyte/integration_code/source_salesforce/source.py", line 110, in streams
    streams = self.generate_streams(config, stream_objects, sf)
  File "/airbyte/integration_code/source_salesforce/source.py", line 95, in generate_streams
    raise Exception(f"Stream {stream_name} cannot be processed by REST or BULK API.")

Expected Behavior

Data should sync, metadata should be shown.

@vincentkoc
Copy link
Contributor Author

@poolmaster
Copy link

The issue seemed to be that bulk API and REST API cannot cover tables with "many" columns and "special" datatypes.
The REST API has this limit while Bulk API has this list and does NOT support "object" and "base64" types.

Can we exclude certain columns to bypass the issue ? I specified limited number of columns in ConfiguredCatalog, but that seems not changing any behavior.

@vincentkoc
Copy link
Contributor Author

@poolmaster is there some query i could run in Salesforce Workbench or otherwise to output the cols and the data types etc for us to diagnose the issue in relation to the Bulk vs REST API calls?

Any way we can update logic to ignore the columns beyond a certain point for now to resolve the block/issue temporarily?

@poolmaster
Copy link

@koconder I don't have one. But I'm pretty sure that was the root cause. It basically blocks the connector from working for certain some critical tables.
I think we need to support either:

  1. ignore the columns beyond a certain point, which would allow REST API always work.
  2. let client to configure what columns to pull (based on ConfiguredCatalog)

Option 2 seems better IMHO

@vincentkoc
Copy link
Contributor Author

@poolmaster are you willing to raise a PR for option 2? I'm happy to help review, test and push it through.

@YowanR YowanR added bounty reward-100 difficulty - ⭐ and removed type/bug Something isn't working labels Jan 24, 2023
@yepher
Copy link

yepher commented Feb 9, 2023

This is my first time using AirByte (testing it as a potential solution in our infrastructure). I am bumping into this same issue. Our SFDC schema is very complex and has a lot of fields. I think this is the bug I am running into:
https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py#L69

I changed that code to this:

@classmethod
    def _get_api_type(cls, stream_name, properties):
        # Salesforce BULK API currently does not support loading fields with data type base64 and compound data
        properties_not_supported_by_bulk = {
            key: value for key, value in properties.items() if value.get("format") == "base64" or "object" in value["type"]
        }
        properties_length = len(",".join(p for p in properties))

        rest_required = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk
        # If we have a lot of properties we can overcome REST API URL length and get an error: "reason: URI Too Long".
        # For such cases connector tries to use BULK API because it uses POST request and passes properties in the request body.
        bulk_required = properties_length + 2000 > Salesforce.REQUEST_SIZE_LIMITS
        print(f"xxxxxxxxxx bulk_required: {bulk_required} properties_lengthL {properties_length} + LIMIT: {Salesforce.REQUEST_SIZE_LIMITS}")
        if rest_required and not bulk_required:
            return "rest"
        if not rest_required:
            return "bulk"

And this is the output:

2023-02-09 05:45:12 INFO i.a.w.i.DefaultAirbyteStreamFactory(parseJson):125 - xxxxxxxxxx bulk_required: True properties_lengthL 14846 + LIMIT: 16384
airbyte-worker                    | 2023-02-09 05:45:12 ERROR i.a.w.i.DefaultAirbyteStreamFactory(internalLog):163 - None: Stream Account cannot be processed by REST or BULK API.
airbyte-worker                    | Traceback (most recent call last):
airbyte-worker                    |   File "/airbyte/integration_code/main.py", line 13, in <module>
airbyte-worker                    |     launch(source, sys.argv[1:])
airbyte-worker                    |   File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py", line 131, in launch
airbyte-worker                    |     for message in source_entrypoint.run(parsed_args):
airbyte-worker                    |   File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py", line 116, in run
airbyte-worker                    |     catalog = self.source.discover(self.logger, config)
airbyte-worker                    |   File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 75, in discover
airbyte-worker                    |     streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
airbyte-worker                    |   File "/airbyte/integration_code/source_salesforce/source.py", line 110, in streams
airbyte-worker                    |     streams = self.generate_streams(config, stream_objects, sf)
airbyte-worker                    |   File "/airbyte/integration_code/source_salesforce/source.py", line 95, in generate_streams
airbyte-worker                    |     raise Exception(f"{api_type}: Stream {stream_name} cannot be processed by REST or BULK API.")

The return boolean logic is just a little off. It should return "bulk" but is returning None in this case because the code falls through. This is not the perfect solution, but to test, I changed the return to this, and it worked for me:

        rest_required = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk
        # If we have a lot of properties we can overcome REST API URL length and get an error: "reason: URI Too Long".
        # For such cases connector tries to use BULK API because it uses POST request and passes properties in the request body.
        bulk_required = properties_length + 2000 > Salesforce.REQUEST_SIZE_LIMITS
        print(f"xxxxxxxxxx bulk_required: {bulk_required} properties_lengthL {properties_length} + LIMIT: {Salesforce.REQUEST_SIZE_LIMITS}")
        if rest_required and not bulk_required:
            return "rest"
        if not rest_required:
            return "bulk"
        return "bulk" # this case needs handled better

@yepher
Copy link

yepher commented Feb 9, 2023

@vincentkoc
Copy link
Contributor Author

Seems like #22597 has added new issues:

airbytehq/oncall/issues/1403
cc @davydov-d

2023-02-12 22:50:25 source > Syncing stream: Account 
2023-02-12 22:50:25 source > <h1>Bad Message 431</h1><pre>reason: Request Header Fields Too Large</pre>
2023-02-12 22:50:25 source > Encountered an exception while reading stream Account
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/requests/models.py", line 971, in json
    return complexjson.loads(self.text, **kwargs)
  File "/usr/local/lib/python3.9/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.9/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.9/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
During handling of the above exception, another exception occurred:

@davydov-d
Copy link
Collaborator

@koconder may I ask you to share your credentials if possible so I could reproduce this? or at least the log file of the failed sync. Thanks!

@davydov-d
Copy link
Collaborator

@koconder may I ask you to share your credentials if possible so I could reproduce this? or at least the log file of the failed sync. Thanks!

nevermind, I have found the root cause and prepared a patch for that: #22896

@vincentkoc
Copy link
Contributor Author

Still an issue @davydov-d

    raise e
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 120, in read
    yield from self._read_stream(
  File "/airbyte/integration_code/source_salesforce/source.py", line 138, in _read_stream
    yield from super()._read_stream(logger, stream_instance, configured_stream, state_manager, internal_config)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 189, in _read_stream
    for record in record_iterator:
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 256, in _read_incremental
    for message_counter, record_data_or_message in enumerate(records, start=1):
  File "/airbyte/integration_code/source_salesforce/streams.py", line 162, in read_records
    yield from self._read_pages(
  File "/airbyte/integration_code/source_salesforce/streams.py", line 209, in _read_pages
    chunk_page_records = {record[self.primary_key]: record for record in chunk_page_records}
  File "/airbyte/integration_code/source_salesforce/streams.py", line 209, in <dictcomp>
    chunk_page_records = {record[self.primary_key]: record for record in chunk_page_records}
KeyError: 'Id'
2023-02-17 04:30:21 destination > Airbyte message consumer: succeeded.
2023-02-17 04:30:21 destination > executing on success close procedure.
2023-02-17 04:30:21 destination > Flushing all 0 current buffers (0 bytes in total)
2023-02-17 04:30:21 destination > Completed integration: io.airbyte.integrations.destination.s3.S3Destination
2023-02-17 04:30:21 destination > Completed destination: io.airbyte.integrations.destination.s3.S3Destination

I will send you the full log on Slack

@vincentkoc vincentkoc reopened this Feb 17, 2023
@davydov-d davydov-d self-assigned this Feb 17, 2023
@davydov-d
Copy link
Collaborator

oh dang, I think I know what the problem is

davydov-d added a commit that referenced this issue Feb 20, 2023
lazebnyi added a commit that referenced this issue Feb 22, 2023
lazebnyi added a commit that referenced this issue Feb 22, 2023
davydov-d added a commit that referenced this issue Feb 23, 2023
davydov-d added a commit that referenced this issue Feb 23, 2023
davydov-d added a commit that referenced this issue Feb 23, 2023
davydov-d added a commit that referenced this issue Feb 23, 2023
davydov-d added a commit that referenced this issue Feb 23, 2023
davydov-d added a commit that referenced this issue Feb 23, 2023
davydov-d added a commit that referenced this issue Feb 23, 2023
davydov-d added a commit that referenced this issue Feb 23, 2023
…ies-chunks' of github.com:airbytehq/airbyte into ddavydov/#20703-source-salesforce-include-pk-in-properties-chunks
davydov-d added a commit that referenced this issue Feb 23, 2023
* #20703 source salesforce: include primary key in every chunk

* #20703 source salesforce: upd changelog

* #20703 source salesforce: review fix

* auto-bump connector version

* #20703 source salesforce: upd connectors.md

---------

Co-authored-by: Serhii Lazebnyi <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>
@davydov-d
Copy link
Collaborator

#23190

@vincentkoc
Copy link
Contributor Author

Still buggy, new bug introduced:

Inconsistent record with primary key 0014a000007hTpOAAU found. It consists of 1 chunks instead of 2. Skipping it.

This issue above repeats for all rows in the first object which was originally the issue Accounts

@vincentkoc vincentkoc reopened this Feb 28, 2023
@davydov-d
Copy link
Collaborator

#23610

@davydov-d
Copy link
Collaborator

the fix has been released

@vincentkoc
Copy link
Contributor Author

@davydov-d im testing now... so far the chunking has started fingers crosssed
One error so far for each chunk

2023-03-03 09:25:26 ERROR c.n.s.DateTimeValidator(tryParse):82 - Invalid date-time: Invalid timezone offset: +0000

@davydov-d
Copy link
Collaborator

@koconder that is rather a warning, and it used to be logged even before chunks were introduced. In case it's something critical, please report a new issue since it's not related to this one.

@davydov-d
Copy link
Collaborator

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants