Skip to content

Commit

Permalink
Merge branch 'master' into tdl-18267
Browse files Browse the repository at this point in the history
  • Loading branch information
bhuvana-talend authored Dec 12, 2023
2 parents 7d560ab + 88b0b8e commit 7479adb
Show file tree
Hide file tree
Showing 10 changed files with 544 additions and 20 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 2.13.0
* HubSpot Custom CRM Objects Support [#242](https://github.com/singer-io/tap-hubspot/pull/242)

## 2.12.2
* Use engagements_page_size advanced option [#234](https://github.com/singer-io/tap-hubspot/pull/234)
*
Expand Down
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
include LICENSE
include tap_hubspot/schemas/*.json
include tap_hubspot/schemas/shared/*.json
17 changes: 4 additions & 13 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup

setup(name='tap-hubspot',
version='2.12.2',
version='2.13.0',
description='Singer.io tap for extracting data from the HubSpot API',
author='Stitch',
url='http://singer.io',
Expand All @@ -29,18 +29,9 @@
packages=['tap_hubspot'],
package_data = {
'tap_hubspot/schemas': [
"campaigns.json",
"companies.json",
"contact_lists.json",
"contacts.json",
"deals.json",
"email_events.json",
"forms.json",
"keywords.json",
"owners.json",
"subscription_changes.json",
"workflows.json",
],
"schemas/*.json",
"schemas/shared/*.json"
]
},
include_package_data=True,
)
178 changes: 173 additions & 5 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class StateFields:

"tickets_properties": "/crm/v3/properties/tickets",
"tickets": "/crm/v4/objects/tickets",

"custom_objects_schema": "/crm/v3/schemas",
"custom_objects": "/crm/v3/objects/p_{object_name}"
}

def get_start(state, tap_stream_id, bookmark_key, older_bookmark_key=None):
Expand Down Expand Up @@ -197,7 +200,12 @@ def get_field_schema(field_type, extras=False):
}
}

def parse_custom_schema(entity_name, data):
def parse_custom_schema(entity_name, data, is_custom_object=False):
if is_custom_object:
return {
field['name']: get_field_type_schema(field['type'])
for field in data
}
if entity_name == "tickets":
return {
field['name']: get_field_type_schema(field['type'])
Expand Down Expand Up @@ -1104,6 +1112,92 @@ def sync_deal_pipelines(STATE, ctx):
singer.write_state(STATE)
return STATE

def gen_request_custom_objects(tap_stream_id, url, params, path, more_key):
"""
Cursor-based API Pagination : Used in custom_objects stream implementation
"""
try:
with metrics.record_counter(tap_stream_id) as counter:
while True:
data = request(url, params).json()

if data.get(path) is None:
raise RuntimeError(
"Unexpected API response: {} not in {}".format(path, data.keys()))

for row in data[path]:
counter.increment()
yield row

if not data.get(more_key):
break
params['after'] = data.get(more_key, {}).get('next', {}).get('after', None)
if params['after'] is None:
break
except SourceUnavailableException as ex:
warning_message = str(ex).replace(CONFIG['access_token'], 10 * '*')
LOGGER.warning(warning_message)
return []

def sync_custom_objects(stream_id, primary_key, bookmark_key, catalog, STATE, params, is_custom_object=False):
"""
Synchronize records from a data source
"""
mdata = metadata.to_map(catalog.get('metadata'))
if is_custom_object:
url = get_url("custom_objects", object_name=stream_id)
else:
url = get_url(stream_id)
max_bk_value = bookmark_value = utils.strptime_with_tz(
get_start(STATE, stream_id, bookmark_key))

LOGGER.info(f"Sync record for {stream_id} from {bookmark_value}")
schema = catalog.get('schema')
singer.write_schema(stream_id, schema, [primary_key],
[bookmark_key], catalog.get('stream_alias'))

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as transformer:
# To handle records updated between start of the table sync and the end,
# store the current sync start in the state and not move the bookmark past this value.
sync_start_time = utils.now()
for row in gen_request_custom_objects(stream_id, url, params, 'results', "paging"):
# parsing the string formatted date to datetime object
modified_time = utils.strptime_to_utc(row[bookmark_key])

# Checking the bookmark value is present on the record and it
# is greater than or equal to defined previous bookmark value
if modified_time and modified_time >= bookmark_value:
# transforms the data and filters out the selected fields from the catalog
record = transformer.transform(lift_properties_and_versions(row), schema, mdata)
singer.write_record(stream_id, record, catalog.get(
'stream_alias'), time_extracted=utils.now())
if modified_time and modified_time >= max_bk_value:
max_bk_value = modified_time

# Don't bookmark past the start of this sync to account for updated records during the sync.
new_bookmark = min(max_bk_value, sync_start_time)
STATE = singer.write_bookmark(STATE, stream_id, bookmark_key, utils.strftime(new_bookmark))
singer.write_state(STATE)
return STATE


def sync_custom_object_records(STATE, ctx, stream_id):
"""
Function to sync records for each `custom_object` stream
"""
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
primary_key = "id"
bookmark_key = "updatedAt"

params = {'limit': 100,
'associations': 'emails,meetings,notes,tasks,calls,conversations,contacts,companies,deals,tickets',
'properties': get_selected_property_fields(catalog, mdata),
'archived': False
}
return sync_custom_objects(stream_id, primary_key, bookmark_key, catalog, STATE, params, is_custom_object=True)


@attr.s
class Stream:
tap_stream_id = attr.ib()
Expand Down Expand Up @@ -1131,6 +1225,64 @@ class Stream:
Stream('engagements', sync_engagements, ["engagement_id"], 'lastUpdated', 'FULL_TABLE')
]

# pylint: disable=inconsistent-return-statements
def generate_custom_streams(mode, catalog=None):
"""
- In DISCOVER mode, fetch the custom schema from the API endpoint and set the schema for the custom objects.
- In SYNC mode, extend STREAMS for the custom objects.
Args:
mode (str): The mode indicating whether to DISCOVER or SYNC custom streams.
catalog (dict): The catalog containing stream information.
Returns:
List[dict] or List[str]: Returns list of custom streams (contains dictionary) in DISCOVER mode and a list of custom object names in SYNC mode.
"""
custom_objects_schema_url = get_url("custom_objects_schema")
if mode == "DISCOVER":
custom_streams = []
# Load Hubspot's shared schemas
refs = load_shared_schema_refs()
for custom_object in gen_request_custom_objects("custom_objects_schema", custom_objects_schema_url, {}, 'results', "paging"):
stream_id = custom_object["name"]
schema = utils.load_json(get_abs_path('schemas/shared/custom_objects.json'))
custom_schema = parse_custom_schema(stream_id, custom_object["properties"], is_custom_object=True)
schema["properties"]["properties"] = {
"type": "object",
"properties": custom_schema,
}

# Move properties to top level
custom_schema_top_level = {'property_{}'.format(k): v for k, v in custom_schema.items()}
schema['properties'].update(custom_schema_top_level)

final_schema = singer.resolve_schema_references(schema, refs)
custom_streams.append({"stream": Stream(stream_id, sync_custom_object_records, ['id'], 'updatedAt', 'INCREMENTAL'),
"schema": final_schema})

return custom_streams

elif mode == "SYNC":
custom_objects = [custom_object["name"] for custom_object in gen_request_custom_objects("custom_objects_schema", custom_objects_schema_url, {}, 'results', "paging")]
if len(custom_objects) > 0:
for stream in catalog["streams"]:
if stream["tap_stream_id"] in custom_objects:
STREAMS.append(Stream(stream["tap_stream_id"], sync_custom_object_records, ['id'], 'updatedAt', 'INCREMENTAL'))
return custom_objects

def load_shared_schema_refs():
shared_schemas_path = get_abs_path('schemas/shared')

shared_file_names = [f for f in os.listdir(shared_schemas_path)
if os.path.isfile(os.path.join(shared_schemas_path, f))]

shared_schema_refs = {}
for shared_file in shared_file_names:
with open(os.path.join(shared_schemas_path, shared_file)) as data_file:
shared_schema_refs[shared_file] = json.load(data_file)

return shared_schema_refs

def get_streams_to_sync(streams, state):
target_stream = singer.get_currently_syncing(state)
result = streams
Expand All @@ -1152,6 +1304,7 @@ def get_selected_streams(remaining_streams, ctx):
return selected_streams

def do_sync(STATE, catalog):
custom_objects = generate_custom_streams(mode="SYNC", catalog=catalog)
# Clear out keys that are no longer used
clean_state(STATE)

Expand All @@ -1168,7 +1321,10 @@ def do_sync(STATE, catalog):
singer.write_state(STATE)

try:
STATE = stream.sync(STATE, ctx) # pylint: disable=not-callable
if stream.tap_stream_id in custom_objects:
STATE = stream.sync(STATE, ctx, stream.tap_stream_id)
else:
STATE = stream.sync(STATE, ctx) # pylint: disable=not-callable
except SourceUnavailableException as ex:
error_message = str(ex).replace(CONFIG['access_token'], 10 * '*')
LOGGER.error(error_message)
Expand Down Expand Up @@ -1210,8 +1366,7 @@ def validate_dependencies(ctx):
if errs:
raise DependencyException(" ".join(errs))

def load_discovered_schema(stream):
schema = load_schema(stream.tap_stream_id)
def get_metadata(stream, schema):
mdata = metadata.new()

mdata = metadata.write(mdata, (), 'table-key-properties', stream.key_properties)
Expand All @@ -1230,8 +1385,13 @@ def load_discovered_schema(stream):
if stream.tap_stream_id == "engagements":
mdata = metadata.write(mdata, ('properties', 'engagement'), 'inclusion', 'automatic')
mdata = metadata.write(mdata, ('properties', 'lastUpdated'), 'inclusion', 'automatic')
return metadata.to_list(mdata)

return schema, metadata.to_list(mdata)
def load_discovered_schema(stream):
schema = load_schema(stream.tap_stream_id)
mdata = get_metadata(stream, schema)

return schema, mdata

def discover_schemas():
result = {'streams': []}
Expand All @@ -1247,6 +1407,14 @@ def discover_schemas():
# Skip the discovery mode on the streams were the required scopes are missing
warning_message = str(ex).replace(CONFIG['access_token'], 10 * '*')
LOGGER.warning(warning_message)

for custom_stream in generate_custom_streams(mode="DISCOVER"):
LOGGER.info('Loading schema for Custom Object - %s', custom_stream["stream"].tap_stream_id)
result['streams'].append({'stream': custom_stream["stream"].tap_stream_id,
'tap_stream_id': custom_stream["stream"].tap_stream_id,
'schema': custom_stream["schema"],
'metadata': get_metadata(custom_stream["stream"], custom_stream["schema"])})

# Load the contacts_by_company schema
LOGGER.info('Loading schema for contacts_by_company')
contacts_by_company = Stream('contacts_by_company', _sync_contacts_by_company, ['company-id', 'contact-id'], None, 'FULL_TABLE')
Expand Down
34 changes: 34 additions & 0 deletions tap_hubspot/schemas/shared/associations_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"type": [
"null",
"object"
],
"properties": {
"results": {
"type": [
"null",
"array"
],
"items": {
"type": [
"null",
"object"
],
"properties": {
"id": {
"type": [
"null",
"string"
]
},
"type": {
"type": [
"null",
"string"
]
}
}
}
}
}
}
44 changes: 44 additions & 0 deletions tap_hubspot/schemas/shared/custom_objects.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"type": "object",
"properties": {
"id": {
"type": [
"null",
"string"
]
},
"createdAt": {
"type": ["null", "string"],
"format": "date-time"
},
"updatedAt": {
"type": ["null", "string"],
"format": "date-time"
},
"archived": {
"type": [
"null",
"boolean"
]
},
"associations": {
"type": [
"null",
"object"
],
"properties": {
"emails": {"$ref": "associations_schema.json"},
"meetings": {"$ref": "associations_schema.json"},
"notes": {"$ref": "associations_schema.json"},
"tasks": {"$ref": "associations_schema.json"},
"calls": {"$ref": "associations_schema.json"},
"conversation_session": {"$ref": "associations_schema.json"},
"companies": {"$ref": "associations_schema.json"},
"contacts": {"$ref": "associations_schema.json"},
"deals": {"$ref": "associations_schema.json"},
"products": {"$ref": "associations_schema.json"},
"tickets": {"$ref": "associations_schema.json"}
}
}
}
}
Loading

0 comments on commit 7479adb

Please sign in to comment.