Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for markers in description #273

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pr_agent/settings/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ automatic_review=true
extra_instructions = ""

[pr_description] # /describe #
publish_labels=true
publish_description_as_comment=false
add_original_user_description=false
keep_original_user_title=false
use_description_markers=false
include_generated_by_header=true
extra_instructions = ""

[pr_questions] # /ask #
Expand Down
116 changes: 85 additions & 31 deletions pr_agent/tools/pr_description.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import json
import re
import logging
from typing import List, Tuple

Expand Down Expand Up @@ -28,6 +29,7 @@ def __init__(self, pr_url: str, args: list = None):
self.main_pr_language = get_main_pr_language(
self.git_provider.get_languages(), self.git_provider.get_files()
)
self.pr_id = f"{self.git_provider.repo}/{self.git_provider.pr_num}"

# Initialize the AI handler
self.ai_handler = AiHandler()
Expand Down Expand Up @@ -61,22 +63,34 @@ async def run(self):
"""
Generates a PR description using an AI model and publishes it to the PR.
"""
logging.info('Generating a PR description...')
logging.info(f"Generating a PR description {self.pr_id}")
if get_settings().config.publish_output:
self.git_provider.publish_comment("Preparing pr description...", is_temporary=True)

await retry_with_fallback_models(self._prepare_prediction)

logging.info('Preparing answer...')
pr_title, pr_body, pr_types, markdown_text = self._prepare_pr_answer()


logging.info(f"Preparing answer {self.pr_id}")
if self.prediction:
self._prepare_data()
else:
return None

if get_settings().pr_description.publish_labels:
pr_types = self._prepare_types()

if get_settings().pr_description.use_description_markers:
logging.info(f"Using description marker replacements {self.pr_id}")
pr_title, pr_body = self._prepare_marker_replacements()
else:
pr_title, pr_body, markdown_text = self._prepare_pr_answer()

if get_settings().config.publish_output:
logging.info('Pushing answer...')
logging.info(f"Pushing answer {self.pr_id}")
if get_settings().pr_description.publish_description_as_comment:
self.git_provider.publish_comment(markdown_text)
else:
self.git_provider.publish_description(pr_title, pr_body)
if self.git_provider.is_supported("get_labels"):
if get_settings().pr_description.publish_labels and self.git_provider.is_supported("get_labels"):
current_labels = self.git_provider.get_labels()
if current_labels is None:
current_labels = []
Expand All @@ -99,9 +113,12 @@ async def _prepare_prediction(self, model: str) -> None:
Any exceptions raised by the 'get_pr_diff' and '_get_prediction' functions.

"""
logging.info('Getting PR diff...')
if get_settings().pr_description.use_description_markers and 'pr_agent:' not in self.user_description:
return None

logging.info(f"Getting PR diff {self.pr_id}")
self.patches_diff = get_pr_diff(self.git_provider, self.token_handler, model)
logging.info('Getting AI prediction...')
logging.info(f"Getting AI prediction {self.pr_id}")
self.prediction = await self._get_prediction(model)

async def _get_prediction(self, model: str) -> str:
Expand Down Expand Up @@ -134,34 +151,71 @@ async def _get_prediction(self, model: str) -> str:

return response

def _prepare_pr_answer(self) -> Tuple[str, str, List[str], str]:
"""
Prepare the PR description based on the AI prediction data.

Returns:
- title: a string containing the PR title.
- pr_body: a string containing the PR body in a markdown format.
- pr_types: a list of strings containing the PR types.
- markdown_text: a string containing the AI prediction data in a markdown format. used for publishing a comment
"""
def _prepare_data(self):
# Load the AI prediction data into a dictionary
data = load_yaml(self.prediction.strip())
self.data = load_yaml(self.prediction.strip())

if get_settings().pr_description.add_original_user_description and self.user_description:
data["User Description"] = self.user_description
self.data["User Description"] = self.user_description


# Initialization
def _prepare_types(self) -> List[str]:
pr_types = []

# If the 'PR Type' key is present in the dictionary, split its value by comma and assign it to 'pr_types'
if 'PR Type' in data:
if type(data['PR Type']) == list:
pr_types = data['PR Type']
elif type(data['PR Type']) == str:
pr_types = data['PR Type'].split(',')
if 'PR Type' in self.data:
if type(self.data['PR Type']) == list:
pr_types = self.data['PR Type']
elif type(self.data['PR Type']) == str:
pr_types = self.data['PR Type'].split(',')

return pr_types

def _prepare_marker_replacements(self) -> Tuple[str, str]:
title = self.vars["title"]
body = self.user_description
if get_settings().pr_description.include_generated_by_header:
ai_header = f"### 🤖 Generated by PR Agent at {self.git_provider.last_commit_id.sha}\n\n"
else:
ai_header = ""

ai_summary = self.data.get('PR Description')
if ai_summary and not re.search(r'<!--\s*pr_agent:summary\s*-->', body):
summary = f"{ai_header}{ai_summary}"
body = body.replace('pr_agent:summary', summary)

if not re.search(r'<!--\s*pr_agent:walkthrough\s*-->', body):
ai_walkthrough = self.data.get('PR Main Files Walkthrough')
if ai_walkthrough:
walkthrough = str(ai_header)
for file in ai_walkthrough:
filename = file['filename'].replace("'", "`")
description = file['changes in file'].replace("'", "`")
walkthrough += f'- `{filename}`: {description}\n'

body = body.replace('pr_agent:walkthrough', walkthrough)

return title, body

def _prepare_pr_answer(self) -> Tuple[str, str, str]:
"""
Prepare the PR description based on the AI prediction data.

Returns:
- title: a string containing the PR title.
- pr_body: a string containing the PR description body in a markdown format.
- markdown_text: a string containing the AI prediction data in a markdown format. used for publishing a comment
"""

# Iterate over the dictionary items and append the key and value to 'markdown_text' in a markdown format
markdown_text = ""
for key, value in self.data.items():
markdown_text += f"## {key}\n\n"
markdown_text += f"{value}\n\n"

# Remove the 'PR Title' key from the dictionary
ai_title = data.pop('PR Title')
ai_title = self.data.pop('PR Title', self.vars["title"])
if get_settings().pr_description.keep_original_user_title:
# Assign the original PR title to the 'title' variable
title = self.vars["title"]
Expand All @@ -172,7 +226,7 @@ def _prepare_pr_answer(self) -> Tuple[str, str, List[str], str]:
# Iterate over the remaining dictionary items and append the key and value to 'pr_body' in a markdown format,
# except for the items containing the word 'walkthrough'
pr_body = ""
for idx, (key, value) in enumerate(data.items()):
for idx, (key, value) in enumerate(self.data.items()):
pr_body += f"## {key}:\n"
if 'walkthrough' in key.lower():
# for filename, description in value.items():
Expand All @@ -185,12 +239,12 @@ def _prepare_pr_answer(self) -> Tuple[str, str, List[str], str]:
if type(value) == list:
value = ', '.join(v for v in value)
pr_body += f"{value}\n"
if idx < len(data) - 1:
if idx < len(self.data) - 1:
pr_body += "\n___\n"

markdown_text = f"## Title\n\n{title}\n\n___\n{pr_body}"

if get_settings().config.verbosity_level >= 2:
logging.info(f"title:\n{title}\n{pr_body}")

return title, pr_body, pr_types, markdown_text
return title, pr_body, markdown_text
Loading