Skip to content

Commit

Permalink
addressed feedback; added keyword option for search
Browse files Browse the repository at this point in the history
  • Loading branch information
terrancedejesus committed Sep 30, 2024
1 parent b5dff80 commit 76192b0
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 47 deletions.
1 change: 1 addition & 0 deletions .github/PULL_REQUEST_GUIDELINES/hunt_new_guidelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Welcome to the `hunting` folder within the `detection-rules` repository! This di
- [ ] `notes` includes additional information regarding data collected from the hunting query.
- [ ] `mitre` matches appropriate technique and sub-technique IDs that hunting query collect's data for.
- [ ] `references` are valid URL links that include information relevenat to the hunt or threat.
- [ ] `license`

### Testing and Validation

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Detection Rules contains more than just static rule files. This repository also
|------------------------------------------------ |------------------------------------------------------------------------------------ |
| [`detection_rules/`](detection_rules) | Python module for rule parsing, validating and packaging |
| [`etc/`](detection_rules/etc) | Miscellaneous files, such as ECS and Beats schemas |
| [`hunting/`](./hunting/) | Root directory where threat hunting package and queries are stored |
| [`hunting/`](./hunting/) | Root directory where threat hunting package and queries are stored |
| [`kibana/`](lib/kibana) | Python library for handling the API calls to Kibana and the Detection Engine |
| [`kql/`](lib/kql) | Python library for parsing and validating Kibana Query Language |
| [`rta/`](rta) | Red Team Automation code used to emulate attacker techniques, used for rule testing |
Expand Down
18 changes: 12 additions & 6 deletions hunting/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Follow the standard [contributing guide](../CONTRIBUTING.md). Please remember to

## Commands

The `hunting` folder is a modularized library with it's own CLI via the user of [click](https://pypi.org/project/click/). All commands can be ran from the root of `detection-rules` repository as such: `python -m hunting COMMAND`.
The `hunting` folder is an executable package with it's own CLI using [click](https://pypi.org/project/click/). All commands can be ran from the root of `detection-rules` repository as such: `python -m hunting COMMAND`.

- **generate-markdown**:
- This will generate Markdown files for each TOML file specified and update the `index.yml` and `index.md`.
Expand All @@ -65,6 +65,7 @@ The `hunting` folder is a modularized library with it's own CLI via the user of
- **search**:
- This command enables users to filter for queries based on MITRE ATT&CK information, more specifically, tactic, technique or sub-technique IDs. The `--tactic`, `--technique`, `--subtechnique` parameters can be used to search for hunting queries that have been tagged with these respective IDs.
- All hunting queries are required to include MITRE mappings. Additionally, `--data-source` parameter can be used with or without MITRE filters to scope to a specific data source (i.e. `python -m hunting search --tactic TA0001 --data-source aws` would show all credential access related hunting queries for AWS)
- More open-ended keyword searches are available via `--keyword` search that can be paired with data source or not to search across a hunting content's name, description, notes and references data.
- **run-query**: **NOTE** - This command requires the `.detection-rules-cfg.yaml` to be populated. Please refer to the [CLI docs](../CLI.md) for optional parameters.
- This command enables users to load a TOML file, select a hunting query and run it against their elasticsearch instance The `--uuid` and `--file-path` parameters can be used to select which hunting query(s) to run.
- Users can select which query to run from the TOML file if multiple are available.
Expand All @@ -80,28 +81,33 @@ The `hunting` folder is a modularized library with it's own CLI via the user of

To contribute to the `hunting` folder or add new hunting queries, follow these steps:

1. **Create a TOML File**
1. **Clone (or fork) and Install Dependencies**
- `git clone [email protected]:elastic/detection-rules.git` to clone the repository
- Setup your own virtual environment if not already established
- `pip install ".[hunting]"`

2. **Create a TOML File**
- Navigate to the respective folder (e.g., `aws/queries`, `macos/queries`) and create a new TOML file for your query.
- Ensure that the file is named descriptively, reflecting the purpose of the hunt (e.g., `credential_access_detection.toml`).

2. **Add Relevant and Required Hunting Information**
3. **Add Relevant and Required Hunting Information**
- Fill out the necessary fields in your TOML file. Be sure to include information such as the author, description, query language, actual queries, MITRE technique mappings, and any notes or references. This ensures the hunt query is complete and provides valuable context for threat hunters.

3. **Generate the Markdown File**
4. **Generate the Markdown File**
- Once the TOML file is ready, use the following command to generate the corresponding Markdown file:
```bash
python -m hunting generate-markdown
```
- This will create a Markdown file in the `docs` folder under the respective integration, which can be used for documentation or sharing.

4. **Refresh the Indexes**
5. **Refresh the Indexes**
- After generating the Markdown, run the `refresh-indexes` command to update the `index.yml` and `index.md` files:
```bash
python -m hunting refresh-index
```
- This ensures that the new hunt query is reflected in the overall index and is available for searching.

5. **Open a Pull Request (PR) for Contributions**
6. **Open a Pull Request (PR) for Contributions**
- If you're contributing the query to the project, submit a Pull Request (PR) with your changes. Be sure to include a description of your query and any relevant details to facilitate the review process.
By following this workflow, you can ensure that your hunt queries are properly formatted, documented, and integrated into the Elastic hunting library.
Expand Down
32 changes: 17 additions & 15 deletions hunting/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.

import json
import textwrap
from dataclasses import asdict
from pathlib import Path

import click
Expand All @@ -15,7 +17,8 @@
from .markdown import MarkdownGenerator
from .run import QueryRunner
from .search import QueryIndex
from .utils import filter_elasticsearch_params, get_hunt_path, load_toml, load_all_toml, update_index_yml
from .utils import (filter_elasticsearch_params, get_hunt_path, load_all_toml,
load_toml, update_index_yml)


@click.group()
Expand All @@ -26,7 +29,7 @@ def hunting():

@hunting.command('generate-markdown')
@click.argument('path', required=False)
def generate_markdown(path):
def generate_markdown(path: click.Path):
"""Convert TOML hunting queries to Markdown format."""
markdown_generator = MarkdownGenerator(HUNTING_DIR)

Expand Down Expand Up @@ -63,23 +66,24 @@ def refresh_index():
@click.option('--technique', type=str, default=None, help="Search by MITRE technique ID (e.g., T1078)")
@click.option('--sub-technique', type=str, default=None, help="Search by MITRE sub-technique ID (e.g., T1078.001)")
@click.option('--data-source', type=str, default=None, help="Filter by data_source like 'aws', 'macos', or 'linux'")
def search_queries(tactic: str, technique: str, sub_technique: str, data_source: str):
@click.option('--keyword', type=str, default=None, help="Search by keyword in name, description, and notes")
def search_queries(tactic: str, technique: str, sub_technique: str, data_source: str, keyword: str):
"""Search for queries based on MITRE tactic, technique, sub-technique, or data_source."""

if not any([tactic, technique, sub_technique, data_source]):
if not any([tactic, technique, sub_technique, data_source, keyword]):
raise click.UsageError("""Please provide at least one filter (tactic, technique, sub-technique,
or data_source) to search queries.""")
data_source or keyword) to search queries.""")

click.echo("Searching for queries based on provided filters...")

# Filter out None values from the MITRE filter tuple
mitre_filters = tuple(filter(None, (tactic, technique, sub_technique)))

# Create an instance of the QueryIndex class
query_index = QueryIndex(HUNTING_DIR)

# Call the search method of QueryIndex with the provided MITRE filters and data_source
results = query_index.search(mitre_filter=mitre_filters, data_source=data_source)
# Filter out None values from the MITRE filter tuple
mitre_filters = tuple(filter(None, (tactic, technique, sub_technique)))

# Call the search method of QueryIndex with the provided MITRE filters, data_source, and keyword
results = query_index.search(mitre_filter=mitre_filters, data_source=data_source, keyword=keyword)

if results:
click.secho(f"\nFound {len(results)} matching queries:\n", fg="green", bold=True)
Expand Down Expand Up @@ -133,15 +137,13 @@ def view_hunt(uuid: str, path: str, output_format: str, query_only: bool):
if output_format == 'toml':
click.echo(hunt_path.read_text())
elif output_format == 'json':
import json

# Convert the hunt object to a dictionary, assuming it's a dataclass
hunt_dict = hunt.__dict__
hunt_dict = asdict(hunt)
click.echo(json.dumps(hunt_dict, indent=4))


@hunting.command('hunt-summary')
@click.option('--breakdown', type=click.Choice(['platform', 'integration', 'language'], case_sensitive=False), default='platform',
@click.option('--breakdown', type=click.Choice(['platform', 'integration', 'language'],
case_sensitive=False), default='platform',
help="Specify how to break down the summary: 'platform', 'integration', or 'language'.")
def hunt_summary(breakdown: str):
"""
Expand Down
2 changes: 1 addition & 1 deletion hunting/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ class Hunt:
license: str
query: list[str]
notes: Optional[list[str]] = field(default_factory=list)
mitre: Optional[list[str]] = field(default_factory=list)
mitre: list[str] = field(default_factory=list)
references: Optional[list[str]] = field(default_factory=list)
1 change: 1 addition & 0 deletions hunting/markdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@


class MarkdownGenerator:
"""Class to generate or update Markdown documentation from TOML or YAML files."""
def __init__(self, base_path: Path):
"""Initialize with the base path and load the hunting index."""
self.base_path = base_path
Expand Down
122 changes: 99 additions & 23 deletions hunting/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@


from pathlib import Path

import click
from detection_rules.attack import tactics_map, technique_lookup

from .utils import load_index_file
from .utils import load_index_file, load_all_toml


class QueryIndex:
Expand Down Expand Up @@ -48,40 +46,118 @@ def _process_technique_id(self, filter_item):
}
self.mitre_technique_ids.update(sub_techniques)

def search(self, mitre_filter: tuple = (), data_source: str = None) -> list:
"""Search the index based on MITRE techniques or data source."""
# Process the MITRE filter
def search(self, mitre_filter: tuple = (), data_source: str = None, keyword: str = None) -> list:
"""Search the index based on MITRE techniques, data source, or keyword."""
results = []

# Step 1: If data source is provided, filter by data source first
if data_source:
click.echo(f"Filtering by data source: {data_source}")
results = self._filter_by_data_source(data_source)

# Step 2: If MITRE filter is provided, process the filter
if mitre_filter:
click.echo(f"Searching for MITRE techniques: {mitre_filter}")
self.process_mitre_filter(mitre_filter)
if results:
# Filter existing results further by MITRE if data source results already exist
results = [result for result in results if
any(tech in self.mitre_technique_ids for tech in result['mitre'])]
else:
# Otherwise, perform a fresh search based on MITRE filter
results = self._search_index(mitre_filter)

# Step 3: If keyword is provided, search for it in name, description, and notes
if keyword:
click.echo(f"Searching for keyword: {keyword}")
if results:
# Filter existing results further by keyword
results = [result for result in results if self._matches_keyword(result, keyword)]
else:
# Perform a fresh search by keyword
results = self._search_keyword(keyword)

return self._handle_no_results(results, mitre_filter, data_source, keyword)

def _search_index(self, mitre_filter: tuple = ()) -> list:
"""Private method to search the index based on MITRE filter."""
results = []
# Load all TOML data for detailed fields
hunting_content = load_all_toml(self.base_path)

for hunt_content, file_path in hunting_content:
query_techniques = hunt_content.mitre
if mitre_filter and not any(tech in self.mitre_technique_ids for tech in query_techniques):
continue

# Prepare the result with full hunt content fields
matches = hunt_content.__dict__.copy()
matches['mitre'] = hunt_content.mitre
matches['data_source'] = hunt_content.integration
matches['uuid'] = hunt_content.uuid
matches['path'] = file_path
results.append(matches)

# Perform search and return results
return self._search_index(mitre_filter, data_source)
return results

def _search_index(self, mitre_filter: tuple, data_source: str) -> list:
"""Private method to search the index based on filters."""
def _search_keyword(self, keyword: str) -> list:
"""Private method to search description, name, notes, and references fields for a keyword."""
results = []
hunting_content = load_all_toml(self.base_path)

for folder, queries in self.hunting_index.items():
if data_source and folder != data_source:
continue
for hunt_content, file_path in hunting_content:
# Assign blank if notes or references are missing
notes = '::'.join(hunt_content.notes) if hunt_content.notes else ''
references = '::'.join(hunt_content.references) if hunt_content.references else ''

# Combine name, description, notes, and references for the search
combined_content = f"{hunt_content.name}::{hunt_content.description}::{notes}::{references}"

for uuid, query in queries.items():
query_techniques = query.get('mitre', [])
if mitre_filter and not any(tech in self.mitre_technique_ids for tech in query_techniques):
continue
if keyword.lower() in combined_content.lower():
# Copy hunt_content data and prepare the result
matches = hunt_content.__dict__.copy()
matches['mitre'] = hunt_content.mitre
matches['data_source'] = hunt_content.integration
matches['uuid'] = hunt_content.uuid
matches['path'] = file_path
results.append(matches)

return results

def _filter_by_data_source(self, data_source: str) -> list:
"""Filter the index by data source."""
results = []
# Load all TOML data for detailed fields
hunting_content = load_all_toml(self.base_path)

for hunt_content, file_path in hunting_content:
if data_source in hunt_content.integration:
# Prepare the result with full hunt content fields
matches = hunt_content.__dict__.copy()
matches['mitre'] = hunt_content.mitre
matches['data_source'] = hunt_content.integration
matches['uuid'] = hunt_content.uuid
matches['path'] = file_path
results.append(matches)

return results

query_with_data_source = query.copy()
query_with_data_source['data_source'] = folder
query_with_data_source['uuid'] = uuid
results.append(query_with_data_source)
def _matches_keyword(self, result: dict, keyword: str) -> bool:
"""Check if the result matches the keyword in name, description, or notes."""
# Combine relevant fields for keyword search
notes = '::'.join(result.get('notes', [])) if 'notes' in result else ''
references = '::'.join(result.get('references', [])) if 'references' in result else ''
combined_content = f"{result['name']}::{result['description']}::{notes}::{references}"

return self._handle_no_results(results, mitre_filter, data_source)
return keyword.lower() in combined_content.lower()

def _handle_no_results(self, results, mitre_filter, data_source):
def _handle_no_results(self, results: list, mitre_filter=None, data_source=None, keyword=None) -> list:
"""Handle cases where no results are found."""
if not results:
if mitre_filter and not self.mitre_technique_ids:
click.echo(f"No MITRE techniques found for the provided filter: {mitre_filter}.")
if data_source:
click.echo(f"No matching queries found for data source: {data_source}")
if keyword:
click.echo(f"No matches found for keyword: {keyword}")
return results
2 changes: 1 addition & 1 deletion hunting/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def load_toml(source: Union[Path, str]) -> Hunt:
raise FileNotFoundError(f"TOML file not found: {source}")
contents = source.read_text(encoding="utf-8")
else:
contents = source # Assuming it's a TOML string
contents = source

toml_dict = tomllib.loads(contents)

Expand Down

0 comments on commit 76192b0

Please sign in to comment.