This repository has been archived by the owner on Sep 18, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
correlator.go
150 lines (136 loc) · 3.45 KB
/
correlator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package main
import (
"fmt"
"io/ioutil"
"net/http"
"strings"
"sync"
"time"
"github.com/google/uuid"
)
const (
timeout = time.Minute
)
var httpClient = http.DefaultClient
var replyChannels = newRepliesMap()
var claimChecks = make(map[string]string)
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
handlePost(w, r)
} else {
handleGet(w, r)
}
})
http.ListenAndServe(":8080", nil)
}
func handlePost(w http.ResponseWriter, r *http.Request) {
namespace, channel := parsePathToChannel(r.URL.Path)
if channel == "" { // handle reply
fmt.Printf("response headers: %v", r.Header)
correlationID := r.Header.Get("knative-correlation-id")
reply, err := ioutil.ReadAll(r.Body)
if err != nil {
// TODO
}
fmt.Printf("REPLY %s: %s\n", correlationID, reply)
replyChannel := replyChannels.Get(correlationID)
if replyChannel != nil {
replyChannel <- string(reply)
} else {
// timeout or non-blocking request
claimChecks[correlationID] = string(reply)
}
} else {
uuid, err := uuid.NewRandom()
if err != nil {
fmt.Print("failed to create correlation ID")
w.WriteHeader(500)
return
}
correlationID := uuid.String()
blocking := r.Header.Get("knative-blocking-request") == "true"
var replyChan chan string
if blocking {
replyChan = make(chan string, 1)
replyChannels.Put(correlationID, replyChan)
defer replyChannels.Delete(correlationID)
}
err = sendToChannel(namespace, channel, correlationID, r)
if err != nil {
fmt.Fprintf(w, "Failed to send to channel %v", err)
return
}
if !blocking {
w.Header().Add("knative-correlation-id", correlationID)
w.WriteHeader(202)
return
}
select {
case reply := <-replyChan:
w.Write([]byte(reply))
case <-time.After(timeout):
w.WriteHeader(404)
}
return
}
}
func handleGet(w http.ResponseWriter, r *http.Request) {
s := strings.Split(r.URL.Path, "/")
if len(s) != 2 {
w.WriteHeader(404)
return
}
key := s[1]
w.Write([]byte(claimChecks[key]))
}
func parsePathToChannel(path string) (namespace, channel string) {
s := strings.Split(path, "/")
if len(s) >= 3 {
namespace = s[1]
channel = s[2]
}
return
}
func sendToChannel(namespace, channel string, correlationID string, r *http.Request) error {
fmt.Printf("SENDING TO %s/%s\n", namespace, channel)
url := fmt.Sprintf("http://%s-channel.%s.svc.cluster.local", channel, namespace)
req, err := http.NewRequest(http.MethodPost, url, r.Body)
if err != nil {
return err
}
req.Header = r.Header
req.Header.Add("knative-correlation-id", correlationID)
fmt.Printf("headers: %v\n", req.Header)
res, err := httpClient.Do(req)
if err != nil {
return err
}
if res.StatusCode >= 400 {
return fmt.Errorf("status: %d", res.StatusCode)
}
return nil
}
// Type repliesMap implements a concurrent safe map of channels to send replies to, keyed by correlationIds
type repliesMap struct {
m map[string]chan<- string
lock sync.RWMutex
}
func (replies *repliesMap) Delete(key string) {
replies.lock.Lock()
defer replies.lock.Unlock()
delete(replies.m, key)
}
func (replies *repliesMap) Get(key string) chan<- string {
replies.lock.RLock()
defer replies.lock.RUnlock()
return replies.m[key]
}
func (replies *repliesMap) Put(key string, value chan<- string) {
replies.lock.Lock()
defer replies.lock.Unlock()
replies.m[key] = value
}
func newRepliesMap() *repliesMap {
return &repliesMap{make(map[string]chan<- string), sync.RWMutex{}}
}