diff --git a/api_app/analyzers_manager/file_analyzers/iocextract.py b/api_app/analyzers_manager/file_analyzers/iocextract.py new file mode 100644 index 0000000000..6f35e286f7 --- /dev/null +++ b/api_app/analyzers_manager/file_analyzers/iocextract.py @@ -0,0 +1,70 @@ +import logging + +import iocextract as i + +from api_app.analyzers_manager.classes import FileAnalyzer + +logger = logging.getLogger(__name__) + + +class IocExtract(FileAnalyzer): + refang: bool = False + defang: bool = False + strip: bool = False + extract_urls: bool = False + extract_ips: bool = False + extract_emails: bool = False + extract_hashes: bool = False + extract_yara_rules: bool = False + extract_telephone_nums: bool = False + extract_iocs: bool = True + + def update(self): + pass + + def run(self): + logger.info(f"Running IocExtract on {self.filename} with md5: {self.md5}") + binary_data = self.read_file_bytes() + text_data = binary_data.decode("utf-8") + result = {} + if self.extract_iocs: + all_iocs = list( + i.extract_iocs(text_data, refang=self.refang, strip=self.strip) + ) + result["all_iocs"] = all_iocs + + else: + extraction_methods = { + "urls": ( + self.extract_urls, + lambda: i.extract_urls( + text_data, + refang=self.refang, + strip=self.strip, + defang=self.defang, + ), + ), + "ips": ( + self.extract_ips, + lambda: i.extract_ips(text_data, refang=self.refang), + ), + "emails": ( + self.extract_emails, + lambda: i.extract_emails(text_data, refang=self.refang), + ), + "hashes": (self.extract_hashes, lambda: i.extract_hashes(text_data)), + "yara_rules": ( + self.extract_yara_rules, + lambda: i.extract_yara_rules(text_data), + ), + "telephone_nums": ( + self.extract_telephone_nums, + lambda: i.extract_telephone_nums(text_data), + ), + } + for key, (flag, method) in extraction_methods.items(): + if flag: + extracted = list(method()) + result[key] = extracted + + return result diff --git a/api_app/analyzers_manager/migrations/0108_analyzer_config_iocextract.py b/api_app/analyzers_manager/migrations/0108_analyzer_config_iocextract.py new file mode 100644 index 0000000000..020e989c4b --- /dev/null +++ b/api_app/analyzers_manager/migrations/0108_analyzer_config_iocextract.py @@ -0,0 +1,228 @@ +from django.db import migrations +from django.db.models.fields.related_descriptors import ( + ForwardManyToOneDescriptor, + ForwardOneToOneDescriptor, + ManyToManyDescriptor, +) + +plugin = { + "python_module": { + "health_check_schedule": None, + "update_schedule": None, + "module": "iocextract.IocExtract", + "base_path": "api_app.analyzers_manager.file_analyzers", + }, + "name": "IocExtract", + "description": '[IocExtract](https://github.com/InQuest/iocextract) package is a library and command line interface (CLI) for extracting URLs, IP addresses, MD5/SHA hashes, email addresses, and YARA rules from text corpora. It allows for you to extract encoded and "defanged" IOCs and optionally decode or refang them.', + "disabled": False, + "soft_time_limit": 60, + "routing_key": "default", + "health_check_status": True, + "type": "file", + "docker_based": False, + "maximum_tlp": "RED", + "observable_supported": [], + "supported_filetypes": ["text/plain"], + "run_hash": False, + "run_hash_type": "", + "not_supported_filetypes": [], + "model": "analyzers_manager.AnalyzerConfig", +} + +params = [ + { + "python_module": { + "module": "iocextract.IocExtract", + "base_path": "api_app.analyzers_manager.file_analyzers", + }, + "name": "extract_urls", + "type": "bool", + "description": "Extract URLs!", + "is_secret": False, + "required": False, + }, + { + "python_module": { + "module": "iocextract.IocExtract", + "base_path": "api_app.analyzers_manager.file_analyzers", + }, + "name": "extract_ips", + "type": "bool", + "description": "Extract IP addresses!\r\n\r\nIncludes both IPv4 and IPv6 addresses.", + "is_secret": False, + "required": False, + }, + { + "python_module": { + "module": "iocextract.IocExtract", + "base_path": "api_app.analyzers_manager.file_analyzers", + }, + "name": "extract_emails", + "type": "bool", + "description": "Extract email addresses!", + "is_secret": False, + "required": False, + }, + { + "python_module": { + "module": "iocextract.IocExtract", + "base_path": "api_app.analyzers_manager.file_analyzers", + }, + "name": "extract_hashes", + "type": "bool", + "description": "Extract MD5/SHA hashes!", + "is_secret": False, + "required": False, + }, + { + "python_module": { + "module": "iocextract.IocExtract", + "base_path": "api_app.analyzers_manager.file_analyzers", + }, + "name": "extract_yara_rules", + "type": "bool", + "description": "Extract YARA rules!", + "is_secret": False, + "required": False, + }, + { + "python_module": { + "module": "iocextract.IocExtract", + "base_path": "api_app.analyzers_manager.file_analyzers", + }, + "name": "extract_telephone_nums", + "type": "bool", + "description": "Extract telephone numbers!", + "is_secret": False, + "required": False, + }, + { + "python_module": { + "module": "iocextract.IocExtract", + "base_path": "api_app.analyzers_manager.file_analyzers", + }, + "name": "refang", + "type": "bool", + "description": "Refang output", + "is_secret": False, + "required": False, + }, + { + "python_module": { + "module": "iocextract.IocExtract", + "base_path": "api_app.analyzers_manager.file_analyzers", + }, + "name": "strip", + "type": "bool", + "description": "Strip possible garbage from the end of URLs", + "is_secret": False, + "required": False, + }, + { + "python_module": { + "module": "iocextract.IocExtract", + "base_path": "api_app.analyzers_manager.file_analyzers", + }, + "name": "defang", + "type": "bool", + "description": "Extract non-defanged IOCs", + "is_secret": False, + "required": False, + }, + { + "python_module": { + "module": "iocextract.IocExtract", + "base_path": "api_app.analyzers_manager.file_analyzers", + }, + "name": "extract_iocs", + "type": "bool", + "description": "Extract all IOCs!", + "is_secret": False, + "required": False, + }, +] + +values = [] + + +def _get_real_obj(Model, field, value): + def _get_obj(Model, other_model, value): + if isinstance(value, dict): + real_vals = {} + for key, real_val in value.items(): + real_vals[key] = _get_real_obj(other_model, key, real_val) + value = other_model.objects.get_or_create(**real_vals)[0] + # it is just the primary key serialized + else: + if isinstance(value, int): + if Model.__name__ == "PluginConfig": + value = other_model.objects.get(name=plugin["name"]) + else: + value = other_model.objects.get(pk=value) + else: + value = other_model.objects.get(name=value) + return value + + if ( + type(getattr(Model, field)) + in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor] + and value + ): + other_model = getattr(Model, field).get_queryset().model + value = _get_obj(Model, other_model, value) + elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value: + other_model = getattr(Model, field).rel.model + value = [_get_obj(Model, other_model, val) for val in value] + return value + + +def _create_object(Model, data): + mtm, no_mtm = {}, {} + for field, value in data.items(): + value = _get_real_obj(Model, field, value) + if type(getattr(Model, field)) is ManyToManyDescriptor: + mtm[field] = value + else: + no_mtm[field] = value + try: + o = Model.objects.get(**no_mtm) + except Model.DoesNotExist: + o = Model(**no_mtm) + o.full_clean() + o.save() + for field, value in mtm.items(): + attribute = getattr(o, field) + if value is not None: + attribute.set(value) + return False + return True + + +def migrate(apps, schema_editor): + Parameter = apps.get_model("api_app", "Parameter") + PluginConfig = apps.get_model("api_app", "PluginConfig") + python_path = plugin.pop("model") + Model = apps.get_model(*python_path.split(".")) + if not Model.objects.filter(name=plugin["name"]).exists(): + exists = _create_object(Model, plugin) + if not exists: + for param in params: + _create_object(Parameter, param) + for value in values: + _create_object(PluginConfig, value) + + +def reverse_migrate(apps, schema_editor): + python_path = plugin.pop("model") + Model = apps.get_model(*python_path.split(".")) + Model.objects.get(name=plugin["name"]).delete() + + +class Migration(migrations.Migration): + atomic = False + dependencies = [ + ("api_app", "0062_alter_parameter_python_module"), + ("analyzers_manager", "0107_analyzer_config_apivoid"), + ] + + operations = [migrations.RunPython(migrate, reverse_migrate)] diff --git a/docs/source/Usage.md b/docs/source/Usage.md index 57fce14d18..75c2d2aea2 100644 --- a/docs/source/Usage.md +++ b/docs/source/Usage.md @@ -153,6 +153,7 @@ The following is the list of the available analyzers you can run out-of-the-box. - `Zippy_scan` : [Zippy](https://github.com/thinkst/zippy): Fast method to classify text as AI or human-generated; takes in `lzma`,`zlib`,`brotli` as input based engines; `ensemble` being default. - `Blint`: [Blint](https://github.com/owasp-dep-scan/blint) is a Binary Linter that checks the security properties and capabilities of your executables. Supported binary formats: - Android (apk, aab) - ELF (GNU, musl) - PE (exe, dll) - Mach-O (x64, arm64) - `MalprobScan` : [Malprob](https://malprob.io/) is a leading malware detection and identification service, powered by cutting-edge AI technology. +- `IocExtract`: [IocExtract](https://github.com/InQuest/iocextract) package is a library and command line interface (CLI) for extracting URLs, IP addresses, MD5/SHA hashes, email addresses, and YARA rules from text corpora. It allows for you to extract encoded and "defanged" IOCs and optionally decode or refang them. ##### Observable analyzers (ip, domain, url, hash) diff --git a/requirements/project-requirements.txt b/requirements/project-requirements.txt index 8a3669b829..6ad3afb5eb 100644 --- a/requirements/project-requirements.txt +++ b/requirements/project-requirements.txt @@ -79,6 +79,7 @@ blint==2.1.5 hfinger==0.2.2 permhash==0.1.4 ail_typo_squatting==2.7.4 +iocextract==1.16.1 # this is required because XLMMacroDeobfuscator does not pin the following packages pyxlsb2==0.0.8 xlrd2==1.3.4