diff --git a/pr_agent/settings/configuration.toml b/pr_agent/settings/configuration.toml index f8abd555a..e9c59f31b 100644 --- a/pr_agent/settings/configuration.toml +++ b/pr_agent/settings/configuration.toml @@ -25,9 +25,12 @@ automatic_review=true extra_instructions = "" [pr_description] # /describe # +publish_labels=true publish_description_as_comment=false add_original_user_description=false keep_original_user_title=false +use_description_markers=false +include_generated_by_header=true extra_instructions = "" [pr_questions] # /ask # diff --git a/pr_agent/tools/pr_description.py b/pr_agent/tools/pr_description.py index c45917f4e..585eb2619 100644 --- a/pr_agent/tools/pr_description.py +++ b/pr_agent/tools/pr_description.py @@ -1,5 +1,6 @@ import copy import json +import re import logging from typing import List, Tuple @@ -28,6 +29,7 @@ def __init__(self, pr_url: str, args: list = None): self.main_pr_language = get_main_pr_language( self.git_provider.get_languages(), self.git_provider.get_files() ) + self.pr_id = f"{self.git_provider.repo}/{self.git_provider.pr_num}" # Initialize the AI handler self.ai_handler = AiHandler() @@ -61,22 +63,34 @@ async def run(self): """ Generates a PR description using an AI model and publishes it to the PR. """ - logging.info('Generating a PR description...') + logging.info(f"Generating a PR description {self.pr_id}") if get_settings().config.publish_output: self.git_provider.publish_comment("Preparing pr description...", is_temporary=True) - + await retry_with_fallback_models(self._prepare_prediction) - - logging.info('Preparing answer...') - pr_title, pr_body, pr_types, markdown_text = self._prepare_pr_answer() - + + logging.info(f"Preparing answer {self.pr_id}") + if self.prediction: + self._prepare_data() + else: + return None + + if get_settings().pr_description.publish_labels: + pr_types = self._prepare_types() + + if get_settings().pr_description.use_description_markers: + logging.info(f"Using description marker replacements {self.pr_id}") + pr_title, pr_body = self._prepare_marker_replacements() + else: + pr_title, pr_body, markdown_text = self._prepare_pr_answer() + if get_settings().config.publish_output: - logging.info('Pushing answer...') + logging.info(f"Pushing answer {self.pr_id}") if get_settings().pr_description.publish_description_as_comment: self.git_provider.publish_comment(markdown_text) else: self.git_provider.publish_description(pr_title, pr_body) - if self.git_provider.is_supported("get_labels"): + if get_settings().pr_description.publish_labels and self.git_provider.is_supported("get_labels"): current_labels = self.git_provider.get_labels() if current_labels is None: current_labels = [] @@ -99,9 +113,12 @@ async def _prepare_prediction(self, model: str) -> None: Any exceptions raised by the 'get_pr_diff' and '_get_prediction' functions. """ - logging.info('Getting PR diff...') + if get_settings().pr_description.use_description_markers and 'pr_agent:' not in self.user_description: + return None + + logging.info(f"Getting PR diff {self.pr_id}") self.patches_diff = get_pr_diff(self.git_provider, self.token_handler, model) - logging.info('Getting AI prediction...') + logging.info(f"Getting AI prediction {self.pr_id}") self.prediction = await self._get_prediction(model) async def _get_prediction(self, model: str) -> str: @@ -134,34 +151,71 @@ async def _get_prediction(self, model: str) -> str: return response - def _prepare_pr_answer(self) -> Tuple[str, str, List[str], str]: - """ - Prepare the PR description based on the AI prediction data. - Returns: - - title: a string containing the PR title. - - pr_body: a string containing the PR body in a markdown format. - - pr_types: a list of strings containing the PR types. - - markdown_text: a string containing the AI prediction data in a markdown format. used for publishing a comment - """ + def _prepare_data(self): # Load the AI prediction data into a dictionary - data = load_yaml(self.prediction.strip()) + self.data = load_yaml(self.prediction.strip()) if get_settings().pr_description.add_original_user_description and self.user_description: - data["User Description"] = self.user_description + self.data["User Description"] = self.user_description + - # Initialization + def _prepare_types(self) -> List[str]: pr_types = [] # If the 'PR Type' key is present in the dictionary, split its value by comma and assign it to 'pr_types' - if 'PR Type' in data: - if type(data['PR Type']) == list: - pr_types = data['PR Type'] - elif type(data['PR Type']) == str: - pr_types = data['PR Type'].split(',') + if 'PR Type' in self.data: + if type(self.data['PR Type']) == list: + pr_types = self.data['PR Type'] + elif type(self.data['PR Type']) == str: + pr_types = self.data['PR Type'].split(',') + + return pr_types + + def _prepare_marker_replacements(self) -> Tuple[str, str]: + title = self.vars["title"] + body = self.user_description + if get_settings().pr_description.include_generated_by_header: + ai_header = f"### 🤖 Generated by PR Agent at {self.git_provider.last_commit_id.sha}\n\n" + else: + ai_header = "" + + ai_summary = self.data.get('PR Description') + if ai_summary and not re.search(r'', body): + summary = f"{ai_header}{ai_summary}" + body = body.replace('pr_agent:summary', summary) + + if not re.search(r'', body): + ai_walkthrough = self.data.get('PR Main Files Walkthrough') + if ai_walkthrough: + walkthrough = str(ai_header) + for file in ai_walkthrough: + filename = file['filename'].replace("'", "`") + description = file['changes in file'].replace("'", "`") + walkthrough += f'- `{filename}`: {description}\n' + + body = body.replace('pr_agent:walkthrough', walkthrough) + + return title, body + + def _prepare_pr_answer(self) -> Tuple[str, str, str]: + """ + Prepare the PR description based on the AI prediction data. + + Returns: + - title: a string containing the PR title. + - pr_body: a string containing the PR description body in a markdown format. + - markdown_text: a string containing the AI prediction data in a markdown format. used for publishing a comment + """ + + # Iterate over the dictionary items and append the key and value to 'markdown_text' in a markdown format + markdown_text = "" + for key, value in self.data.items(): + markdown_text += f"## {key}\n\n" + markdown_text += f"{value}\n\n" # Remove the 'PR Title' key from the dictionary - ai_title = data.pop('PR Title') + ai_title = self.data.pop('PR Title', self.vars["title"]) if get_settings().pr_description.keep_original_user_title: # Assign the original PR title to the 'title' variable title = self.vars["title"] @@ -172,7 +226,7 @@ def _prepare_pr_answer(self) -> Tuple[str, str, List[str], str]: # Iterate over the remaining dictionary items and append the key and value to 'pr_body' in a markdown format, # except for the items containing the word 'walkthrough' pr_body = "" - for idx, (key, value) in enumerate(data.items()): + for idx, (key, value) in enumerate(self.data.items()): pr_body += f"## {key}:\n" if 'walkthrough' in key.lower(): # for filename, description in value.items(): @@ -185,7 +239,7 @@ def _prepare_pr_answer(self) -> Tuple[str, str, List[str], str]: if type(value) == list: value = ', '.join(v for v in value) pr_body += f"{value}\n" - if idx < len(data) - 1: + if idx < len(self.data) - 1: pr_body += "\n___\n" markdown_text = f"## Title\n\n{title}\n\n___\n{pr_body}" @@ -193,4 +247,4 @@ def _prepare_pr_answer(self) -> Tuple[str, str, List[str], str]: if get_settings().config.verbosity_level >= 2: logging.info(f"title:\n{title}\n{pr_body}") - return title, pr_body, pr_types, markdown_text \ No newline at end of file + return title, pr_body, markdown_text