Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cloud availability updater: automatic PR creation #22568

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions .github/workflows/run-qa-engine.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ name: Run QA Engine

on:
workflow_dispatch:
schedule:
# 1pm UTC is 6am PDT.
# same time as Generate Build Report
- cron: "0 13 * * *"
# schedule:
## 1pm UTC is 6am PDT.
## same time as Generate Build Report
# - cron: "0 13 * * *"
Comment on lines +5 to +8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 disabled for now


jobs:
run-qa-engine:
Expand All @@ -28,5 +28,7 @@ jobs:
run: pip install --quiet -e ./tools/ci_connector_ops
- name: Run QA Engine
env:
QA_ENGINE_AIRBYTE_DATA_PROD_SA: '${{ secrets.QA_ENGINE_AIRBYTE_DATA_PROD_SA }}'
LOGLEVEL: INFO
QA_ENGINE_AIRBYTE_DATA_PROD_SA: "${{ secrets.QA_ENGINE_AIRBYTE_DATA_PROD_SA }}"
GITHUB_API_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
run: run-qa-engine
Original file line number Diff line number Diff line change
@@ -1,47 +1,58 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
alafanechere marked this conversation as resolved.
Show resolved Hide resolved
#


import os
import logging
from pathlib import Path
import shutil
import subprocess
from typing import Optional
import tempfile
from pathlib import Path
from typing import Iterable, Optional

import git
import requests

from .models import ConnectorQAReport
from .constants import (
AIRBYTE_CLOUD_GITHUB_REPO_URL,
AIRBYTE_CLOUD_MAIN_BRANCH_NAME
AIRBYTE_PLATFORM_INTERNAL_GITHUB_REPO_URL,
AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME,
AIRBYTE_PLATFORM_INTERNAL_PR_ENDPOINT,
AIRBYTE_PLATFORM_INTERNAL_REPO_OWNER,
GITHUB_API_COMMON_HEADERS,
)
from .models import ConnectorQAReport

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def clone_airbyte_cloud_repo(local_repo_path: Path) -> git.Repo:
logging.info(f"Cloning {AIRBYTE_CLOUD_GITHUB_REPO_URL} to {local_repo_path}")
return git.Repo.clone_from(AIRBYTE_CLOUD_GITHUB_REPO_URL, local_repo_path, branch=AIRBYTE_CLOUD_MAIN_BRANCH_NAME)
logger.info(f"Cloning {AIRBYTE_PLATFORM_INTERNAL_GITHUB_REPO_URL} to {local_repo_path}")
return git.Repo.clone_from(
AIRBYTE_PLATFORM_INTERNAL_GITHUB_REPO_URL, local_repo_path, branch=AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME
)
Comment on lines +30 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess we have to clone the platform repo to open a PR



def get_definitions_mask_path(local_repo_path, definition_type: str) -> Path:
definitions_mask_path = local_repo_path / f"cloud-config/cloud-config-seed/src/main/resources/seed/{definition_type}_definitions_mask.yaml"
definitions_mask_path = (
local_repo_path / f"cloud-config/cloud-config-seed/src/main/resources/seed/{definition_type}_definitions_mask.yaml"
)
if not definitions_mask_path.exists():
raise FileNotFoundError(f"Can't find the {definition_type} definitions mask")
return definitions_mask_path


def checkout_new_branch(airbyte_cloud_repo: git.Repo, new_branch_name: str) -> git.Head:
new_branch = airbyte_cloud_repo.create_head(new_branch_name)
new_branch.checkout()
logging.info(f"Checked out branch {new_branch_name}.")
logger.info(f"Checked out branch {new_branch_name}.")
return new_branch


def update_definitions_mask(connector: ConnectorQAReport, definitions_mask_path: Path) -> Optional[Path]:
with open(definitions_mask_path, "r") as definition_mask:
connector_already_in_mask = connector.connector_definition_id in definition_mask.read()
if connector_already_in_mask:
logging.warning(f"{connector.connector_name}'s definition id is already in {definitions_mask_path}.")
logger.warning(f"{connector.connector_name}'s definition id is already in {definitions_mask_path}.")
return None

to_append = f"""# {connector.connector_name} (from cloud availability updater)
Expand All @@ -50,31 +61,64 @@ def update_definitions_mask(connector: ConnectorQAReport, definitions_mask_path:

with open(definitions_mask_path, "a") as f:
f.write(to_append)
logging.info(f"Updated {definitions_mask_path} with {connector.connector_name}'s definition id.")
logger.info(f"Updated {definitions_mask_path} with {connector.connector_name}'s definition id.")
return definitions_mask_path


def run_generate_cloud_connector_catalog(airbyte_cloud_repo_path: Path) -> str:
result = subprocess.check_output(
f"cd {airbyte_cloud_repo_path} && ./gradlew :cloud-config:cloud-config-seed:generateCloudConnectorCatalog",
shell=True
)
logging.info("Ran generateCloudConnectorCatalog Gradle Task")
f"cd {airbyte_cloud_repo_path} && ./gradlew :cloud-config:cloud-config-seed:generateCloudConnectorCatalog", shell=True
)
logger.info("Ran generateCloudConnectorCatalog Gradle Task")
return result.decode()


def commit_all_files(airbyte_cloud_repo: git.Repo, commit_message: str):
airbyte_cloud_repo.git.add('--all')
airbyte_cloud_repo.git.add("--all")
airbyte_cloud_repo.git.commit(m=commit_message)
logging.info(f"Committed file changes.")
logger.info("Committed file changes.")


def push_branch(airbyte_cloud_repo: git.Repo, branch:str):
airbyte_cloud_repo.git.push("--set-upstream", "origin", branch)
logging.info(f"Pushed branch {branch} to origin")
def push_branch(airbyte_cloud_repo: git.Repo, branch: str):
airbyte_cloud_repo.git.push("--force", "--set-upstream", "origin", branch)
logger.info(f"Pushed branch {branch} to origin")

def deploy_new_connector_to_cloud_repo(
airbyte_cloud_repo_path: Path,
airbyte_cloud_repo: git.Repo,
connector: ConnectorQAReport
):

def pr_already_created_for_branch(head_branch: str) -> bool:
response = requests.get(
AIRBYTE_PLATFORM_INTERNAL_PR_ENDPOINT,
headers=GITHUB_API_COMMON_HEADERS,
params={"head": f"{AIRBYTE_PLATFORM_INTERNAL_REPO_OWNER}:{head_branch}", "state": "open"},
)
response.raise_for_status()
return len(response.json()) > 0


def create_pr(connector: ConnectorQAReport, branch: str) -> Optional[requests.Response]:
body = f"""The Cloud Availability Updater decided that it's the right time to make {connector.connector_name} available on Cloud!
Technical name: {connector.connector_technical_name}
Version: {connector.connector_version}
Definition ID: {connector.connector_definition_id}
OSS sync success rate: {connector.sync_success_rate}
OSS number of connections: {connector.number_of_connections}
alafanechere marked this conversation as resolved.
Show resolved Hide resolved
"""
data = {
"title": f"🤖 Add {connector.connector_technical_name} to cloud",
"body": body,
"head": branch,
"base": AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME,
}
if not pr_already_created_for_branch(branch):
response = requests.post(AIRBYTE_PLATFORM_INTERNAL_PR_ENDPOINT, headers=GITHUB_API_COMMON_HEADERS, json=data)
response.raise_for_status()
pr_url = response.json().get("url")
logger.info(f"A PR was opened for {connector.connector_technical_name}: {pr_url}")
return response
else:
logger.warning(f"A PR already exists for branch {branch}")


def deploy_new_connector_to_cloud_repo(airbyte_cloud_repo_path: Path, airbyte_cloud_repo: git.Repo, connector: ConnectorQAReport):
"""Updates the local definitions mask on Airbyte cloud repo.
Calls the generateCloudConnectorCatalog gradle task.
Commits these changes on a new branch.
Expand All @@ -85,15 +129,22 @@ def deploy_new_connector_to_cloud_repo(
airbyte_cloud_repo (git.Repo): The Airbyte Cloud repo instance.
connector (ConnectorQAReport): The connector to add to a definitions mask.
"""
airbyte_cloud_repo.git.checkout(AIRBYTE_CLOUD_MAIN_BRANCH_NAME)
airbyte_cloud_repo.git.checkout(AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME)
new_branch_name = f"cloud-availability-updater/deploy-{connector.connector_technical_name}"
checkout_new_branch(airbyte_cloud_repo, new_branch_name)
definitions_mask_path = get_definitions_mask_path(airbyte_cloud_repo_path, connector.connector_type)
update_definitions_mask(connector, definitions_mask_path)
run_generate_cloud_connector_catalog(airbyte_cloud_repo_path)
commit_all_files(
airbyte_cloud_repo,
f"🤖 Add {connector.connector_name} connector to cloud"
)
push_branch(airbyte_cloud_repo, new_branch_name)
airbyte_cloud_repo.git.checkout(AIRBYTE_CLOUD_MAIN_BRANCH_NAME)
updated_files = update_definitions_mask(connector, definitions_mask_path)
if updated_files:
run_generate_cloud_connector_catalog(airbyte_cloud_repo_path)
commit_all_files(airbyte_cloud_repo, f"🤖 Add {connector.connector_name} connector to cloud")
push_branch(airbyte_cloud_repo, new_branch_name)
create_pr(connector, new_branch_name)
airbyte_cloud_repo.git.checkout(AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME)


def deploy_eligible_connectors_to_cloud_repo(eligible_connectors: Iterable):
cloud_repo_path = Path(tempfile.mkdtemp())
airbyte_cloud_repo = clone_airbyte_cloud_repo(cloud_repo_path)
for connector in eligible_connectors:
deploy_new_connector_to_cloud_repo(cloud_repo_path, airbyte_cloud_repo, connector)
shutil.rmtree(cloud_repo_path)
26 changes: 20 additions & 6 deletions tools/ci_connector_ops/ci_connector_ops/qa_engine/constants.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import os

CONNECTOR_BUILD_OUTPUT_URL = "https://dnsgjos7lj2fu.cloudfront.net/tests/history/connectors"
CLOUD_CATALOG_URL = "https://storage.googleapis.com/prod-airbyte-cloud-connector-metadata-service/cloud_catalog.json"
OSS_CATALOG_URL = "https://storage.googleapis.com/prod-airbyte-cloud-connector-metadata-service/oss_catalog.json"

INAPPROPRIATE_FOR_CLOUD_USE_CONNECTORS = [
"8be1cf83-fde1-477f-a4ad-318d23c9f3c6", # Local CSV
"a625d593-bba5-4a1c-a53d-2d246268a816", # Local JSON
"b76be0a6-27dc-4560-95f6-2623da0bd7b6" # Local SQL Lite
"8be1cf83-fde1-477f-a4ad-318d23c9f3c6", # Local CSV
"a625d593-bba5-4a1c-a53d-2d246268a816", # Local JSON
"b76be0a6-27dc-4560-95f6-2623da0bd7b6", # Local SQL Lite
]

GCS_QA_REPORT_PATH = "gs://prod-airbyte-cloud-connector-metadata-service/qa_report.json"
AIRBYTE_CLOUD_GITHUB_REPO_URL = "https://github.com/airbytehq/airbyte-cloud.git"
AIRBYTE_CLOUD_MAIN_BRANCH_NAME = "master"
AIRBYTE_PLATFORM_INTERNAL_REPO_OWNER = "airbytehq"
AIRBYTE_PLATFORM_INTERNAL_REPO_NAME = "airbyte-platform-internal"
AIRBYTE_PLATFORM_INTERNAL_GITHUB_REPO_URL = (
f"https://github.com/{AIRBYTE_PLATFORM_INTERNAL_REPO_OWNER}/{AIRBYTE_PLATFORM_INTERNAL_REPO_NAME}.git"
)
AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME = "master"
AIRBYTE_PLATFORM_INTERNAL_PR_ENDPOINT = (
f"https://api.github.com/repos/{AIRBYTE_PLATFORM_INTERNAL_REPO_OWNER}/{AIRBYTE_PLATFORM_INTERNAL_REPO_NAME}/pulls"
)
GITHUB_API_TOKEN = os.environ.get("GITHUB_API_TOKEN")
GITHUB_API_COMMON_HEADERS = {
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
"Authorization": f"Bearer {GITHUB_API_TOKEN}",
}
26 changes: 18 additions & 8 deletions tools/ci_connector_ops/ci_connector_ops/qa_engine/main.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging

from . import cloud_availability_updater, enrichments, inputs, validations
from .constants import CLOUD_CATALOG_URL, OSS_CATALOG_URL
from . import enrichments, inputs, validations

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger(__name__)


def main():
logger.info("Fetch the OSS connectors catalog.")
oss_catalog = inputs.fetch_remote_catalog(OSS_CATALOG_URL)
logger.info("Fetch the Cloud connectors catalog.")
cloud_catalog = inputs.fetch_remote_catalog(CLOUD_CATALOG_URL)
logger.info("Fetch adoption metrics.")
adoption_metrics_per_connector_version = inputs.fetch_adoption_metrics_per_connector_version()
enriched_catalog = enrichments.get_enriched_catalog(
oss_catalog,
cloud_catalog,
adoption_metrics_per_connector_version
)
validations.get_qa_report(enriched_catalog, len(oss_catalog))
logger.info("Start the enriched catalog generation.")
enriched_catalog = enrichments.get_enriched_catalog(oss_catalog, cloud_catalog, adoption_metrics_per_connector_version)
logger.info("Start the QA report generation.")
qa_report = validations.get_qa_report(enriched_catalog, len(oss_catalog))
logger.info("Start the QA report generation.")
eligible_connectors = validations.get_connectors_eligible_for_cloud(qa_report)
logger.info("Start eligible connectors deployment to Cloud.")
cloud_availability_updater.deploy_eligible_connectors_to_cloud_repo(eligible_connectors)
43 changes: 24 additions & 19 deletions tools/ci_connector_ops/ci_connector_ops/qa_engine/validations.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,50 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from datetime import datetime
from typing import Iterable
from typing import List

import pandas as pd
import requests

from .constants import INAPPROPRIATE_FOR_CLOUD_USE_CONNECTORS
from .inputs import BUILD_STATUSES, fetch_latest_build_status_for_connector_version
from .models import ConnectorQAReport, QAReport
from .inputs import fetch_latest_build_status_for_connector_version, BUILD_STATUSES

TRUTHY_COLUMNS_TO_BE_ELIGIBLE = [
"documentation_is_available",
"is_appropriate_for_cloud_use",
"latest_build_is_successful"
]
logger = logging.getLogger(__name__)


TRUTHY_COLUMNS_TO_BE_ELIGIBLE = ["documentation_is_available", "is_appropriate_for_cloud_use", "latest_build_is_successful"]


class QAReportGenerationError(Exception):
pass


def url_is_reachable(url: str) -> bool:
response = requests.get(url)
return response.status_code == 200


def is_appropriate_for_cloud_use(definition_id: str) -> bool:
return definition_id not in INAPPROPRIATE_FOR_CLOUD_USE_CONNECTORS


def is_eligible_for_promotion_to_cloud(connector_qa_data: pd.Series) -> bool:
if connector_qa_data["is_on_cloud"]:
return False
return all([
connector_qa_data[col]
for col in TRUTHY_COLUMNS_TO_BE_ELIGIBLE
])
if connector_qa_data["is_on_cloud"]:
return False
return all([connector_qa_data[col] for col in TRUTHY_COLUMNS_TO_BE_ELIGIBLE])


def latest_build_is_successful(connector_qa_data: pd.Series) -> bool:
connector_technical_name = connector_qa_data["connector_technical_name"]
connector_version = connector_qa_data["connector_version"]
latest_build_status = fetch_latest_build_status_for_connector_version(connector_technical_name, connector_version)
return latest_build_status == BUILD_STATUSES.SUCCESS


def get_qa_report(enriched_catalog: pd.DataFrame, oss_catalog_length: int) -> pd.DataFrame:
"""Perform validation steps on top of the enriched catalog.
Adds the following columns:
Expand Down Expand Up @@ -74,13 +77,15 @@ def get_qa_report(enriched_catalog: pd.DataFrame, oss_catalog_length: int) -> pd
qa_report["report_generation_datetime"] = datetime.utcnow()

# Only select dataframe columns defined in the ConnectorQAReport model.
qa_report= qa_report[[field.name for field in ConnectorQAReport.__fields__.values()]]
qa_report = qa_report[[field.name for field in ConnectorQAReport.__fields__.values()]]
# Validate the report structure with pydantic QAReport model.
QAReport(connectors_qa_report=qa_report.to_dict(orient="records"))
if len(qa_report) != oss_catalog_length:
raise QAReportGenerationError("The QA report does not contain all the connectors defined in the OSS catalog.")
raise QAReportGenerationError("The QA report does not contain all the connectors defined in the OSS catalog.")
return qa_report

def get_connectors_eligible_for_cloud(qa_report: pd.DataFrame) -> Iterable[ConnectorQAReport]:
for _, row in qa_report[qa_report["is_eligible_for_promotion_to_cloud"]].iterrows():
yield ConnectorQAReport(**row)

def get_connectors_eligible_for_cloud(qa_report: pd.DataFrame) -> List[ConnectorQAReport]:
eligible_connectors = [ConnectorQAReport(**row) for _, row in qa_report[qa_report["is_eligible_for_promotion_to_cloud"]].iterrows()]
logger.info(f"{len(eligible_connectors)} connectors are eligible for Cloud.")
return eligible_connectors
6 changes: 3 additions & 3 deletions tools/ci_connector_ops/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"pandas-gbq~=0.19.0",
"pydantic~=1.10.4",
"fsspec~=2023.1.0",
"gcsfs~=2023.1.0"
"gcsfs~=2023.1.0",
]

TEST_REQUIREMENTS = [
Expand All @@ -21,7 +21,7 @@
]

setup(
version="0.1.10",
version="0.1.11",
name="ci_connector_ops",
description="Packaged maintained by the connector operations team to perform CI for connectors",
author="Airbyte",
Expand All @@ -40,7 +40,7 @@
"print-mandatory-reviewers = ci_connector_ops.acceptance_test_config_checks:print_mandatory_reviewers",
"allowed-hosts-checks = ci_connector_ops.allowed_hosts_checks:check_allowed_hosts",
"run-qa-engine = ci_connector_ops.qa_engine.main:main",
"run-qa-checks = ci_connector_ops.qa_checks:run_qa_checks"
"run-qa-checks = ci_connector_ops.qa_checks:run_qa_checks",
],
},
)
Loading