-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
eventmgr.py
208 lines (167 loc) · 6.73 KB
/
eventmgr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
from colorama import Style
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import json
import logging
from logging.handlers import RotatingFileHandler
import threading
from typing import Any, Callable, List, Optional, TextIO
from uuid import uuid4
from dbt.events.base_types import BaseEvent, EventLevel, msg_from_base_event, EventMsg
# A Filter is a function which takes a BaseEvent and returns True if the event
# should be logged, False otherwise.
Filter = Callable[[EventMsg], bool]
# Default filter which logs every event
def NoFilter(_: EventMsg) -> bool:
return True
# A Scrubber removes secrets from an input string, returning a sanitized string.
Scrubber = Callable[[str], str]
# Provide a pass-through scrubber implementation, also used as a default
def NoScrubber(s: str) -> str:
return s
class LineFormat(Enum):
PlainText = 1
DebugText = 2
Json = 3
# Map from dbt event levels to python log levels
_log_level_map = {
EventLevel.DEBUG: 10,
EventLevel.TEST: 10,
EventLevel.INFO: 20,
EventLevel.WARN: 30,
EventLevel.ERROR: 40,
}
# We need this function for now because the numeric log severity levels in
# Python do not match those for logbook, so we have to explicitly call the
# correct function by name.
def send_to_logger(l, level: str, log_line: str):
if level == "test":
l.debug(log_line)
elif level == "debug":
l.debug(log_line)
elif level == "info":
l.info(log_line)
elif level == "warn":
l.warning(log_line)
elif level == "error":
l.error(log_line)
else:
raise AssertionError(
f"While attempting to log {log_line}, encountered the unhandled level: {level}"
)
@dataclass
class LoggerConfig:
name: str
filter: Filter = NoFilter
scrubber: Scrubber = NoScrubber
line_format: LineFormat = LineFormat.PlainText
level: EventLevel = EventLevel.WARN
use_colors: bool = False
output_stream: Optional[TextIO] = None
output_file_name: Optional[str] = None
logger: Optional[Any] = None
class _Logger:
def __init__(self, event_manager: "EventManager", config: LoggerConfig) -> None:
self.name: str = config.name
self.filter: Filter = config.filter
self.scrubber: Scrubber = config.scrubber
self.level: EventLevel = config.level
self.event_manager: EventManager = event_manager
self._python_logger: Optional[logging.Logger] = config.logger
self._stream: Optional[TextIO] = config.output_stream
if config.output_file_name:
log = logging.getLogger(config.name)
log.setLevel(_log_level_map[config.level])
handler = RotatingFileHandler(
filename=str(config.output_file_name),
encoding="utf8",
maxBytes=10 * 1024 * 1024, # 10 mb
backupCount=5,
)
handler.setFormatter(logging.Formatter(fmt="%(message)s"))
log.handlers.clear()
log.addHandler(handler)
self._python_logger = log
def create_line(self, msg: EventMsg) -> str:
raise NotImplementedError()
def write_line(self, msg: EventMsg):
line = self.create_line(msg)
python_level = _log_level_map[EventLevel(msg.info.level)]
if self._python_logger is not None:
send_to_logger(self._python_logger, msg.info.level, line)
elif self._stream is not None and _log_level_map[self.level] <= python_level:
self._stream.write(line + "\n")
def flush(self):
if self._python_logger is not None:
for handler in self._python_logger.handlers:
handler.flush()
elif self._stream is not None:
self._stream.flush()
class _TextLogger(_Logger):
def __init__(self, event_manager: "EventManager", config: LoggerConfig) -> None:
super().__init__(event_manager, config)
self.use_colors = config.use_colors
self.use_debug_format = config.line_format == LineFormat.DebugText
def create_line(self, msg: EventMsg) -> str:
return self.create_debug_line(msg) if self.use_debug_format else self.create_info_line(msg)
def create_info_line(self, msg: EventMsg) -> str:
ts: str = datetime.utcnow().strftime("%H:%M:%S")
scrubbed_msg: str = self.scrubber(msg.info.msg) # type: ignore
return f"{self._get_color_tag()}{ts} {scrubbed_msg}"
def create_debug_line(self, msg: EventMsg) -> str:
log_line: str = ""
# Create a separator if this is the beginning of an invocation
# TODO: This is an ugly hack, get rid of it if we can
if msg.info.name == "MainReportVersion":
separator = 30 * "="
log_line = (
f"\n\n{separator} {msg.info.ts} | {self.event_manager.invocation_id} {separator}\n"
)
ts: str = msg.info.ts.strftime("%H:%M:%S.%f")
scrubbed_msg: str = self.scrubber(msg.info.msg) # type: ignore
level = msg.info.level
log_line += (
f"{self._get_color_tag()}{ts} [{level:<5}]{self._get_thread_name()} {scrubbed_msg}"
)
return log_line
def _get_color_tag(self) -> str:
return "" if not self.use_colors else Style.RESET_ALL
def _get_thread_name(self) -> str:
thread_name = ""
if threading.current_thread().name:
thread_name = threading.current_thread().name
thread_name = thread_name[:10]
thread_name = thread_name.ljust(10, " ")
thread_name = f" [{thread_name}]:"
return thread_name
class _JsonLogger(_Logger):
def create_line(self, msg: EventMsg) -> str:
from dbt.events.functions import msg_to_dict
msg_dict = msg_to_dict(msg)
raw_log_line = json.dumps(msg_dict, sort_keys=True)
line = self.scrubber(raw_log_line) # type: ignore
return line
class EventManager:
def __init__(self) -> None:
self.loggers: List[_Logger] = []
self.callbacks: List[Callable[[EventMsg], None]] = []
self.invocation_id: str = str(uuid4())
def fire_event(self, e: BaseEvent, level: EventLevel = None) -> None:
msg = msg_from_base_event(e, level=level)
for logger in self.loggers:
if logger.filter(msg): # type: ignore
logger.write_line(msg)
for callback in self.callbacks:
callback(msg)
def add_logger(self, config: LoggerConfig):
logger = (
_JsonLogger(self, config)
if config.line_format == LineFormat.Json
else _TextLogger(self, config)
)
logger.event_manager = self
self.loggers.append(logger)
def flush(self):
for logger in self.loggers:
logger.flush()