Skip to content

Commit

Permalink
Now featuring bulk inserts and metadata
Browse files Browse the repository at this point in the history
Added elasticsearch bulk insert and metadata features
  • Loading branch information
dgunter authored Aug 6, 2018
1 parent ccbd7a7 commit be7ab91
Showing 1 changed file with 50 additions and 5 deletions.
55 changes: 50 additions & 5 deletions evtxtoelk.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,24 @@

from Evtx.Evtx import FileHeader
from Evtx.Views import evtx_file_xml_view
from elasticsearch import Elasticsearch
from elasticsearch import Elasticsearch, helpers
import xmltodict
import sys


class EvtxToElk:
@staticmethod
def evtx_to_elk(filename, elk_ip):
def bulk_to_elasticsearch(es, bulk_queue):
try:
helpers.bulk(es, bulk_queue)
return True
except:
print(traceback.print_exc())
return False

@staticmethod
def evtx_to_elk(filename, elk_ip, elk_index="hostlogs", bulk_queue_len_threshold=500, metadata={}):
bulk_queue = []
es = Elasticsearch([elk_ip])
with open(filename) as infile:
with contextlib.closing(mmap.mmap(infile.fileno(), 0, access=mmap.ACCESS_READ)) as buf:
Expand Down Expand Up @@ -74,22 +85,56 @@ def evtx_to_elk(filename, elk_ip):
else:
pass

# Insert data into ElasticSearch
es.index(index="hostlogs", doc_type="hostlogs", body=json.loads(json.dumps(log_line)))
# Insert data into queue
#event_record = json.loads(json.dumps(log_line))
#event_record.update({
# "_index": elk_index,
# "_type": elk_index,
# "metadata": metadata
#})
#bulk_queue.append(event_record)
bulk_queue.append({
"_index": elk_index,
"_type": elk_index,
"body": json.loads(json.dumps(log_line)),
"metadata": metadata
})

if len(bulk_queue) == bulk_queue_len_threshold:
print('Bulkingrecords to ES: ' + str(len(bulk_queue)))
# start parallel bulking to ElasticSearch, default 500 chunks;
if EvtxToElk.bulk_to_elasticsearch(es, bulk_queue):
bulk_queue = []
else:
print('Failed to bulk data to Elasticsearch')
sys.exit(1)

except:
print("***********")
print("Parsing Exception")
print(traceback.print_exc())
print(json.dumps(log_line, indent=2))
print("***********")

# Check for any remaining records in the bulk queue
if len(bulk_queue) > 0:
print('Bulking final set of records to ES: ' + str(len(bulk_queue)))
if EvtxToElk.bulk_to_elasticsearch(es, bulk_queue):
bulk_queue = []
else:
print('Failed to bulk data to Elasticsearch')
sys.exit(1)


if __name__ == "__main__":
# Create argument parser
parser = argparse.ArgumentParser()
# Add arguments
parser.add_argument('evtxfile', help="Evtx file to parse")
parser.add_argument('elk_ip', default="localhost", help="IP (and port) of ELK instance")
parser.add_argument('-i', default="hostlogs", help="ELK index to load data into")
parser.add_argument('-s', default=500, help="Size of queue")
parser.add_argument('-meta', default={}, type=json.loads, help="Metadata to add to records")
# Parse arguments and call evtx to elk class
args = parser.parse_args()
EvtxToElk.evtx_to_elk(args.evtxfile, args.elk_ip)
EvtxToElk.evtx_to_elk(args.evtxfile, args.elk_ip, elk_index=args.i, bulk_queue_len_threshold=int(args.s), metadata=args.meta)

0 comments on commit be7ab91

Please sign in to comment.