Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move all fields test to new framework #239

Merged
merged 12 commits into from
Dec 18, 2023
Merged
383 changes: 383 additions & 0 deletions tests/base_hubspot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,383 @@
import os
import unittest
from datetime import datetime as dt
from datetime import timedelta

import tap_tester.menagerie as menagerie
import tap_tester.connections as connections
import tap_tester.runner as runner
from tap_tester.base_suite_tests.base_case import BaseCase
from tap_tester import LOGGER


class HubspotBaseCase(BaseCase):

BASIC_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"

EXPECTED_PAGE_SIZE = "expected-page-size"
OBEYS_START_DATE = "obey-start-date"
PARENT_STREAM = "parent-stream"

#######################################
# Tap Configurable Metadata Methods #
#######################################

def setUp(self):
missing_envs = [x for x in [
'TAP_HUBSPOT_REDIRECT_URI',
'TAP_HUBSPOT_CLIENT_ID',
'TAP_HUBSPOT_CLIENT_SECRET',
'TAP_HUBSPOT_REFRESH_TOKEN'
] if os.getenv(x) is None]
if missing_envs:
raise Exception("Missing environment variables: {}".format(missing_envs))

@staticmethod
def get_type():
return "platform.hubspot"

@staticmethod
def tap_name():
return "tap-hubspot"

def get_properties(self):
start_date = dt.today() - timedelta(days=1)
start_date_with_fmt = dt.strftime(start_date, self.START_DATE_FORMAT)

return {'start_date' : start_date_with_fmt}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the new framework we make the start date a property so it can be changed in tests outside of this method.

Suggested change
def get_properties(self):
start_date = dt.today() - timedelta(days=1)
start_date_with_fmt = dt.strftime(start_date, self.START_DATE_FORMAT)
return {'start_date' : start_date_with_fmt}
# set the default start date which can be overridden in the tests.
start_date = BaseCase.timedelta_formatted(dt.utcnow(), delta=timedelta(days=-1))
def get_properties(self):
return {'start_date': self.start_date,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - Changed this method as per the suggestion


def get_credentials(self):
return {'refresh_token': os.getenv('TAP_HUBSPOT_REFRESH_TOKEN'),
'client_secret': os.getenv('TAP_HUBSPOT_CLIENT_SECRET'),
'redirect_uri': os.getenv('TAP_HUBSPOT_REDIRECT_URI'),
'client_id': os.getenv('TAP_HUBSPOT_CLIENT_ID')}

def expected_check_streams(self):
return set(self.expected_metadata().keys())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as this method in tap-tester with a different name. It would be best to remove this and rename the calls to it to the method in tap-tester. At the minimum if we want to keep this name for some reason, just can make it a wrapper and call the underlying tap-tester method in it as follows:

Suggested change
def expected_check_streams(self):
return set(self.expected_metadata().keys())
@classmethod
def expected_check_streams(cls):
return cls.expected_stream_names()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is used only start_date test. So, I removed this. I think we can change start date test when we move that to the new framework.


@classmethod
def expected_metadata(cls): # DOCS_BUG https://stitchdata.atlassian.net/browse/DOC-1523)
"""The expected streams and metadata about the streams"""
return {
"campaigns": {
BaseCase.PRIMARY_KEYS: {"id"},
BaseCase.REPLICATION_METHOD: BaseCase.FULL_TABLE,
HubspotBaseCase.OBEYS_START_DATE: False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this property OBEYS_START_DATE although not currently a framework concept would apply to more taps and should be moved into the tap tester framework

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this to the tap-tester base case

},
"companies": {
BaseCase.PRIMARY_KEYS: {"companyId"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"property_hs_lastmodifieddate"},
HubspotBaseCase.EXPECTED_PAGE_SIZE: 250,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expected page size is now a concept in the framework as API_LIMIT. We should use this instead.
https://github.com/stitchdata/tap-tester/blob/ed0886e9a9bd3f1340ad4929a03f2d67ab4ebf2a/tap_tester/base_suite_tests/base_case.py#L88

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed EXPECTED_PAGE_SIZE to API_LIMIT to use the variable already defined in tap-tester base case

HubspotBaseCase.OBEYS_START_DATE: True
},
"contact_lists": {
BaseCase.PRIMARY_KEYS: {"listId"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updatedAt"},
HubspotBaseCase.EXPECTED_PAGE_SIZE: 250,
HubspotBaseCase.OBEYS_START_DATE: True
},
"contacts": {
BaseCase.PRIMARY_KEYS: {"vid"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"versionTimestamp"},
HubspotBaseCase.EXPECTED_PAGE_SIZE: 100,
HubspotBaseCase.OBEYS_START_DATE: True
},
"contacts_by_company": {
BaseCase.PRIMARY_KEYS: {"company-id", "contact-id"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
HubspotBaseCase.EXPECTED_PAGE_SIZE: 100,
HubspotBaseCase.OBEYS_START_DATE: True,
HubspotBaseCase.PARENT_STREAM: 'companies'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how common parent stream is, If this is unique to hubspot then doing it like this is appropriate. If we think there are more cases where this would exist we should also move this concept to the tap-tester framework.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see PARENT_STREAM defined. Can you show me where this is? Did it move to BaseCase? If so, we should specify that instead of HubspotBaseCase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed it.. It is moved to tap-tester base case. I changed to use BaseCase.PARENT_STREAM

},
"deal_pipelines": {
BaseCase.PRIMARY_KEYS: {"pipelineId"},
BaseCase.REPLICATION_METHOD: BaseCase.FULL_TABLE,
HubspotBaseCase.OBEYS_START_DATE: False,
},
"deals": {
BaseCase.PRIMARY_KEYS: {"dealId"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"property_hs_lastmodifieddate"},
HubspotBaseCase.OBEYS_START_DATE: True
},
"email_events": {
BaseCase.PRIMARY_KEYS: {"id"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"startTimestamp"},
HubspotBaseCase.EXPECTED_PAGE_SIZE: 1000,
HubspotBaseCase.OBEYS_START_DATE: True
},
"engagements": {
BaseCase.PRIMARY_KEYS: {"engagement_id"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"lastUpdated"},
HubspotBaseCase.EXPECTED_PAGE_SIZE: 250,
HubspotBaseCase.OBEYS_START_DATE: True
},
"forms": {
BaseCase.PRIMARY_KEYS: {"guid"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updatedAt"},
HubspotBaseCase.OBEYS_START_DATE: True
},
"owners": {
BaseCase.PRIMARY_KEYS: {"ownerId"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updatedAt"},
HubspotBaseCase.OBEYS_START_DATE: True # TODO is this a BUG?
},
"subscription_changes": {
BaseCase.PRIMARY_KEYS: {"timestamp", "portalId", "recipient"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"startTimestamp"},
HubspotBaseCase.EXPECTED_PAGE_SIZE: 1000,
HubspotBaseCase.OBEYS_START_DATE: True
},
"workflows": {
BaseCase.PRIMARY_KEYS: {"id"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updatedAt"},
HubspotBaseCase.OBEYS_START_DATE: True
},
"tickets": {
BaseCase.PRIMARY_KEYS: {"id"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updatedAt"},
HubspotBaseCase.EXPECTED_PAGE_SIZE: 100,
HubspotBaseCase.OBEYS_START_DATE: True
}
}

#############################
# Common Metadata Methods #
#############################

def expected_primary_keys(self):
"""
return a dictionary with key of table name
and value as a set of primary key fields
"""
return {table: properties.get(self.PRIMARY_KEYS, set())
for table, properties
in self.expected_metadata().items()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be overriding base case methods unless there is a good reason to make them different. In this case there is no difference except expanded functionality in the base case. We should remove this method.

This exists in https://github.com/stitchdata/tap-tester/blob/ed0886e9a9bd3f1340ad4929a03f2d67ab4ebf2a/tap_tester/base_suite_tests/base_case.py#L162-L172

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah.. that's right- a lot of methose methods are already there in tap-tester base case. Didn't realise it. Removed all these methods from here.



def expected_automatic_fields(self):
"""
return a dictionary with key of table name and value as the primary keys and replication keys
"""
pks = self.expected_primary_keys()
rks = self.expected_replication_keys()

return {stream: rks.get(stream, set()) | pks.get(stream, set())
for stream in self.expected_streams()}


def expected_replication_method(self):
"""return a dictionary with key of table name and value of replication method"""
return {table: properties.get(self.REPLICATION_METHOD, None)
for table, properties
in self.expected_metadata().items()}

def expected_streams(self):
"""A set of expected stream names"""
return set(self.expected_metadata().keys())

def expected_replication_keys(self):
"""
return a dictionary with key of table name
and value as a set of replication key fields
"""
return {table: properties.get(self.REPLICATION_KEYS, set())
for table, properties
in self.expected_metadata().items()}

def expected_page_limits(self):
return {table: properties.get(self.EXPECTED_PAGE_SIZE, set())
for table, properties
in self.expected_metadata().items()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for these. They are not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


def expected_primary_keys(self):

"""
return a dictionary with key of table name
and value as a set of primary key fields
"""
return {table: properties.get(self.PRIMARY_KEYS, set())
for table, properties
in self.expected_metadata().items()}

def expected_automatic_fields(self):
auto_fields = {}
for k, v in self.expected_metadata().items():
auto_fields[k] = v.get(self.PRIMARY_KEYS, set()) | v.get(self.REPLICATION_KEYS, set())
return auto_fields
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also true for these methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


##########################
# Common Test Actions #
##########################
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for all of these as well. These are all in tap tester base case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


def create_connection_and_run_check(self, original_properties: bool = True):
"""Create a new connection with the test name"""
# Create the connection
conn_id = connections.ensure_connection(self, original_properties)

# Run a check job using orchestrator (discovery)
check_job_name = runner.run_check_mode(self, conn_id)

# Assert that the check job succeeded
exit_status = menagerie.get_exit_status(conn_id, check_job_name)
menagerie.verify_check_exit_status(self, exit_status, check_job_name)
return conn_id

def run_and_verify_check_mode(self, conn_id):
"""
Run the tap in check mode and verify it succeeds.
This should be ran prior to field selection and initial sync.

Return the connection id and found catalogs from menagerie.
"""
# run in check mode
check_job_name = runner.run_check_mode(self, conn_id)

# verify check exit codes
exit_status = menagerie.get_exit_status(conn_id, check_job_name)
menagerie.verify_check_exit_status(self, exit_status, check_job_name)

found_catalogs = menagerie.get_catalogs(conn_id)
self.assertGreater(len(found_catalogs), 0, msg="unable to locate schemas for connection {}".format(conn_id))

found_catalog_names = set(map(lambda c: c['tap_stream_id'], found_catalogs))
self.assertSetEqual(self.expected_check_streams(), found_catalog_names,
msg="discovered schemas do not match")
LOGGER.info("discovered schemas are OK")

return found_catalogs

def run_and_verify_sync(self, conn_id):
"""
Run a sync job and make sure it exited properly.
Return a dictionary with keys of streams synced
and values of records synced for each stream
"""
# Run a sync job using orchestrator
sync_job_name = runner.run_sync_mode(self, conn_id)

# Verify tap and target exit codes
exit_status = menagerie.get_exit_status(conn_id, sync_job_name)
menagerie.verify_sync_exit_status(self, exit_status, sync_job_name)

# Verify actual rows were synced
sync_record_count = runner.examine_target_output_file(self,
conn_id,
self.expected_streams(),
self.expected_primary_keys())
total_row_count = sum(sync_record_count.values())
self.assertGreater(total_row_count, 0,
msg="failed to replicate any data: {}".format(sync_record_count))
LOGGER.info("total replicated row count: %s", total_row_count)

return sync_record_count

def perform_and_verify_table_and_field_selection(self,
conn_id,
test_catalogs,
select_all_fields=True):
"""
Perform table and field selection based off of the streams to select
set and field selection parameters.

Verify this results in the expected streams selected and all or no
fields selected for those streams.
"""

# Select all available fields or select no fields from all testable streams
self.select_all_streams_and_fields(
conn_id=conn_id, catalogs=test_catalogs, select_all_fields=select_all_fields
)

catalogs = menagerie.get_catalogs(conn_id)

# Ensure our selection affects the catalog
expected_selected = [tc.get('tap_stream_id') for tc in test_catalogs]
for cat in catalogs:
catalog_entry = menagerie.get_annotated_schema(conn_id, cat['stream_id'])

# Verify all testable streams are selected
selected = catalog_entry.get('annotated-schema').get('selected')
LOGGER.info("Validating selection on %s: %s", cat['stream_name'], selected)
if cat['stream_name'] not in expected_selected:
self.assertFalse(selected, msg="Stream selected, but not testable.")
continue # Skip remaining assertions if we aren't selecting this stream
self.assertTrue(selected, msg="Stream not selected.")

if select_all_fields:
# Verify all fields within each selected stream are selected
for field, field_props in catalog_entry.get('annotated-schema').get('properties').items():
field_selected = field_props.get('selected')
LOGGER.info("\tValidating selection on %s.%s: %s",
cat['stream_name'], field, field_selected)
self.assertTrue(field_selected, msg="Field not selected.")
else:
# Verify only automatic fields are selected
expected_automatic_fields = self.expected_automatic_fields().get(cat['tap_stream_id'])
selected_fields = self.get_selected_fields_from_metadata(catalog_entry['metadata'])
self.assertEqual(expected_automatic_fields, selected_fields)

@staticmethod
def get_selected_fields_from_metadata(metadata):
selected_fields = set()
for field in metadata:
is_field_metadata = len(field['breadcrumb']) > 1
inclusion_automatic_or_selected = (field['metadata'].get('inclusion') == 'automatic'
or field['metadata'].get('selected') is True)
if is_field_metadata and inclusion_automatic_or_selected:
selected_fields.add(field['breadcrumb'][1])
return selected_fields

@staticmethod
def select_all_streams_and_fields(conn_id, catalogs, select_all_fields: bool = True):
"""Select all streams and all fields within streams"""
for catalog in catalogs:
schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id'])

non_selected_properties = []
if not select_all_fields:
# get a list of all properties so that none are selected
non_selected_properties = schema.get('annotated-schema', {}).get(
'properties', {}).keys()

connections.select_catalog_and_fields_via_metadata(
conn_id, catalog, schema, [], non_selected_properties)

def timedelta_formatted(self, dtime, days=0, str_format="%Y-%m-%dT00:00:00Z"):
date_stripped = dt.strptime(dtime, str_format)
return_date = date_stripped + timedelta(days=days)

return dt.strftime(return_date, str_format)

################################
# Tap Specific Test Actions #
################################

def datetime_from_timestamp(self, value, str_format="%Y-%m-%dT00:00:00Z"):
"""
Takes in a unix timestamp in milliseconds.
Returns a string formatted python datetime
"""
try:
datetime_value = dt.fromtimestamp(value)
datetime_str = dt.strftime(datetime_value, str_format)
except ValueError as err:
raise NotImplementedError(
f"Invalid argument 'value': {value} "
"This method was designed to accept unix timestamps in milliseconds."
)
return datetime_str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is the only tap that uses unix timestamps, but this function looks like it could be helpful to other taps. I would consider if it should be moved to base case in the helper function? https://github.com/stitchdata/tap-tester/blob/ed0886e9a9bd3f1340ad4929a03f2d67ab4ebf2a/tap_tester/base_suite_tests/base_case.py#L474-L522

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this to tap-tester base case


def is_child(self, stream):
"""return true if this stream is a child stream"""
return self.expected_metadata()[stream].get(self.PARENT_STREAM) is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should follow PARENT_STREAM. If that stays here this should, if that goes to base case this should follow it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this to tap-tester base case

Loading