-
-
Notifications
You must be signed in to change notification settings - Fork 366
/
mqingest.go
122 lines (103 loc) · 2.72 KB
/
mqingest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main
import (
"bufio"
"encoding/json"
"flag"
"fmt"
"github.com/sa7mon/s3scanner/bucket"
"github.com/streadway/amqp"
"log"
"os"
"strings"
)
func failOnError(err error, msg string) {
if err != nil {
log.Printf("%v - %v\n", msg, err)
os.Exit(1)
}
}
func printUsage() {
fmt.Println("mqingest takes in a file of bucket names, one per line, and publishes them to a RabbitMQ queue")
flag.PrintDefaults()
}
// Name of queue should indicate endpoint, no need to put endpoint in messgae
type BucketMessage struct {
BucketName string `json:"bucket_name"`
}
func main() {
var filename string
var url string
var queueName string
flag.StringVar(&filename, "file", "", "File name of buckets to send to MQ")
flag.StringVar(&url, "url", "amqp://guest:guest@localhost:5672/", "AMQP URI of RabbitMQ server")
flag.StringVar(&queueName, "queue", "", "Name of message queue to publish buckets to")
flag.Parse()
if filename == "" || queueName == "" {
fmt.Println("Flags 'file' and 'queue' are required")
printUsage()
os.Exit(1)
}
conn, err := amqp.Dial(url)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// Declare dead letter queue
dlq, dlErr := ch.QueueDeclare(queueName+"_dead", true, false, false,
false, nil)
failOnError(dlErr, "Failed to declare dead letter queue")
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp.Table{
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": dlq.Name,
},
)
if err != nil {
failOnError(err, "Failed to declare a queue")
}
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
failOnError(err, "Failed to set QoS on channel")
}
file, err := os.Open(filename)
if err != nil {
failOnError(err, "Failed to open file")
}
defer file.Close()
msgsPublished := 0
fileScanner := bufio.NewScanner(file)
for fileScanner.Scan() {
bucketName := strings.TrimSpace(fileScanner.Text())
//bucketMsg := BucketMessage{BucketName: bucketName}
bucketMsg := bucket.Bucket{Name: bucketName}
bucketBytes, err := json.Marshal(bucketMsg)
if err != nil {
failOnError(err, "Failed to marshal bucket msg")
}
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{Body: bucketBytes, DeliveryMode: amqp.Persistent},
)
if err != nil {
failOnError(err, "Failed to publish to channel")
}
msgsPublished++
}
if err := fileScanner.Err(); err != nil {
failOnError(err, "fileScanner failed")
}
log.Printf("%v bucket names published to queue %v\n", msgsPublished, queueName)
}