From 88b3c9adfd3a4acdb89ac99fe7662a873e96f719 Mon Sep 17 00:00:00 2001 From: Nika Smilga <42929200+smilni@users.noreply.github.com> Date: Thu, 16 Nov 2023 12:48:36 +0300 Subject: [PATCH] =?UTF-8?q?fix:=20combined=20id=20of=20docs=20that=20are?= =?UTF-8?q?=20used=20together=20added=20to=20atts;=20fixed=20=E2=80=A6=20(?= =?UTF-8?q?#123)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: combined id of docs that are used together added to atts; fixed earlier generation retrieval for multiple docs * add check that there are multiple docs to get combination id; remove extra logs * concatenated ids of files -> list of ids of files * improve set_correct_type_and_id func for cases where no id is needed --- annotators/doc_processor/README.md | 16 +- annotators/doc_processor/server.py | 18 +- annotators/doc_processor/utils.py | 15 +- .../scenario/response.py | 163 ++++++++++-------- .../scenario/utils.py | 46 +++-- 5 files changed, 165 insertions(+), 93 deletions(-) diff --git a/annotators/doc_processor/README.md b/annotators/doc_processor/README.md index c18e2f23f..57f88a835 100644 --- a/annotators/doc_processor/README.md +++ b/annotators/doc_processor/README.md @@ -11,7 +11,19 @@ Here is an example of what Document Processor may add to the dialog state: { "human": { "attributes": { - "documents_in_use" = ["nlkr09lnvJ_7ed546db9846ba7661ceda123837f7fc", "kKmcdwiow9_7ed546db9846ba7661ceda123837f7fc"] + "documents_in_use" = ["nlkr09lnvJ_7ed546db9846ba7661ceda123837f7fc", "kKmcdwiow9_7ed546db9846ba7661ceda123837f7fc"], + "documents_combination_ids" = { + "LKNpck0nke_7ed546db9846ba7661ceda123837f7f": + [ + "nlkr09lnvJ_7ed546db9846ba7661ceda123837f7fc", + "kKmcdwiow9_7ed546db9846ba7661ceda123837f7fc" + ], + "kfmIOJkm9e_7ed546db9846ba7661ceda123837f7f": + [ + "nlkr09lnvJ_7ed546db9846ba7661ceda123837f7fc", + "lrfmovor99_jdcn096db9846ba681ceda398kewn93" + ] + } "processed_documents" = { "nlkr09lnvJ_7ed546db9846ba7661ceda123837f7fc": { @@ -39,6 +51,8 @@ Here is an example of what Document Processor may add to the dialog state: `documents_in_use` are the documents that are being discussed on this step of the dialog. These are typically the documents specified in the attributes of the last human utterance or the arguments of doc-processor docker container. +`documents_combination_ids` is a dictionary mapping combination id with ids of documents that this combination includes. Each docs (2 and more) that were ever used together in `documents_in_use` have their own combination id and are stored in `documents_combination_ids`. + `processed_documents` are all documents that were given by the user during the dialog and processed by system with all the information available about these documents. `processed_documents` always include `documents_in_use` and may include previously discussed documents if there are any. ## Parameters diff --git a/annotators/doc_processor/server.py b/annotators/doc_processor/server.py index 93ff8d2e7..c9ce649d7 100644 --- a/annotators/doc_processor/server.py +++ b/annotators/doc_processor/server.py @@ -38,6 +38,9 @@ def process_and_upload_doc(): bot_utts = dialog.get("bot_utterances", []) docs_in_atts = human_utts[-1].get("attributes", {}).get("documents", []) all_docs_info = dialog.get("human", {}).get("attributes", {}).get("processed_documents", {}) + docs_combination_ids_info = ( + dialog.get("human", {}).get("attributes", {}).get("documents_combination_ids", {}) + ) docs_in_use_info = dialog.get("human", {}).get("attributes", {}).get("documents_in_use", []) # even if we reset the dialog, we may still get some old files in docs_in_use # thus, for a new dialog, we manually reset docs_in_use @@ -46,17 +49,21 @@ def process_and_upload_doc(): # check if we got sth from attributes (docs_in_atts) or arguments (DOC_PATHS_OR_LINKS) # if these docs were not processed yet, process them and upload to file server # if these docs were already processed, just reset n_steps_discussed - new_docs_in_use_info, new_docs_info = upload_documents_save_info( + new_docs_in_use_info, new_docs_info, docs_combination_ids_new = upload_documents_save_info( docs_in_atts, DOC_PATHS_OR_LINKS, all_docs_info, docs_in_use_info, dialog_id ) - # update dicts to be used in human_attributes with new info - # for docs_in_use, if we got new docs, forget the old ones + # update dicts to be used in human_attributes with new info for docs_in_use + # and new combination id for these new docs_in_use + # if we got new docs, remove the old ones from docs_in_use_info if new_docs_in_use_info: docs_in_use_info = new_docs_in_use_info - all_docs_info.update(new_docs_info) + docs_combination_ids_info.update(docs_combination_ids_new) # only update attributes if we received some documents if new_docs_info: + all_docs_info.update(new_docs_info) logger.info("Received and processed new document(s).") + # if no new documents received, we can either leave the attributes as they are + # or in some cases clear active documents if we don't want to continue discussing them else: # check if document is being discussed for too long; if yes, clear docs_in_use_info # do not check that if we have any document in attributes @@ -73,12 +80,13 @@ def process_and_upload_doc(): docs_in_use_info.clear() logger.info( f"No skills using docs active for {N_TURNS_TO_KEEP_DOC} turns. \ - Remove all docs from active memory." +Remove all docs from active memory." ) human_atts = { "human_attributes": { "processed_documents": all_docs_info, "documents_in_use": docs_in_use_info, + "documents_combination_ids": docs_combination_ids_info, } } attributes_to_add.append(human_atts) diff --git a/annotators/doc_processor/utils.py b/annotators/doc_processor/utils.py index 29a6a2123..66d9a2eec 100644 --- a/annotators/doc_processor/utils.py +++ b/annotators/doc_processor/utils.py @@ -244,7 +244,7 @@ def get_docs_to_process(all_docs_to_check: List[str], all_docs_info: dict, docs_ def upload_documents_save_info( docs_in_atts: List[str], doc_paths_or_links: List[str], all_docs_info: dict, docs_in_use_info: dict, dialog_id: str -) -> Tuple[dict, dict]: +) -> Tuple[list, dict, dict]: """Processes the given documents to get plain text if they were not processed before, uploads them to file server and returns information about each. NB: If there are multiple documents, their text is concatenated and uploaded to server as one .txt file. @@ -260,6 +260,12 @@ def upload_documents_save_info( documents_in_use = ['nlkr09lnvJ_7ed546db9846ba7661ceda123837f7fc', 'kKmcdwiow9_7ed546db9846ba7661ceda123837f7fc'] + A dictionary mapping combination id with ids of files currently in use: + docs_combination_ids = { + 'LKNpck0nke_7ed546db9846ba7661ceda123837f7fc': + ['nlkr09lnvJ_7ed546db9846ba7661ceda123837f7fc-kKmcdwiow9_7ed546db9846ba7661ceda123837f7fc'] + } + Another one mapping ids of all files that were ever used and information about them, such as file source and link to the file with processed text: processed_documents = { @@ -284,7 +290,7 @@ def upload_documents_save_info( # (either fully unprocessed or processed sometime earlier but not yet present in current docs_in_use) all_docs_to_check = list(set(docs_in_atts + doc_paths_or_links)) docs_and_types = get_docs_to_process(all_docs_to_check, all_docs_info, docs_in_use_info) - all_docs_info_new = {} + all_docs_info_new, docs_combination_ids_new = {}, {} docs_in_use_info_new = [] # check if we need to process anything if docs_and_types: @@ -327,4 +333,7 @@ def upload_documents_save_info( all_docs_info_new[file_id]["processed_text_link"] = doc_text_link all_docs_info_new[file_id]["filename"] = filename docs_in_use_info_new.append(file_id) - return docs_in_use_info_new, all_docs_info_new + if len(docs_in_use_info_new) > 1: + doc_combination_id = generate_unique_file_id(10, dialog_id) + docs_combination_ids_new[doc_combination_id] = docs_in_use_info_new + return docs_in_use_info_new, all_docs_info_new, docs_combination_ids_new diff --git a/skills/dff_meeting_analysis_skill/scenario/response.py b/skills/dff_meeting_analysis_skill/scenario/response.py index 178c21bae..e58438e06 100644 --- a/skills/dff_meeting_analysis_skill/scenario/response.py +++ b/skills/dff_meeting_analysis_skill/scenario/response.py @@ -18,6 +18,7 @@ compose_and_upload_final_response, get_older_gen_response, get_name_and_text_from_file, + get_key_by_value, ) @@ -89,86 +90,108 @@ def gathering_responses(reply, confidence, human_attr, bot_attr, attr): related_files = bot_utts[-1].get("user", {}).get("attributes", {}).get("related_files", {}) # check if we already received such request before and saved hyp for it to server documents_in_use = context[-1].get("user", {}).get("attributes", {}).get("documents_in_use", []) + docs_combination_ids = ( + context[-1].get("user", {}).get("attributes", {}).get("documents_combination_ids", {}) + ) all_docs_info = context[-1].get("user", {}).get("attributes", {}).get("processed_documents", {}) sending_variables = compose_sending_variables({}, ENVVARS_TO_SEND, human_uttr_attributes) hyps_and_names_all_docs = [] if documents_in_use: _all_docs_have_summary = True - for document_in_use_id in documents_in_use: - if related_files.get(f"summary__{document_in_use_id}", None) is None: - _all_docs_have_summary = False - # if we need a weekly report, on this step we gather separate daily reports for each doc - # also here we change the type of summary prompt based on summary length request + _need_to_get_response_from_llm = True + # check if have final hypothesis for this request in case of multiple docs in use + if len(documents_in_use) > 1: + prompt_type_local, _ = set_correct_type_and_id(request, prompt_type_local) + curr_combination_id = get_key_by_value(docs_combination_ids, documents_in_use) + prompt_type_and_combination_id = f"{prompt_type_local}__{curr_combination_id}" + if prompt_type_and_combination_id in related_files.keys(): + _need_to_get_response_from_llm = False + hypotheses = [get_older_gen_response(prompt_type_and_combination_id, related_files)] + # check if have final hypothesis for this request in case of one doc in use + else: prompt_type_local, prompt_type_and_id = set_correct_type_and_id( - request, prompt_type_local, document_in_use_id + request, prompt_type_local, document_in_use_id=documents_in_use[0] ) - # we do not do anything unless we have the link to our file(s) in use - transcript_link = all_docs_info[document_in_use_id].get("processed_text_link", "") - if transcript_link: - # here we check if we already generated sth for the same request and the same doc - if prompt_type_and_id in related_files.keys(): - # in the future, it is better to store filenames in related_files - # to avoid extra requests to file server - filename, _ = get_name_and_text_from_file(transcript_link) - older_response = get_older_gen_response(prompt_type_and_id, related_files) - hyp_and_name_one_doc = [(filename, older_response)] - # if no, let's generate it - else: - logger.info( - f"No earlier {prompt_type_and_id} found. \ + if prompt_type_and_id in related_files.keys(): + _need_to_get_response_from_llm = False + hypotheses = [get_older_gen_response(prompt_type_and_id, related_files)] + + if _need_to_get_response_from_llm: + for document_in_use_id in documents_in_use: + if related_files.get(f"summary__{document_in_use_id}", None) is None: + _all_docs_have_summary = False + # if we need a weekly report, on this step we gather separate daily reports for each doc + # also here we change the type of summary prompt based on summary length request + prompt_type_local, prompt_type_and_id = set_correct_type_and_id( + request, prompt_type_local, document_in_use_id=document_in_use_id + ) + # we do not do anything unless we have the link to our file(s) in use + transcript_link = all_docs_info[document_in_use_id].get("processed_text_link", "") + if transcript_link: + # here we check if we already generated sth for the same request and the same doc + if prompt_type_and_id in related_files.keys(): + # in the future, it is better to store filenames in related_files + # to avoid extra requests to file server + filename, _ = get_name_and_text_from_file(transcript_link) + older_response = get_older_gen_response(prompt_type_and_id, related_files) + hyp_and_name_one_doc = [(filename, older_response)] + # if no, let's generate it + else: + logger.info( + f"No earlier {prompt_type_and_id} found. \ Sending request to generative model." - ) - try: - filename, orig_text = get_name_and_text_from_file(transcript_link) - hyp_one_doc, related_files, _n_requests = get_and_upload_response_for_one_doc( - orig_text, - prompt_type_and_id, - dialog_context, - sending_variables, - related_files, ) - hyp_and_name_one_doc = [(filename, hyp_one_doc)] - n_requests += _n_requests - except Exception as e: - sentry_sdk.capture_exception(e) - logger.exception(e) - hyp_and_name_one_doc = [] + try: + filename, orig_text = get_name_and_text_from_file(transcript_link) + hyp_one_doc, related_files, _n_requests = get_and_upload_response_for_one_doc( + orig_text, + prompt_type_and_id, + dialog_context, + sending_variables, + related_files, + ) + hyp_and_name_one_doc = [(filename, hyp_one_doc)] + n_requests += _n_requests + except Exception as e: + sentry_sdk.capture_exception(e) + logger.exception(e) + hyp_and_name_one_doc = [] + else: + hyp_and_name_one_doc = [] + hyps_and_names_all_docs += hyp_and_name_one_doc + + if prompt_type == "question_answering" and _all_docs_have_summary: + # if we are in `question_answering` node then + # the condition `go_to_question_answering` was requested once + n_requests += 1 + # having got responses for all docs, let's make one response from it + # just return the response if we have one document and one response + if len(hyps_and_names_all_docs) == 1 and prompt_type_local != "weekly_report": + hypotheses = [hyps_and_names_all_docs[0][1]] else: - hyp_and_name_one_doc = [] - hyps_and_names_all_docs += hyp_and_name_one_doc - - if prompt_type == "question_answering" and _all_docs_have_summary: - # if we are in `question_answering` node then - # the condition `go_to_question_answering` was requested once - n_requests += 1 - # having got responses for all docs, let's make one response from it - # just return the response if we have one document and one response - if len(hyps_and_names_all_docs) == 1 and prompt_type_local != "weekly_report": - hypotheses = [hyps_and_names_all_docs[0][1]] - else: - # earlier we set prompt_type_and_id for weekly_analysis to full report for each doc, - # now we need it to set it back - prompt_type_and_id = f"{prompt_type_local}__{document_in_use_id}" - try: - # now by default we are passing filenames to LLM together with hypothesis for each file - # you can choose to pass only hypotheses (no filenames) by setting use_filenames=False - # when calling compose_and_upload_final_response() - hypotheses, related_files, _n_requests = compose_and_upload_final_response( - hyps_and_names_all_docs, - prompt_type_and_id, - dialog_context, - sending_variables, - related_files, - ) - n_requests += _n_requests - except Exception as e: - sentry_sdk.capture_exception(e) - logger.exception(e) - hypotheses = [] - - # for full report and weekly report, add formatting - if prompt_type_local == "weekly_report" or prompt_type_local == "full_report": - hypotheses = postprocess_formatting(hypotheses, prompt_type=prompt_type_local) + # earlier we set prompt_type_and_id for weekly_analysis to full report for each doc, + # now we need it to set it back + prompt_type_and_id = f"{prompt_type_local}__{curr_combination_id}" + try: + # now by default we are passing filenames to LLM together with hypothesis for each file + # you can choose to pass only hypotheses (no filenames) by setting use_filenames=False + # when calling compose_and_upload_final_response() + hypotheses, related_files, _n_requests = compose_and_upload_final_response( + hyps_and_names_all_docs, + prompt_type_and_id, + dialog_context, + sending_variables, + related_files, + ) + n_requests += _n_requests + except Exception as e: + sentry_sdk.capture_exception(e) + logger.exception(e) + hypotheses = [] + # for full report and weekly report, add formatting + if prompt_type_local == "weekly_report" or prompt_type_local == "full_report": + hypotheses = postprocess_formatting(hypotheses, prompt_type=prompt_type_local) + # if there are docs in human utt attributes, but no processed docs in use were found elif docs_in_attributes: hyp_excuse = """Sorry, I failed to process the file you provided. \ diff --git a/skills/dff_meeting_analysis_skill/scenario/utils.py b/skills/dff_meeting_analysis_skill/scenario/utils.py index 60be5086d..ef208a0b6 100644 --- a/skills/dff_meeting_analysis_skill/scenario/utils.py +++ b/skills/dff_meeting_analysis_skill/scenario/utils.py @@ -4,7 +4,7 @@ import sentry_sdk import requests import os -from typing import List, Tuple +from typing import List, Tuple, Any from common.text_processing_for_prompts import ( check_token_number, split_transcript_into_chunks, @@ -75,10 +75,12 @@ def get_older_gen_response(item_type_and_id, bot_attrs_files): return old_response -def set_correct_type_and_id(request, prompt_type_local, document_in_use_id): +def set_correct_type_and_id(request, prompt_type_local, document_in_use_id=""): # if we need a weekly report, on this step we gather separate daily reports for each doc + prompt_type_and_id = "" if prompt_type_local == "weekly_report": - prompt_type_and_id = f"full_report__{document_in_use_id}" + if document_in_use_id: + prompt_type_and_id = f"full_report__{document_in_use_id}" else: if prompt_type_local == "summary": is_long_request = LONG_SUMMARY_REQUEST.search(request) @@ -87,7 +89,8 @@ def set_correct_type_and_id(request, prompt_type_local, document_in_use_id): prompt_type_local += "_long" elif is_short_request: prompt_type_local += "_short" - prompt_type_and_id = f"{prompt_type_local}__{document_in_use_id}" + if document_in_use_id: + prompt_type_and_id = f"{prompt_type_local}__{document_in_use_id}" return prompt_type_local, prompt_type_and_id @@ -176,8 +179,14 @@ def get_response_for_prompt_type( def upload_generated_item_return_link(hypothesis: str, prompt_type_and_id: str): - filename = f"{prompt_type_and_id}.txt" - uploaded_doc_link = upload_document(hypothesis, filename, FILE_SERVER_URL, FILE_SERVER_TIMEOUT, type_ref="text") + # we do not upload question_answering as questions may vary + # we do not upload combine_responses because combine_responses type is only used for internal processing + # the response generated in combine_responses will be uploaded later at its original name + uploaded_doc_link = "" + if "combine_responses" not in prompt_type_and_id and "question_answering" not in prompt_type_and_id: + logger.info(f"Saving {prompt_type_and_id} to related_files.") + filename = f"{prompt_type_and_id}.txt" + uploaded_doc_link = upload_document(hypothesis, filename, FILE_SERVER_URL, FILE_SERVER_TIMEOUT, type_ref="text") return uploaded_doc_link @@ -199,10 +208,13 @@ def compose_and_upload_final_response( info_to_provide_to_llm = [hyps[1] for hyps in hyps_and_names_all_docs] hyps_from_all_docs = SEP_FOR_DOC_RESPONSES.join(info_to_provide_to_llm) if "weekly_report" not in prompt_type_and_id: - prompt_type_and_id = f"combine_responses__{prompt_type_and_id.split('__')[1]}" + prompt_type_and_id_for_processing = f"combine_responses__{prompt_type_and_id.split('__')[1]}" hyp_combined, bot_attrs_files, n_requests = get_and_upload_response_for_one_doc( - hyps_from_all_docs, prompt_type_and_id, dialog_context, sending_variables, bot_attrs_files + hyps_from_all_docs, prompt_type_and_id_for_processing, dialog_context, sending_variables, bot_attrs_files ) + uploaded_doc_link = upload_generated_item_return_link(hyp_combined, prompt_type_and_id) + if uploaded_doc_link: + bot_attrs_files[prompt_type_and_id] = uploaded_doc_link return [hyp_combined], bot_attrs_files, n_requests @@ -242,7 +254,8 @@ def get_and_upload_response_for_one_doc( ) n_requests += _n_requests uploaded_doc_link = upload_generated_item_return_link(part_of_report, item_type_and_id) - bot_attrs_files[item_type_and_id] = uploaded_doc_link + if uploaded_doc_link: + bot_attrs_files[prompt_type_and_id] = uploaded_doc_link hypothesis += f"{part_of_report}\n\n" else: hypothesis, n_requests = get_response_for_prompt_type( @@ -250,16 +263,21 @@ def get_and_upload_response_for_one_doc( ) # we save each hyp to server under the name of the request and doc_in_use id - # except for question_answering and combine_responses which we don't save as questions may vary - if prompt_type != "question_answering" and prompt_type != "combine_responses": - logger.info(f"Saving {prompt_type_and_id} to related_files.") - uploaded_doc_link = upload_generated_item_return_link(hypothesis, prompt_type_and_id) + uploaded_doc_link = upload_generated_item_return_link(hypothesis, prompt_type_and_id) + if uploaded_doc_link: bot_attrs_files[prompt_type_and_id] = uploaded_doc_link return hypothesis, bot_attrs_files, n_requests -def get_name_and_text_from_file(transcript_link): +def get_name_and_text_from_file(transcript_link: str) -> Tuple[str, str]: orig_file = requests.get(transcript_link, timeout=FILE_SERVER_TIMEOUT) orig_text = orig_file.text filename = FILENAME.search(orig_text).group(1) return filename, orig_text + + +def get_key_by_value(dictionary: dict, value_to_find: Any) -> str: + for key, value in dictionary.items(): + if value == value_to_find: + return key + return ""