diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 6e46b00..58b80a3 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -2,10 +2,10 @@ (write a short description or paste a link to JIRA) # Manual QA steps - - - + - + # Risks - - - + - + # Rollback steps - revert this branch diff --git a/.gitignore b/.gitignore index 5386653..6a1a7e9 100644 --- a/.gitignore +++ b/.gitignore @@ -45,20 +45,7 @@ coverage.xml *,cover .hypothesis/ -# Translations -*.mo -*.pot -# Django stuff: -*.log -local_settings.py - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy # Sphinx documentation docs/_build/ @@ -72,8 +59,6 @@ target/ # pyenv .python-version -# celery beat schedule file -celerybeat-schedule # dotenv .env @@ -92,11 +77,15 @@ ENV/ ._* .DS_Store -# Custom stuff -env.sh +state.json +catalog.json config.json -.autoenv.zsh -rsa-key -tags -properties.json + +# VS Code files for those working on multiple tools +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +*.code-workspace diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..6717558 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,57 @@ +default_stages: [commit] +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.3.0 + hooks: + - id: check-merge-conflict + - id: check-docstring-first + - id: debug-statements + - id: trailing-whitespace + - id: check-toml + - id: end-of-file-fixer + - id: check-yaml + - id: sort-simple-yaml + - id: check-json + - id: pretty-format-json + args: ['--autofix','--no-sort-keys'] + + - repo: https://github.com/pycqa/isort + rev: 5.10.1 + hooks: + - id: isort + + - repo: https://github.com/psf/black + rev: 22.8.0 + hooks: + - id: black + + - repo: https://github.com/pycqa/flake8 + rev: 5.0.4 + hooks: + - id: flake8 + additional_dependencies: [ + 'flake8-print', + 'flake8-debugger', + ] + + - repo: https://github.com/PyCQA/bandit + rev: '1.7.4' + hooks: + - id: bandit + + - repo: https://github.com/asottile/pyupgrade + rev: v2.37.3 + hooks: + - id: pyupgrade + args: [--py37-plus] + + - repo: https://github.com/PyCQA/docformatter + rev: v1.5.0 + hooks: + - id: docformatter + args: [--in-place] + + - repo: https://github.com/codespell-project/codespell + rev: v2.2.1 + hooks: + - id: codespell diff --git a/LICENSE b/LICENSE index 753d647..4ec8c3f 100644 --- a/LICENSE +++ b/LICENSE @@ -617,4 +617,3 @@ Program, unless a warranty or assumption of liability accompanies a copy of the Program in return for a fee. END OF TERMS AND CONDITIONS - diff --git a/README.md b/README.md index 6a9e278..573666c 100644 --- a/README.md +++ b/README.md @@ -37,8 +37,8 @@ authorization request), log into your Zendesk Chat / Zopim account, go to Settings -> Account -> API -> Add API Client Once you create the API Client you will receive a client ID and client secret. -Use these in conjunction with your chose method of performing the OAuth 2 -reqeust to obtain an access token to your (or a third-party) Zendesk Chat / +Use these in conjunction with your choice method of performing the OAuth 2 +request to obtain an access token to your (or a third-party) Zendesk Chat / Zopim account. 3. Create the Config File diff --git a/config.json.sample b/config.json.sample new file mode 100644 index 0000000..d565082 --- /dev/null +++ b/config.json.sample @@ -0,0 +1,4 @@ +{ + "access_token":"", + "start_date":"12/01/2010" +} diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..500b64e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,21 @@ +[tool.black] +line-length = 120 +target-version = ['py37',] +include = '\.pyi?$' + +[flake8] +profile = "black" +max-line-length = 120 +exclude = "build,.git,.tox,./tests/.env,tests" +ignore = "W504,W601,D203" + +[tool.pylint] +max-line-length = 120 +disable = ["R0801",] + +[tool.isort] +profile = "black" +multi_line_output = 3 + +[tool.bandit] +exclude_dirs = ["tests",".env"] diff --git a/setup.cfg b/setup.cfg index b88034e..fec1fe5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,10 @@ [metadata] description-file = README.md + + +[flake8] +ignore = W504,W601,D203 +profile = black +max-line-length = 120 +exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,.git,.tox,./tests/.env,tests +max-complexity = 10 diff --git a/setup.py b/setup.py index f20f3de..bd6b0e2 100755 --- a/setup.py +++ b/setup.py @@ -1,33 +1,24 @@ #!/usr/bin/env python -from setuptools import setup, find_packages +from setuptools import find_packages, setup -setup(name="tap-zendesk-chat", - version="0.3.2", - description="Singer.io tap for extracting data from the Zendesk Chat API", - author="Stitch", - url="http://singer.io", - classifiers=["Programming Language :: Python :: 3 :: Only"], - py_modules=["tap_zendesk_chat"], - install_requires=[ - "python-dateutil==2.6.0", # because of singer-python issue - "pendulum==1.2.0", # because of singer-python issue - "singer-python==5.12.1", - "requests==2.20.0", - ], - extras_require={ - 'dev': [ - 'pylint==2.7.4', - 'ipdb', - 'nose' - ] - }, - entry_points=""" - [console_scripts] - tap-zendesk-chat=tap_zendesk_chat:main - """, - packages=["tap_zendesk_chat"], - package_data = { - "schemas": ["tap_zendesk_chat/schemas/*.json"] - }, - include_package_data=True, +setup( + name="tap-zendesk-chat", + version="0.3.2", + description="Singer.io tap for extracting data from the Zendesk Chat API", + author="Stitch", + url="https://singer.io", + classifiers=["Programming Language :: Python :: 3 :: Only"], + py_modules=["tap_zendesk_chat"], + install_requires=[ + "singer-python==5.12.1", + "requests==2.20.0", + ], + extras_require={"dev": ["pylint", "ipdb", "nose"]}, + entry_points=""" + [console_scripts] + tap-zendesk-chat=tap_zendesk_chat:main + """, + packages=find_packages(exclude=["tests"]), + package_data={"schemas": ["tap_zendesk_chat/schemas/*.json"]}, + include_package_data=True, ) diff --git a/tap_zendesk_chat/__init__.py b/tap_zendesk_chat/__init__.py index 2169657..38122a7 100644 --- a/tap_zendesk_chat/__init__.py +++ b/tap_zendesk_chat/__init__.py @@ -1,134 +1,25 @@ #!/usr/bin/env python3 -import os import singer -from singer import metrics, utils, metadata -from singer.catalog import Catalog, CatalogEntry, Schema -from requests.exceptions import HTTPError -from . import streams as streams_ +from singer.utils import handle_top_exception, parse_args + from .context import Context -from .http import Client +from .discover import discover +from .sync import sync REQUIRED_CONFIG_KEYS = ["start_date", "access_token"] LOGGER = singer.get_logger() -def get_abs_path(path): - return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) - - -def load_schema(tap_stream_id): - path = "schemas/{}.json".format(tap_stream_id) - schema = utils.load_json(get_abs_path(path)) - dependencies = schema.pop("tap_schema_dependencies", []) - refs = {} - for sub_stream_id in dependencies: - refs[sub_stream_id] = load_schema(sub_stream_id) - if refs: - singer.resolve_schema_references(schema, refs) - return schema - - -def ensure_credentials_are_authorized(client): - # The request will throw an exception if the credentials are not authorized - client.request(streams_.DEPARTMENTS.tap_stream_id) - - -def is_account_endpoint_authorized(client): - # The account endpoint is restricted to zopim accounts, meaning integrated - # Zendesk accounts will get a 403 for this endpoint. - try: - client.request(streams_.ACCOUNT.tap_stream_id) - except HTTPError as e: - if e.response.status_code == 403: - LOGGER.info( - "Ignoring 403 from account endpoint - this must be an " - "integrated Zendesk account. This endpoint will be excluded " - "from discovery." - ) - return False - else: - raise - return True - - -def discover(config): - client = Client(config) - ensure_credentials_are_authorized(client) - include_account_stream = is_account_endpoint_authorized(client) - catalog = Catalog([]) - for stream in streams_.all_streams: - if (not include_account_stream - and stream.tap_stream_id == streams_.ACCOUNT.tap_stream_id): - continue - raw_schema = load_schema(stream.tap_stream_id) - mdata = build_metadata(raw_schema, stream) - schema = Schema.from_dict(raw_schema) - catalog.streams.append(CatalogEntry( - stream=stream.tap_stream_id, - tap_stream_id=stream.tap_stream_id, - key_properties=stream.pk_fields, - schema=schema, - metadata=metadata.to_list(mdata) - )) - return catalog - -def build_metadata(raw_schema, stream): - - mdata = metadata.new() - metadata.write(mdata, (), 'valid-replication-keys', list(stream.replication_key)) - metadata.write(mdata, (), 'table-key-properties', list(stream.pk_fields)) - for prop in raw_schema['properties'].keys(): - if prop in stream.replication_key or prop in stream.pk_fields: - metadata.write(mdata, ('properties', prop), 'inclusion', 'automatic') - else: - metadata.write(mdata, ('properties', prop), 'inclusion', 'available') - - return mdata - - -def output_schema(stream): - schema = load_schema(stream.tap_stream_id) - singer.write_schema(stream.tap_stream_id, schema, stream.pk_fields) - - -def is_selected(stream): - mdata = metadata.to_map(stream.metadata) - return metadata.get(mdata, (), 'selected') - -def sync(ctx): - currently_syncing = ctx.state.get("currently_syncing") - start_idx = streams_.all_stream_ids.index(currently_syncing) \ - if currently_syncing else 0 - stream_ids_to_sync = [cs.tap_stream_id for cs in ctx.catalog.streams - if is_selected(cs)] - streams = [s for s in streams_.all_streams[start_idx:] - if s.tap_stream_id in stream_ids_to_sync] - for stream in streams: - ctx.state["currently_syncing"] = stream.tap_stream_id - output_schema(stream) - ctx.write_state() - stream.sync(ctx) - ctx.state["currently_syncing"] = None - ctx.write_state() - - -def main_impl(): - args = utils.parse_args(REQUIRED_CONFIG_KEYS) +@handle_top_exception(LOGGER) +def main(): + """performs sync and discovery.""" + args = parse_args(REQUIRED_CONFIG_KEYS) if args.discover: discover(args.config).dump() - print() else: - catalog = Catalog.from_dict(args.properties) \ - if args.properties else discover(args.config) - ctx = Context(args.config, args.state, catalog) + ctx = Context(args.config, args.state, args.catalog or discover(args.config)) sync(ctx) -def main(): - try: - main_impl() - except Exception as exc: - LOGGER.critical(exc) - raise exc if __name__ == "__main__": main() diff --git a/tap_zendesk_chat/context.py b/tap_zendesk_chat/context.py index a665750..5da6e53 100644 --- a/tap_zendesk_chat/context.py +++ b/tap_zendesk_chat/context.py @@ -1,22 +1,33 @@ from datetime import datetime -import singer +from typing import Dict, List + +from singer import Catalog, write_state +from singer.utils import now + from .http import Client + class Context: - def __init__(self, config, state, catalog): + """Wrapper Class Around state bookmarking.""" + + def __init__(self, config: Dict, state: Dict, catalog: Catalog): self.config = config self.state = state self.catalog = catalog self.client = Client(config) - self.now = datetime.utcnow() + self.now = now() @property def bookmarks(self): + """Provides read-only access to bookmarks, creates one if does not + exist.""" if "bookmarks" not in self.state: self.state["bookmarks"] = {} return self.state["bookmarks"] - def bookmark(self, path): + def bookmark(self, path: List): + """checks the state[file] for a nested path of bookmarks and returns + value.""" bookmark = self.bookmarks for p in path: if p not in bookmark: @@ -37,4 +48,4 @@ def update_start_date_bookmark(self, path): return val def write_state(self): - singer.write_state(self.state) + write_state(self.state) diff --git a/tap_zendesk_chat/discover.py b/tap_zendesk_chat/discover.py new file mode 100644 index 0000000..3ebffac --- /dev/null +++ b/tap_zendesk_chat/discover.py @@ -0,0 +1,67 @@ +import singer +from requests.exceptions import HTTPError +from singer.catalog import Catalog +from singer.metadata import get_standard_metadata, to_list, to_map, write + +from .http import Client +from .streams import STREAMS +from .utils import load_schema + +LOGGER = singer.get_logger() + + +def account_not_authorized(client): + # The account endpoint is restricted to zopim accounts, meaning integrated + # Zendesk accounts will get a 403 for this endpoint. + try: + client.request(STREAMS["account"].tap_stream_id) + except HTTPError as err: + if err.response.status_code == 403: + LOGGER.info( + "Ignoring 403 from account endpoint - this must be an \ + integrated Zendesk account. This endpoint will be excluded \ + from discovery" + ) + return True + raise + return False + + +def get_metadata(schema: dict, stream): + """tweaked inbuilt singer method to also mark the replication keys as + automatic fields.""" + stream_metadata = get_standard_metadata( + **{ + "schema": schema, + "key_properties": list(stream.key_properties), + "valid_replication_keys": list(stream.valid_replication_keys), + "replication_method": stream.forced_replication_method, + } + ) + stream_metadata = to_map(stream_metadata) + if stream.valid_replication_keys is not None: + for key in stream.valid_replication_keys: + stream_metadata = write(stream_metadata, ("properties", key), "inclusion", "automatic") + stream_metadata = to_list(stream_metadata) + return stream_metadata + + +def discover(config: dict) -> Catalog: + """discover function for tap-zendesk-chat.""" + if config: + client = Client(config) + client.request(STREAMS["chats"].tap_stream_id) + if account_not_authorized(client): + STREAMS.pop("account") + streams = [] + for stream_name, stream in STREAMS.items(): + schema = load_schema(stream.tap_stream_id) + streams.append( + { + "stream": stream_name, + "tap_stream_id": stream.tap_stream_id, + "schema": schema, + "metadata": get_metadata(schema, stream), + } + ) + return Catalog.from_dict({"streams": streams}) diff --git a/tap_zendesk_chat/http.py b/tap_zendesk_chat/http.py index 12ef80a..a9239b3 100644 --- a/tap_zendesk_chat/http.py +++ b/tap_zendesk_chat/http.py @@ -1,7 +1,8 @@ -import requests -from singer import metrics import backoff +import requests +from singer import get_logger, metrics +LOGGER = get_logger() BASE_URL = "https://www.zopim.com" @@ -11,27 +12,28 @@ class RateLimitException(Exception): class Client: def __init__(self, config): - # self.session = requests.Session() self.access_token = config["access_token"] - self.user_agent = config.get("user_agent") + self.user_agent = config.get("user_agent", "tap-zendesk-chat") + self.headers = {} + self.headers["Authorization"] = f"Bearer {self.access_token}" + self.headers["User-Agent"] = self.user_agent self.session = requests.Session() - @backoff.on_exception(backoff.expo, - RateLimitException, - max_tries=10, - factor=2) + @backoff.on_exception(backoff.expo, RateLimitException, max_tries=10, factor=2) def request(self, tap_stream_id, params=None, url=None, url_extra=""): - if not params: - params={} with metrics.http_request_timer(tap_stream_id) as timer: - url = url or BASE_URL + "/api/v2/" + tap_stream_id + url_extra - headers = {"Authorization": "Bearer " + self.access_token} - if self.user_agent: - headers["User-Agent"] = self.user_agent - request = requests.Request("GET", url, headers=headers, params=params) - response = self.session.send(request.prepare()) + + url = url or f"{BASE_URL}/api/v2/{tap_stream_id}{url_extra}" + LOGGER.info("calling %s %s", url, params) + response = self.session.get(url, headers=self.headers, params=params) timer.tags[metrics.Tag.http_status_code] = response.status_code + if response.status_code in [429, 502]: raise RateLimitException() + elif response.status_code == 400: + LOGGER.warning( + "The amount of data present for in %s stream is huge,\ + The api has a pagination limit of 251 pages, please reduce the search window for this stream" + ) response.raise_for_status() return response.json() diff --git a/tap_zendesk_chat/schemas/bans.json b/tap_zendesk_chat/schemas/bans.json index ed1541e..1538c2e 100644 --- a/tap_zendesk_chat/schemas/bans.json +++ b/tap_zendesk_chat/schemas/bans.json @@ -38,6 +38,13 @@ "null", "string" ] + }, + "created_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" } } } diff --git a/tap_zendesk_chat/schemas/chats.json b/tap_zendesk_chat/schemas/chats.json index a2c932a..c7829f1 100644 --- a/tap_zendesk_chat/schemas/chats.json +++ b/tap_zendesk_chat/schemas/chats.json @@ -103,10 +103,10 @@ "$ref": "chat_response_time" }, "session": { - "type": [ - "null", - "object" - ] + "type": [ + "null", + "object" + ] }, "history": { "items": { diff --git a/tap_zendesk_chat/schemas/shortcuts.json b/tap_zendesk_chat/schemas/shortcuts.json index a9228ed..6068203 100644 --- a/tap_zendesk_chat/schemas/shortcuts.json +++ b/tap_zendesk_chat/schemas/shortcuts.json @@ -48,6 +48,15 @@ "items": { "type": "integer" } + }, + "agents": { + "type": [ + "null", + "array" + ], + "items": { + "type": "integer" + } } }, "type": [ diff --git a/tap_zendesk_chat/streams.py b/tap_zendesk_chat/streams.py index 1c19241..b3f94a2 100644 --- a/tap_zendesk_chat/streams.py +++ b/tap_zendesk_chat/streams.py @@ -1,101 +1,122 @@ -from datetime import datetime, timedelta -from pendulum import parse as dt_parse -import singer -from singer import metrics, Transformer, metadata +from datetime import timedelta +from typing import Dict, List -LOGGER = singer.get_logger() +import singer +from singer import Transformer, metrics +from singer.utils import strptime_to_utc +from .utils import break_into_intervals -def break_into_intervals(days, start_time: str, now: datetime): - delta = timedelta(days=days) - start_dt = dt_parse(start_time) - while start_dt < now: - end_dt = min(start_dt + delta, now) - yield start_dt, end_dt - start_dt = end_dt +LOGGER = singer.get_logger() -class Stream: +class BaseStream: """Information about and functions for syncing streams. Important class properties: :var tap_stream_id: - :var pk_fields: A list of primary key fields""" + :var pk_fields: A list of primary key fields + """ - replication_key = set() - def __init__(self, tap_stream_id, pk_fields): - self.tap_stream_id = tap_stream_id - self.pk_fields = pk_fields + valid_replication_keys = set() + tap_stream_id = None def metrics(self, page): + "updates the metrics counter for the current stream" with metrics.record_counter(self.tap_stream_id) as counter: counter.increment(len(page)) - def format_response(self, response): - return [response] if isinstance(response, list) else response - - def write_page(self, page): + def write_page(self, page: List): """Formats a list of records in place and outputs the data to stdout.""" singer.write_records(self.tap_stream_id, page) self.metrics(page) + def sync(self, ctx, schema: Dict, stream_metadata: Dict, transformer: Transformer): + response = ctx.client.request(self.tap_stream_id) + page = [transformer.transform(rec, schema, metadata=stream_metadata) for rec in response] + self.write_page(page) + + +class Account(BaseStream): + + tap_stream_id = "account" + key_properties = ["account_key"] + forced_replication_method = "FULL_TABLE" + + def sync(self, ctx, schema: Dict, stream_metadata: Dict, transformer: Transformer): + response = ctx.client.request(self.tap_stream_id) + page = transformer.transform(response, schema, metadata=stream_metadata) + self.write_page([page]) + -class Everything(Stream): - def sync(self, ctx): - with Transformer() as transformer: - schema = ctx.catalog.get_stream(self.tap_stream_id).schema.to_dict() - m_data = metadata.to_map(ctx.catalog.get_stream(self.tap_stream_id).metadata) - response = ctx.client.request(self.tap_stream_id) - page = [transformer.transform(rec, schema, metadata=m_data) for rec in response] - self.write_page(page) +class Agents(BaseStream): + tap_stream_id = "agents" + key_properties = ["id"] + forced_replication_method = "FULL_TABLE" -class Agents(Stream): - def sync(self, ctx): + def sync(self, ctx, schema: Dict, stream_metadata: Dict, transformer: Transformer): since_id_offset = [self.tap_stream_id, "offset", "id"] since_id = ctx.bookmark(since_id_offset) or 0 - schema = ctx.catalog.get_stream(self.tap_stream_id).schema.to_dict() - m_data = metadata.to_map(ctx.catalog.get_stream(self.tap_stream_id).metadata) + while True: + params = { + "since_id": since_id, + "limit": ctx.config.get("agents_page_limit", 100), + } + page = ctx.client.request(self.tap_stream_id, params) + if not page: + break + self.write_page([transformer.transform(rec, schema, metadata=stream_metadata) for rec in page]) + since_id = page[-1]["id"] + 1 + ctx.set_bookmark(since_id_offset, since_id) + ctx.write_state() + ctx.set_bookmark(since_id_offset, None) - with Transformer() as transformer: - while True: - params = { - "since_id": since_id, - "limit": ctx.config.get("agents_page_limit", 100), - } - page = ctx.client.request(self.tap_stream_id, params) - if not page: - break - page = [transformer.transform(rec, schema, metadata=m_data) for rec in page] - self.write_page(page) - since_id = page[-1]["id"] + 1 - ctx.set_bookmark(since_id_offset, since_id) - ctx.write_state() + +class Bans(BaseStream): + + tap_stream_id = "bans" + key_properties = ["id"] + forced_replication_method = "FULL_TABLE" + + def sync(self, ctx, schema: Dict, stream_metadata: Dict, transformer: Transformer): + since_id_offset = [self.tap_stream_id, "offset", "id"] + since_id = ctx.bookmark(since_id_offset) or 0 + + while True: + params = { + "since_id": since_id, + "limit": ctx.config.get("bans_page_limit", 100), + } + response = ctx.client.request(self.tap_stream_id, params) + page = response.get("visitor", []) + response.get("ip_address", []) + if not page: + break + page = response["visitor"] + response["ip_address"] + self.write_page([transformer.transform(rec, schema, metadata=stream_metadata) for rec in page]) + since_id = page[-1]["id"] + 1 + ctx.set_bookmark(since_id_offset, since_id) + ctx.write_state() ctx.set_bookmark(since_id_offset, None) - ctx.write_state() -class Chats(Stream): - replication_key = {'timestamp', 'end_timestamp'} - def _bulk_chats(self, ctx, chat_ids): +class Chats(BaseStream): + + tap_stream_id = "chats" + key_properties = ["id"] + forced_replication_method = "INCREMENTAL" + valid_replication_keys = {"timestamp", "end_timestamp"} + + def _bulk_chats(self, ctx, chat_ids: List): if not chat_ids: return [] params = {"ids": ",".join(chat_ids)} body = ctx.client.request(self.tap_stream_id, params=params) return list(body["docs"].values()) - def _search(self, ctx, chat_type, ts_field, - start_dt: datetime, end_dt: datetime): - params = { - "q": "type:{} AND {}:[{} TO {}]" - .format(chat_type, ts_field, start_dt.isoformat(), end_dt.isoformat()) - } - return ctx.client.request( - self.tap_stream_id, params=params, url_extra="/search") - - def _pull(self, ctx, chat_type, ts_field, *, full_sync): + def _pull(self, ctx, chat_type, ts_field, full_sync, schema: Dict, stream_metadata: Dict, transformer: Transformer): """Pulls and writes pages of data for the given chat_type, where chat_type can be either "chat" or "offline_msg". @@ -114,93 +135,104 @@ def _pull(self, ctx, chat_type, ts_field, *, full_sync): start_time = ctx.update_start_date_bookmark(ts_bookmark_key) next_url = ctx.bookmark(url_offset_key) max_bookmark = start_time - schema = ctx.catalog.get_stream(self.tap_stream_id).schema.to_dict() - m_data = metadata.to_map(ctx.catalog.get_stream(self.tap_stream_id).metadata) - interval_days = 14 - interval_days_str = ctx.config.get("chat_search_interval_days") - if interval_days_str is not None: - interval_days = int(interval_days_str) + + interval_days = int(ctx.config.get("chat_search_interval_days", "14")) LOGGER.info("Using chat_search_interval_days: %s", interval_days) - intervals = break_into_intervals(interval_days, start_time, ctx.now) - with Transformer() as transformer: - for start_dt, end_dt in intervals: - while True: - if next_url: - search_resp = ctx.client.request(self.tap_stream_id, url=next_url) - else: - search_resp = self._search(ctx, chat_type, ts_field, start_dt, end_dt) - next_url = search_resp["next_url"] - ctx.set_bookmark(url_offset_key, next_url) - ctx.write_state() - chat_ids = [r["id"] for r in search_resp["results"]] - chats = self._bulk_chats(ctx, chat_ids) - if chats: - chats = [transformer.transform(rec, schema, metadata=m_data) for rec in chats] - self.write_page(chats) - max_bookmark = max(max_bookmark, *[c[ts_field] for c in chats]) - if not next_url: - break - ctx.set_bookmark(ts_bookmark_key, max_bookmark) + for start_dt, end_dt in break_into_intervals(interval_days, start_time, ctx.now): + while True: + if next_url: + search_resp = ctx.client.request(self.tap_stream_id, url=next_url) + else: + params = {"q": f"type:{chat_type} AND {ts_field}:[{start_dt.isoformat()} TO {end_dt.isoformat()}]"} + search_resp = ctx.client.request(self.tap_stream_id, params=params, url_extra="/search") + + next_url = search_resp["next_url"] + ctx.set_bookmark(url_offset_key, next_url) ctx.write_state() + chats = self._bulk_chats(ctx, [r["id"] for r in search_resp["results"]]) + if chats: + chats = [transformer.transform(rec, schema, metadata=stream_metadata) for rec in chats] + self.write_page(chats) + max_bookmark = max(max_bookmark, *[c[ts_field] for c in chats]) + if not next_url: + break + ctx.set_bookmark(ts_bookmark_key, max_bookmark) + ctx.write_state() - def _should_run_full_sync(self, ctx): + def _should_run_full_sync(self, ctx) -> bool: sync_days = ctx.config.get("chats_full_sync_days") if sync_days: last_sync = ctx.state.get("chats_last_full_sync") if not last_sync: LOGGER.info("Running full sync of chats: no last sync time") return True - next_sync = dt_parse(last_sync) + timedelta(days=int(sync_days)) + next_sync = strptime_to_utc(last_sync) + timedelta(days=int(sync_days)) if next_sync <= ctx.now: - LOGGER.info("Running full sync of chats: " - "last sync was %s, configured to run every %s days", - last_sync, sync_days) + LOGGER.info( + "Running full sync of chats: last sync was %s, configured to run every %s days", + last_sync, + sync_days, + ) return True return False - def sync(self, ctx): + def sync(self, ctx, schema: Dict, stream_metadata: Dict, transformer: Transformer): full_sync = self._should_run_full_sync(ctx) - self._pull(ctx, "chat", "end_timestamp", full_sync=full_sync) - self._pull(ctx, "offline_msg", "timestamp", full_sync=full_sync) + self._pull( + ctx, + "chat", + "end_timestamp", + full_sync=full_sync, + schema=schema, + stream_metadata=stream_metadata, + transformer=transformer, + ) + self._pull( + ctx, + "offline_msg", + "timestamp", + full_sync=full_sync, + schema=schema, + stream_metadata=stream_metadata, + transformer=transformer, + ) if full_sync: ctx.state["chats_last_full_sync"] = ctx.now.isoformat() ctx.write_state() -class Bans(Stream): - def sync(self, ctx): - with Transformer() as transformer: - schema = ctx.catalog.get_stream(self.tap_stream_id).schema.to_dict() - m_data = metadata.to_map(ctx.catalog.get_stream(self.tap_stream_id).metadata) - response = ctx.client.request(self.tap_stream_id) - page = response["visitor"] + response["ip_address"] - page = [transformer.transform(rec, schema, metadata=m_data) for rec in page] - self.write_page(page) - - -class Account(Stream): - def sync(self, ctx): - # The account endpoint returns a single item, so we have to wrap it in - # a list to write a "page" - with Transformer() as transformer: - schema = ctx.catalog.get_stream(self.tap_stream_id).schema.to_dict() - m_data = metadata.to_map(ctx.catalog.get_stream(self.tap_stream_id).metadata) - response = ctx.client.request(self.tap_stream_id) - page = transformer.transform(response, schema, metadata=m_data) - self.write_page([page]) - - -DEPARTMENTS = Everything("departments", ["id"]) -ACCOUNT = Account("account", ["account_key"]) -all_streams = [ - Agents("agents", ["id"]), - Chats("chats", ["id"]), - Everything("shortcuts", ["name"]), - Everything("triggers", ["id"]), - Bans("bans", ["id"]), - DEPARTMENTS, - Everything("goals", ["id"]), - ACCOUNT, -] -all_stream_ids = [s.tap_stream_id for s in all_streams] +class Departments(BaseStream): + tap_stream_id = "departments" + key_properties = ["id"] + forced_replication_method = "FULL_TABLE" + + +class Goals(BaseStream): + tap_stream_id = "goals" + key_properties = ["id"] + forced_replication_method = "FULL_TABLE" + + +class Shortcuts(BaseStream): + tap_stream_id = "shortcuts" + key_properties = ["name"] + forced_replication_method = "FULL_TABLE" + + +class Triggers(BaseStream): + tap_stream_id = "triggers" + key_properties = ["id"] + forced_replication_method = "FULL_TABLE" + + +STREAMS = { + Account.tap_stream_id: Account, + Agents.tap_stream_id: Agents, + Bans.tap_stream_id: Bans, + Chats.tap_stream_id: Chats, + Departments.tap_stream_id: Departments, + Goals.tap_stream_id: Goals, + Shortcuts.tap_stream_id: Shortcuts, + Triggers.tap_stream_id: Triggers, +} diff --git a/tap_zendesk_chat/sync.py b/tap_zendesk_chat/sync.py new file mode 100644 index 0000000..a351612 --- /dev/null +++ b/tap_zendesk_chat/sync.py @@ -0,0 +1,31 @@ +from singer import ( + Transformer, + get_logger, + metadata, + set_currently_syncing, + write_schema, + write_state, +) + +from .streams import STREAMS + +LOGGER = get_logger() + + +def sync(ctx): + """performs sync for selected streams.""" + with Transformer() as transformer: + for stream in ctx.catalog.get_selected_streams(ctx.state): + tap_stream_id = stream.tap_stream_id + stream_schema = stream.schema.to_dict() + stream_metadata = metadata.to_map(stream.metadata) + stream_obj = STREAMS[tap_stream_id]() + LOGGER.info("Starting sync for stream: %s", tap_stream_id) + ctx.state = set_currently_syncing(ctx.state, tap_stream_id) + ctx.write_state() + write_schema(tap_stream_id, stream_schema, stream_obj.key_properties, stream.replication_key) + stream_obj.sync(ctx, schema=stream_schema, stream_metadata=stream_metadata, transformer=transformer) + ctx.write_state() + + ctx.state = set_currently_syncing(ctx.state, None) + write_state(ctx.state) diff --git a/tap_zendesk_chat/utils.py b/tap_zendesk_chat/utils.py new file mode 100644 index 0000000..03f9ff2 --- /dev/null +++ b/tap_zendesk_chat/utils.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +from datetime import datetime, timedelta +from pathlib import Path + +import singer +from singer.utils import load_json, strptime_to_utc + + +def load_schema(tap_stream_id): + schema = load_json(Path(__file__).parent.resolve() / f"schemas/{tap_stream_id}.json") + dependencies = schema.pop("tap_schema_dependencies", []) + refs = {ref: load_schema(ref) for ref in dependencies} + if refs: + singer.resolve_schema_references(schema, refs) + return schema + + +def break_into_intervals(days, start_time: str, now: datetime): + delta = timedelta(days=days) + start_dt = strptime_to_utc(start_time) + while start_dt < now: + end_dt = min(start_dt + delta, now) + yield start_dt, end_dt + start_dt = end_dt diff --git a/tests/base.py b/tests/base.py index af864be..1501d96 100644 --- a/tests/base.py +++ b/tests/base.py @@ -1,24 +1,17 @@ -""" -Setup expectations for test sub classes -Run discovery for as a prerequisite for most tests -""" -import unittest -import os +"""Setup expectations for test sub classes Run discovery for as a prerequisite +for most tests.""" +import copy import json -import decimal +import os +import unittest from datetime import datetime as dt from datetime import timezone as tz -from singer import utils +from typing import Any, Dict, Set from tap_tester import connections, menagerie, runner -class BaseTapTest(unittest.TestCase): - """ - Setup expectations for test sub classes - Run discovery for as a prerequisite for most tests - """ - +class ZendeskChatBaseTest(unittest.TestCase): REPLICATION_KEYS = "valid-replication-keys" PRIMARY_KEYS = "table-key-properties" REPLICATION_METHOD = "forced-replication-method" @@ -26,28 +19,31 @@ class BaseTapTest(unittest.TestCase): FULL = "FULL_TABLE" START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.start_date = self.get_properties().get("start_date") + self.maxDiff = None + @staticmethod def tap_name(): - """The name of the tap""" + """The name of the tap.""" return "tap-zendesk-chat" @staticmethod def get_type(): - """the expected url route ending""" + """the expected url route ending.""" return "platform.zendesk-chat" def get_properties(self, original: bool = True): """Configuration properties required for the tap.""" - return_value = { - 'start_date': dt.strftime(dt.today(), self.START_DATE_FORMAT) - } + return_value = {"start_date": dt.strftime(dt.today(), self.START_DATE_FORMAT)} if original: return return_value # Start Date test needs the new connections start date to be prior to the default - assert self.start_date < return_value["start_date"] + self.assertTrue(self.start_date < return_value["start_date"]) # Assign start date to be the default return_value["start_date"] = self.start_date @@ -55,17 +51,15 @@ def get_properties(self, original: bool = True): @staticmethod def get_credentials(): - """Authentication information for the test account""" - return { - 'access_token': os.getenv('TAP_ZENDESK_CHAT_ACCESS_TOKEN') - } + """Authentication information for the test account.""" + return {"access_token": os.getenv("TAP_ZENDESK_CHAT_ACCESS_TOKEN")} def expected_metadata(self): - """The expected streams and metadata about the streams""" + """The expected streams and metadata about the streams.""" default = { self.PRIMARY_KEYS: {"id"}, - self.REPLICATION_METHOD: self.FULL, + self.REPLICATION_METHOD: self.FULL } shortcuts_rep_key = { @@ -73,7 +67,6 @@ def expected_metadata(self): self.REPLICATION_METHOD: self.FULL } - account_rep_key = { self.PRIMARY_KEYS: {"account_key"}, self.REPLICATION_METHOD: self.FULL @@ -81,14 +74,8 @@ def expected_metadata(self): chats_rep_key = { self.PRIMARY_KEYS: {"id"}, - self.REPLICATION_KEYS: {'timestamp', 'end_timestamp'}, - self.REPLICATION_METHOD: self.INCREMENTAL - } - - agents_rep_key = { - self.PRIMARY_KEYS: {"id"}, - self.REPLICATION_METHOD: self.FULL, - self.REPLICATION_KEYS: {'id'} + self.REPLICATION_KEYS: {"timestamp", "end_timestamp"}, + self.REPLICATION_METHOD: self.INCREMENTAL, } return { @@ -102,52 +89,56 @@ def expected_metadata(self): "account": account_rep_key, } - def expected_streams(self): - """A set of expected stream names""" + def expected_streams(self) -> Set: + """A set of expected stream names.""" return set(self.expected_metadata().keys()) - def expected_primary_keys(self): - """ - return a dictionary with key of table name - and value as a set of primary key fields - """ - return {table: properties.get(self.PRIMARY_KEYS, set()) - for table, properties in self.expected_metadata().items()} + def expected_primary_keys(self) -> Dict: + """return a dictionary with key of table name and value as a set of + primary key fields.""" + return { + table: properties.get(self.PRIMARY_KEYS, set()) for table, properties in self.expected_metadata().items() + } - def expected_replication_keys(self): - """ - return a dictionary with key of table name - and value as a set of replication key fields - """ - return {table: properties.get(self.REPLICATION_KEYS, set()) - for table, properties in self.expected_metadata().items()} + def expected_replication_keys(self) -> Dict: + """return a dictionary with key of table name and value as a set of + replication key fields.""" + return { + table: properties.get(self.REPLICATION_KEYS, set()) + for table, properties in self.expected_metadata().items() + } - def expected_automatic_fields(self): - return {table: self.expected_primary_keys().get(table) | self.expected_replication_keys().get(table) - for table in self.expected_metadata()} + def expected_automatic_fields(self) -> Dict: + return { + table: self.expected_primary_keys().get(table) | self.expected_replication_keys().get(table) + for table in self.expected_metadata() + } - def expected_replication_method(self): - """return a dictionary with key of table name nd value of replication method""" - return {table: properties.get(self.REPLICATION_METHOD, None) - for table, properties - in self.expected_metadata().items()} + def expected_replication_method(self) -> Dict: + """return a dictionary with key of table name and value of replication + method.""" + return { + table: properties.get(self.REPLICATION_METHOD, None) + for table, properties in self.expected_metadata().items() + } def setUp(self): - """Verify that you have set the prerequisites to run the tap (creds, etc.)""" - env_keys = {'TAP_ZENDESK_CHAT_ACCESS_TOKEN'} + """Verify that you have set the prerequisites to run the tap (creds, + etc.)""" + env_keys = {"TAP_ZENDESK_CHAT_ACCESS_TOKEN"} missing_envs = [x for x in env_keys if os.getenv(x) is None] if missing_envs: - raise Exception("Set environment variables: {}".format(missing_envs)) + raise Exception(f"Set environment variables: {missing_envs}") ######################### # Helper Methods # ######################### - def run_sync(self, conn_id): - """ - Run a sync job and make sure it exited properly. - Return a dictionary with keys of streams synced - and values of records synced for each stream + def run_sync(self, conn_id: int): + """Run a sync job and make sure it exited properly. + + Return a dictionary with keys of streams synced and values of + records synced for each stream """ # Run a sync job using orchestrator sync_job_name = runner.run_sync_mode(self, conn_id) @@ -158,52 +149,55 @@ def run_sync(self, conn_id): # Verify actual rows were synced sync_record_count = runner.examine_target_output_file( - self, conn_id, self.expected_streams(), self.expected_primary_keys()) + self, conn_id, self.expected_streams(), self.expected_primary_keys() + ) return sync_record_count @staticmethod def local_to_utc(date: dt): - """Convert a datetime with timezone information to utc""" - utc = dt(date.year, date.month, date.day, date.hour, date.minute, - date.second, date.microsecond, tz.utc) + """Convert a datetime with timezone information to utc.""" + utc = dt(date.year, date.month, date.day, date.hour, date.minute, date.second, date.microsecond, tz.utc) if date.tzinfo and hasattr(date.tzinfo, "_offset"): utc += date.tzinfo._offset return utc - def max_bookmarks_by_stream(self, sync_records): - """ - Return the maximum value for the replication key for the events stream - which is the bookmark expected value for updated records. + def max_bookmarks_by_stream(self, sync_records: Any): + """Return the maximum value for the replication key for the events + stream which is the bookmark expected value for updated records. - Comparisons are based on the class of the bookmark value. Dates will be - string compared which works for ISO date-time strings. + Comparisons are based on the class of the bookmark value. Dates + will be string compared which works for ISO date-time strings. """ max_bookmarks = {} chats_offline = [] chats = [] for stream, batch in sync_records.items(): - upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] + upsert_messages = [m for m in batch.get("messages") if m["action"] == "upsert"] if stream == "chats": for msg in upsert_messages: - if msg['data']['type'] == 'chat': + if msg["data"]["type"] == "chat": chats.append(msg) - elif msg['data']['type'] == 'offline_msg': + elif msg["data"]["type"] == "offline_msg": chats_offline.append(msg) else: - raise RuntimeError("Got unexpected chat type: " + msg['data']['type']) + raise RuntimeError("Got unexpected chat type: " + msg["data"]["type"]) chats_bookmark_key = "end_timestamp" chats_offline_bookmark_key = "timestamp" bk_values_chats = [message["data"].get(chats_bookmark_key) for message in chats] bk_values_chats_offline = [message["data"].get(chats_offline_bookmark_key) for message in chats_offline] - max_bookmarks['chats.chat'] = {chats_bookmark_key : max(bk_values_chats, default=None)} - max_bookmarks['chats.offline_msg'] = {chats_offline_bookmark_key : max(bk_values_chats_offline, default=None)} + max_bookmarks["chats.chat"] = {chats_bookmark_key: max(bk_values_chats, default=None)} + max_bookmarks["chats.offline_msg"] = { + chats_offline_bookmark_key: max(bk_values_chats_offline, default=None) + } else: stream_bookmark_key = self.expected_replication_keys().get(stream) or set() with self.subTest(stream=stream): - assert not stream_bookmark_key or len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key + assert ( + not stream_bookmark_key or len(stream_bookmark_key) == 1 + ) # There shouldn't be a compound replication key if not stream_bookmark_key: continue stream_bookmark_key = stream_bookmark_key.pop() @@ -221,34 +215,35 @@ def max_bookmarks_by_stream(self, sync_records): max_bookmarks[stream][stream_bookmark_key] = bk_value return max_bookmarks - - def min_bookmarks_by_stream(self, sync_records): - """ - Return the minimum value for the replication key for each stream - """ + def min_bookmarks_by_stream(self, sync_records: Any): + """Return the minimum value for the replication key for each stream.""" min_bookmarks = {} chats = [] chats_offline = [] for stream, batch in sync_records.items(): - upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] + upsert_messages = [m for m in batch.get("messages") if m["action"] == "upsert"] if stream == "chats": for msg in upsert_messages: - if msg['data']['type'] == 'chat': + if msg["data"]["type"] == "chat": chats.append(msg) - elif msg['data']['type'] == 'offline_msg': + elif msg["data"]["type"] == "offline_msg": chats_offline.append(msg) else: - raise RuntimeError("Got unexpected chat type: " + msg['data']['type']) + raise RuntimeError("Got unexpected chat type: " + msg["data"]["type"]) chats_bookmark_key = "end_timestamp" chats_offline_bookmark_key = "timestamp" bk_values_chats = [message["data"].get(chats_bookmark_key) for message in chats] bk_values_chats_offline = [message["data"].get(chats_offline_bookmark_key) for message in chats_offline] - min_bookmarks['chats.chat'] = {chats_bookmark_key : min(bk_values_chats, default=None)} - min_bookmarks['chats.offline_msg'] = {chats_offline_bookmark_key : min(bk_values_chats_offline, default=None)} + min_bookmarks["chats.chat"] = {chats_bookmark_key: min(bk_values_chats, default=None)} + min_bookmarks["chats.offline_msg"] = { + chats_offline_bookmark_key: min(bk_values_chats_offline, default=None) + } else: stream_bookmark_key = self.expected_replication_keys().get(stream) or set() with self.subTest(stream=stream): - assert not stream_bookmark_key or len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key + assert ( + not stream_bookmark_key or len(stream_bookmark_key) == 1 + ) # There shouldn't be a compound replication key if not stream_bookmark_key: continue stream_bookmark_key = stream_bookmark_key.pop() @@ -264,36 +259,34 @@ def min_bookmarks_by_stream(self, sync_records): if bk_value < min_bookmarks[stream][stream_bookmark_key]: min_bookmarks[stream][stream_bookmark_key] = bk_value - print(min_bookmarks) return min_bookmarks - - def select_all_streams_and_fields(self, conn_id, catalogs, select_all_fields: bool = True, exclude_streams=None): - """Select all streams and all fields within streams""" + def select_all_streams_and_fields( + self, conn_id: Any, catalogs: Any, select_all_fields: bool = True, exclude_streams=None + ): + """Select all streams and all fields within streams.""" for catalog in catalogs: - if exclude_streams and catalog.get('stream_name') in exclude_streams: + if exclude_streams and catalog.get("stream_name") in exclude_streams: continue - schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) + schema = menagerie.get_annotated_schema(conn_id, catalog["stream_id"]) non_selected_properties = [] if not select_all_fields: # get a list of all properties so that none are selected - non_selected_properties = schema.get('annotated-schema', {}).get( - 'properties', {}) + non_selected_properties = schema.get("annotated-schema", {}).get("properties", {}) # remove properties that are automatic - for prop in self.expected_automatic_fields().get(catalog['stream_name'], []): + for prop in self.expected_automatic_fields().get(catalog["stream_name"], []): if prop in non_selected_properties: del non_selected_properties[prop] non_selected_properties = non_selected_properties.keys() additional_md = [] connections.select_catalog_and_fields_via_metadata( - conn_id, catalog, schema, additional_md=additional_md, - non_selected_fields=non_selected_properties + conn_id, catalog, schema, additional_md=additional_md, non_selected_fields=non_selected_properties ) def create_connection(self, original_properties: bool = True, original_credentials: bool = True): - """Create a new connection with the test name""" + """Create a new connection with the test name.""" # Create the connection conn_id = connections.ensure_connection(self, original_properties, original_credentials) @@ -309,18 +302,18 @@ def create_connection(self, original_properties: bool = True, original_credentia def get_selected_fields_from_metadata(metadata): selected_fields = set() for field in metadata: - is_field_metadata = len(field['breadcrumb']) > 1 - inclusion_automatic_or_selected = (field['metadata']['inclusion'] == 'automatic' - or field['metadata']['selected'] is True) - if is_field_metadata and inclusion_automatic_or_selected: - selected_fields.add(field['breadcrumb'][1]) + is_field_metadata = len(field["breadcrumb"]) > 1 + if is_field_metadata: + inclusion_automatic_or_selected = ( + field["metadata"]["inclusion"] == "automatic" or field["metadata"]["selected"] is True + ) + if inclusion_automatic_or_selected: + selected_fields.add(field["breadcrumb"][1]) return selected_fields - - def run_and_verify_check_mode(self, conn_id): - """ - Run the tap in check mode and verify it succeeds. - This should be ran prior to field selection and initial sync. + def run_and_verify_check_mode(self, conn_id: Any): + """Run the tap in check mode and verify it succeeds. This should be ran + prior to field selection and initial sync. Return the connection id and found catalogs from menagerie. """ @@ -333,23 +326,20 @@ def run_and_verify_check_mode(self, conn_id): found_catalogs = menagerie.get_catalogs(conn_id) - self.assertGreater(len(found_catalogs), 0, msg="unable to locate schemas for connection {}".format(conn_id)) - found_catalog_names = set(map(lambda c: c['tap_stream_id'], found_catalogs)) + self.assertGreater(len(found_catalogs), 0, msg=f"unable to locate schemas for connection {conn_id}") + found_catalog_names = set(map(lambda c: c["tap_stream_id"], found_catalogs)) diff = self.expected_streams().symmetric_difference(found_catalog_names) - self.assertEqual(len(diff), 0, msg="discovered schemas do not match: {}".format(diff)) - print("discovered schemas are OK") - + self.assertEqual(len(diff), 0, msg=f"discovered schemas do not match: {diff}") return found_catalogs - def run_and_verify_sync(self, conn_id, clear_state=False): - """ - Clear the connections state in menagerie and Run a Sync. - Verify the exit code following the sync. + def run_and_verify_sync(self, conn_id, clear_state: bool = False): + """Clear the connections state in menagerie and Run a Sync. Verify the + exit code following the sync. Return the connection id and record count by stream """ if clear_state: - #clear state + # clear state menagerie.set_state(conn_id, {}) # run sync @@ -360,59 +350,64 @@ def run_and_verify_sync(self, conn_id, clear_state=False): menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) # read target output - record_count_by_stream = runner.examine_target_output_file(self, conn_id, - self.expected_streams(), - self.expected_primary_keys()) + record_count_by_stream = runner.examine_target_output_file( + self, conn_id, self.expected_streams(), self.expected_primary_keys() + ) return record_count_by_stream - def perform_and_verify_table_and_field_selection(self, conn_id, found_catalogs, streams_to_select, select_all_fields=True): - """ - Perform table and field selection based off of the streams to select set and field selection parameters. - Verfify this results in the expected streams selected and all or no fields selected for those streams. + def perform_and_verify_table_and_field_selection( + self, conn_id: Any, found_catalogs: Any, streams_to_select: Any, select_all_fields: bool = True + ): + """Perform table and field selection based off of the streams to select + set and field selection parameters. + + Verify this results in the expected streams selected and all or + no fields selected for those streams. """ # Select all available fields or select no fields from all testable streams exclude_streams = self.expected_streams().difference(streams_to_select) self.select_all_streams_and_fields( - conn_id=conn_id, catalogs=found_catalogs, select_all_fields=select_all_fields, exclude_streams=exclude_streams + conn_id=conn_id, + catalogs=found_catalogs, + select_all_fields=select_all_fields, + exclude_streams=exclude_streams, ) catalogs = menagerie.get_catalogs(conn_id) # Ensure our selection worked for cat in catalogs: - catalog_entry = menagerie.get_annotated_schema(conn_id, cat['stream_id']) + catalog_entry = menagerie.get_annotated_schema(conn_id, cat["stream_id"]) # Verify all testable streams are selected - selected = catalog_entry.get('annotated-schema').get('selected') - print("Validating selection on {}: {}".format(cat['stream_name'], selected)) - if cat['stream_name'] not in streams_to_select: + selected = catalog_entry.get("annotated-schema").get("selected") + if cat["stream_name"] not in streams_to_select: self.assertFalse(selected, msg="Stream selected, but not testable.") - continue # Skip remaining assertions if we aren't selecting this stream + continue # Skip remaining assertions if we aren't selecting this stream self.assertTrue(selected, msg="Stream not selected.") if select_all_fields: # Verify all fields within each selected stream are selected - for field, field_props in catalog_entry.get('annotated-schema').get('properties').items(): - field_selected = field_props.get('selected') - print("\tValidating selection on {}.{}: {}".format(cat['stream_name'], field, field_selected)) + for field, field_props in catalog_entry.get("annotated-schema").get("properties").items(): + field_selected = field_props.get("selected") self.assertTrue(field_selected, msg="Field not selected.") else: # Verify only automatic fields are selected - expected_automatic_fields = self.expected_automatic_fields().get(cat['tap_stream_id']) - selected_fields = self.get_selected_fields_from_metadata(catalog_entry['metadata']) + expected_automatic_fields = self.expected_automatic_fields().get(cat["tap_stream_id"]) + selected_fields = self.get_selected_fields_from_metadata(catalog_entry["metadata"]) self.assertEqual(expected_automatic_fields, selected_fields) - def expected_schema_keys(self, stream): - props = self._load_schemas(stream).get(stream).get('properties') + def expected_schema_keys(self, stream: Any): + props = self._load_schemas(stream).get(stream).get("properties") if not props: - props = self._load_schemas(stream, shared=True).get(stream).get('properties') + props = self._load_schemas(stream, shared=True).get(stream).get("properties") assert props, "schema not configured proprerly" return props.keys() @staticmethod - def _get_abs_path(path): + def _get_abs_path(path: str): return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) def _load_schemas(self, stream, shared: bool = False): @@ -420,14 +415,46 @@ def _load_schemas(self, stream, shared: bool = False): file_name = "shared/" + stream[:-1] + ".json" if shared else stream + ".json" path = self._get_abs_path("schemas") + "/" + file_name - final_path = path.replace('tests', self.tap_name().replace('-', '_')) + final_path = path.replace("tests", self.tap_name().replace("-", "_")) with open(final_path) as file: schemas[stream] = json.load(file) return schemas - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.start_date = self.get_properties().get('start_date') - self.maxDiff=None + def create_interrupt_sync_state(self, state: dict, interrupt_stream: str, pending_streams: list, sync_records: Any): + """Creates a state for simulating a interrupted sync and backdating + bookmarks for interrupted stream.""" + + interrupted_sync_states = copy.deepcopy(state) + bookmark_state = interrupted_sync_states["bookmarks"] + # Set the interrupt stream as currently syncing + interrupted_sync_states["currently_syncing"] = interrupt_stream + + # Remove bookmark for completed streams to set them as pending + # Setting value to start date wont be needed as all other streams are full_table + for stream in pending_streams: + bookmark_state.pop(stream, None) + + # update state for chats stream and set the bookmark to a date earlier + chats_bookmark = bookmark_state.get("chats", {}) + chats_bookmark.pop("offset", None) + chats_rec, offline_msgs_rec = [], [] + for record in sync_records.get("chats").get("messages"): + if record.get("action") == "upsert": + rec = record.get("data") + if rec["type"] == "offline_msg": + offline_msgs_rec.append(rec) + else: + chats_rec.append(rec) + + # set a deferred bookmark value for both the bookmarks of chat stream + chat_index = len(chats_rec) // 2 if len(chats_rec) > 1 else 0 + chats_bookmark["chat.end_timestamp"] = chats_rec[chat_index]["end_timestamp"] + + msg_index = len(offline_msgs_rec) // 2 if len(offline_msgs_rec) > 1 else 0 + chats_bookmark["offline_msg.timestamp"] = offline_msgs_rec[msg_index]["timestamp"] + + bookmark_state["chats"] = chats_bookmark + interrupted_sync_states["bookmarks"] = bookmark_state + return interrupted_sync_states diff --git a/tests/test_all_fields.py b/tests/test_all_fields.py new file mode 100644 index 0000000..080e778 --- /dev/null +++ b/tests/test_all_fields.py @@ -0,0 +1,95 @@ +"""Test that with no fields selected for a stream automatic fields are still +replicated.""" +from base import ZendeskChatBaseTest +from tap_tester import connections, menagerie, runner + + +class TestZendeskChatAllFields(ZendeskChatBaseTest): + """Test that all fields selected for a stream are replicated.""" + + @staticmethod + def name(): + return "tap_tester_zendesk_chat_all_fields" + + KNOWN_MISSING_FIELDS = { + "agents": { + "scope", + }, + "account": { + "billing", + }, + "shortcuts": { + "departments", + "agents", + }, + } + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + + return_value = { + "start_date": "2017-01-15T00:00:00Z", + "chat_search_interval_days": 500, + } + + if original: + return return_value + + # Start Date test needs the new connections start date to be prior to the default + self.assertTrue(self.start_date < return_value["start_date"]) + + # Assign start date to be the default + return_value["start_date"] = self.start_date + return return_value + + def test_run(self): + """ + - Verify no unexpected streams were replicated + - Verify that more than just the automatic fields are replicated for each stream. + - Verify all fields for each stream are replicated + """ + expected_streams = self.expected_streams() + conn_id = connections.ensure_connection(self) + found_catalogs = self.run_and_verify_check_mode(conn_id) + catalog_entries = [catalog for catalog in found_catalogs if catalog.get("stream_name") in expected_streams] + self.perform_and_verify_table_and_field_selection( + conn_id, catalog_entries, expected_streams, select_all_fields=True + ) + stream_all_fields = dict() + + for catalog in catalog_entries: + stream_id, stream_name = catalog["stream_id"], catalog["stream_name"] + catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id) + fields_from_field_level_md = [ + md_entry["breadcrumb"][1] for md_entry in catalog_entry["metadata"] if md_entry["breadcrumb"] != [] + ] + stream_all_fields[stream_name] = set(fields_from_field_level_md) + + record_count_by_stream = self.run_and_verify_sync(conn_id) + synced_records = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + + expected_all_keys = stream_all_fields[stream] + expected_automatic_keys = self.expected_automatic_fields().get(stream) + data = synced_records.get(stream) + actual_all_keys = set() + + for message in data["messages"]: + if message["action"] == "upsert": + actual_all_keys.update(message["data"].keys()) + + self.assertTrue( + expected_automatic_keys.issubset(actual_all_keys), + msg=f'{expected_automatic_keys-actual_all_keys} is not in "expected_all_keys"', + ) + + self.assertGreater( + record_count_by_stream.get(stream, -1), + 0, + msg="The number of records is not over the stream max limit", + ) + expected_all_keys = expected_all_keys - self.KNOWN_MISSING_FIELDS.get(stream, set()) + self.assertGreaterEqual(len(expected_all_keys), len(actual_all_keys)) + self.assertSetEqual(expected_all_keys, actual_all_keys) diff --git a/tests/test_automatic_fields.py b/tests/test_automatic_fields.py new file mode 100644 index 0000000..841baf7 --- /dev/null +++ b/tests/test_automatic_fields.py @@ -0,0 +1,104 @@ +"""Test that with no fields selected for a stream automatic fields are still +replicated.""" +from typing import Dict + +from base import ZendeskChatBaseTest +from tap_tester import connections, menagerie, runner + + +class TestZendeskChatAutomaticFields(ZendeskChatBaseTest): + """Test that with no fields selected for a stream automatic fields are + still replicated.""" + + @staticmethod + def name(): + return "tap_tester_zendesk_chat_automatic_fields" + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + + return_value = { + "start_date": "2017-01-15T00:00:00Z", + "chat_search_interval_days": 500, + } + + if original: + return return_value + + # Start Date test needs the new connections start date to be prior to the default + self.assertTrue(self.start_date < return_value["start_date"]) + + # Assign start date to be the default + return_value["start_date"] = self.start_date + return return_value + + def get_chat_type_mapping(self, conn_id: str) -> Dict: + """performs a sync with all fields to get data on chat type mapping to + make correct assertions based on chat type. + + returns {"chat_id":"type"} + """ + + expected_streams = self.expected_streams() + menagerie.set_state(conn_id, {}) + found_catalogs = self.run_and_verify_check_mode(conn_id) + catalog_entries = [catalog for catalog in found_catalogs if catalog.get("stream_name") in expected_streams] + self.perform_and_verify_table_and_field_selection( + conn_id, catalog_entries, expected_streams, select_all_fields=True + ) + self.run_and_verify_sync(conn_id) + synced_records = runner.get_records_from_target_output() + data = synced_records.get("chats", {})["messages"] + chat_type_mapping = {row["data"]["id"]: row["data"]["type"] for row in data if row["action"] == "upsert"} + return chat_type_mapping + + def test_run(self): + """ + - Verify we can deselect all fields except when inclusion=automatic, which is handled by base.py methods + - Verify that only the automatic fields are sent to the target. + - Verify that all replicated records have unique primary key values. + """ + + expected_streams = self.expected_streams() + + conn_id = connections.ensure_connection(self) + found_catalogs = self.run_and_verify_check_mode(conn_id) + catalog_entries = [catalog for catalog in found_catalogs if catalog.get("stream_name") in expected_streams] + self.perform_and_verify_table_and_field_selection( + conn_id, catalog_entries, expected_streams, select_all_fields=False + ) + + # run initial sync + record_count_by_stream = self.run_and_verify_sync(conn_id) + synced_records = runner.get_records_from_target_output() + + chat_mapping = self.get_chat_type_mapping(conn_id) + + for stream in expected_streams: + with self.subTest(stream=stream): + + expected_keys = self.expected_automatic_fields().get(stream) + + data = synced_records.get(stream, {}) + record_messages_keys = [set(row["data"].keys()) for row in data["messages"]] + + self.assertGreater( + record_count_by_stream.get(stream, -1), + 0, + msg="The number of records is not over the stream max limit", + ) + if stream == "chats": + # chats stream has two types of records "offline_msgs" and "chat" both of them have different replication keys + # the key "end_timestamp" is not available for "offline_msgs" + # hence we need to verify the record has both or atleaset one key + expected_keys_offline_msg = self.expected_automatic_fields().get(stream) - {"end_timestamp"} + for row in data["messages"]: + rec = row["data"] + actual_keys = set(rec.keys()) + if chat_mapping[rec["id"]] == "offline_msg": + self.assertSetEqual(actual_keys, expected_keys_offline_msg) + elif chat_mapping[rec["id"]] == "chat": + self.assertSetEqual(actual_keys, expected_keys) + else: + for actual_keys in record_messages_keys: + self.assertSetEqual(expected_keys, actual_keys) diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py index 6a31258..f8eb166 100644 --- a/tests/test_bookmarks.py +++ b/tests/test_bookmarks.py @@ -1,27 +1,12 @@ -import os -import datetime -import dateutil.parser -import pytz +from base import ZendeskChatBaseTest +from tap_tester import connections, menagerie, runner -from tap_tester import runner, menagerie, connections +STREAMS_WITH_BOOKMARKS = ["agents", "chats"] -from base import BaseTapTest - -STREAMS_WITH_BOOKMARKS = ['agents', 'chats'] - -class BookmarksTest(BaseTapTest): - - expected_record_count = { - 'agents': 3, - 'chats': 223, - 'bans': 22, - 'account': 1, - 'shortcuts': 4, - 'triggers': 12, - 'departments': 1, - 'goals': 2, - } +class TestZendeskChatBookmarks(ZendeskChatBaseTest): + """Test tap sets a bookmark and respects it for the next sync of a + stream.""" @staticmethod def name(): @@ -29,10 +14,7 @@ def name(): def get_properties(self, original: bool = True): """Configuration properties required for the tap.""" - return_value = { - 'start_date': '2017-08-15T00:00:00Z', - 'agents_page_limit': 1, - } + return_value = {"start_date": "2017-01-15T00:00:00Z", "agents_page_limit": 1, "chat_search_interval_days": 2} if original: return return_value @@ -40,14 +22,24 @@ def get_properties(self, original: bool = True): return return_value - def test_run(self): - expected_streams = self.expected_streams() + """ + - Verify that for each stream you can do a sync which records bookmarks. + - Verify that the bookmark is the maximum value sent to the target for the replication key. + - Verify that a second sync respects the bookmark + All data of the second sync is >= the bookmark from the first sync + The number of records in the 2nd sync is less then the first + - Verify that for full table stream, all data replicated in sync 1 is replicated again in sync 2. + + PREREQUISITE + For EACH stream that is incrementally replicated there are multiple rows of data with + different values for the replication key + """ + expected_streams = self.expected_streams() # Testing against ads insights objects - self.start_date = self.get_properties()['start_date'] + self.start_date = self.get_properties()["start_date"] - """A Parametrized Bookmarks Test""" expected_replication_keys = self.expected_replication_keys() expected_replication_methods = self.expected_replication_method() @@ -61,15 +53,16 @@ def test_run(self): found_catalogs = self.run_and_verify_check_mode(conn_id) # Select only the expected streams tables - catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams] - self.perform_and_verify_table_and_field_selection(conn_id, catalog_entries, expected_streams, select_all_fields=True) + catalog_entries = [ce for ce in found_catalogs if ce["tap_stream_id"] in expected_streams] + self.perform_and_verify_table_and_field_selection( + conn_id, catalog_entries, expected_streams, select_all_fields=True + ) # Run a sync job using orchestrator first_sync_record_count = self.run_and_verify_sync(conn_id) first_sync_records = runner.get_records_from_target_output() first_sync_bookmarks = menagerie.get_state(conn_id) - ########################################################################## ### Second Sync ########################################################################## @@ -91,96 +84,98 @@ def test_run(self): # collect information for assertions from syncs 1 & 2 base on expected values first_sync_count = first_sync_record_count.get(stream, 0) second_sync_count = second_sync_record_count.get(stream, 0) - first_sync_messages = [record.get('data') for record in - first_sync_records.get(stream).get('messages') - if record.get('action') == 'upsert'] - second_sync_messages = [record.get('data') for record in - second_sync_records.get(stream).get('messages') - if record.get('action') == 'upsert'] - first_bookmark_key_value = first_sync_bookmarks.get('bookmarks', {}).get(stream) - second_bookmark_key_value = second_sync_bookmarks.get('bookmarks', {}).get(stream) - - # Assert we synced the expected number of records. Ensures pagination happens - self.assertEqual(first_sync_count, self.expected_record_count[stream]) - - - if expected_replication_method == self.INCREMENTAL: # chats is the only incremental stream - - # collect information specific to incremental streams from syncs 1 & 2 - replication_key = next(iter(expected_replication_keys[stream])) + first_sync_messages = [ + record.get("data") + for record in first_sync_records.get(stream).get("messages") + if record.get("action") == "upsert" + ] + second_sync_messages = [ + record.get("data") + for record in second_sync_records.get(stream).get("messages") + if record.get("action") == "upsert" + ] + first_bookmark_key_value = first_sync_bookmarks.get("bookmarks", {}).get(stream) + second_bookmark_key_value = second_sync_bookmarks.get("bookmarks", {}).get(stream) + + if expected_replication_method == self.INCREMENTAL: # chats is the only incremental stream # Verify the first sync sets a bookmark of the expected form self.assertIsNotNone(first_bookmark_key_value) - self.assertIsNotNone(first_bookmark_key_value.get('chat.end_timestamp')) - self.assertIsNotNone(first_bookmark_key_value.get('offline_msg.timestamp')) + self.assertIsNotNone(first_bookmark_key_value.get("chat.end_timestamp")) + self.assertIsNotNone(first_bookmark_key_value.get("offline_msg.timestamp")) # Verify the second sync sets a bookmark of the expected form self.assertIsNotNone(second_bookmark_key_value) - self.assertIsNotNone(second_bookmark_key_value.get('chat.end_timestamp')) - self.assertIsNotNone(second_bookmark_key_value.get('offline_msg.timestamp')) + self.assertIsNotNone(second_bookmark_key_value.get("chat.end_timestamp")) + self.assertIsNotNone(second_bookmark_key_value.get("offline_msg.timestamp")) # Verify the second sync bookmark is Equal to the first sync bookmark - self.assertEqual(second_bookmark_key_value, first_bookmark_key_value) # assumes no changes to data during test + self.assertEqual( + second_bookmark_key_value, first_bookmark_key_value + ) # assumes no changes to data during test for record in second_sync_messages: - if record.get('type') == 'chat': + if record.get("type") == "chat": # Verify the second sync records respect the previous (simulated) bookmark value - replication_key_value = record.get('end_timestamp') + replication_key_value = record.get("end_timestamp") # Verify the second sync bookmark value is the max replication key value for a given stream self.assertLessEqual( replication_key_value, - second_bookmark_key_value.get('chat.end_timestamp'), - msg="Second sync bookmark was set incorrectly, a record with a greater replication-key value was synced.") + second_bookmark_key_value.get("chat.end_timestamp"), + msg="Second sync bookmark was set incorrectly, a record with a greater replication-key value was synced.", + ) - elif record.get('type') == 'offline_msg': + elif record.get("type") == "offline_msg": # Verify the second sync records respect the previous (simulated) bookmark value - replication_key_value = record.get('timestamp') + replication_key_value = record.get("timestamp") # Verify the second sync bookmark value is the max replication key value for a given stream self.assertLessEqual( replication_key_value, - second_bookmark_key_value.get('offline_msg.timestamp'), - msg="Second sync bookmark was set incorrectly, a record with a greater replication-key value was synced.") + second_bookmark_key_value.get("offline_msg.timestamp"), + msg="Second sync bookmark was set incorrectly, a record with a greater replication-key value was synced.", + ) else: - assert(False) + assert False for record in first_sync_messages: - if record.get('type') == 'chat': + if record.get("type") == "chat": # Verify the first sync records respect the previous (simulated) bookmark value - replication_key_value = record.get('end_timestamp') + replication_key_value = record.get("end_timestamp") # Verify the second sync bookmark value is the max replication key value for a given stream self.assertLessEqual( replication_key_value, - first_bookmark_key_value.get('chat.end_timestamp'), - msg="First sync bookmark was set incorrectly, a record with a greater replication-key value was synced.") + first_bookmark_key_value.get("chat.end_timestamp"), + msg="First sync bookmark was set incorrectly, a record with a greater replication-key value was synced.", + ) - elif record.get('type') == 'offline_msg': + elif record.get("type") == "offline_msg": # Verify the first sync records respect the previous (simulated) bookmark value - replication_key_value = record.get('timestamp') + replication_key_value = record.get("timestamp") # Verify the first sync bookmark value is the max replication key value for a given stream self.assertLessEqual( replication_key_value, - first_bookmark_key_value.get('offline_msg.timestamp'), - msg="First sync bookmark was set incorrectly, a record with a greater replication-key value was synced.") + first_bookmark_key_value.get("offline_msg.timestamp"), + msg="First sync bookmark was set incorrectly, a record with a greater replication-key value was synced.", + ) else: - assert(False) + assert False # Verify the number of records in the 2nd sync is less then the first self.assertLess(second_sync_count, first_sync_count) - elif expected_replication_method == self.FULL: # Verify the number of records in the second sync is the same as the first self.assertEqual(second_sync_count, first_sync_count) - if stream == 'agents': - self.assertEqual(first_bookmark_key_value, second_bookmark_key_value, {'offset': {'id': None}}) + if stream in ("agents", "bans"): + self.assertEqual(first_bookmark_key_value, second_bookmark_key_value, {"offset": {"id": None}}) else: # Verify the syncs do not set a bookmark for full table streams self.assertIsNone(first_bookmark_key_value) @@ -188,7 +183,10 @@ def test_run(self): else: raise NotImplementedError( - "INVALID EXPECTATIONS\t\tSTREAM: {} REPLICATION_METHOD: {}".format(stream, expected_replication_method)) + "INVALID EXPECTATIONS\t\tSTREAM: {} REPLICATION_METHOD: {}".format( + stream, expected_replication_method + ) + ) # Verify at least 1 record was replicated in the second sync - self.assertGreater(second_sync_count, 0, msg="We are not fully testing bookmarking for {}".format(stream)) + self.assertGreater(second_sync_count, 0, msg=f"We are not fully testing bookmarking for {stream}") diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 208fd0b..9a499c3 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -1,121 +1,111 @@ -""" -Test tap discovery -""" +"""Test tap discovery.""" import re -import unittest -from tap_tester import menagerie, connections -from base import BaseTapTest +from base import ZendeskChatBaseTest +from tap_tester import connections, menagerie -class DiscoveryTest(BaseTapTest): - """ Test the tap discovery """ - +class TestZendeskChatDiscovery(ZendeskChatBaseTest): @staticmethod def name(): - return "tap_tester_tap_zendesk_chat_discovery_test" + return "tap_tester_tap_zendesk_chat_discovery" def test_run(self): - """ - Verify that discover creates the appropriate catalog, schema, metadata, etc. - - • Verify number of actual streams discovered match expected - • Verify the stream names discovered were what we expect - • Verify stream names follow naming convention - streams should only have lowercase alphas and underscores - • verify there is only 1 top level breadcrumb - • verify replication key(s) - • verify primary key(s) - • verify the actual replication matches our expected replication method - • verify that primary, replication and foreign keys - are given the inclusion of automatic (metadata and annotated schema). - • verify that all other fields have inclusion of available (metadata and schema) + """Testing that discovery creates the appropriate catalog with valid + metadata. + + - Verify number of actual streams discovered match expected + - Verify the stream names discovered were what we expect + - Verify stream names follow naming convention (streams should only have lowercase alphas and underscores_ + - verify there is only 1 top level breadcrumb + - verify replication key(s) + - verify primary key(s) + - verify the actual replication matches our expected replication method + - verify that primary, replication and foreign keys are given the inclusion of automatic (metadata and annotated schema). + - verify that all other fields have inclusion of available (metadata and schema) """ conn_id = connections.ensure_connection(self) # Verify number of actual streams discovered match expected found_catalogs = self.run_and_verify_check_mode(conn_id) - self.assertGreater(len(found_catalogs), 0, - msg="unable to locate schemas for connection {}".format(conn_id)) - self.assertEqual(len(found_catalogs), - len(self.expected_streams()), - msg="Expected {} streams, actual was {} for connection {}," - " actual {}".format( - len(self.expected_streams()), - len(found_catalogs), - found_catalogs, - conn_id)) + self.assertGreater(len(found_catalogs), 0, msg=f"unable to locate schemas for connection {conn_id}") + self.assertEqual( + len(found_catalogs), + len(self.expected_streams()), + msg="Expected {} streams, actual was {} for connection {}," + " actual {}".format(len(self.expected_streams()), len(found_catalogs), found_catalogs, conn_id), + ) # Verify the stream names discovered were what we expect - found_catalog_names = {c['tap_stream_id'] for c in found_catalogs} - self.assertEqual(set(self.expected_streams()), - set(found_catalog_names), - msg="Expected streams don't match actual streams") + found_catalog_names = {c["tap_stream_id"] for c in found_catalogs} + self.assertEqual( + set(self.expected_streams()), set(found_catalog_names), msg="Expected streams don't match actual streams" + ) # Verify stream names follow naming convention # streams should only have lowercase alphas and underscores - self.assertTrue(all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]), - msg="One or more streams don't follow standard naming") + self.assertTrue( + all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]), + msg="One or more streams don't follow standard naming", + ) for stream in self.expected_streams(): with self.subTest(stream=stream): - catalog = next(iter([catalog for catalog in found_catalogs - if catalog["stream_name"] == stream])) + catalog = next(iter([catalog for catalog in found_catalogs if catalog["stream_name"] == stream])) assert catalog # based on previous tests this should always be found - schema_and_metadata = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) + schema_and_metadata = menagerie.get_annotated_schema(conn_id, catalog["stream_id"]) metadata = schema_and_metadata["metadata"] schema = schema_and_metadata["annotated-schema"] # verify the stream level properties are as expected # verify there is only 1 top level breadcrumb stream_properties = [item for item in metadata if item.get("breadcrumb") == []] - self.assertTrue(len(stream_properties) == 1, - msg="There is more than one top level breadcrumb") + self.assertTrue(len(stream_properties) == 1, msg="There is more than one top level breadcrumb") # verify replication key(s) - actual = set(stream_properties[0].get("metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS) or []) + actual = set( + stream_properties[0].get("metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS) or [] + ) expected = self.expected_replication_keys()[stream] or set() - self.assertEqual( - actual, - expected, - msg="expected replication key {} but actual is {}".format( - expected, actual)) + self.assertEqual(actual, expected, msg=f"expected replication key {expected} but actual is {actual}") # verify primary key(s) self.assertEqual( - set(stream_properties[0].get( - "metadata", {self.PRIMARY_KEYS: []}).get(self.PRIMARY_KEYS, [])), + set(stream_properties[0].get("metadata", {self.PRIMARY_KEYS: []}).get(self.PRIMARY_KEYS, [])), self.expected_primary_keys()[stream], msg="expected primary key {} but actual is {}".format( self.expected_primary_keys()[stream], - set(stream_properties[0].get( - "metadata", {self.PRIMARY_KEYS: None}).get(self.PRIMARY_KEYS, [])))) - + set(stream_properties[0].get("metadata", {self.PRIMARY_KEYS: None}).get(self.PRIMARY_KEYS, [])), + ), + ) expected_automatic_fields = self.expected_automatic_fields()[stream] or set() # verify that primary and replication keys # are given the inclusion of automatic in metadata. - actual_automatic_fields = {mdata['breadcrumb'][-1] - for mdata in metadata - if mdata['breadcrumb'] and mdata['metadata']['inclusion'] == 'automatic'} + actual_automatic_fields = { + mdata["breadcrumb"][-1] + for mdata in metadata + if mdata["breadcrumb"] and mdata["metadata"]["inclusion"] == "automatic" + } - actual_available_fields = {mdata['breadcrumb'][-1] - for mdata in metadata - if mdata['breadcrumb'] and mdata['metadata']['inclusion'] == 'available'} + actual_available_fields = { + mdata["breadcrumb"][-1] + for mdata in metadata + if mdata["breadcrumb"] and mdata["metadata"]["inclusion"] == "available" + } - self.assertEqual(expected_automatic_fields, - actual_automatic_fields, - msg="expected {} automatic fields but got {}".format( - expected_automatic_fields, - actual_automatic_fields)) + self.assertEqual( + expected_automatic_fields, + actual_automatic_fields, + msg="expected {} automatic fields but got {}".format( + expected_automatic_fields, actual_automatic_fields + ), + ) # verify that all other fields have inclusion of available # This assumes there are no unsupported fields for SaaS sources - self.assertSetEqual( - actual_available_fields, - set(schema['properties']) - actual_automatic_fields - ) + self.assertSetEqual(actual_available_fields, set(schema["properties"]) - actual_automatic_fields) diff --git a/tests/test_interupted_sync.py b/tests/test_interupted_sync.py new file mode 100644 index 0000000..45b0dcd --- /dev/null +++ b/tests/test_interupted_sync.py @@ -0,0 +1,152 @@ +"""Test that with no fields selected for a stream automatic fields are still +replicated.""" +from base import ZendeskChatBaseTest +from singer.utils import strptime_to_utc +from tap_tester import connections, menagerie, runner + + +class TestZendeskChatDiscoveryInteruptibleSync(ZendeskChatBaseTest): + """Test tap's ability to recover from an interrupted sync.""" + + @staticmethod + def name(): + return "tap_tester_zendesk_chat_interrupted_sync" + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return_value = {"start_date": "2017-01-10T00:00:00Z", "chat_search_interval_days": 300} + if original: + return return_value + + return_value["start_date"] = self.start_date + + return return_value + + def test_run(self): + """Testing that if a sync job is interrupted and state is saved with + `currently_syncing`(stream) the next sync job kicks off and the tap + picks back up on that `currently_syncing` stream. + + - Verify behavior is consistent when an added stream is selected between initial and resuming sync + - Verify only records with replication-key values greater than or equal to the stream level bookmark are + replicated on the resuming sync for the interrupted stream. + - Verify the yet-to-be-synced streams are replicated following the interrupted stream in the resuming sync. + """ + + expected_streams = self.expected_streams() + expected_replication_methods = self.expected_replication_method() + + # instantiate connection + conn_id = connections.ensure_connection(self) + + # Run check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # Table and field selection + catalog_entries = [item for item in found_catalogs if item.get("stream_name") in expected_streams] + + self.perform_and_verify_table_and_field_selection(conn_id, catalog_entries, expected_streams) + + # Run a first sync job using orchestrator + first_sync_record_count = self.run_and_verify_sync(conn_id) + first_sync_records = runner.get_records_from_target_output() + + first_sync_bookmarks = menagerie.get_state(conn_id) + + completed_streams = {"account", "agents", "bans"} + interrupt_stream = "chats" + pending_streams = {"departments", "goals", "shortcuts", "triggers"} + + interrupted_sync_states = self.create_interrupt_sync_state( + first_sync_bookmarks, interrupt_stream, pending_streams, first_sync_records + ) + bookmark_state = interrupted_sync_states["bookmarks"] + menagerie.set_state(conn_id, interrupted_sync_states) + + second_sync_record_count = self.run_and_verify_sync(conn_id) + second_sync_records = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + expected_replication_method = expected_replication_methods[stream] + first_sync_count = first_sync_record_count.get(stream, 0) + second_sync_count = second_sync_record_count.get(stream, 0) + + # Gather results + full_records = [message["data"] for message in first_sync_records[stream]["messages"]] + interrupted_records = [message["data"] for message in second_sync_records[stream]["messages"]] + + if expected_replication_method == self.INCREMENTAL: + + if stream in completed_streams: + # Verify at least 1 record was replicated in the second sync + self.assertGreaterEqual( + second_sync_count, + 1, + msg=f"Incorrect bookmarking for {stream}, at least one or more record should be replicated", + ) + elif stream == interrupted_sync_states.get("currently_syncing", None): + # For interrupted stream records sync count should be less equals + self.assertLessEqual( + second_sync_count, + first_sync_count, + msg=f"For interrupted stream - {stream}, seconds sync record count should be lesser or equal to first sync", + ) + + # Verify the interrupted sync replicates the expected record set + # All interrupted recs are in full recs + for record in interrupted_records: + self.assertIn( + record, + full_records, + msg="incremental table record in interrupted sync not found in full sync", + ) + + # Verify resuming sync only replicates records with replication key values greater or equal to + # The interrupted_state for streams that were replicated during the interrupted sync. + if stream == "chats": + + interrupted_bmk_chat_msg = strptime_to_utc(bookmark_state["chats"]["offline_msg.timestamp"]) + interrupted_bmk_chat = strptime_to_utc(bookmark_state["chats"]["chat.end_timestamp"]) + + for record in interrupted_records: + if record["type"] == "offline_msg": + rec_time = strptime_to_utc(record.get("timestamp")) + self.assertGreaterEqual(rec_time, interrupted_bmk_chat_msg) + else: + rec_time = strptime_to_utc(record.get("end_timestamp")) + self.assertGreaterEqual(rec_time, interrupted_bmk_chat) + + # Record count for all streams of interrupted sync match expectations + full_records_after_interrupted_bookmark = 0 + + for record in full_records: + if record["type"] == "offline_msg": + rec_time = strptime_to_utc(record.get("timestamp")) + if rec_time >= interrupted_bmk_chat_msg: + full_records_after_interrupted_bookmark += 1 + else: + rec_time = strptime_to_utc(record.get("end_timestamp")) + if rec_time >= interrupted_bmk_chat: + full_records_after_interrupted_bookmark += 1 + + self.assertEqual( + full_records_after_interrupted_bookmark, + len(interrupted_records), + msg=f"Expected {full_records_after_interrupted_bookmark} records in each sync", + ) + + elif stream in pending_streams: + # First sync and second sync record count match + self.assertGreaterEqual( + second_sync_count, + first_sync_count, + msg=f"For pending sync streams - {stream}, second sync record count should be more than or equal to first sync", + ) + + elif expected_replication_method == self.FULL: + self.assertEqual(second_sync_count, first_sync_count) + else: + raise NotImplementedError( + f"INVALID EXPECTATIONS: STREAM: {stream} REPLICATION_METHOD: {expected_replication_method}" + ) diff --git a/tests/test_pagination.py b/tests/test_pagination.py new file mode 100644 index 0000000..775ac70 --- /dev/null +++ b/tests/test_pagination.py @@ -0,0 +1,90 @@ +"""Test that with no fields selected for a stream automatic fields are still +replicated.""" +from math import ceil + +from base import ZendeskChatBaseTest +from tap_tester import connections, runner +from tap_tester.logger import LOGGER + + +class TestZendeskChatPagination(ZendeskChatBaseTest): + @staticmethod + def name(): + return "tap_tester_zendesk_chat_pagination" + + AGENTS_PAGE_SIZE = 1 + BANS_PAGE_SIZE = 100 + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return_value = { + "start_date": "2021-10-10T00:00:00Z", + "agents_page_limit": self.AGENTS_PAGE_SIZE, + } + if original: + return return_value + + return_value["start_date"] = self.start_date + + return return_value + + def test_run(self): + """ + - Verify that for each stream you can get multiple pages of data. + + This requires we ensure more than 1 page of data exists at all times for any given stream. + - Verify by pks that the data replicated matches the data we expect. + """ + + page_size = int(self.get_properties().get("agents_page_limit", 10)) + # only "bans" and "agents" stream support pagination + expected_streams = {"bans", "agents"} + + # instantiate connection + conn_id = connections.ensure_connection(self) + + # run check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + catalog_entries = [item for item in found_catalogs if item.get("stream_name") in expected_streams] + + self.perform_and_verify_table_and_field_selection(conn_id, catalog_entries, expected_streams) + + # Run a first sync job using orchestrator + synced_records = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + page_size = self.BANS_PAGE_SIZE if stream == "bans" else self.AGENTS_PAGE_SIZE + # expected values + expected_primary_keys = self.expected_primary_keys() + primary_keys_list = [ + tuple(message.get("data").get(expected_pk) for expected_pk in expected_primary_keys[stream]) + for message in synced_records.get(stream).get("messages") + if message.get("action") == "upsert" + ] + rec_count = len(primary_keys_list) + + # verify records are more than page size so multiple page is working + self.assertGreater(rec_count, page_size, msg="The number of records is not over the stream max limit") + + # Chunk the replicated records (just primary keys) into expected pages + pages = [] + page_count = ceil(rec_count / page_size) + for page_index in range(page_count): + page_start = page_index * page_size + page_end = (page_index + 1) * page_size + pages.append(set(primary_keys_list[page_start:page_end])) + + LOGGER.info("items: %s page_count %s", rec_count, page_count) + + # Verify by primary keys that data is unique for each page + for current_index, current_page in enumerate(pages): + with self.subTest(current_page_primary_keys=current_page): + for other_index, other_page in enumerate(pages): + if current_index == other_index: + continue # don't compare the page to itself + self.assertTrue( + current_page.isdisjoint(other_page), msg=f"other_page_primary_keys={other_page}" + ) diff --git a/tests/test_start_date.py b/tests/test_start_date.py index 2f511dd..6be726f 100644 --- a/tests/test_start_date.py +++ b/tests/test_start_date.py @@ -1,61 +1,51 @@ -""" -Test that the start_date configuration is respected -""" +"""Test that the start_date configuration is respected.""" from functools import reduce -import os - +from base import ZendeskChatBaseTest from dateutil.parser import parse +from tap_tester import menagerie, runner +from tap_tester.logger import LOGGER -from tap_tester import menagerie, runner, connections - -from base import BaseTapTest +class StartDateTest(ZendeskChatBaseTest): + """Test that the start_date configuration is respected. -class StartDateTest(BaseTapTest): - """ - Test that the start_date configuration is respected - - • verify that a sync with a later start date has at least one record synced - and less records than the 1st sync with a previous start date - • verify that each stream has less records than the earlier start date sync - • verify all data from later start data has bookmark values >= start_date - • verify that the minimum bookmark sent to the target for the later start_date sync - is greater than or equal to the start date + - verify that a sync with a later start date has at least one record + synced and less records than the 1st sync with a previous start date + - verify that each stream has less records than the earlier + start date sync + - verify all data from later start data has bookmark values >= start_date + - verify that the minimum bookmark sent to the target for the later + start_date sync is >= start date """ + @staticmethod + def name(): + return "tap_tester_zendesk_chat_start_date_test" + def get_properties(self, original: bool = True): return_value = { - 'start_date': '2021-04-01T00:00:00Z', + "start_date": "2021-04-01T00:00:00Z", } if original: return return_value - return_value["start_date"] = '2021-05-06T00:00:00Z' + return_value["start_date"] = "2021-05-06T00:00:00Z" return return_value - @staticmethod - def get_credentials(original_credentials: bool = True): - return { - 'access_token': os.getenv('TAP_ZENDESK_CHAT_ACCESS_TOKEN') - } - - @staticmethod - def name(): - return "tap_tester_zendesk_chat_start_date_test" - def test_run(self): - """Test we get a lot of data back based on the start date configured in base""" + """Test we get a lot of data back based on the start date configured in + base.""" conn_id = self.create_connection() found_catalogs = menagerie.get_catalogs(conn_id) - incremental_streams = {key for key, value in self.expected_replication_method().items() - if value == self.INCREMENTAL} + incremental_streams = { + key for key, value in self.expected_replication_method().items() if value == self.INCREMENTAL + } - our_catalogs = [catalog for catalog in found_catalogs if - catalog.get('tap_stream_id') in incremental_streams] + our_catalogs = [catalog for catalog in found_catalogs if catalog.get("tap_stream_id") in incremental_streams] # Select all streams and all fields within streams self.select_all_streams_and_fields(conn_id, our_catalogs, select_all_fields=True) @@ -86,8 +76,7 @@ def test_run(self): # Select all streams and all fields within streams found_catalogs = menagerie.get_catalogs(conn_id) - our_catalogs = [catalog for catalog in found_catalogs if - catalog.get('tap_stream_id') in incremental_streams] + our_catalogs = [catalog for catalog in found_catalogs if catalog.get("tap_stream_id") in incremental_streams] self.select_all_streams_and_fields(conn_id, our_catalogs, select_all_fields=True) # Run a sync job using orchestrator @@ -107,7 +96,8 @@ def test_run(self): self.assertGreaterEqual( first_sync_record_count.get(stream, 0), second_sync_record_count.get(stream, 0), - msg="second had more records, start_date usage not verified") + msg="second had more records, start_date usage not verified", + ) # verify all data from 2nd sync >= start_date target_mark = second_min_bookmarks.get(stream, {"mark": None}) @@ -121,9 +111,7 @@ def test_run(self): # verify that the minimum bookmark sent to the target for the second sync # is greater than or equal to the start date - self.assertGreaterEqual(target_value, - self.local_to_utc(parse(self.start_date))) + self.assertGreaterEqual(target_value, self.local_to_utc(parse(self.start_date))) except (OverflowError, ValueError, TypeError): - print("bookmarks cannot be converted to dates, " - "can't test start_date for {}".format(stream)) + LOGGER.info("bookmarks cannot be converted to dates, " "can't test start_date for %s", stream) diff --git a/tests/unittests/test_auth_discovery.py b/tests/unittests/test_auth_discovery.py index 2ca1c10..52c173e 100644 --- a/tests/unittests/test_auth_discovery.py +++ b/tests/unittests/test_auth_discovery.py @@ -1,7 +1,9 @@ -from requests.exceptions import HTTPError -import tap_zendesk_chat import unittest from unittest import mock + +from requests.exceptions import HTTPError + +import tap_zendesk_chat from tap_zendesk_chat.http import Client @@ -33,13 +35,10 @@ def mock_200_account_endpoint_exception(*args, **kwargs): return MockResponse({}, 200) -class TestBasicAuthInDiscoverMode(unittest.TestCase): - +class TestDiscoverMode(unittest.TestCase): def test_basic_auth_no_access_401(self): - ''' - Verify exception is raised for no access(401) error code for basic auth - do the assertions inside exception block - ''' + """Verify exception is raised for no access(401) error code for basic + auth do the assertions inside exception block.""" args = Args() with self.assertRaises(HTTPError) as e: @@ -48,27 +47,29 @@ def test_basic_auth_no_access_401(self): expected_error_message = "401 Client Error: Unauthorized for url:" self.assertIn(expected_error_message, str(e.exception)) - @mock.patch('tap_zendesk_chat.utils', return_value=Args()) - @mock.patch('tap_zendesk_chat.discover') - def test_discovery_calls_on_200_access(self, mock_discover, mock_utils): - """ - tests if discovery method is getting called after mocking required_config_keys - """ - tap_zendesk_chat.main_impl() - self.assertEqual(mock_discover.call_count, 1) + @mock.patch("tap_zendesk_chat.utils", return_value=Args()) + @mock.patch("singer.catalog.Catalog.from_dict", return_value={"key": "value"}) + def test_discovery_no_config(self, mock_utils, mock_catalog): + """tests discovery method when config is None.""" + expected = {"key": "value"} + self.assertEqual(tap_zendesk_chat.discover(None), expected) + @mock.patch("tap_zendesk_chat.utils", return_value=Args()) + @mock.patch("singer.catalog.Catalog.from_dict", return_value={"key": "value"}) + @mock.patch("tap_zendesk_chat.http.Client.request") + def test_discovery(self, mock_utils, mock_catalog, mock_request): + """tests discovery method.""" + expected = {"key": "value"} + self.assertEqual(tap_zendesk_chat.discover(Args().config), expected) -class TestAccountEndpointAuthorized(unittest.TestCase): - - @mock.patch("requests.Session.send") - def test_is_account_endpoint_verified(self, mock_send): - """ - verify if is_account_endpoint_authorized fn returns True boolean on 200 status code - """ - args = Args() - client = Client(args.config) - mock_send.return_value = mock_200_account_endpoint_exception() - resp = tap_zendesk_chat.is_account_endpoint_authorized(client) - self.assertEqual(resp, True) +class TestAccountEndpointAuthorized(unittest.TestCase): + def test_is_account_not_authorized_404(self): + """tests if account_not_authorized method in discover raises http + 404.""" + client = Client(Args().config) + with self.assertRaises(HTTPError) as e: + client.request("xxxxxxx") + expected_error_message = "404 Client Error: Not Found for url:" + self.assertIn(expected_error_message, str(e.exception)) diff --git a/tests/unittests/test_context.py b/tests/unittests/test_context.py index ac94b94..69a7ebb 100644 --- a/tests/unittests/test_context.py +++ b/tests/unittests/test_context.py @@ -1,6 +1,7 @@ -from tap_zendesk_chat.context import Context import unittest +from tap_zendesk_chat.context import Context + class TestContextFunctions(unittest.TestCase): config = {"start_date": "2022-01-01", "access_token": ""} @@ -10,10 +11,8 @@ class TestContextFunctions(unittest.TestCase): context_client = Context(config, state, catalog) def test_bookmarks(self): - """ - tests bookmarks property for context module - returns {} with bookmarks key in state file - """ + """tests bookmarks property for context module returns {} with + bookmarks key in state file.""" self.assertEqual({}, self.context_client.bookmarks) self.context_client.state = {"bookmarks": {"account": {"start_date": self.config.get("start_date")}}} @@ -21,29 +20,30 @@ def test_bookmarks(self): self.assertEqual(1, len(self.context_client.bookmarks)) def test_get_bookmark(self): - """ - tests bookmark fn in context.py - """ - self.context_client.state = {"bookmarks": {"account": {"last_created": "2022-06-01"}, - "chats": {"chat.end_timestamp": "2022-06-01T15:00:00", - "offline_msg.timestamp": "2022-06-01T18:00:00"}}} + """tests bookmark fn in context.py.""" + self.context_client.state = { + "bookmarks": { + "account": {"last_created": "2022-06-01"}, + "chats": {"chat.end_timestamp": "2022-06-01T15:00:00", "offline_msg.timestamp": "2022-06-01T18:00:00"}, + } + } self.assertEqual("2022-06-01T18:00:00", self.context_client.bookmark(["chats", "offline_msg.timestamp"])) self.assertEqual({}, self.context_client.bookmark(["chats", "offline_msg.end_timestamp"])) self.assertEqual("2022-06-01T15:00:00", self.context_client.bookmark(["chats", "chat.end_timestamp"])) def test_set_bookmark(self): - """ - tests set_bookmark fn in context.py - set the bookmark using set_bookmark fn and assert the bookmark for stream in state json - """ - self.context_client.state = {"bookmarks": {"account": {"last_created": "2022-06-01"}, - "chats": {"chat.end_timestamp": "2022-06-01T15:00:00", - "offline_msg.timestamp": "2022-06-01T18:00:00"}}} + """tests set_bookmark fn in context.py set the bookmark using + set_bookmark fn and assert the bookmark for stream in state json.""" + self.context_client.state = { + "bookmarks": { + "account": {"last_created": "2022-06-01"}, + "chats": {"chat.end_timestamp": "2022-06-01T15:00:00", "offline_msg.timestamp": "2022-06-01T18:00:00"}, + } + } self.context_client.set_bookmark(["chats", "chat.end_timestamp"], "2022-07-01T01:00:00") self.assertEqual("2022-07-01T01:00:00", self.context_client.state["bookmarks"]["chats"]["chat.end_timestamp"]) self.context_client.set_bookmark(["account"], {"last_created": "2022-07-05"}) - self.assertEqual({"last_created": "2022-07-05"}, self.context_client.state["bookmarks"]["account"]) - + self.assertEqual({"last_created": "2022-07-05"}, self.context_client.state["bookmarks"]["account"]) diff --git a/tests/unittests/test_http_exceptions.py b/tests/unittests/test_http_exceptions.py index 61366a4..36195b4 100644 --- a/tests/unittests/test_http_exceptions.py +++ b/tests/unittests/test_http_exceptions.py @@ -1,6 +1,7 @@ -from tap_zendesk_chat.http import RateLimitException, Client -from unittest import mock import unittest +from unittest import mock + +from tap_zendesk_chat.http import Client, RateLimitException client = Client({"access_token": ""}) @@ -14,9 +15,7 @@ def __init__(self, resp, status_code, headers=None, raise_error=False): def mock_429_rate_limit_exception_response(*args, **kwargs): - """ - Mock the response with status code as 429 - """ + """Mock the response with status code as 429.""" return MockResponse({}, 429, headers={}, raise_error=True) @@ -29,27 +28,22 @@ class TestRateLimitExceptionRetry(unittest.TestCase): @mock.patch("requests.Session.send", side_effect=mock_429_rate_limit_exception_response) def test_rate_limit_429_error(self, mocked_send, mocked_sleep): - """ - verify the custom RateLimitException - Make sure API call gets retired for 10 times before raising RateLimitException - Verifying the retry is happening 10 times for the RateLimitException exception - """ + """verify the custom RateLimitException Make sure API call gets retired + for 10 times before raising RateLimitException Verifying the retry is + happening 10 times for the RateLimitException exception.""" with self.assertRaises(RateLimitException): client.request("departments") - self.assertEquals(mocked_send.call_count, 10) + self.assertEqual(mocked_send.call_count, 10) class TestBadGatewayExceptionRetry(unittest.TestCase): @mock.patch("time.sleep") @mock.patch("requests.Session.send", side_effect=mock_502_bad_gateway_exception_response) def test_rate_limit_502_error(self, mocked_send, mocked_sleep): - """ - verify the custom RateLimitException for 502 Bad Gateway exception - Make sure API call gets retired for 10 times before raising RateLimitException - Verifying the retry is happening 10 times for the RateLimitException exception - """ + """verify the custom RateLimitException for 502 Bad Gateway exception + Make sure API call gets retired for 10 times before raising + RateLimitException Verifying the retry is happening 10 times for the + RateLimitException exception.""" with self.assertRaises(RateLimitException): client.request("departments") - self.assertEquals(mocked_send.call_count, 10) - - + self.assertEqual(mocked_send.call_count, 10) diff --git a/tests/unittests/test_metadata.py b/tests/unittests/test_metadata.py deleted file mode 100644 index 1da89c2..0000000 --- a/tests/unittests/test_metadata.py +++ /dev/null @@ -1,75 +0,0 @@ -import tap_zendesk_chat -import unittest - - -class BaseMetadata: - """ - creates a Base class for metadata - """ - metadata = [{"breadcrumb": [], "metadata": {"valid-replication-keys": [], - "table-key-properties": ["id"], "selected": True}}, {"breadcrumb": ["properties", "create_date"], - "metadata": {"inclusion": "available"}}] - - -class Departments(BaseMetadata): - """ - Class for Departments stream - inherits BaseMetadata class - """ - stream = 'departments' - schema = {} - properties = ['description', 'name', 'id', 'enabled', 'members', 'settings'] - - -class Account(BaseMetadata): - """ - Class for Account stream - inherits BaseMetadata class - """ - stream = 'account' - properties = ['create_date', 'account_key', 'status', 'billing', 'plan'] - - -class Bans: - """ - Class for Bans stream - has its own metadata attribute - """ - stream = 'bans' - properties = [] - metadata = [{"breadcrumb": [], "metadata": {"valid-replication-keys": [], - "table-key-properties": ["id"], "selected": False}}, {"breadcrumb": ["properties", "create_date"], - "metadata": {"inclusion": "available"}}] - - -class TestMetadataFunctions(unittest.TestCase): - """ - Used to test metadata functions defined in tap_zendesk_chat/__init__.py file - """ - POSITIVE_TEST_STREAMS = [Account, Departments] - NEGATIVE_TEST_STREAM = [Bans] - - def test_is_selected(self): - """ - tests is_selected fn in tap_zendesk_chat/__init__.py file - checks if selected field is set as true in metadata - """ - for stream in self.POSITIVE_TEST_STREAMS: - self.assertEqual(True, tap_zendesk_chat.is_selected(stream)) - - for stream in self.NEGATIVE_TEST_STREAM: - self.assertEqual(False, tap_zendesk_chat.is_selected(stream)) - - def test_load_schema(self): - """ - tests load_schema fn in tap_zendesk_chat/__init__.py file - checks if length of properties attr equals with size of properties in loaded schema using load_schema fn - """ - for stream in self.POSITIVE_TEST_STREAMS: - self.assertEquals(len(stream.properties), len(tap_zendesk_chat.load_schema(stream.stream)['properties'])) - - for stream in self.NEGATIVE_TEST_STREAM: - self.assertNotEqual(len(stream.properties), len(tap_zendesk_chat.load_schema(stream.stream)['properties'])) - - - diff --git a/tests/unittests/test_streams.py b/tests/unittests/test_streams.py deleted file mode 100644 index c21c9c6..0000000 --- a/tests/unittests/test_streams.py +++ /dev/null @@ -1,14 +0,0 @@ -import pendulum -from tap_zendesk_chat.streams import break_into_intervals - - -def test_intervals(): - days = 30 - now = pendulum.parse("2018-02-14T10:30:20") - broken = break_into_intervals(days, "2018-01-02T18:14:33", now) - as_strs = [(x.isoformat(), y.isoformat()) for x, y in broken] - assert as_strs == [ - ("2018-01-02T18:14:33+00:00", "2018-02-01T18:14:33+00:00"), - ("2018-02-01T18:14:33+00:00", "2018-02-14T10:30:20+00:00"), - ] - diff --git a/tests/unittests/test_utils.py b/tests/unittests/test_utils.py new file mode 100644 index 0000000..02c3dbd --- /dev/null +++ b/tests/unittests/test_utils.py @@ -0,0 +1,72 @@ +import unittest + +from tap_zendesk_chat import utils + + +class BaseMetadata: + """creates a Base class for metadata.""" + + metadata = [ + { + "breadcrumb": [], + "metadata": {"valid-replication-keys": [], "table-key-properties": ["id"], "selected": True}, + }, + {"breadcrumb": ["properties", "create_date"], "metadata": {"inclusion": "available"}}, + ] + + +class Departments(BaseMetadata): + """Class for Departments stream inherits BaseMetadata class.""" + + stream = "departments" + schema = {} + properties = ["description", "name", "id", "enabled", "members", "settings"] + + +class Account(BaseMetadata): + """Class for Account stream inherits BaseMetadata class.""" + + stream = "account" + properties = ["create_date", "account_key", "status", "billing", "plan"] + + +class Bans: + """Class for Bans stream has its own metadata attribute.""" + + stream = "bans" + properties = [] + metadata = [ + { + "breadcrumb": [], + "metadata": {"valid-replication-keys": [], "table-key-properties": ["id"], "selected": False}, + }, + {"breadcrumb": ["properties", "create_date"], "metadata": {"inclusion": "available"}}, + ] + + +class TestMetadataFunctions(unittest.TestCase): + """Used to test metadata functions defined in tap_zendesk_chat/__init__.py + file.""" + + POSITIVE_TEST_STREAMS = [Account, Departments] + NEGATIVE_TEST_STREAM = [Bans] + + def test_load_schema(self): + """tests load_schema fn in tap_zendesk_chat/__init__.py file checks if + length of properties attr equals with size of properties in loaded + schema using load_schema fn.""" + for stream in self.POSITIVE_TEST_STREAMS: + self.assertEqual(len(stream.properties), len(utils.load_schema(stream.stream)["properties"])) + + for stream in self.NEGATIVE_TEST_STREAM: + self.assertNotEqual(len(stream.properties), len(utils.load_schema(stream.stream)["properties"])) + + def test_intervals(self): + days = 30 + now = utils.strptime_to_utc("2018-02-14T10:30:20") + broken = utils.break_into_intervals(days, "2018-01-02T18:14:33", now) + as_strs = [(x.isoformat(), y.isoformat()) for x, y in broken] + assert as_strs == [ + ("2018-01-02T18:14:33+00:00", "2018-02-01T18:14:33+00:00"), + ("2018-02-01T18:14:33+00:00", "2018-02-14T10:30:20+00:00"), + ]