Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source GitHub: fix bug with IssueEvents stream and add handling for rate limiting #4708

Merged
merged 14 commits into from
Jul 15, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
#


from typing import Any, List, Mapping, Tuple
from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
Expand All @@ -43,7 +43,8 @@
IssueMilestones,
Issues,
Projects,
PullRequests,
PullRequestsAsc,
PullRequestsDesc,
Releases,
Reviews,
Stargazers,
Expand All @@ -52,6 +53,17 @@


class SourceGithub(AbstractSource):
def __init__(self):
self._first_run_for_pull_requests_stream = True

def read(
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
) -> Iterator[AirbyteMessage]:
if "pull_requests" in state and state["pull_requests"].get(config["repository"]) is not None:
self._first_run_for_pull_requests_stream = False

yield from super().read(logger=logger, config=config, catalog=catalog, state=state)

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
try:
authenticator = TokenAuthenticator(token=config["access_token"], auth_method="token")
Expand All @@ -65,6 +77,11 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = TokenAuthenticator(token=config["access_token"], auth_method="token")
full_refresh_args = {"authenticator": authenticator, "repository": config["repository"]}
incremental_args = {"authenticator": authenticator, "repository": config["repository"], "start_date": config["start_date"]}

pull_requests_class = PullRequestsAsc if self._first_run_for_pull_requests_stream is True else PullRequestsDesc
keu marked this conversation as resolved.
Show resolved Hide resolved
pull_requests_stream = pull_requests_class(**incremental_args)
pull_requests_stream.name = "pull_requests"

keu marked this conversation as resolved.
Show resolved Hide resolved
return [
Assignees(**full_refresh_args),
Reviews(**full_refresh_args),
Expand All @@ -74,7 +91,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
Releases(**incremental_args),
Events(**incremental_args),
Comments(**incremental_args),
PullRequests(**incremental_args),
pull_requests_stream,
keu marked this conversation as resolved.
Show resolved Hide resolved
CommitComments(**incremental_args),
IssueMilestones(**incremental_args),
Commits(**incremental_args),
Expand Down
176 changes: 147 additions & 29 deletions airbyte-integrations/connectors/source-github/source_github/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@


import tempfile
import time
from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import requests
import vcr
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream
from requests.exceptions import HTTPError
Expand Down Expand Up @@ -62,10 +62,32 @@ def path(self, **kwargs) -> str:
return f"repos/{self.repository}/{self.name}"

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
response_data = response.json()
if response_data and len(response_data) == self.page_size:
self._page += 1
return {"page": self._page}
link = response.headers.get("Link")
if link and 'rel="next"' in link:
response_data = response.json()
if response_data and len(response_data) == self.page_size:
self._page += 1
return {"page": self._page}

def should_retry(self, response: requests.Response) -> bool:
# We don't call `super()` here because we have custom error handling and GitHub API sometimes returns strange
# errors. So in `read_records()` we have custom error handling which don't require to call `super()` here.
return response.headers.get("X-RateLimit-Remaining") == "0" or response.status_code in (
500,
502,
)
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved

def backoff_time(self, response: requests.Response) -> Optional[Union[int, float]]:
# This method is called if we run into the rate limit. GitHub limits requests to 5000 per hour and provides
# `X-RateLimit-Reset` header which contains time when this hour will be finished and limits will be reset so
# we again could have 5000 per another hour.

if response.status_code == 502:
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved
return 0.5

reset_time = int(response.headers.get("X-RateLimit-Reset", time.time() + 60))
time_now = int(time.time())
return reset_time - time_now
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved

def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
try:
Expand All @@ -75,11 +97,17 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:

# This whole try/except situation in `read_records()` isn't good but right now in `self._send_request()`
# function we have `response.raise_for_status()` so we don't have much choice on how to handle errors.
# We added this try/except code because for private repositories `Teams` stream is not available and we get
# "404 Client Error: Not Found for url: https://api.github.com/orgs/sherifnada/teams?per_page=100" error.
# Blocked on https://github.com/airbytehq/airbyte/issues/3514.
if "/teams?" in error_msg:
error_msg = f"Syncing Team stream isn't available for repository {self.repository}"
# Bocked on https://github.com/airbytehq/airbyte/issues/3514.
if e.response.status_code == 403:
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved
error_msg = (
f"Syncing `{self.__class__.__name__}` stream isn't available for repository "
f"`{self.repository}` and your `access_token`, seems like you don't have permissions for "
f"this stream."
)
elif e.response.status_code == 404 and "/teams?" in error_msg:
# For private repositories `Teams` stream is not available and we get "404 Client Error: Not Found for
# url: https://api.github.com/orgs/sherifnada/teams?per_page=100" error.
error_msg = f"Syncing `Team` stream isn't available for repository `{self.repository}`."

self.logger.warn(error_msg)

Expand Down Expand Up @@ -168,7 +196,8 @@ class SemiIncrementalGithubStream(GithubStream):
# we should break processing records if possible. If `sort` is set to `updated` and `direction` is set to `desc`
# this means that latest records will be at the beginning of the response and after we processed those latest
# records we can just stop and not process other record. This will increase speed of each incremental stream
# which supports those 2 request parameters.
# which supports those 2 request parameters. Currently only `IssueMilestones` and `PullRequests` streams are
# supporting this.
is_sorted_descending = False

def __init__(self, start_date: str, **kwargs):
Expand Down Expand Up @@ -223,10 +252,16 @@ def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMa


class Assignees(GithubStream):
pass
"""
API docs: https://docs.github.com/en/rest/reference/issues#list-assignees
"""


class Reviews(GithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/pulls#list-reviews-for-a-pull-request
"""

fields_to_minimize = ("user",)

def path(
Expand All @@ -236,21 +271,31 @@ def path(
return f"repos/{self.repository}/pulls/{pull_request_number}/reviews"

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
pull_requests_stream = PullRequests(authenticator=self.authenticator, repository=self.repository, start_date="")
pull_requests_stream = PullRequestsAsc(authenticator=self.authenticator, repository=self.repository, start_date="")
for pull_request in pull_requests_stream.read_records(sync_mode=SyncMode.full_refresh):
yield {"pull_request_number": pull_request["number"]}


class Collaborators(GithubStream):
pass
"""
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved
API docs: https://docs.github.com/en/rest/reference/repos#list-repository-collaborators
"""


class IssueLabels(GithubStream):
"""
API docs: https://docs.github.com/en/free-pro-team@latest/rest/reference/issues#list-labels-for-a-repository
"""

def path(self, **kwargs) -> str:
return f"repos/{self.repository}/labels"


class Teams(GithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/teams#list-teams
"""

def path(self, **kwargs) -> str:
owner, _ = self.repository.split("/")
return f"orgs/{owner}/teams"
Expand All @@ -260,6 +305,10 @@ def path(self, **kwargs) -> str:


class Releases(SemiIncrementalGithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/repos#list-releases
"""

cursor_field = "created_at"
fields_to_minimize = ("author",)

Expand All @@ -275,6 +324,10 @@ def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]:


class Events(SemiIncrementalGithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/activity#list-repository-events
"""

cursor_field = "created_at"
fields_to_minimize = (
"actor",
Expand All @@ -283,7 +336,13 @@ class Events(SemiIncrementalGithubStream):
)


class PullRequests(SemiIncrementalGithubStream):
class PullRequestsBase(SemiIncrementalGithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/pulls#list-pull-requests
"""

name = "pull_requests"
page_size = 50
fields_to_minimize = (
"user",
"milestone",
Expand All @@ -293,15 +352,13 @@ class PullRequests(SemiIncrementalGithubStream):
"requested_reviewers",
"requested_teams",
)
stream_base_params = {
"state": "all",
"sort": "updated",
"direction": "desc",
}

def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
with vcr.use_cassette(cache_file.name, record_mode="new_episodes", serializer="json"):
yield from super().read_records(**kwargs)
# TODO Fix vcr error:
# UnicodeDecodeError: 'utf-8' codec can't decode byte 0x72 in position 1: invalid start byteDoes this HTTP
# interaction contain binary data? If so, use a different serializer (like the yaml serializer) for this request?
# def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
# with vcr.use_cassette(cache_file.name, record_mode="new_episodes", serializer="json"):
# yield from super().read_records(**kwargs)

def path(self, **kwargs) -> str:
return f"repos/{self.repository}/pulls"
Expand All @@ -324,14 +381,50 @@ def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
return record


class PullRequestsAsc(PullRequestsBase):
"""
API docs: https://docs.github.com/en/rest/reference/pulls#list-pull-requests
This class is used when this is the first sync (we don't have state yet).
"""

state_checkpoint_interval = PullRequestsBase.page_size
stream_base_params = {
"state": "all",
"sort": "updated",
}


class PullRequestsDesc(PullRequestsBase):
"""
API docs: https://docs.github.com/en/rest/reference/pulls#list-pull-requests
This class is used when this is not the first sync (we already have state).
"""

is_sorted_descending = True
stream_base_params = {
"state": "all",
"sort": "updated",
"direction": "desc",
}


class CommitComments(SemiIncrementalGithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/repos#list-commit-comments-for-a-repository
"""

fields_to_minimize = ("user",)

def path(self, **kwargs) -> str:
return f"repos/{self.repository}/comments"


class IssueMilestones(SemiIncrementalGithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/issues#list-milestones
"""

state_checkpoint_interval = SemiIncrementalGithubStream.page_size
is_sorted_descending = True
fields_to_minimize = ("creator",)
stream_base_params = {
Expand All @@ -345,6 +438,10 @@ def path(self, **kwargs) -> str:


class Stargazers(SemiIncrementalGithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/activity#list-stargazers
"""

primary_key = "user_id"
cursor_field = "starred_at"
fields_to_minimize = ("user",)
Expand All @@ -358,6 +455,10 @@ def request_headers(self, **kwargs) -> Mapping[str, Any]:


class Projects(SemiIncrementalGithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/projects#list-repository-projects
"""

fields_to_minimize = ("creator",)
stream_base_params = {
"state": "all",
Expand All @@ -372,6 +473,11 @@ def request_headers(self, **kwargs) -> Mapping[str, Any]:


class IssueEvents(SemiIncrementalGithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/issues#list-issue-events-for-a-repository
"""

cursor_field = "created_at"
fields_to_minimize = (
"actor",
"issue",
Expand All @@ -385,17 +491,23 @@ def path(self, **kwargs) -> str:


class Comments(IncrementalGithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/issues#list-issue-comments-for-a-repository
"""

fields_to_minimize = ("user",)
stream_base_params = {
"sort": "updated",
"direction": "desc",
}
page_size = 30 # `comments` is a large stream so it's better to set smaller page size.
state_checkpoint_interval = page_size

def path(self, **kwargs) -> str:
return f"repos/{self.repository}/issues/comments"


class Commits(IncrementalGithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/issues#list-issue-comments-for-a-repository
"""

primary_key = "sha"
cursor_field = "created_at"
fields_to_minimize = (
Expand All @@ -416,6 +528,12 @@ def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]:


class Issues(IncrementalGithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/issues#list-repository-issues
"""

page_size = 50 # `issues` is a large stream so it's better to set smaller page size.
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved
state_checkpoint_interval = page_size
fields_to_minimize = (
"user",
"assignee",
Expand All @@ -426,5 +544,5 @@ class Issues(IncrementalGithubStream):
stream_base_params = {
"state": "all",
"sort": "updated",
"direction": "desc",
"direction": "asc",
}
Loading