-
Notifications
You must be signed in to change notification settings - Fork 0
/
clientConnection.go
executable file
·217 lines (169 loc) · 5.4 KB
/
clientConnection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package hz
import (
"net"
"fmt"
"errors"
"strconv"
"encoding/binary"
"sync"
"time"
)
// An external logging impl for feedback
type ILogging interface {
Trace(string, ...interface{})
Info(string, ...interface{})
Warn(string, ...interface{})
Error(string, ...interface{})
Fatal(string, ...interface{})
}
// Callback data as all responses are async
type ResponseCallback struct {
NotifyChannel chan *ClientMessage
autoRemove bool
}
type ClientConnection struct {
Address Address
readBuffer []byte
cid uint64
partitionCount int32
socketMutex *sync.Mutex
socket net.Conn
responsesMutex *sync.Mutex
responses map[int64]*ResponseCallback
Logger ILogging
Closed bool
QueueSerializerId uint32
}
const (
DEFAULT_EXCHANGE_TIMEOUT_MILLIS = 1000 * 60 * 2 // 2 mins
)
func NewClientConnection(address Address) *ClientConnection {
connection := new(ClientConnection)
connection.Address = address
connection.readBuffer = make([]byte, 0) //todo
connection.socketMutex = &sync.Mutex{}
connection.responsesMutex = &sync.Mutex{}
connection.responses = make(map[int64]*ResponseCallback)
connection.cid = 1
connection.QueueSerializerId = 0
return connection
}
// Ensure a unique id on each message exchange
func (this *ClientConnection) NextCorrelationId() uint64 {
this.cid += 1
return this.cid
}
func (this *ClientConnection) Close() {
this.Logger.Trace("Closing connection: %v", this.socket)
this.socket.Close()
this.Closed = true;
}
func (this *ClientConnection) Connect(address Address) *Promise {
result := new(Promise)
result.SuccessChannel = make(chan interface{}, 1)
result.FailureChannel = make(chan error, 1)
go func() {
socket, err := net.Dial("tcp", this.Address.Host+":"+strconv.Itoa(this.Address.Port))
this.socket = socket
if err == nil {
this.Closed = false
this.socket.Write([]byte(CLIENT_BINARY_NEW))
result.SuccessChannel <- this
} else {
result.FailureChannel <- errors.New(fmt.Sprintf("Could not connect to address: %s, err(%v)", this.Address.String(), err))
}
}()
return result
}
// A single socket read loop with message distribution to registered callbacks
func (this *ClientConnection) InitReadLoop() {
go func() {
flBuffer := make([]byte, INT_SIZE_IN_BYTES)
for {
i, err := this.socket.Read(flBuffer)
if nil != err {
this.Logger.Error("Unexpected error reading frame length! %v - read loop aborted!", err)
return;
}
for i < len(flBuffer) {
j, err2 := this.socket.Read(flBuffer[i:])
if nil != err2 {
this.Logger.Error("Unexpected error reading frame length! %v - read loop aborted!", err)
return;
}
i = i + j
}
frameLength := binary.LittleEndian.Uint32(flBuffer[0:])
fBuffer := make([]byte, frameLength-INT_SIZE_IN_BYTES)
i, err = this.socket.Read(fBuffer)
if nil != err {
this.Logger.Error("Unexpected error reading message! %v - read loop aborted!", err)
return;
}
if i < len(fBuffer) {
j, err2 := this.socket.Read(fBuffer[i:])
if nil != err2 {
this.Logger.Error("Unexpected error reading message! %v - read loop aborted!", err)
return;
}
i = i + j
}
msg := new(ClientMessage)
msg.Buffer = append(flBuffer, fBuffer...)
cid := msg.GetCorrelationId()
this.responsesMutex.Lock()
cb, ok := this.responses[cid]
if ok {
if cb.autoRemove {
delete(this.responses, cid)
this.Logger.Trace("Removed correlation id from responses map: %d", cid)
}
this.responsesMutex.Unlock()
go func() {
cb.NotifyChannel <- msg
}()
} else {
this.Logger.Error("Failed to find correlation id: %d using response message of type: 0x%04x! Message receiver is now BLOCKED!!", cid, msg.GetMessageType())
this.responsesMutex.Unlock()
}
}
}()
}
func (this *ClientConnection) Exchange(msg *ClientMessage) (*ClientMessage, error) {
return this.ExchangeWithTimeout(msg, DEFAULT_EXCHANGE_TIMEOUT_MILLIS)
}
func (this *ClientConnection) ExchangeWithTimeout(msg *ClientMessage, timeout time.Duration) (*ClientMessage, error) {
this.socketMutex.Lock()
cb := this.Register(msg.GetCorrelationId())
cb.autoRemove = true
this.Logger.Trace("====> Sending: cid=%d, type=0x%02x, partitionid=%d, framelength=%d, flags=0x%02x, dataoffset=%d", msg.GetCorrelationId(), msg.GetMessageType(), msg.GetPartitionId(), msg.GetFrameLength(), msg.GetFlags(), msg.GetDataOffset())
n, err := this.socket.Write(msg.Buffer)
if nil != err {
this.socketMutex.Unlock()
this.Logger.Error("Fatal socket error on write: %v\n", err)
this.Close()
return nil, err
}
if n != len(msg.Buffer) {
this.socketMutex.Unlock()
this.Close()
return nil, errors.New(fmt.Sprintf("Fatal socket error: Incomplete write to socket! buffer size=%d, written=%d\r\n", len(msg.Buffer), n))
}
this.socketMutex.Unlock()
select {
case response := <-cb.NotifyChannel:
return response, nil
case <-time.After(time.Millisecond * timeout):
// call timed out
return nil, errors.New(fmt.Sprintf("Message exchange timeout. No response received in: %d millis", timeout))
}
}
// Receive callback registration based on message correlation id
func (this *ClientConnection) Register(correlationId int64) *ResponseCallback {
responseCallback := ResponseCallback{}
responseCallback.NotifyChannel = make(chan *ClientMessage)
this.responsesMutex.Lock()
this.responses[correlationId] = &responseCallback
this.responsesMutex.Unlock()
return &responseCallback
}