-
Notifications
You must be signed in to change notification settings - Fork 10
/
interface.py
171 lines (141 loc) · 5.07 KB
/
interface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import csv
import typing
import importlib
import importlib.util
import sys
from dataclasses import dataclass, fields, field
from pathlib import Path
from urllib.parse import urlparse
from .aws import AwsClient
from .models import (
LogFormat,
LogFormatType,
)
from .util import batcher
from .parser import to_python
@dataclass
class AwsLogParser:
log_type: LogFormat
# Optional
region: typing.Optional[str] = None
profile: typing.Optional[str] = None
file_suffix: str = ".log"
verbose: bool = False
plugin_paths: typing.List[typing.Union[str, Path]] = field(default_factory=list)
plugins: typing.List[str] = field(default_factory=list)
def __post_init__(self):
self.aws_client = AwsClient(
region=self.region, profile=self.profile, verbose=self.verbose
)
self.plugins_loaded = [
self.load_plugin(
plugin,
self.plugin_paths[0],
)
for plugin in self.plugins
]
def load_plugin(self, plugin, plugin_path):
plugin_module, plugin_classs = plugin.split(":")
spec = importlib.util.spec_from_file_location(
plugin_module, f"{plugin_path}/{plugin_module}.py"
)
if spec is None:
raise ValueError("{plugin} not found")
module = importlib.util.module_from_spec(spec)
sys.modules[plugin_module] = module
spec.loader.exec_module(module) # type: ignore
return getattr(module, plugin_classs)(aws_client=self.aws_client)
def run_plugin(self, plugin, log_entries):
for batch in batcher(log_entries, plugin.batch_size):
yield from plugin.augment(batch)
def parse_csv(self, content):
model_fields = fields(self.log_type.model)
assert self.log_type.delimiter
for row in csv.reader(content, delimiter=self.log_type.delimiter):
if not row[0].startswith("#"):
yield self.log_type.model(
*[
to_python(value, field)
for value, field in zip(row, model_fields)
]
)
def parse_json(self, records):
for record in records:
yield self.log_type.model.from_json(record) # type: ignore
def parse(self, content):
parse_func = (
self.parse_json
if self.log_type.type == LogFormatType.JSON
else self.parse_csv
)
log_entries = parse_func(content)
for plugin in self.plugins_loaded:
log_entries = self.run_plugin(plugin, log_entries)
yield from log_entries
def read_file(self, path):
"""
Yield parsed log entries from the given file.
Low level function used by ``parse_files``.
:param path: The path to the file.
:type kind: str
:return: Parsed log entries.
:rtype: Dependant on log_type.
"""
if self.verbose:
print(f"Reading file://{path}")
with open(path) as log_data:
yield from self.parse(log_data.readlines())
def read_files(self, pathname):
"""
Yield parsed log entries from the files in the given path.
Low level function used by ``parse_url``.
:param pathname: The path to the files.
:type kind: str
:return: Parsed log entries.
:rtype: Dependant on log_type.
"""
path = Path(pathname)
if path.is_file():
yield from self.read_file(path)
else:
for p in path.glob(f"**/*{self.file_suffix}"):
yield from self.read_file(p)
def read_s3(self, bucket, prefix, endswith=None):
"""
Yield parsed log entries from the given s3 url.
Low level function used by ``parse_url``.
:param bucket: The S3 bucket.
:type kind: str
:param prefix: The S3 prefix.
:type kind: str
:return: Parsed log entries.
:rtype: Dependant on log_type.
"""
yield from self.parse(
self.aws_client.s3_service.read_keys(bucket, prefix, endswith=endswith)
)
def read_url(self, url):
"""
Yield parsed log entries from the given url. The file:// and s3://
schemes are currently supported.
:param url: The url to read from. Partial path's are supported
for s3 urls. For example::
s3://bucket/prefix/
or you can pass the full path to the file::
s3://bucket/prefix/logfile.log
:type kind: str
:raise ValueError: If the url schema is not known.
:return: Parsed log entries.
:rtype: Dependant on log_type.
"""
parsed = urlparse(url)
if parsed.scheme == "file":
yield from self.read_files(parsed.path)
elif parsed.scheme == "s3":
yield from self.read_s3(
parsed.netloc,
parsed.path.lstrip("/"),
endswith=self.file_suffix,
)
else:
raise ValueError(f"Unknown scheme {parsed.scheme}")