From f2871405c273645a7283db3974e9808ed149164b Mon Sep 17 00:00:00 2001 From: Eric Forte <119343520+eric-forte-elastic@users.noreply.github.com> Date: Wed, 9 Oct 2024 17:19:59 -0400 Subject: [PATCH] [FR][DAC] Import Rules Verbose Message (#4093) * Draft Verbose Message * Fix Linting * Made more descriptive * Updated for readability (cherry picked from commit 4edef2ea80b7cc24fe34f11af2f0ad282015e857) --- detection_rules/kbwrap.py | 57 +++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/detection_rules/kbwrap.py b/detection_rules/kbwrap.py index d83b0d66a6d..5f4719b5f66 100644 --- a/detection_rules/kbwrap.py +++ b/detection_rules/kbwrap.py @@ -94,28 +94,9 @@ def kibana_import_rules(ctx: click.Context, rules: RuleCollection, overwrite: Op overwrite_exceptions: Optional[bool] = False, overwrite_action_connectors: Optional[bool] = False) -> (dict, List[RuleResource]): """Import custom rules into Kibana.""" - kibana = ctx.obj['kibana'] - rule_dicts = [r.contents.to_api_format() for r in rules] - with kibana: - cl = GenericCollection.default() - exception_dicts = [ - d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLExceptionContents) - ] - action_connectors_dicts = [ - d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLActionConnectorContents) - ] - response, successful_rule_ids, results = RuleResource.import_rules( - rule_dicts, - exception_dicts, - action_connectors_dicts, - overwrite=overwrite, - overwrite_exceptions=overwrite_exceptions, - overwrite_action_connectors=overwrite_action_connectors - ) - - def handle_response_errors(response: dict): + def _handle_response_errors(response: dict): """Handle errors from the import response.""" - def parse_list_id(s: str): + def _parse_list_id(s: str): """Parse the list ID from the error message.""" match = re.search(r'list_id: "(.*?)"', s) return match.group(1) if match else None @@ -132,7 +113,7 @@ def parse_list_id(s: str): click.echo(f' - {error["rule_id"]}: ({error["error"]["status_code"]}) {error["error"]["message"]}') if "references a non existent exception list" in error["error"]["message"]: - list_id = parse_list_id(error["error"]["message"]) + list_id = _parse_list_id(error["error"]["message"]) if list_id in all_exception_list_ids: workaround_errors.append(error["rule_id"]) @@ -144,12 +125,42 @@ def parse_list_id(s: str): click.echo(' '.join(f'-id {rule_id}' for rule_id in workaround_errors)) click.echo() + def _process_imported_items(imported_items_list, item_type_description, item_key): + """Displays appropriately formatted success message that all items imported successfully.""" + all_ids = {item[item_key] for sublist in imported_items_list for item in sublist} + if all_ids: + click.echo(f'{len(all_ids)} {item_type_description} successfully imported') + ids_str = '\n - '.join(all_ids) + click.echo(f' - {ids_str}') + + kibana = ctx.obj['kibana'] + rule_dicts = [r.contents.to_api_format() for r in rules] + with kibana: + cl = GenericCollection.default() + exception_dicts = [ + d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLExceptionContents) + ] + action_connectors_dicts = [ + d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLActionConnectorContents) + ] + response, successful_rule_ids, results = RuleResource.import_rules( + rule_dicts, + exception_dicts, + action_connectors_dicts, + overwrite=overwrite, + overwrite_exceptions=overwrite_exceptions, + overwrite_action_connectors=overwrite_action_connectors + ) + if successful_rule_ids: click.echo(f'{len(successful_rule_ids)} rule(s) successfully imported') rule_str = '\n - '.join(successful_rule_ids) click.echo(f' - {rule_str}') if response['errors']: - handle_response_errors(response) + _handle_response_errors(response) + else: + _process_imported_items(exception_dicts, 'exception list(s)', 'list_id') + _process_imported_items(action_connectors_dicts, 'action connector(s)', 'id') return response, results