Skip to content

Commit

Permalink
Tdl 26414 rename custom objects (#263)
Browse files Browse the repository at this point in the history
* Rename custom object having same name as standard object
* Update integration tests
* Bump version v3.1.0

---------

Co-authored-by: RushiT0122 <[email protected]>
  • Loading branch information
RushiT0122 and RushiT0122 authored Sep 18, 2024
1 parent 5b9d080 commit 695c5b5
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 18 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 3.1.0
* Renames custom object that shares name with standard objects [#263](https://github.com/singer-io/tap-hubspot/pull/263)

## 3.0.0
* Upgrade Owners API endpoint [#256](https://github.com/singer-io/tap-hubspot/pull/256)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup

setup(name='tap-hubspot',
version='3.0.0',
version='3.1.0',
description='Singer.io tap for extracting data from the HubSpot API',
author='Stitch',
url='http://singer.io',
Expand Down
15 changes: 10 additions & 5 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1155,7 +1155,7 @@ def sync_custom_objects(stream_id, primary_key, bookmark_key, catalog, STATE, pa
"""
mdata = metadata.to_map(catalog.get('metadata'))
if is_custom_object:
url = get_url("custom_objects", object_name=stream_id)
url = get_url("custom_objects", object_name=catalog["table_name"])
else:
url = get_url(stream_id)
max_bk_value = bookmark_value = utils.strptime_with_tz(
Expand All @@ -1180,7 +1180,7 @@ def sync_custom_objects(stream_id, primary_key, bookmark_key, catalog, STATE, pa
# transforms the data and filters out the selected fields from the catalog
record = transformer.transform(lift_properties_and_versions(row), schema, mdata)
singer.write_record(stream_id, record, catalog.get(
'stream_alias'), time_extracted=utils.now())
'stream'), time_extracted=utils.now())
if modified_time and modified_time >= max_bk_value:
max_bk_value = modified_time

Expand Down Expand Up @@ -1249,12 +1249,14 @@ def generate_custom_streams(mode, catalog=None):
List[dict] or List[str]: Returns list of custom streams (contains dictionary) in DISCOVER mode and a list of custom object names in SYNC mode.
"""
custom_objects_schema_url = get_url("custom_objects_schema")
standard_streams = [stream.tap_stream_id for stream in STREAMS]
if mode == "DISCOVER":
custom_streams = []
# Load Hubspot's shared schemas
refs = load_shared_schema_refs()
for custom_object in gen_request_custom_objects("custom_objects_schema", custom_objects_schema_url, {}, 'results', "paging"):
stream_id = custom_object["name"]
custom_object_name = custom_object["name"]
stream_id = f'custom_object_{custom_object_name}' if custom_object_name in standard_streams else custom_object_name
schema = utils.load_json(get_abs_path('schemas/shared/custom_objects.json'))
custom_schema = parse_custom_schema(stream_id, custom_object["properties"], is_custom_object=True)
schema["properties"]["properties"] = {
Expand All @@ -1267,13 +1269,15 @@ def generate_custom_streams(mode, catalog=None):
schema['properties'].update(custom_schema_top_level)

final_schema = singer.resolve_schema_references(schema, refs)
custom_streams.append({"stream": Stream(stream_id, sync_custom_object_records, ['id'], 'updatedAt', 'INCREMENTAL'),
custom_streams.append({"custom_object_name": custom_object_name,
"stream": Stream(stream_id, sync_custom_object_records, ['id'], 'updatedAt', 'INCREMENTAL'),
"schema": final_schema})

return custom_streams

elif mode == "SYNC":
custom_objects = [custom_object["name"] for custom_object in gen_request_custom_objects("custom_objects_schema", custom_objects_schema_url, {}, 'results', "paging")]
rename_stream = lambda stream: f'custom_object_{stream}' if stream in standard_streams else stream
custom_objects = [rename_stream(custom_object["name"]) for custom_object in gen_request_custom_objects("custom_objects_schema", custom_objects_schema_url, {}, 'results', "paging")]
if len(custom_objects) > 0:
for stream in catalog["streams"]:
if stream["tap_stream_id"] in custom_objects:
Expand Down Expand Up @@ -1422,6 +1426,7 @@ def discover_schemas():
LOGGER.info('Loading schema for Custom Object - %s', custom_stream["stream"].tap_stream_id)
result['streams'].append({'stream': custom_stream["stream"].tap_stream_id,
'tap_stream_id': custom_stream["stream"].tap_stream_id,
"table_name": custom_stream["custom_object_name"],
'schema': custom_stream["schema"],
'metadata': get_metadata(custom_stream["stream"], custom_stream["schema"])})

Expand Down
4 changes: 3 additions & 1 deletion tap_hubspot/tests/unittests/test_custom_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
{
"stream": "cars",
"tap_stream_id": "cars",
"table_name": "cars",
"schema": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -81,7 +82,8 @@ def test_generate_custom_streams(
"properties": {"property_fake_object": "fake_value"},
}
expected_value = [
{'stream': Stream(tap_stream_id='fake_object', sync=mock_sync_custom_records, key_properties=['id'], replication_key='updatedAt', replication_method='INCREMENTAL'),
{'custom_object_name':'fake_object',
'stream': Stream(tap_stream_id='fake_object', sync=mock_sync_custom_records, key_properties=['id'], replication_key='updatedAt', replication_method='INCREMENTAL'),
'schema': {'type': 'object', 'properties': {'property_fake_object': 'fake_value'}}}]

# Set up mock return values
Expand Down
14 changes: 14 additions & 0 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,20 @@ def expected_metadata(self): # DOCS_BUG https://stitchdata.atlassian.net/browse
self.REPLICATION_KEYS: {"updatedAt"},
self.EXPECTED_PAGE_SIZE: 100,
self.OBEYS_START_DATE: True
},
"custom_object_campaigns": {
self.PRIMARY_KEYS: {"id"},
self.REPLICATION_METHOD: self.INCREMENTAL,
self.REPLICATION_KEYS: {"updatedAt"},
self.EXPECTED_PAGE_SIZE: 100,
self.OBEYS_START_DATE: True
},
"custom_object_contacts": {
self.PRIMARY_KEYS: {"id"},
self.REPLICATION_METHOD: self.INCREMENTAL,
self.REPLICATION_KEYS: {"updatedAt"},
self.EXPECTED_PAGE_SIZE: 100,
self.OBEYS_START_DATE: True
}
}

Expand Down
16 changes: 16 additions & 0 deletions tests/base_hubspot.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,22 @@ def expected_metadata(cls): # DOCS_BUG https://stitchdata.atlassian.net/browse/
BaseCase.API_LIMIT: 100,
BaseCase.EXPECTED_PAGE_SIZE: 100,
BaseCase.OBEYS_START_DATE: True
},
"custom_object_campaigns": {
BaseCase.PRIMARY_KEYS: {"id"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updatedAt"},
BaseCase.API_LIMIT: 100,
BaseCase.EXPECTED_PAGE_SIZE: 100,
BaseCase.OBEYS_START_DATE: True
},
"custom_object_contacts": {
BaseCase.PRIMARY_KEYS: {"id"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updatedAt"},
BaseCase.API_LIMIT: 100,
BaseCase.EXPECTED_PAGE_SIZE: 100,
BaseCase.OBEYS_START_DATE: True
}

}
49 changes: 40 additions & 9 deletions tests/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime
import random
import uuid
import sys
import string

import backoff
import requests
Expand Down Expand Up @@ -216,8 +216,11 @@ def read(self, stream, parent_ids=[], since='', pagination=False):
return self.get_subscription_changes(since, pagination)
elif stream == "tickets":
return self.get_tickets(pagination)
elif stream in ["cars", "co_firsts"]:
elif stream in ["cars", "co_firsts", "custom_object_contacts"]:
return self.get_custom_objects(stream)
elif stream == "custom_object_campaigns":
# Custom object endpoints must be accessed using the names specified in the source.
return self.get_custom_objects("custom_object_campaigns", source_name="campaigns")
else:
raise NotImplementedError

Expand Down Expand Up @@ -745,29 +748,29 @@ def _get_custom_object_record_by_pk(self, object_name, id):
response = self.get(url)
return response

def get_custom_objects_properties(self, object_name):
def get_custom_objects_properties(self, object_name, source_name):
"""
Get custom object properties.
HubSpot API https://developers.hubspot.com/docs/api/crm/crm-custom-objects
"""
url = f"{BASE_URL}/crm/v3/properties/p_{object_name}"
url = f"{BASE_URL}/crm/v3/properties/p_{source_name or object_name}"
records = self.get(url)

return ",".join([record["name"] for record in records["results"]])

def get_custom_objects(self, stream):
def get_custom_objects(self, stream, source_name=None):
"""
Get all custom_object records.
HubSpot API https://developers.hubspot.com/docs/api/crm/crm-custom-objects
"""
page_size = self.BaseTest.expected_metadata().get(stream,{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/crm/v3/objects/p_{stream}"
url = f"{BASE_URL}/crm/v3/objects/p_{source_name or stream}"
replication_key = list(self.replication_keys[stream])[0]
records = []

# response = self.get(url)
associations = 'emails,meetings,notes,tasks,calls,conversations,contacts,companies,deals,tickets'
params = {"limit": page_size, "associations": associations, 'properties': self.get_custom_objects_properties(stream)}
params = {"limit": page_size, "associations": associations, 'properties': self.get_custom_objects_properties(stream, source_name)}
while True:
response = self.get(url, params=params)

Expand Down Expand Up @@ -839,8 +842,11 @@ def create(self, stream, company_ids=[], subscriptions=[], times=1):
return self.create_subscription_changes(subscriptions, times)
elif stream == 'tickets':
return self.create_tickets()
elif stream in ["cars", "co_firsts"]:
elif stream in ["cars", "co_firsts", "custom_object_contacts"]:
return self.create_custom_object_record(stream)
elif stream == "custom_object_campaigns":
# Custom object endpoints must be accessed using the names specified in the source.
return self.create_custom_object_record("campaigns")
else:
raise NotImplementedError(f"There is no create_{stream} method in this dipatch!")

Expand Down Expand Up @@ -1148,6 +1154,11 @@ def create_contacts_by_company(self, company_ids=[], contact_records=[], times=1

return records

def generate_random_string(self, length):
characters = string.ascii_letters + string.digits
random_string = ''.join(random.choice(characters) for _ in range(length))
return random_string

def create_custom_object_record(self, stream):
url = f"{BASE_URL}/crm/v3/objects/p_{stream}"

Expand All @@ -1174,6 +1185,23 @@ def create_custom_object_record(self, stream):
"country": random.choice(["USA", "India", "France", "UK"])
}
}
elif stream == "campaigns":
url = f"{BASE_URL}/crm/v3/objects/p_campaigns"
data = {
"properties": {
"co_campaign_id": random.randint(1, 100000),
"name": "test name",
"country": random.choice(["USA", "India", "France", "UK"])
}
}
elif stream == "custom_object_contacts":
data = {
"properties": {
"co_contact_id": random.randint(1, 100000),
"name": "test name",
"country": random.choice(["USA", "India", "France", "UK"])
}
}

# generate a record
response = self.post(url, data)
Expand Down Expand Up @@ -1599,8 +1627,11 @@ def update(self, stream, record_id):
return self.update_engagements(record_id)
elif stream == 'tickets':
return self.update_tickets(record_id)
elif stream in ["cars", "co_firsts"]:
elif stream in ["cars", "co_firsts", "custom_object_contacts"]:
return self.update_custom_object_record(stream, record_id)
elif stream == "custom_object_campaigns":
# Custom object endpoints must be accessed using the names specified in the source.
return self.update_custom_object_record("campaigns", record_id)
else:
raise NotImplementedError(f"Test client does not have an update method for {stream}")

Expand Down
6 changes: 4 additions & 2 deletions tests/test_hubspot_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ def get_matching_actual_record_by_pk(expected_primary_key_dict, actual_records):
'activeSalesforceId',

# Field is returned by API but not listed in official Hubspot documentation
'userIdIncludingInactive'
'userIdIncludingInactive',
'type'
},
'forms': { # BUG https://jira.talendforge.org/browse/TDL-15001
'alwaysCreateNewCompany',
Expand Down Expand Up @@ -167,7 +168,8 @@ def get_matching_actual_record_by_pk(expected_primary_key_dict, actual_records):
'property_hs_v2_date_exited_appointmentscheduled',
'property_hs_v2_latest_time_in_appointmentscheduled',
'property_hs_v2_cumulative_time_in_appointmentscheduled',
'property_hs_v2_date_entered_qualifiedtobuy'
'property_hs_v2_date_entered_qualifiedtobuy',
'property_deal_currency_code'
},
'subscription_changes':{
'normalizedEmailId'
Expand Down

0 comments on commit 695c5b5

Please sign in to comment.