Skip to content

Commit

Permalink
before sync get the streams from the catalog instead of the api endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
sgandhi1311 committed Nov 22, 2023
1 parent 4d62c50 commit 28ba295
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,7 @@ def sync_records(stream_id, primary_key, bookmark_key, catalog, STATE, params):
"""
mdata = metadata.to_map(catalog.get('metadata'))
if stream_id.startswith("custom_"):
url = get_url("custom_objects", object_name=stream_id[len("custom_"):])
url = get_url("custom_objects", object_name=stream_id.split("custom_")[1])
else:
url = get_url(stream_id)
max_bk_value = bookmark_value = utils.strptime_with_tz(
Expand Down Expand Up @@ -1221,15 +1221,15 @@ class Stream:
]


def add_custom_streams(mode):
custom_objects_schema_url = get_url("custom_objects_schema")
# Load Hubspot's shared schemas
refs = load_shared_schema_refs()
try:
for custom_object in gen_request_custom_objects("custom_objects_schema", custom_objects_schema_url, {}, 'results', "paging"):
stream_id = "custom_" + custom_object["name"]
STREAMS.append(Stream(stream_id, sync_custom_object_records, ['id'], 'updatedAt', 'INCREMENTAL'))
if mode == "DISCOVER":
def add_custom_streams(mode, catalog = None):
if mode == "DISCOVER":
custom_objects_schema_url = get_url("custom_objects_schema")
# Load Hubspot's shared schemas
refs = load_shared_schema_refs()
try:
for custom_object in gen_request_custom_objects("custom_objects_schema", custom_objects_schema_url, {}, 'results', "paging"):
stream_id = "custom_" + custom_object["name"]
STREAMS.append(Stream(stream_id, sync_custom_object_records, ['id'], 'updatedAt', 'INCREMENTAL'))
schema = utils.load_json(get_abs_path('schemas/shared/custom_objects.json'))
custom_schema = parse_custom_schema(stream_id, custom_object["properties"])
schema["properties"]["properties"] = {
Expand All @@ -1248,9 +1248,13 @@ def add_custom_streams(mode):
with open(custom_schema_path, 'w') as json_file:
json.dump(final_schema, json_file)

except SourceUnavailableException as ex:
warning_message = str(ex).replace(CONFIG['access_token'], 10 * '*')
LOGGER.warning(warning_message)
except SourceUnavailableException as ex:
warning_message = str(ex).replace(CONFIG['access_token'], 10 * '*')
LOGGER.warning(warning_message)

elif mode == "SYNC":
for stream in catalog["streams"]:
STREAMS.append(Stream(stream["tap_stream_id"], sync_custom_object_records, ['id'], 'updatedAt', 'INCREMENTAL'))

def load_shared_schema_refs():
shared_schemas_path = get_abs_path('schemas/shared')
Expand Down Expand Up @@ -1286,7 +1290,7 @@ def get_selected_streams(remaining_streams, ctx):
return selected_streams

def do_sync(STATE, catalog):
add_custom_streams(mode="SYNC")
add_custom_streams(mode="SYNC", catalog=catalog)
# Clear out keys that are no longer used
clean_state(STATE)

Expand Down

0 comments on commit 28ba295

Please sign in to comment.