Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Github check connection for organizations with large number of re… #8170

Merged
merged 3 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ repos:
hooks:
- id: prettier
types_or: [yaml, json]
exclude: ^.github/
exclude: |
(?x)^.*(
.github/|
source_specs.yaml
).?$

- repo: https://gitlab.com/pycqa/flake8
rev: 3.8.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@
- name: GitHub
sourceDefinitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerRepository: airbyte/source-github
dockerImageTag: 0.2.4
dockerImageTag: 0.2.5
documentationUrl: https://docs.airbyte.io/integrations/sources/github
icon: github.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1643,7 +1643,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-github:0.2.4"
- dockerImage: "airbyte/source-github:0.2.5"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/github"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-github/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.version=0.2.5
LABEL io.airbyte.name=airbyte/source-github
5 changes: 1 addition & 4 deletions airbyte-integrations/connectors/source-github/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
"vcrpy==4.1.1",
]

TEST_REQUIREMENTS = [
"pytest~=6.1",
"source-acceptance-test",
]
TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "responses==0.13.3"]

setup(
name="source_github",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,40 @@
)

TOKEN_SEPARATOR = ","
# To scan all the repos within orgnaization, organization name could be
# specified by using asteriks i.e. "airbytehq/*"
ORGANIZATION_PATTERN = re.compile("^.*/\\*$")


class SourceGithub(AbstractSource):
@staticmethod
def _generate_repositories(config: Mapping[str, Any], authenticator: MultipleTokenAuthenticator) -> List[str]:
def _generate_repositories(config: Mapping[str, Any], authenticator: MultipleTokenAuthenticator) -> Tuple[List[str], List[str]]:
"""
Parse repositories config line and produce two lists of repositories.
Args:
config (dict): Dict representing connector's config
authenticator(MultipleTokenAuthenticator): authenticator object
Returns:
Tuple[List[str], List[str]]: Tuple of two lists: first representing
repositories directly mentioned in config and second is
organization repositories from orgs/{org}/repos request.
"""
repositories = list(filter(None, config["repository"].split(" ")))

if not repositories:
raise Exception("Field `repository` required to be provided for connect to Github API")

repositories_list = [repo for repo in repositories if not re.match("^.*/\\*$", repo)]
repositories_list: set = {repo for repo in repositories if not ORGANIZATION_PATTERN.match(repo)}
organizations = [org.split("/")[0] for org in repositories if org not in repositories_list]
organisation_repos = set()
if organizations:
repos = Repositories(authenticator=authenticator, organizations=organizations)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment that states that if we receive data about an organization, then there is no need to check for each of its repositories. And add this link: https://docs.github.com/en/developers/apps/building-oauth-apps/scopes-for-oauth-apps#available-scopes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for stream in repos.stream_slices(sync_mode=SyncMode.full_refresh):
repositories_list += [r["full_name"] for r in repos.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream)]
organisation_repos = organisation_repos.union(
{r["full_name"] for r in repos.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream)}
)

return list(set(repositories_list))
return list(repositories_list), list(organisation_repos)

@staticmethod
def _get_authenticator(config: Dict[str, Any]):
Expand Down Expand Up @@ -114,21 +130,28 @@ def _get_branches_data(selected_branches: str, full_refresh_args: Dict[str, Any]
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
try:
authenticator = self._get_authenticator(config)
repositories = self._generate_repositories(config=config, authenticator=authenticator)
# In case of getting repository list for given organization was
# successfull no need of checking stats for every repository within
# that organization.
# Since we have "repo" scope requested it should grant access to private repos as well:
# https://docs.github.com/en/developers/apps/building-oauth-apps/scopes-for-oauth-apps#available-scopes
repositories, _ = self._generate_repositories(config=config, authenticator=authenticator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • if the user provides only organization repos (repositories would be empty?), then the check_connection doesn't check anything?
  • if repositories is a very long list of repo names, we still have the same issue?

Wouldn't it be better to limit the for loop at instead?

for stream_slice in repository_stats_stream.stream_slices(sync_mode=SyncMode.full_refresh):

or at least limit to one public and one private from repositories, and maybe one for each organization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the user provides only organization repos (repositories would be empty?), then the check_connection doesn't check anything?

No, in this case it would try to get repo list from organization, pls look at unittest there is a case for that.

if repositories is a very long list of repo names, we still have the same issue?

Yes, but this was an issue with organization with a lot of repos within. In case if we get large list of repos it could be from different accounts and organizations and we should check all of them.

Copy link
Contributor Author

@avida avida Nov 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to limit the for loop at instead? or at least limit to one public and one private from repositories, and maybe one for each organization?

In this case if user provide some private repo he has no access, check could falsely success. In this commit I check every single repo and organization (without checking repo within org cause "repo" scope should enable us to access every repo for that org). This fix making minimal sufficient requests to check that further read wont fail.

As an optimization we could group single repo by organization but this defect is not about long list of repos, its about organization with a lot of repos.


repository_stats_stream = RepositoryStats(
authenticator=authenticator,
repositories=repositories,
)
for stream_slice in repository_stats_stream.stream_slices(sync_mode=SyncMode.full_refresh):
next(repository_stats_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
next(repository_stats_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice), None)
return True, None
except Exception as e:
return False, repr(e)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = self._get_authenticator(config)
repositories = self._generate_repositories(config=config, authenticator=authenticator)
repos, organization_repos = self._generate_repositories(config=config, authenticator=authenticator)
repositories = repos + organization_repos

organizations = list({org.split("/")[0] for org in repositories})
full_refresh_args = {"authenticator": authenticator, "repositories": repositories}
incremental_args = {**full_refresh_args, "start_date": config["start_date"]}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from unittest.mock import MagicMock

import responses
from airbyte_cdk.models import AirbyteConnectionStatus, Status
from source_github.source import SourceGithub


def check_source(repo_line: str) -> AirbyteConnectionStatus:
source = SourceGithub()
config = {"access_token": "test_token", "repository": repo_line}
logger_mock = MagicMock()
return source.check(logger_mock, config)


@responses.activate
def test_check_connection_repos_only():
responses.add("GET", "https://api.github.com/repos/airbyte", json={})

status = check_source("airbyte airbyte airbyte")
assert not status.message
assert status.status == Status.SUCCEEDED
# Only one request since 3 repos have same name
assert len(responses.calls) == 1


@responses.activate
def test_check_connection_repos_and_org_repos():
repos = [{"name": f"name {i}", "full_name": f"full name {i}"} for i in range(1000)]
responses.add("GET", "https://api.github.com/repos/airbyte/test", json={})
responses.add("GET", "https://api.github.com/repos/airbyte/test2", json={})
responses.add("GET", "https://api.github.com/orgs/airbytehq/repos", json=repos)
responses.add("GET", "https://api.github.com/orgs/org/repos", json=repos)

status = check_source("airbyte/test airbyte/test2 airbytehq/* org/*")
assert not status.message
assert status.status == Status.SUCCEEDED
# Two requests for repos and two for organization
assert len(responses.calls) == 4


@responses.activate
def test_check_connection_org_only():
repos = [{"name": f"name {i}", "full_name": f"full name {i}"} for i in range(1000)]
responses.add("GET", "https://api.github.com/orgs/airbytehq/repos", json=repos)

status = check_source("airbytehq/*")
assert not status.message
assert status.status == Status.SUCCEEDED
# One request to check organization
assert len(responses.calls) == 1
1 change: 1 addition & 0 deletions docs/integrations/sources/github.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Your token should have at least the `repo` scope. Depending on which streams you

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.2.5 | 2021-11-21 | [8170](https://github.com/airbytehq/airbyte/pull/8170) | Fix slow check connection for organizations with a lot of repos |
| 0.2.4 | 2021-11-11 | [7856](https://github.com/airbytehq/airbyte/pull/7856) | Resolve $ref fields in some stream schemas |
| 0.2.3 | 2021-10-06 | [6833](https://github.com/airbytehq/airbyte/pull/6833) | Fix config backward compatability |
| 0.2.2 | 2021-10-05 | [6761](https://github.com/airbytehq/airbyte/pull/6761) | Add oauth worflow specification |
Expand Down