This repository has been archived by the owner on Nov 17, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 13
/
logs.go
130 lines (104 loc) · 2.81 KB
/
logs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package tugboat
import (
"bufio"
"bytes"
"encoding/json"
"io"
"strings"
"time"
)
// LogLine represents a line of log output.
type LogLine struct {
// A unique identifier for this log line.
ID string `db:"id"`
// The associated deployment.
DeploymentID string `db:"deployment_id"`
// The line of text from the log line.
Text string `db:"text"`
// The time that the line was recorded.
At time.Time `db:"at"`
}
// LogLinesCreate inserts a LogLine into the store.
func (s *store) LogLinesCreate(l *LogLine) error {
return s.db.Insert(l)
}
// LogLines returns a slice of all LogLines for a given Deployment.
func (s *store) LogLines(d *Deployment) ([]*LogLine, error) {
var lines []*LogLine
_, err := s.db.Select(&lines, `select * from logs where deployment_id = $1 order by at asc`, d.ID)
return lines, err
}
// logsService wraps the LogLinesCreate method.
type logsService interface {
LogLinesCreate(*LogLine) error
}
// newLogsService returns a new composed logsService.
func newLogsService(store *store, pusher Pusher) logsService {
return &pushedLogsService{
logsService: store,
pusher: pusher,
}
}
// pushedLogsService wraps a logsService to send events to pusher.
type pushedLogsService struct {
logsService
pusher Pusher
}
// LogLinesCreate sends a pusher event including the new LogLine then delegates
// to the wrapped logsService.
func (s *pushedLogsService) LogLinesCreate(l *LogLine) error {
channel := deploymentChannel(l.DeploymentID)
data := struct {
ID string `json:"id"`
Output string `json:"output"`
}{
ID: l.DeploymentID,
Output: l.Text,
}
raw, err := json.Marshal(&data)
if err != nil {
return err
}
if err := s.pusher.Publish(string(raw), "log_line", channel); err != nil {
return err
}
return s.logsService.LogLinesCreate(l)
}
// logWriter is an io.Writer implementation that writes log lines using a
// logsService.
type logWriter struct {
createLogLine func(*LogLine) error
// deployment is the deployment that the log lines will be associated
// with.
deploymentID string
}
// Write creates a new LogLine for each line from p.
func (w *logWriter) Write(p []byte) (int, error) {
r := bufio.NewReader(bytes.NewReader(p))
createLine := func(text string) error {
return w.createLogLine(&LogLine{
DeploymentID: w.deploymentID,
Text: text,
At: time.Now(),
})
}
read := len(p)
for {
b, err := r.ReadBytes('\n')
// Heroku may send a null character as a heartbeat signal. We
// want to strip out any null characters, as inserting them into
// postgres will cause an error.
line := strings.Replace(string(b), "\x00", "", -1)
if err != nil {
if err == io.EOF {
return read, createLine(line)
} else {
return read, err
}
}
if err := createLine(line); err != nil {
return read, err
}
}
return read, nil
}