From 88b0b8eb28b07733e20f9f4786a1190c86b213d7 Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi <105213416+sgandhi1311@users.noreply.github.com> Date: Tue, 12 Dec 2023 20:37:36 +0530 Subject: [PATCH] Tdl 24434 custom object support (#242) * initial commit to add support for custom objects * modify the endpoint for custom records * functionality for the discover mode of custom streams * add custom_objects schema * add sync functionality for custom_object records * create shared folder for the re-use of schema * Log a warning for custom object scopes, handling exceptions. * add doc string to definitions * remove custom_object schema stream, as it just the schema of custom objects with solid analytical info. * modifying naming conventions * include shared folder in the package data * include shared json file names * modified package data in setup.py * update manifest file with the shared folder * correct the misspelled * make pylint happy * add unit test for the custom object functionality * handle the chains of .get() calls * before sync get the streams from the catalog instead of the api endpoint * make pylint happy * remove `custom_` prefix from the table name * make pylint happy * Add doc string * disable pylint issue - inconsistent-return-statements * fix unit test case * update the discover mode for custom streams * remove trailing space * modify function name * update STATE assignment post custom object sync * move try catch block to the `gen_request_custom_objects` function * correct variable name * include custom objects stream for testing all the scenarios (#243) * initial commit to add support for custom objects * modify the endpoint for custom records * functionality for the discover mode of custom streams * add custom_objects schema * add sync functionality for custom_object records * create shared folder for the re-use of schema * Log a warning for custom object scopes, handling exceptions. * add doc string to definitions * remove custom_object schema stream, as it just the schema of custom objects with solid analytical info. * modifying naming conventions * include shared folder in the package data * include shared json file names * modified package data in setup.py * update manifest file with the shared folder * include custom objects stream for testing all the scenarios * correct the misspelled * make pylint happy * fix the errors * take the recent startdate, as the custom objects are created 2 days before. * remove `custom_` prefix from the name of custom object stream * remove unused code * Update start dates due to the absence of custom object creation in between. * update test client with pagination logic * fix all fields test for deals stream * update STATE assignment post custom object sync * correct variable name * setup and changelog update --- CHANGELOG.md | 3 + MANIFEST.in | 1 + setup.py | 17 +- tap_hubspot/__init__.py | 178 +++++++++++++++++- .../schemas/shared/associations_schema.json | 34 ++++ .../schemas/shared/custom_objects.json | 44 +++++ .../tests/unittests/test_custom_objects.py | 158 ++++++++++++++++ tests/base.py | 15 ++ tests/client.py | 110 +++++++++++ tests/test_hubspot_start_date.py | 4 +- 10 files changed, 544 insertions(+), 20 deletions(-) create mode 100644 tap_hubspot/schemas/shared/associations_schema.json create mode 100644 tap_hubspot/schemas/shared/custom_objects.json create mode 100644 tap_hubspot/tests/unittests/test_custom_objects.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a3f5a08..952a01f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 2.13.0 + * HubSpot Custom CRM Objects Support [#242](https://github.com/singer-io/tap-hubspot/pull/242) + ## 2.12.2 * Use engagements_page_size advanced option [#234](https://github.com/singer-io/tap-hubspot/pull/234) * diff --git a/MANIFEST.in b/MANIFEST.in index be81b9f2..b649d158 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1,3 @@ include LICENSE include tap_hubspot/schemas/*.json +include tap_hubspot/schemas/shared/*.json diff --git a/setup.py b/setup.py index 0f30dd63..f99dbc6a 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='tap-hubspot', - version='2.12.2', + version='2.13.0', description='Singer.io tap for extracting data from the HubSpot API', author='Stitch', url='http://singer.io', @@ -29,18 +29,9 @@ packages=['tap_hubspot'], package_data = { 'tap_hubspot/schemas': [ - "campaigns.json", - "companies.json", - "contact_lists.json", - "contacts.json", - "deals.json", - "email_events.json", - "forms.json", - "keywords.json", - "owners.json", - "subscription_changes.json", - "workflows.json", - ], + "schemas/*.json", + "schemas/shared/*.json" + ] }, include_package_data=True, ) diff --git a/tap_hubspot/__init__.py b/tap_hubspot/__init__.py index 17ae77e4..218d1f50 100644 --- a/tap_hubspot/__init__.py +++ b/tap_hubspot/__init__.py @@ -102,6 +102,9 @@ class StateFields: "tickets_properties": "/crm/v3/properties/tickets", "tickets": "/crm/v4/objects/tickets", + + "custom_objects_schema": "/crm/v3/schemas", + "custom_objects": "/crm/v3/objects/p_{object_name}" } def get_start(state, tap_stream_id, bookmark_key, older_bookmark_key=None): @@ -197,7 +200,12 @@ def get_field_schema(field_type, extras=False): } } -def parse_custom_schema(entity_name, data): +def parse_custom_schema(entity_name, data, is_custom_object=False): + if is_custom_object: + return { + field['name']: get_field_type_schema(field['type']) + for field in data + } if entity_name == "tickets": return { field['name']: get_field_type_schema(field['type']) @@ -1104,6 +1112,92 @@ def sync_deal_pipelines(STATE, ctx): singer.write_state(STATE) return STATE +def gen_request_custom_objects(tap_stream_id, url, params, path, more_key): + """ + Cursor-based API Pagination : Used in custom_objects stream implementation + """ + try: + with metrics.record_counter(tap_stream_id) as counter: + while True: + data = request(url, params).json() + + if data.get(path) is None: + raise RuntimeError( + "Unexpected API response: {} not in {}".format(path, data.keys())) + + for row in data[path]: + counter.increment() + yield row + + if not data.get(more_key): + break + params['after'] = data.get(more_key, {}).get('next', {}).get('after', None) + if params['after'] is None: + break + except SourceUnavailableException as ex: + warning_message = str(ex).replace(CONFIG['access_token'], 10 * '*') + LOGGER.warning(warning_message) + return [] + +def sync_custom_objects(stream_id, primary_key, bookmark_key, catalog, STATE, params, is_custom_object=False): + """ + Synchronize records from a data source + """ + mdata = metadata.to_map(catalog.get('metadata')) + if is_custom_object: + url = get_url("custom_objects", object_name=stream_id) + else: + url = get_url(stream_id) + max_bk_value = bookmark_value = utils.strptime_with_tz( + get_start(STATE, stream_id, bookmark_key)) + + LOGGER.info(f"Sync record for {stream_id} from {bookmark_value}") + schema = catalog.get('schema') + singer.write_schema(stream_id, schema, [primary_key], + [bookmark_key], catalog.get('stream_alias')) + + with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as transformer: + # To handle records updated between start of the table sync and the end, + # store the current sync start in the state and not move the bookmark past this value. + sync_start_time = utils.now() + for row in gen_request_custom_objects(stream_id, url, params, 'results', "paging"): + # parsing the string formatted date to datetime object + modified_time = utils.strptime_to_utc(row[bookmark_key]) + + # Checking the bookmark value is present on the record and it + # is greater than or equal to defined previous bookmark value + if modified_time and modified_time >= bookmark_value: + # transforms the data and filters out the selected fields from the catalog + record = transformer.transform(lift_properties_and_versions(row), schema, mdata) + singer.write_record(stream_id, record, catalog.get( + 'stream_alias'), time_extracted=utils.now()) + if modified_time and modified_time >= max_bk_value: + max_bk_value = modified_time + + # Don't bookmark past the start of this sync to account for updated records during the sync. + new_bookmark = min(max_bk_value, sync_start_time) + STATE = singer.write_bookmark(STATE, stream_id, bookmark_key, utils.strftime(new_bookmark)) + singer.write_state(STATE) + return STATE + + +def sync_custom_object_records(STATE, ctx, stream_id): + """ + Function to sync records for each `custom_object` stream + """ + catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) + mdata = metadata.to_map(catalog.get('metadata')) + primary_key = "id" + bookmark_key = "updatedAt" + + params = {'limit': 100, + 'associations': 'emails,meetings,notes,tasks,calls,conversations,contacts,companies,deals,tickets', + 'properties': get_selected_property_fields(catalog, mdata), + 'archived': False + } + return sync_custom_objects(stream_id, primary_key, bookmark_key, catalog, STATE, params, is_custom_object=True) + + @attr.s class Stream: tap_stream_id = attr.ib() @@ -1131,6 +1225,64 @@ class Stream: Stream('engagements', sync_engagements, ["engagement_id"], 'lastUpdated', 'FULL_TABLE') ] +# pylint: disable=inconsistent-return-statements +def generate_custom_streams(mode, catalog=None): + """ + - In DISCOVER mode, fetch the custom schema from the API endpoint and set the schema for the custom objects. + - In SYNC mode, extend STREAMS for the custom objects. + + Args: + mode (str): The mode indicating whether to DISCOVER or SYNC custom streams. + catalog (dict): The catalog containing stream information. + + Returns: + List[dict] or List[str]: Returns list of custom streams (contains dictionary) in DISCOVER mode and a list of custom object names in SYNC mode. + """ + custom_objects_schema_url = get_url("custom_objects_schema") + if mode == "DISCOVER": + custom_streams = [] + # Load Hubspot's shared schemas + refs = load_shared_schema_refs() + for custom_object in gen_request_custom_objects("custom_objects_schema", custom_objects_schema_url, {}, 'results', "paging"): + stream_id = custom_object["name"] + schema = utils.load_json(get_abs_path('schemas/shared/custom_objects.json')) + custom_schema = parse_custom_schema(stream_id, custom_object["properties"], is_custom_object=True) + schema["properties"]["properties"] = { + "type": "object", + "properties": custom_schema, + } + + # Move properties to top level + custom_schema_top_level = {'property_{}'.format(k): v for k, v in custom_schema.items()} + schema['properties'].update(custom_schema_top_level) + + final_schema = singer.resolve_schema_references(schema, refs) + custom_streams.append({"stream": Stream(stream_id, sync_custom_object_records, ['id'], 'updatedAt', 'INCREMENTAL'), + "schema": final_schema}) + + return custom_streams + + elif mode == "SYNC": + custom_objects = [custom_object["name"] for custom_object in gen_request_custom_objects("custom_objects_schema", custom_objects_schema_url, {}, 'results', "paging")] + if len(custom_objects) > 0: + for stream in catalog["streams"]: + if stream["tap_stream_id"] in custom_objects: + STREAMS.append(Stream(stream["tap_stream_id"], sync_custom_object_records, ['id'], 'updatedAt', 'INCREMENTAL')) + return custom_objects + +def load_shared_schema_refs(): + shared_schemas_path = get_abs_path('schemas/shared') + + shared_file_names = [f for f in os.listdir(shared_schemas_path) + if os.path.isfile(os.path.join(shared_schemas_path, f))] + + shared_schema_refs = {} + for shared_file in shared_file_names: + with open(os.path.join(shared_schemas_path, shared_file)) as data_file: + shared_schema_refs[shared_file] = json.load(data_file) + + return shared_schema_refs + def get_streams_to_sync(streams, state): target_stream = singer.get_currently_syncing(state) result = streams @@ -1152,6 +1304,7 @@ def get_selected_streams(remaining_streams, ctx): return selected_streams def do_sync(STATE, catalog): + custom_objects = generate_custom_streams(mode="SYNC", catalog=catalog) # Clear out keys that are no longer used clean_state(STATE) @@ -1168,7 +1321,10 @@ def do_sync(STATE, catalog): singer.write_state(STATE) try: - STATE = stream.sync(STATE, ctx) # pylint: disable=not-callable + if stream.tap_stream_id in custom_objects: + STATE = stream.sync(STATE, ctx, stream.tap_stream_id) + else: + STATE = stream.sync(STATE, ctx) # pylint: disable=not-callable except SourceUnavailableException as ex: error_message = str(ex).replace(CONFIG['access_token'], 10 * '*') LOGGER.error(error_message) @@ -1210,8 +1366,7 @@ def validate_dependencies(ctx): if errs: raise DependencyException(" ".join(errs)) -def load_discovered_schema(stream): - schema = load_schema(stream.tap_stream_id) +def get_metadata(stream, schema): mdata = metadata.new() mdata = metadata.write(mdata, (), 'table-key-properties', stream.key_properties) @@ -1230,8 +1385,13 @@ def load_discovered_schema(stream): if stream.tap_stream_id == "engagements": mdata = metadata.write(mdata, ('properties', 'engagement'), 'inclusion', 'automatic') mdata = metadata.write(mdata, ('properties', 'lastUpdated'), 'inclusion', 'automatic') + return metadata.to_list(mdata) - return schema, metadata.to_list(mdata) +def load_discovered_schema(stream): + schema = load_schema(stream.tap_stream_id) + mdata = get_metadata(stream, schema) + + return schema, mdata def discover_schemas(): result = {'streams': []} @@ -1247,6 +1407,14 @@ def discover_schemas(): # Skip the discovery mode on the streams were the required scopes are missing warning_message = str(ex).replace(CONFIG['access_token'], 10 * '*') LOGGER.warning(warning_message) + + for custom_stream in generate_custom_streams(mode="DISCOVER"): + LOGGER.info('Loading schema for Custom Object - %s', custom_stream["stream"].tap_stream_id) + result['streams'].append({'stream': custom_stream["stream"].tap_stream_id, + 'tap_stream_id': custom_stream["stream"].tap_stream_id, + 'schema': custom_stream["schema"], + 'metadata': get_metadata(custom_stream["stream"], custom_stream["schema"])}) + # Load the contacts_by_company schema LOGGER.info('Loading schema for contacts_by_company') contacts_by_company = Stream('contacts_by_company', _sync_contacts_by_company, ['company-id', 'contact-id'], None, 'FULL_TABLE') diff --git a/tap_hubspot/schemas/shared/associations_schema.json b/tap_hubspot/schemas/shared/associations_schema.json new file mode 100644 index 00000000..87f71ff5 --- /dev/null +++ b/tap_hubspot/schemas/shared/associations_schema.json @@ -0,0 +1,34 @@ +{ + "type": [ + "null", + "object" + ], + "properties": { + "results": { + "type": [ + "null", + "array" + ], + "items": { + "type": [ + "null", + "object" + ], + "properties": { + "id": { + "type": [ + "null", + "string" + ] + }, + "type": { + "type": [ + "null", + "string" + ] + } + } + } + } + } +} diff --git a/tap_hubspot/schemas/shared/custom_objects.json b/tap_hubspot/schemas/shared/custom_objects.json new file mode 100644 index 00000000..7cf6395b --- /dev/null +++ b/tap_hubspot/schemas/shared/custom_objects.json @@ -0,0 +1,44 @@ +{ + "type": "object", + "properties": { + "id": { + "type": [ + "null", + "string" + ] + }, + "createdAt": { + "type": ["null", "string"], + "format": "date-time" + }, + "updatedAt": { + "type": ["null", "string"], + "format": "date-time" + }, + "archived": { + "type": [ + "null", + "boolean" + ] + }, + "associations": { + "type": [ + "null", + "object" + ], + "properties": { + "emails": {"$ref": "associations_schema.json"}, + "meetings": {"$ref": "associations_schema.json"}, + "notes": {"$ref": "associations_schema.json"}, + "tasks": {"$ref": "associations_schema.json"}, + "calls": {"$ref": "associations_schema.json"}, + "conversation_session": {"$ref": "associations_schema.json"}, + "companies": {"$ref": "associations_schema.json"}, + "contacts": {"$ref": "associations_schema.json"}, + "deals": {"$ref": "associations_schema.json"}, + "products": {"$ref": "associations_schema.json"}, + "tickets": {"$ref": "associations_schema.json"} + } + } + } +} diff --git a/tap_hubspot/tests/unittests/test_custom_objects.py b/tap_hubspot/tests/unittests/test_custom_objects.py new file mode 100644 index 00000000..f0b14311 --- /dev/null +++ b/tap_hubspot/tests/unittests/test_custom_objects.py @@ -0,0 +1,158 @@ +import unittest +from unittest.mock import patch +from tap_hubspot import generate_custom_streams, Stream, sync_custom_object_records, Context + +MOCK_CATALOG = { + "streams": [ + { + "stream": "cars", + "tap_stream_id": "cars", + "schema": { + "type": "object", + "properties": { + "id": {"type": ["null", "string"]}, + "updatedAt": {"type": ["null", "string"], "format": "date-time"}, + "property_model": {"type": ["null", "string"]}, + }, + }, + "metadata": [ + { + "breadcrumb": [], + "metadata": { + "table-key-properties": ["id"], + "forced-replication-method": "INCREMENTAL", + "valid-replication-keys": ["updatedAt"], + "selected": True, + }, + }, + { + "breadcrumb": ["properties", "id"], + "metadata": {"inclusion": "automatic"}, + }, + { + "breadcrumb": ["properties", "updatedAt"], + "metadata": {"inclusion": "automatic"}, + }, + { + "breadcrumb": ["properties", "property_model"], + "metadata": {"inclusion": "available", "selected": True}, + }, + ], + } + ] +} + + +class TestGenerateCustomStreams(unittest.TestCase): + @patch("tap_hubspot.sync_custom_object_records") + @patch("tap_hubspot.get_url", return_value="fake_custom_objects_schema_url") + @patch("tap_hubspot.load_shared_schema_refs", return_value="fake_refs") + @patch("tap_hubspot.gen_request_custom_objects") + @patch("tap_hubspot.utils.load_json") + @patch("tap_hubspot.parse_custom_schema") + @patch("tap_hubspot.singer.resolve_schema_references") + @patch("builtins.open", create=True) + @patch("tap_hubspot.LOGGER.warning") + def test_generate_custom_streams( + self, + mock_warning, + mock_open, + mock_resolve_schema, + mock_parse_custom_schema, + mock_load_json, + mock_gen_request_custom_objects, + mock_load_shared_schema_refs, + mock_get_url, + mock_sync_custom_records + ): + """ + test the flow of definition generate_custom_streams + """ + + # Set up mocks and fake data + mode = "DISCOVER" + fake_custom_object = { + "name": "fake_object", + "properties": {"prop1": "type1", "prop2": "type2"}, + } + fake_custom_objects_schema_url = "fake_custom_objects_schema_url" + fake_final_schema = { + "type": "object", + "properties": {"property_fake_object": "fake_value"}, + } + expected_value = [ + {'stream': Stream(tap_stream_id='fake_object', sync=mock_sync_custom_records, key_properties=['id'], replication_key='updatedAt', replication_method='INCREMENTAL'), + 'schema': {'type': 'object', 'properties': {'property_fake_object': 'fake_value'}}}] + + # Set up mock return values + mock_gen_request_custom_objects.return_value = [fake_custom_object] + mock_load_json.return_value = { + "type": "object", + "properties": {"properties": {}}, + } + mock_parse_custom_schema.return_value = {"prop1": "type1", "prop2": "type2"} + mock_resolve_schema.return_value = fake_final_schema + mock_get_url.return_value = fake_custom_objects_schema_url + + # Call the function + actual_value = generate_custom_streams(mode) + # Verify the expected calls + mock_gen_request_custom_objects.assert_called_once_with( + "custom_objects_schema", + fake_custom_objects_schema_url, + {}, + "results", + "paging", + ) + mock_load_shared_schema_refs.assert_called_once() + mock_get_url.assert_called_once_with("custom_objects_schema") + mock_parse_custom_schema.assert_called_once_with( + "fake_object", {"prop1": "type1", "prop2": "type2"}, is_custom_object=True + ) + mock_resolve_schema.assert_called_once_with( + { + "type": "object", + "properties": { + "properties": { + "type": "object", + "properties": {"prop1": "type1", "prop2": "type2"}, + }, + "property_prop1": "type1", + "property_prop2": "type2", + }, + }, + "fake_refs", + ) + mock_warning.assert_not_called() # No warning should be issued in this case + self.assertEqual(actual_value, expected_value) + + @patch("tap_hubspot.gen_request_custom_objects") + @patch("tap_hubspot.get_start", return_value="2023-07-07T00:00:00Z") + @patch("tap_hubspot.get_selected_property_fields", return_value="model") + def test_sync_custom_objects( + self, mock_property, mock_start_date, mock_custom_objects + ): + """ + Test the synchronization of custom objects. + """ + + # Set up mocks and fake data + STATE = {"currently_syncing": "cars"} + ctx = Context(MOCK_CATALOG) + stream_id = "cars" + mock_custom_objects.return_value = [ + { + "id": "11111", + "properties": {"model": "Frontier"}, + "updatedAt": "2023-11-09T13:14:22.956Z", + } + ] + expected_output = { + "currently_syncing": "cars", + "bookmarks": {"cars": {"updatedAt": "2023-11-09T13:14:22.956000Z"}}, + } + + # Call the function + actual_output = sync_custom_object_records(STATE, ctx, stream_id) + # Verify the expected calls + self.assertEqual(expected_output, actual_output) diff --git a/tests/base.py b/tests/base.py index 769eac88..a9b61d66 100644 --- a/tests/base.py +++ b/tests/base.py @@ -155,6 +155,21 @@ def expected_metadata(self): # DOCS_BUG https://stitchdata.atlassian.net/browse self.REPLICATION_KEYS: {"updatedAt"}, self.EXPECTED_PAGE_SIZE: 100, self.OBEYS_START_DATE: True + }, + # below are the custom_objects stream + "cars": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updatedAt"}, + self.EXPECTED_PAGE_SIZE: 100, + self.OBEYS_START_DATE: True + }, + "co_firsts": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updatedAt"}, + self.EXPECTED_PAGE_SIZE: 100, + self.OBEYS_START_DATE: True } } diff --git a/tests/client.py b/tests/client.py index db8ff9b0..aa63a2a3 100644 --- a/tests/client.py +++ b/tests/client.py @@ -213,6 +213,8 @@ def read(self, stream, parent_ids=[], since='', pagination=False): return self.get_subscription_changes(since, pagination) elif stream == "tickets": return self.get_tickets(pagination) + elif stream in ["cars", "co_firsts"]: + return self.get_custom_objects(stream) else: raise NotImplementedError @@ -729,6 +731,57 @@ def get_tickets(self, pagination=False): records = self.denest_properties('tickets', records) return records + + def _get_custom_object_record_by_pk(self, object_name, id): + """ + Get a specific custom object record by pk value + HubSpot API https://developers.hubspot.com/docs/api/crm/crm-custom-objects + """ + associations = 'emails,meetings,notes,tasks,calls,conversations,contacts,companies,deals,tickets' + url = f"{BASE_URL}/crm/v3/objects/p_{object_name}/{id}?associations={associations}" + response = self.get(url) + return response + + def get_custom_objects_properties(self, object_name): + """ + Get custom object properties. + HubSpot API https://developers.hubspot.com/docs/api/crm/crm-custom-objects + """ + url = f"{BASE_URL}/crm/v3/properties/p_{object_name}" + records = self.get(url) + + return ",".join([record["name"] for record in records["results"]]) + + def get_custom_objects(self, stream): + """ + Get all custom_object records. + HubSpot API https://developers.hubspot.com/docs/api/crm/crm-custom-objects + """ + page_size = self.BaseTest.expected_metadata().get(stream,{}).get(self.BaseTest.EXPECTED_PAGE_SIZE) + url = f"{BASE_URL}/crm/v3/objects/p_{stream}" + replication_key = list(self.replication_keys[stream])[0] + records = [] + + # response = self.get(url) + associations = 'emails,meetings,notes,tasks,calls,conversations,contacts,companies,deals,tickets' + params = {"limit": page_size, "associations": associations, 'properties': self.get_custom_objects_properties(stream)} + while True: + response = self.get(url, params=params) + + records.extend([record + for record in response["results"] + if record[replication_key] >= self.start_date_strf.replace('.Z', '.000Z')]) + + if not response.get("paging"): + break + if page_size and len(records) > page_size+10: + break + params['after'] = response.get("paging", {}).get('next', {}).get('after', None) + if params['after'] is None: + break + + records = self.denest_properties(stream, records) + return records ########################################################################## ### CREATE @@ -775,6 +828,8 @@ def create(self, stream, company_ids=[], subscriptions=[], times=1): return self.create_subscription_changes(subscriptions, times) elif stream == 'tickets': return self.create_tickets() + elif stream in ["cars", "co_firsts"]: + return self.create_custom_object_record(stream) else: raise NotImplementedError(f"There is no create_{stream} method in this dipatch!") @@ -947,6 +1002,37 @@ def create_contacts_by_company(self, company_ids=[], contact_records=[], times=1 return records + def create_custom_object_record(self, stream): + url = f"{BASE_URL}/crm/v3/objects/p_{stream}" + + if stream == "cars": + data = { + "properties": { + "condition": random.choice(["used", "new"]), + "date_received": random.choice([1197590400000, 1554854400000, 1337990400000, 1543708800000]), + "year": random.choice([2000, 2001, 2002, 2003, 2020, 2021, 2022, 2023]), + "make": random.choice([ "bmw", "hyundai", "kia", "tata", "toyota", "mercedes", "porsche"]), + "model": random.choice(["sporty", "normal"]), + "vin": str(uuid.uuid4()).replace('-', ''), + "color": random.choice(["white", "red", "black", "orange", "green"]), + "mileage": random.choice([10, 20, 30, 40, 15, 25, 35, 12, 22, 28]), + "price": random.choice([10000, 20000, 30000, 40000, 50000, 60000, 70000]), + "notes": random.choice(["Excellent condition.", "Bad Condition"]) + } + } + elif stream == "co_firsts": + data = { + "properties": { + "id": random.randint(1, 100000), + "name": "test name", + "country": random.choice(["USA", "India", "France", "UK"]) + } + } + + # generate a record + response = self.post(url, data) + return [response] + def create_deal_pipelines(self): """ HubSpot API @@ -1367,6 +1453,8 @@ def update(self, stream, record_id): return self.update_engagements(record_id) elif stream == 'tickets': return self.update_tickets(record_id) + elif stream in ["cars", "co_firsts"]: + return self.update_custom_object_record(stream, record_id) else: raise NotImplementedError(f"Test client does not have an update method for {stream}") @@ -1602,6 +1690,28 @@ def update_tickets(self, ticket_id): self.patch(url, data) return self._get_tickets_by_pk(ticket_id) + + def update_custom_object_record(self, stream, id): + """ + Updates a custom object record using the HubSpot CRM API. + https://developers.hubspot.com/docs/api/crm/crm-custom-objects + + :param stream: The custom object stream identifier, e.g., 'custom_objects'. + :param id: The primary key value of the custom object record to update. + :return: The updated custom object record. + """ + url = f"{BASE_URL}/crm/v3/objects/p_{stream}/{id}" + + record_uuid = str(uuid.uuid4()).replace('-', '')[:20] + data = { + "properties": { + "notes": f"update record for testing - {record_uuid}" + } + } + + self.patch(url, data) + + return self._get_custom_object_record_by_pk(stream, id) ########################################################################## ### Deletes diff --git a/tests/test_hubspot_start_date.py b/tests/test_hubspot_start_date.py index 5a6f5d3f..2cbcb723 100644 --- a/tests/test_hubspot_start_date.py +++ b/tests/test_hubspot_start_date.py @@ -55,11 +55,11 @@ def get_properties(self, original=True): if original: return { - 'start_date' : self.timedelta_formatted(utc_today, days=-8) + 'start_date' : self.timedelta_formatted(utc_today, days=-20) } else: return { - 'start_date': self.timedelta_formatted(utc_today, days=-4) + 'start_date': self.timedelta_formatted(utc_today, days=-5) } def test_run(self):