Skip to content

Commit

Permalink
Tests cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Aug 23, 2022
1 parent 90cc541 commit 39af208
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 168 deletions.
95 changes: 95 additions & 0 deletions tests/core/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,102 @@
"""Tap, target and stream test fixtures."""

from __future__ import annotations

from typing import Any, Iterable

import pendulum
import pytest

from singer_sdk import Stream, Tap
from singer_sdk.typing import (
DateTimeType,
IntegerType,
PropertiesList,
Property,
StringType,
)


class SimpleTestStream(Stream):
"""Test stream class."""

name = "test"
schema = PropertiesList(
Property("id", IntegerType, required=True),
Property("value", StringType, required=True),
Property("updatedAt", DateTimeType, required=True),
).to_dict()
replication_key = "updatedAt"

def __init__(self, tap: Tap):
"""Create a new stream."""
super().__init__(tap, schema=self.schema, name=self.name)

def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
"""Generate records."""
yield {"id": 1, "value": "Egypt"}
yield {"id": 2, "value": "Germany"}
yield {"id": 3, "value": "India"}


class UnixTimestampIncrementalStream(SimpleTestStream):
name = "unix_ts"
schema = PropertiesList(
Property("id", IntegerType, required=True),
Property("value", StringType, required=True),
Property("updatedAt", IntegerType, required=True),
).to_dict()
replication_key = "updatedAt"


class UnixTimestampIncrementalStream2(UnixTimestampIncrementalStream):
name = "unix_ts_override"

def compare_start_date(self, value: str, start_date_value: str) -> str:
"""Compare a value to a start date value."""

start_timestamp = pendulum.parse(start_date_value).format("X")
return max(value, start_timestamp, key=float)


class SimpleTestTap(Tap):
"""Test tap class."""

name = "test-tap"
config_jsonschema = PropertiesList(
Property("username", StringType, required=True),
Property("password", StringType, required=True),
Property("start_date", DateTimeType),
additional_properties=False,
).to_dict()

def discover_streams(self) -> list[Stream]:
"""List all streams."""
return [
SimpleTestStream(self),
UnixTimestampIncrementalStream(self),
UnixTimestampIncrementalStream2(self),
]


@pytest.fixture
def tap_class():
"""Return the tap class."""
return SimpleTestTap


@pytest.fixture
def tap() -> SimpleTestTap:
"""Tap instance."""
return SimpleTestTap(
config={
"username": "utest",
"password": "ptest",
"start_date": "2021-01-01",
},
parse_env_config=False,
)


@pytest.fixture
def csv_config(outdir: str) -> dict:
Expand Down
183 changes: 15 additions & 168 deletions tests/core/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,78 +2,25 @@

from __future__ import annotations

import json
import logging
from typing import Any, Iterable, cast
from typing import TYPE_CHECKING, Any

import pendulum
import pytest
import requests
from click.testing import CliRunner

from singer_sdk.exceptions import ConfigValidationError
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers._singer import Catalog, CatalogEntry, MetadataMapping
from singer_sdk.helpers._singer import Catalog, MetadataMapping
from singer_sdk.helpers.jsonpath import _compile_jsonpath
from singer_sdk.streams.core import (
REPLICATION_FULL_TABLE,
REPLICATION_INCREMENTAL,
Stream,
)
from singer_sdk.streams.core import REPLICATION_FULL_TABLE, REPLICATION_INCREMENTAL
from singer_sdk.streams.graphql import GraphQLStream
from singer_sdk.streams.rest import RESTStream
from singer_sdk.tap_base import Tap
from singer_sdk.typing import (
DateTimeType,
IntegerType,
PropertiesList,
Property,
StringType,
)
from singer_sdk.typing import IntegerType, PropertiesList, Property, StringType

CONFIG_START_DATE = "2021-01-01"


class SimpleTestStream(Stream):
"""Test stream class."""

name = "test"
schema = PropertiesList(
Property("id", IntegerType, required=True),
Property("value", StringType, required=True),
Property("updatedAt", DateTimeType, required=True),
).to_dict()
replication_key = "updatedAt"

def __init__(self, tap: Tap):
"""Create a new stream."""
super().__init__(tap, schema=self.schema, name=self.name)

def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
"""Generate records."""
yield {"id": 1, "value": "Egypt"}
yield {"id": 2, "value": "Germany"}
yield {"id": 3, "value": "India"}


class UnixTimestampIncrementalStream(SimpleTestStream):
name = "unix_ts"
schema = PropertiesList(
Property("id", IntegerType, required=True),
Property("value", StringType, required=True),
Property("updatedAt", IntegerType, required=True),
).to_dict()
replication_key = "updatedAt"


class UnixTimestampIncrementalStream2(UnixTimestampIncrementalStream):
name = "unix_ts_override"

def compare_start_date(self, value: str, start_date_value: str) -> str:
"""Compare a value to a start date value."""

start_timestamp = pendulum.parse(start_date_value).format("X")
return max(value, start_timestamp, key=float)
if TYPE_CHECKING:
from singer_sdk import Stream, Tap


class RestTestStream(RESTStream):
Expand Down Expand Up @@ -102,52 +49,13 @@ class GraphqlTestStream(GraphQLStream):
replication_key = "updatedAt"


class SimpleTestTap(Tap):
"""Test tap class."""

name = "test-tap"
config_jsonschema = PropertiesList(
Property("username", StringType, required=True),
Property("password", StringType, required=True),
Property("start_date", DateTimeType),
additional_properties=False,
).to_dict()

def discover_streams(self) -> list[Stream]:
"""List all streams."""
return [
SimpleTestStream(self),
UnixTimestampIncrementalStream(self),
UnixTimestampIncrementalStream2(self),
]


@pytest.fixture
def tap() -> SimpleTestTap:
"""Tap instance."""
return SimpleTestTap(
config={
"username": "utest",
"password": "ptest",
"start_date": "2021-01-01",
},
parse_env_config=False,
)


@pytest.fixture
def stream(tap: SimpleTestTap) -> SimpleTestStream:
def stream(tap):
"""Create a new stream instance."""
return cast(SimpleTestStream, tap.load_streams()[0])
return tap.load_streams()[0]


@pytest.fixture
def unix_timestamp_stream(tap: SimpleTestTap) -> UnixTimestampIncrementalStream:
"""Create a new stream instance."""
return cast(UnixTimestampIncrementalStream, tap.load_streams()[1])


def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream):
def test_stream_apply_catalog(stream: Stream):
"""Applying a catalog to a stream should overwrite fields."""
assert stream.primary_keys == []
assert stream.replication_key == "updatedAt"
Expand Down Expand Up @@ -238,7 +146,7 @@ def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream):
],
)
def test_stream_starting_timestamp(
tap: SimpleTestTap,
tap: Tap,
stream_name: str,
bookmark_value: str,
expected_starting_value: Any,
Expand Down Expand Up @@ -322,9 +230,7 @@ def test_stream_starting_timestamp(
"nested_values",
],
)
def test_jsonpath_rest_stream(
tap: SimpleTestTap, path: str, content: str, result: list[dict]
):
def test_jsonpath_rest_stream(tap: Tap, path: str, content: str, result: list[dict]):
"""Validate records are extracted correctly from the API response."""
fake_response = requests.Response()
fake_response._content = str.encode(content)
Expand All @@ -337,7 +243,7 @@ def test_jsonpath_rest_stream(
assert list(rows) == result


def test_jsonpath_graphql_stream_default(tap: SimpleTestTap):
def test_jsonpath_graphql_stream_default(tap: Tap):
"""Validate graphql JSONPath, defaults to the stream name."""
content = """{
"data": {
Expand All @@ -357,7 +263,7 @@ def test_jsonpath_graphql_stream_default(tap: SimpleTestTap):
assert list(rows) == [{"id": 1, "value": "abc"}, {"id": 2, "value": "def"}]


def test_jsonpath_graphql_stream_override(tap: SimpleTestTap):
def test_jsonpath_graphql_stream_override(tap: Tap):
"""Validate graphql jsonpath can be updated."""
content = """[
{"id": 1, "value": "abc"},
Expand Down Expand Up @@ -444,7 +350,7 @@ def records_jsonpath(cls):
],
)
def test_next_page_token_jsonpath(
tap: SimpleTestTap, path: str, content: str, headers: dict, result: str
tap: Tap, path: str, content: str, headers: dict, result: str
):
"""Validate pagination token is extracted correctly from API response."""
fake_response = requests.Response()
Expand All @@ -469,7 +375,7 @@ def test_cached_jsonpath():
assert recompiled is compiled


def test_sync_costs_calculation(tap: SimpleTestTap, caplog):
def test_sync_costs_calculation(tap: Tap, caplog):
"""Test sync costs are added up correctly."""
fake_request = requests.PreparedRequest()
fake_response = requests.Response()
Expand All @@ -496,62 +402,3 @@ def calculate_test_cost(
for record in caplog.records:
assert record.levelname == "INFO"
assert f"Total Sync costs for stream {stream.name}" in record.message


@pytest.mark.parametrize(
"config_dict,errors",
[
(
{},
["'username' is a required property", "'password' is a required property"],
),
(
{"username": "utest"},
["'password' is a required property"],
),
(
{"username": "utest", "password": "ptest", "extra": "not valid"},
["Additional properties are not allowed ('extra' was unexpected)"],
),
],
ids=[
"missing_username",
"missing_password",
"extra_property",
],
)
def test_config_errors(config_dict: dict, errors: list[str]):
with pytest.raises(ConfigValidationError, match="Config validation failed") as exc:
SimpleTestTap(config_dict, validate_config=True)

assert exc.value.errors == errors


def test_cli(tmp_path):
"""Test the CLI."""
runner = CliRunner(mix_stderr=False)
result = runner.invoke(SimpleTestTap.cli, ["--help"])
assert result.exit_code == 0
assert "Show this message and exit." in result.output

config_path = tmp_path / "config.json"
config_path.write_text(json.dumps({}))
result = runner.invoke(SimpleTestTap.cli, ["--config", str(config_path)])
assert result.exit_code == 1
assert result.stdout == ""
assert "'username' is a required property" in result.stderr
assert "'password' is a required property" in result.stderr

config_path = tmp_path / "config.json"
config_path.write_text(json.dumps({}))
result = runner.invoke(
SimpleTestTap.cli,
[
"--config",
str(config_path),
"--discover",
],
)
assert result.exit_code == 0
assert "streams" in json.loads(result.stdout)
assert result.stderr == ""
Loading

0 comments on commit 39af208

Please sign in to comment.