Skip to content

Commit

Permalink
Source Monday: fix schema loader; delete old files (#20996)
Browse files Browse the repository at this point in the history
* Source Monday: fix schema loader; delete old files

* Source Monday: fix schema loader; delete old files

* Source Monday: fix tests

* auto-bump connector version

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
artem1205 and octavia-squidington-iii authored Jan 4, 2023
1 parent 10d7bea commit f6cc98f
Show file tree
Hide file tree
Showing 11 changed files with 24 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@
- name: Monday
sourceDefinitionId: 80a54ea2-9959-4040-aac1-eee42423ec9b
dockerRepository: airbyte/source-monday
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.com/integrations/sources/monday
icon: monday.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8530,7 +8530,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-monday:0.2.1"
- dockerImage: "airbyte/source-monday:0.2.2"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/monday"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-monday/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_monday ./source_monday
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/source-monday
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


from .dpath_string_extractor import DpathStringExtractor
from .graphql_request_options_provider import GraphQLRequestOptionsProvider

# from .source import SourceMonday
from .source_lc import SourceMonday
from .source import SourceMonday

__all__ = ["SourceMonday", "GraphQLRequestOptionsProvider", "DpathStringExtractor"]
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
import os
from dataclasses import dataclass
from typing import Any, Mapping, MutableMapping, Optional, Type, Union

from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.request_options import InterpolatedRequestOptionsProvider
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState


Expand All @@ -33,10 +32,9 @@ def _ensure_type(self, t: Type, o: Any):
raise TypeError(f"{type(o)} {o} is not of type {t}")

def _get_schema_root_properties(self):
schema_path = os.path.join(os.path.abspath(os.curdir), "source_monday", f"schemas/{self.name}.json")
with open(schema_path) as f:
schema_dict = json.load(f)
return schema_dict["properties"]
schema_loader = JsonFileSchemaLoader(config=self.config, options={"name": self.name})
schema = schema_loader.get_json_schema()
return schema["properties"]

def _get_object_arguments(self, **object_arguments) -> str:
return ",".join([f"{argument}:{value}" for argument, value in object_arguments.items() if value is not None])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "0.1.0"

definitions:
schema_loader:
type: JsonSchema
type: JsonFileSchemaLoader
file_path: "./source_monday/schemas/{{ options['name'] }}.json"
selector:
type: RecordSelector
Expand All @@ -18,6 +18,7 @@ definitions:
type: BearerAuthenticator
api_token: "{{ config['credentials']['api_token'] if config['credentials']['auth_type'] == 'api_token' else config['credentials']['access_token'] if config['credentials']['auth_type'] == 'oauth2.0' else config.get('api_token', '') }}"
request_options_provider:
type: InterpolatedRequestOptionsProvider
class_name: "source_monday.GraphQLRequestOptionsProvider"
limit: "{{ options['items_per_page'] }}"
error_handler:
Expand Down Expand Up @@ -59,7 +60,7 @@ definitions:
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
type: CustomRecordExtractor
field_pointer: "/data/boards/*/items/*"
class_name: "source_monday.DpathStringExtractor"
paginator:
Expand Down
185 changes: 10 additions & 175 deletions airbyte-integrations/connectors/source-monday/source_monday/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,181 +2,16 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource

import json
import os
from abc import ABC
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple
"""
This file provides the necessary constructs to interpret a provided declarative YAML configuration file into
source connector.
WARNING: Do not modify this file.
"""

import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer


# Basic full refresh stream
class MondayStream(HttpStream, ABC):
url_base: str = "https://api.monday.com/v2"
primary_key: str = "id"
page: int = 1
limit: Optional[int] = None
transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
json_response = response.json().get("data", {})
records = json_response.get(self.name.lower(), [])
self.page += 1
if records:
return {"page": self.page}

def load_schema(self):
"""
Load schema from file and make a GraphQL query
"""
script_dir = os.path.dirname(__file__)
schema_path = os.path.join(script_dir, f"schemas/{self.name.lower()}.json")
with open(schema_path) as f:
schema_dict = json.load(f)
schema = schema_dict["properties"]
graphql_schema = []
for col in schema:
if "properties" in schema[col]:
nested_ids = ",".join(schema[col]["properties"])
graphql_schema.append(f"{col}{{{nested_ids}}}")
else:
graphql_schema.append(col)
return ",".join(graphql_schema)

def should_retry(self, response: requests.Response) -> bool:
# Monday API return code 200 with and errors key if complexity is too high.
# https://api.developer.monday.com/docs/complexity-queries
is_complex_query = response.json().get("errors")
if is_complex_query:
self.logger.error(response.text)
return response.status_code == 429 or 500 <= response.status_code < 600 or is_complex_query

@property
def retry_factor(self) -> int:
return 15

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
graphql_params = {}
if self.limit:
graphql_params["limit"] = self.limit
if next_page_token:
graphql_params.update(next_page_token)

graphql_query = ",".join([f"{k}:{v}" for k, v in graphql_params.items()])
# Monday uses a query string to pass in environments
params = {"query": f"query {{ {self.name.lower()} ({graphql_query}) {{ {self.load_schema()} }} }}"}
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
json_response = response.json().get("data", {})
records = json_response.get(self.name.lower(), [])
yield from records

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return ""


class Items(MondayStream):
"""
API Documentation: https://api.developer.monday.com/docs/items-queries
"""

limit = 100

@property
def retry_factor(self) -> int:
# this stream has additional rate limits, please see https://api.developer.monday.com/docs/items-queries#additional-rate-limit
return 30


class Boards(MondayStream):
"""
API Documentation: https://api.developer.monday.com/docs/groups-queries#groups-queries
"""


class Teams(MondayStream):
"""
API Documentation: https://api.developer.monday.com/docs/teams-queries
"""

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
# Stream teams doesn't support pagination
params = {"query": f"query {{ {self.name.lower()} () {{ {self.load_schema()} }} }}"}
return params

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return {}


class Updates(MondayStream):
"""
API Documentation: https://api.developer.monday.com/docs/updates-queries
"""


class Users(MondayStream):
"""
API Documentation: https://api.developer.monday.com/docs/users-queries-1
"""

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
# Stream Users doesn't support pagination
return


class MondayAuthentication:
"""Provides the authentication capabilities for both old and new methods."""

def __init__(self, config: Dict):
self.config = config

def get_token(self):
# the old config supports for backward capability
token = self.config.get("api_token")
if not token:
auth_type = self.config["credentials"]["auth_type"]
if auth_type == "oauth2.0":
token = self.config["credentials"]["access_token"]
if auth_type == "api_token":
token = self.config["credentials"]["api_token"]
return token

def get_auth(self) -> TokenAuthenticator:
"""Return the TokenAuthenticator object with access or api token."""
return TokenAuthenticator(token=self.get_token())


class SourceMonday(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
url = "https://api.monday.com/v2"
params = {"query": "query { me { is_guest created_at name id}}"}
auth_header = MondayAuthentication(config).get_auth().get_auth_header()
try:
response = requests.post(url, params=params, headers=auth_header)
response.raise_for_status()
return True, None
except requests.exceptions.RequestException as e:
return False, e

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = MondayAuthentication(config).get_auth()
return [
Items(authenticator=auth),
Boards(authenticator=auth),
Teams(authenticator=auth),
Updates(authenticator=auth),
Users(authenticator=auth),
]
# Declarative Source
class SourceMonday(YamlDeclarativeSource):
def __init__(self):
super().__init__(**{"path_to_yaml": "monday.yaml"})

This file was deleted.

This file was deleted.

This file was deleted.

5 changes: 3 additions & 2 deletions docs/integrations/sources/monday.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ The Monday connector should not run into Monday API limitations under normal usa

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------|
| 0.2.1 | 2022-12-15 | [20533](https://github.com/airbytehq/airbyte/pull/20533) | Bump CDK version|
| 0.2.0 | 2022-12-13 | [19586](https://github.com/airbytehq/airbyte/pull/19586) | Migrate to low-code |
| 0.2.2 | 2023-01-04 | [20996](https://github.com/airbytehq/airbyte/pull/20996) | Fix json schema loader |
| 0.2.1 | 2022-12-15 | [20533](https://github.com/airbytehq/airbyte/pull/20533) | Bump CDK version |
| 0.2.0 | 2022-12-13 | [19586](https://github.com/airbytehq/airbyte/pull/19586) | Migrate to low-code |
| 0.1.4 | 2022-06-06 | [14443](https://github.com/airbytehq/airbyte/pull/14443) | Increase retry_factor for Items stream |
| 0.1.3 | 2021-12-23 | [8172](https://github.com/airbytehq/airbyte/pull/8172) | Add oauth2.0 support |
| 0.1.2 | 2021-12-07 | [8429](https://github.com/airbytehq/airbyte/pull/8429) | Update titles and descriptions |
Expand Down

0 comments on commit f6cc98f

Please sign in to comment.