-
Notifications
You must be signed in to change notification settings - Fork 425
/
elasticsearchdb.py
183 lines (154 loc) · 6.94 KB
/
elasticsearchdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# Copyright (C) 2017 Marirs.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# See the file 'docs/LICENSE' for copying permission.
import gc
import json
import logging
from contextlib import suppress
from datetime import datetime
from lib.cuckoo.common.abstracts import Report
from lib.cuckoo.common.config import Config
from lib.cuckoo.common.exceptions import CuckooDependencyError, CuckooReportError
from modules.reporting.report_doc import ensure_valid_utf8, get_json_document, insert_calls
repconf = Config("reporting")
if repconf.elasticsearchdb.enabled:
try:
from elasticsearch.exceptions import AuthorizationException, ConnectionError, RequestError
from dev_utils.elasticsearchdb import (
ANALYSIS_INDEX_MAPPING_SETTINGS,
daily_analysis_index_exists,
daily_calls_index_exists,
delete_analysis_and_related_calls,
elastic_handler,
get_daily_analysis_index,
get_daily_calls_index,
)
HAVE_ELASTICSEARCH = True
except ImportError:
HAVE_ELASTICSEARCH = False
log = logging.getLogger(__name__)
logging.getLogger("elasticsearch").setLevel("ERROR")
class ElasticSearchDB(Report):
"""Stores report in ElasticSearchDB."""
def __init__(self):
self.es = None
def connect(self):
"""Connects to Elasticsearch database, loads options and set connectors.
@raise CuckooReportError: if unable to connect.
"""
try:
self.es = elastic_handler
except TypeError:
raise CuckooReportError("Elasticsearch connection port must be integer")
except ConnectionError:
raise CuckooReportError("Cannot connect to ElasticsearchDB")
def index_report(self, report):
self.es.index(index=get_daily_analysis_index(), body=report)
def check_analysis_index(self):
try:
log.debug("Check if the index exists")
if not daily_analysis_index_exists():
self.es.indices.create(
index=get_daily_analysis_index(),
body=ANALYSIS_INDEX_MAPPING_SETTINGS,
)
except (RequestError, AuthorizationException) as e:
raise CuckooDependencyError(f"Unable to create Elasticsearch index {e}")
def check_calls_index(self):
try:
log.debug("Check if the index exists")
if not daily_calls_index_exists():
self.es.indices.create(index=get_daily_calls_index())
except (RequestError, AuthorizationException) as e:
raise CuckooDependencyError(f"Unable to create Elasticsearch index {e}")
def format_dates(self, report):
info = report["info"]
report["info"]["started"] = (
datetime.strptime(info["started"], "%Y-%m-%d %H:%M:%S") if isinstance(info["started"], str) else info["started"]
)
report["info"]["ended"] = (
datetime.strptime(info["ended"], "%Y-%m-%d %H:%M:%S") if isinstance(info["ended"], str) else info["ended"]
)
report["info"]["machine"]["started_on"] = (
datetime.strptime(info["machine"]["started_on"], "%Y-%m-%d %H:%M:%S")
if isinstance(info["machine"]["started_on"], str)
else info["machine"]["started_on"]
)
report["info"]["machine"]["shutdown_on"] = (
datetime.strptime(info["machine"]["shutdown_on"], "%Y-%m-%d %H:%M:%S")
if isinstance(info["machine"]["shutdown_on"], str)
else info["machine"]["shutdown_on"]
)
for dropped in report["dropped"]:
if "pe" in dropped:
dropped["pe"]["timestamp"] = datetime.strptime(dropped["pe"]["timestamp"], "%Y-%m-%d %H:%M:%S")
# Fix signatures from string to list in order to have a common mapping
def fix_signature_results(self, report):
for s in report["signatures"]:
for f in s["data"]:
for k, val in f.items():
if isinstance(val, (str, bool)):
f[k] = {"name": str(val)}
if k == "file" and isinstance(val, list):
for index, file in enumerate(val):
val[index] = {"name": file}
def fix_suricata_http_status(self, report):
if "http" in report["suricata"]:
for http in report["suricata"]["http"]:
if http["status"] == "None":
http["status"] = None
def fix_cape_payloads(self, report):
if "CAPE" in report:
for p in report["CAPE"]["payloads"]:
if p["tlsh"] is False:
p["tlsh"] = None
def convert_procdump_strings_to_str(self, report):
if "procdump" in report and report["procdump"]:
for item in report["procdump"]:
for k, val in item.items():
if k == "strings":
for index, string in enumerate(val):
val[index] = str(string)
def fix_fields(self, report):
self.fix_suricata_http_status(report)
self.fix_signature_results(report)
self.fix_cape_payloads(report)
self.convert_procdump_strings_to_str(report)
def date_hook(self, json_dict):
for key, value in json_dict.items():
with suppress(Exception):
json_dict[key] = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
return json_dict
def run(self, results):
"""Writes report.
@param results: analysis results dictionary.
@raise CuckooReportError: if fails to connect or write to Elasticsearch DB.
"""
# We put the raise here and not at the import because it would
# otherwise trigger even if the module is not enabled in the config.
if not HAVE_ELASTICSEARCH:
raise CuckooDependencyError("Unable to import elasticsearch (install with `pip3 install elasticsearch`)")
self.connect()
# Check if the daily index exists.
self.check_analysis_index()
self.check_calls_index()
# Create a copy of the dictionary. This is done in order to not modify
# the original dictionary and possibly compromise the following
# reporting modules.
report = get_json_document(results, self.analysis_path)
self.fix_fields(report)
report = json.loads(json.dumps(report, default=str), object_hook=self.date_hook)
new_processes = insert_calls(report, elastic_db=elastic_handler)
# Store the results in the report.
report["behavior"] = dict(report["behavior"])
report["behavior"]["processes"] = new_processes
delete_analysis_and_related_calls(report["info"]["id"])
self.format_dates(report)
ensure_valid_utf8(report)
gc.collect()
# Store the report and retrieve its object id.
try:
self.index_report(report)
except Exception as e:
log.error(e)
return