-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
428 lines (373 loc) · 11.9 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
package reservation
import (
"context"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/bradfitz/gomemcache/memcache"
"github.com/delimitrou/DeathStarBench/hotelreservation/registry"
pb "github.com/delimitrou/DeathStarBench/hotelreservation/services/reservation/proto"
"github.com/delimitrou/DeathStarBench/hotelreservation/tls"
"github.com/google/uuid"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/opentracing/opentracing-go"
"github.com/rs/zerolog/log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
const name = "srv-reservation"
// Server implements the user service
type Server struct {
Tracer opentracing.Tracer
Port int
IpAddr string
MongoClient *mongo.Client
Registry *registry.Client
MemcClient *memcache.Client
uuid string
}
// Run starts the server
func (s *Server) Run() error {
opentracing.SetGlobalTracer(s.Tracer)
if s.Port == 0 {
return fmt.Errorf("server port must be set")
}
s.uuid = uuid.New().String()
opts := []grpc.ServerOption{
grpc.KeepaliveParams(keepalive.ServerParameters{
Timeout: 120 * time.Second,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
PermitWithoutStream: true,
}),
grpc.UnaryInterceptor(
otgrpc.OpenTracingServerInterceptor(s.Tracer),
),
}
if tlsopt := tls.GetServerOpt(); tlsopt != nil {
opts = append(opts, tlsopt)
}
srv := grpc.NewServer(opts...)
pb.RegisterReservationServer(srv, s)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", s.Port))
if err != nil {
log.Fatal().Msgf("failed to listen: %v", err)
}
log.Trace().Msgf("In reservation s.IpAddr = %s, port = %d", s.IpAddr, s.Port)
err = s.Registry.Register(name, s.uuid, s.IpAddr, s.Port)
if err != nil {
return fmt.Errorf("failed register: %v", err)
}
log.Info().Msg("Successfully registered in consul")
return srv.Serve(lis)
}
// Shutdown cleans up any processes
func (s *Server) Shutdown() {
s.Registry.Deregister(s.uuid)
}
// MakeReservation makes a reservation based on given information
func (s *Server) MakeReservation(ctx context.Context, req *pb.Request) (*pb.Result, error) {
// [create thread]
res := new(pb.Result)
res.HotelId = make([]string, 0)
database := s.MongoClient.Database("reservation-db")
resCollection := database.Collection("reservation")
numCollection := database.Collection("number")
inDate, _ := time.Parse(
time.RFC3339,
req.InDate+"T12:00:00+00:00")
outDate, _ := time.Parse(
time.RFC3339,
req.OutDate+"T12:00:00+00:00")
hotelId := req.HotelId[0]
indate := inDate.String()[0:10]
memc_date_num_map := make(map[string]int)
// [capacity-lookup-loop]
for inDate.Before(outDate) {
// check reservations
count := 0
inDate = inDate.AddDate(0, 0, 1)
outdate := inDate.String()[0:10]
// first check memc
memc_key := hotelId + "_" + inDate.String()[0:10] + "_" + outdate
item, err := s.MemcClient.Get(memc_key)
// [branch-on-occupancy-cache-result]
if err == nil {
// memcached hit
count, _ = strconv.Atoi(string(item.Value))
log.Trace().Msgf("memcached hit %s = %d", memc_key, count)
memc_date_num_map[memc_key] = count + int(req.RoomNumber)
} else if err == memcache.ErrCacheMiss {
// memcached miss
log.Trace().Msgf("memcached miss")
var reserve []reservation
filter := bson.D{{"hotelId", hotelId}, {"inDate", indate}, {"outDate", outdate}}
curr, err := resCollection.Find(context.TODO(), filter)
if err != nil {
log.Error().Msgf("Failed get reservation data: ", err)
}
curr.All(context.TODO(), &reserve)
if err != nil {
log.Panic().Msgf("Tried to find hotelId [%v] from date [%v] to date [%v], but got error", hotelId, indate, outdate, err.Error())
}
for _, r := range reserve {
count += r.Number
}
memc_date_num_map[memc_key] = count + int(req.RoomNumber)
} else {
log.Panic().Msgf("Tried to get memc_key [%v], but got memmcached error = %s", memc_key, err)
}
// check capacity
// check memc capacity
// [check cap cache]
memc_cap_key := hotelId + "_cap"
item, err = s.MemcClient.Get(memc_cap_key)
hotel_cap := 0
if err == nil {
// memcached hit
hotel_cap, _ = strconv.Atoi(string(item.Value))
log.Trace().Msgf("memcached hit %s = %d", memc_cap_key, hotel_cap)
} else if err == memcache.ErrCacheMiss {
// memcached miss
var num number
err = numCollection.FindOne(context.TODO(), &bson.D{{"hotelId", hotelId}}).Decode(&num)
if err != nil {
log.Panic().Msgf("Tried to find hotelId [%v], but got error", hotelId, err.Error())
}
hotel_cap = int(num.Number)
// write to memcache
s.MemcClient.Set(&memcache.Item{Key: memc_cap_key, Value: []byte(strconv.Itoa(hotel_cap))})
} else {
log.Panic().Msgf("Tried to get memc_cap_key [%v], but got memmcached error = %s", memc_cap_key, err)
}
if count+int(req.RoomNumber) > hotel_cap {
return res, nil
}
indate = outdate
}
// only update reservation number cache after check succeeds
// [cache-write-loop]
for key, val := range memc_date_num_map {
s.MemcClient.Set(&memcache.Item{Key: key, Value: []byte(strconv.Itoa(val))})
}
inDate, _ = time.Parse(
time.RFC3339,
req.InDate+"T12:00:00+00:00")
indate = inDate.String()[0:10]
for inDate.Before(outDate) {
inDate = inDate.AddDate(0, 0, 1)
outdate := inDate.String()[0:10]
_, err := resCollection.InsertOne(
context.TODO(),
reservation{
HotelId: hotelId,
CustomerName: req.CustomerName,
InDate: indate,
OutDate: outdate,
Number: int(req.RoomNumber),
},
)
if err != nil {
log.Panic().Msgf("Tried to insert hotel [hotelId %v], but got error", hotelId, err.Error())
}
indate = outdate
}
res.HotelId = append(res.HotelId, hotelId)
return res, nil
}
// CheckAvailability checks if given information is available
func (s *Server) CheckAvailability(ctx context.Context, req *pb.Request) (*pb.Result, error) {
res := new(pb.Result)
res.HotelId = make([]string, 0)
hotelMemKeys := []string{}
keysMap := make(map[string]struct{})
resMap := make(map[string]bool)
// cache capacity since it will not change
for _, hotelId := range req.HotelId {
hotelMemKeys = append(hotelMemKeys, hotelId+"_cap")
resMap[hotelId] = true
keysMap[hotelId+"_cap"] = struct{}{}
}
capMemSpan, _ := opentracing.StartSpanFromContext(ctx, "memcached_capacity_get_multi_number")
capMemSpan.SetTag("span.kind", "client")
cacheMemRes, err := s.MemcClient.GetMulti(hotelMemKeys)
capMemSpan.Finish()
numCollection := s.MongoClient.Database("reservation-db").Collection("number")
misKeys := []string{}
// gather cache miss key to query in mongodb
if err == memcache.ErrCacheMiss {
for key := range keysMap {
if _, ok := cacheMemRes[key]; !ok {
misKeys = append(misKeys, key)
}
}
} else if err != nil {
log.Panic().Msgf("Tried to get memc_cap_key [%v], but got memmcached error = %s", hotelMemKeys, err)
}
// store whole capacity result in cacheCap
cacheCap := make(map[string]int)
for k, v := range cacheMemRes {
hotelCap, _ := strconv.Atoi(string(v.Value))
cacheCap[k] = hotelCap
}
if len(misKeys) > 0 {
queryMissKeys := []string{}
for _, k := range misKeys {
queryMissKeys = append(queryMissKeys, strings.Split(k, "_")[0])
}
var nums []number
capMongoSpan, _ := opentracing.StartSpanFromContext(ctx, "mongodb_capacity_get_multi_number")
capMongoSpan.SetTag("span.kind", "client")
curr, err := numCollection.Find(context.TODO(), bson.D{{"$in", queryMissKeys}})
if err != nil {
log.Error().Msgf("Failed get reservation number data: ", err)
}
curr.All(context.TODO(), &nums)
if err != nil {
log.Error().Msgf("Failed get reservation number data: ", err)
}
capMongoSpan.Finish()
if err != nil {
log.Panic().Msgf("Tried to find hotelId [%v], but got error", misKeys, err.Error())
}
for _, num := range nums {
cacheCap[num.HotelId] = num.Number
// we don't care set successfully or not
go s.MemcClient.Set(&memcache.Item{Key: num.HotelId + "_cap", Value: []byte(strconv.Itoa(num.Number))})
}
}
reqCommand := []string{}
queryMap := make(map[string]map[string]string)
for _, hotelId := range req.HotelId {
log.Trace().Msgf("reservation check hotel %s", hotelId)
inDate, _ := time.Parse(
time.RFC3339,
req.InDate+"T12:00:00+00:00")
outDate, _ := time.Parse(
time.RFC3339,
req.OutDate+"T12:00:00+00:00")
for inDate.Before(outDate) {
indate := inDate.String()[:10]
inDate = inDate.AddDate(0, 0, 1)
outDate := inDate.String()[:10]
memcKey := hotelId + "_" + outDate + "_" + outDate
reqCommand = append(reqCommand, memcKey)
queryMap[memcKey] = map[string]string{
"hotelId": hotelId,
"startDate": indate,
"endDate": outDate,
}
}
}
type taskRes struct {
hotelId string
checkRes bool
}
reserveMemSpan, _ := opentracing.StartSpanFromContext(ctx, "memcached_reserve_get_multi_number")
ch := make(chan taskRes)
reserveMemSpan.SetTag("span.kind", "client")
// check capacity in memcached and mongodb
if itemsMap, err := s.MemcClient.GetMulti(reqCommand); err != nil && err != memcache.ErrCacheMiss {
reserveMemSpan.Finish()
log.Panic().Msgf("Tried to get memc_key [%v], but got memmcached error = %s", reqCommand, err)
} else {
reserveMemSpan.Finish()
// go through reservation count from memcached
go func() {
for k, v := range itemsMap {
id := strings.Split(k, "_")[0]
val, _ := strconv.Atoi(string(v.Value))
var res bool
if val+int(req.RoomNumber) <= cacheCap[id] {
res = true
}
ch <- taskRes{
hotelId: id,
checkRes: res,
}
}
if err == nil {
close(ch)
}
}()
// use miss reservation to get data from mongo
// rever string to indata and outdate
if err == memcache.ErrCacheMiss {
var wg sync.WaitGroup
for k := range itemsMap {
delete(queryMap, k)
}
wg.Add(len(queryMap))
go func() {
wg.Wait()
close(ch)
}()
for command := range queryMap {
go func(comm string) {
defer wg.Done()
var reserve []reservation
queryItem := queryMap[comm]
resCollection := s.MongoClient.Database("reservation-db").Collection("reservation")
filter := bson.D{{"hotelId", queryItem["hotelId"]}, {"inDate", queryItem["startDate"]}, {"outDate", queryItem["endDate"]}}
reserveMongoSpan, _ := opentracing.StartSpanFromContext(ctx, "mongodb_capacity_get_multi_number"+comm)
reserveMongoSpan.SetTag("span.kind", "client")
curr, err := resCollection.Find(context.TODO(), filter)
if err != nil {
log.Error().Msgf("Failed get reservation data: ", err)
}
curr.All(context.TODO(), &reserve)
if err != nil {
log.Error().Msgf("Failed get reservation data: ", err)
}
reserveMongoSpan.Finish()
if err != nil {
log.Panic().Msgf("Tried to find hotelId [%v] from date [%v] to date [%v], but got error",
queryItem["hotelId"], queryItem["startDate"], queryItem["endDate"], err.Error())
}
var count int
for _, r := range reserve {
log.Trace().Msgf("reservation check reservation number = %d", queryItem["hotelId"])
count += r.Number
}
// update memcached
go s.MemcClient.Set(&memcache.Item{Key: comm, Value: []byte(strconv.Itoa(count))})
var res bool
if count+int(req.RoomNumber) <= cacheCap[queryItem["hotelId"]] {
res = true
}
ch <- taskRes{
hotelId: queryItem["hotelId"],
checkRes: res,
}
}(command)
}
}
}
for task := range ch {
if !task.checkRes {
resMap[task.hotelId] = false
}
}
for k, v := range resMap {
if v {
res.HotelId = append(res.HotelId, k)
}
}
return res, nil
}
type reservation struct {
HotelId string `bson:"hotelId"`
CustomerName string `bson:"customerName"`
InDate string `bson:"inDate"`
OutDate string `bson:"outDate"`
Number int `bson:"number"`
}
type number struct {
HotelId string `bson:"hotelId"`
Number int `bson:"numberOfRoom"`
}