-
Notifications
You must be signed in to change notification settings - Fork 26
/
transaction_handler.go
337 lines (302 loc) · 10.5 KB
/
transaction_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
package mongo
import (
"context"
"github.com/go-oauth2/oauth2/v4"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"log"
"time"
)
// transactionData is the object saved in the TxnCName db
type transactionData struct {
ID string `bson:"_id"`
TxnID string `bson:"TxnID"`
Collection string `bson:"Collection"`
Service string `bson:"Service"`
CreatedAt time.Time `bson:"CreatedAt"`
}
type transactionHandler struct {
tcfg *TokenConfig
tw TransactionWorker
}
func NewTransactionHandler(client *mongo.Client, tcfg *TokenConfig) *transactionHandler {
return &transactionHandler{
tcfg: tcfg,
tw: NewTransactionWorker(tcfg, client),
}
}
// runTransactionCreate run the transaction
func (th *transactionHandler) runTransactionCreate(ctx context.Context, info oauth2.TokenInfo, basicData basicData, accessData tokenData, id string, rexp time.Time) (errRET error) {
ctxReq, cancel := th.tcfg.storeConfig.setRequestContext()
defer cancel()
if ctxReq != nil {
ctx = ctxReq
}
// create id transaction idTXN
txnID := primitive.NewObjectID().Hex()
// T1
basicTxnData := transactionData{
ID: basicData.ID,
TxnID: txnID,
Collection: th.tcfg.BasicCName,
Service: th.tcfg.storeConfig.service,
CreatedAt: time.Now(),
}
var err error
errRET = th.tw.insertBasicTransactionData(ctx, basicTxnData)
if errRET != nil {
log.Println("T1: Failed add basicData to TxnCName: ", errRET)
return
} else {
errRET = th.tw.insertBasicData(ctx, basicData)
if errRET != nil {
log.Println("T1: Failed add basicData to BasicCName: ", errRET)
err = th.tw.removeTransactionData(ctx, basicData.ID)
if err != nil {
// basicTxnData from will be remove when service restart
log.Println("T2: Failed remove basicData from TxnCName: ", err)
}
return
}
}
// T2
acccessTxnData := transactionData{
ID: info.GetAccess(),
TxnID: txnID,
Collection: th.tcfg.AccessCName,
Service: th.tcfg.storeConfig.service,
CreatedAt: time.Now(),
}
errRET = th.tw.insertTokenTransactionData(ctx, acccessTxnData)
if errRET != nil {
log.Println("T2: Failed insert accessData to TxnCName: ", errRET)
err = th.tw.removeBasicData(ctx, basicData.ID)
if err != nil {
// basicData will be remove when service restart
log.Println("T2: Failed remove basicData from BasicCName: ", err)
} else {
// basicData has been removed, then removed it in TxnCName
err = th.tw.removeTransactionData(ctx, basicData.ID)
if err != nil {
log.Println("T2: Failed remove basicData from TxnCName: ", err)
}
}
return
} else {
errRET = th.tw.insertTokenData(ctx, accessData, th.tcfg.AccessCName)
if errRET != nil {
log.Println("T2: Failed insert accessData to AccessCName: ", errRET)
err = th.tw.removeBasicData(ctx, basicData.ID)
if err != nil {
// basicData from will be remove when service restart
log.Println("T2: Failed remove basicData from BasicCName: ", err)
} else {
err = th.tw.removeTransactionData(ctx, basicData.ID)
if err != nil {
// basicTxnData from will be remove when service restart
log.Println("T2: Failed remove basicData from TxnCName: ", err)
}
}
err = th.tw.removeTransactionData(ctx, accessData.ID)
if err != nil {
log.Println("T2: Failed remove txnData from TxnCName: ", err)
}
return
}
}
// T3
refresh := info.GetRefresh()
if refresh != "" {
refreshData := tokenData{
ID: refresh,
BasicID: id,
ExpiredAt: rexp,
}
errRET = th.tw.insertTokenData(ctx, refreshData, th.tcfg.RefreshCName)
if errRET != nil {
log.Println("T3: Failed insert refreshData to RefreshCName: ", err)
err = th.tw.removeBasicData(ctx, basicData.ID)
if err != nil {
// basicData will be remove when service restart
log.Println("T3: Failed remove basicData from BasicCName: ", err)
} else {
err = th.tw.removeTransactionData(ctx, basicData.ID)
if err != nil {
// basicData will be remove when service restart
log.Println("T3: Failed remove basicData from TxnCName: ", err)
}
}
err = th.tw.removeTokenData(ctx, accessData.ID, th.tcfg.AccessCName)
if err != nil {
// accessData will be remove when service restart
log.Println("T3: Failed remove accessData from AccessCName: ", err)
} else {
err = th.tw.removeTransactionData(ctx, accessData.ID)
if err != nil {
log.Println("T3: Failed remove txnData from TxnCName: ", err)
}
}
return
}
}
// case all is fine, finally delete all txnDatas
err = th.tw.removeTransactionData(ctx, basicData.ID)
if err != nil {
// basicTxnData will be remove when service restart
log.Println("EndTnx cleanup: Failed remove basicData from TxnCName: ", err)
}
err = th.tw.removeTransactionData(ctx, accessData.ID)
if err != nil {
// accessTxnData will be remove when service restart
log.Println("EndTxn cleanup: Failed remove txnData from TxnCName: ", err)
}
return nil
}
type TransactionWorker interface {
insertBasicData(ctx context.Context, basicData basicData) error
removeBasicData(ctx context.Context, basicDataID string) error
insertBasicTransactionData(ctx context.Context, txnData transactionData) error
insertTokenData(ctx context.Context, tokenData tokenData, collectionName string) error
removeTokenData(ctx context.Context, tokenDataID, collectionName string) error
insertTokenTransactionData(ctx context.Context, txnData transactionData) error
removeTransactionData(ctx context.Context, tokenDataID string) error
cleanupTransactionsData(ctx context.Context, service string) error
}
// transactionWorker execute transaction's actions
type transactionWorker struct {
tc *TokenConfig
client *mongo.Client
}
func NewTransactionWorker(t *TokenConfig, cl *mongo.Client) *transactionWorker {
return &transactionWorker{
tc: t,
client: cl,
}
}
func (tw *transactionWorker) getCollection(collName string) *mongo.Collection {
return tw.client.Database(tw.tc.storeConfig.db).Collection(collName)
}
func (tw *transactionWorker) insertBasicData(ctx context.Context, basicData basicData) error {
_, err := tw.getCollection(tw.tc.BasicCName).InsertOne(ctx, basicData)
if err != nil {
if !mongo.IsDuplicateKeyError(err) {
log.Println("Err insertBasicData into BasicCname: ", err)
} else {
// in case of retry, the tuple may have already been inserted
// we like to carry on
log.Println("Err insertBasicData duplicated _id: ", err)
return nil
}
}
return err
}
func (tw *transactionWorker) removeBasicData(ctx context.Context, basicDataID string) error {
_, err := tw.getCollection(tw.tc.BasicCName).DeleteOne(ctx, bson.D{{Key: "_id", Value: basicDataID}})
if err != nil {
log.Println("Err removeBasicData from BasicCname: ", err)
return err
}
return err
}
func (tw *transactionWorker) insertBasicTransactionData(ctx context.Context, txnData transactionData) error {
_, err := tw.getCollection(tw.tc.TxnCName).InsertOne(ctx, txnData)
if err != nil {
if !mongo.IsDuplicateKeyError(err) {
log.Println("Err insertBasicTransactionData to basicCname: ", err)
} else {
log.Println("Err insertBasicTransactionData duplicated _id: ", err)
return nil
}
}
return err
}
// insertTokenData insert accessData and refreshData
func (tw *transactionWorker) insertTokenData(ctx context.Context, tokenData tokenData, collectionName string) error {
_, err := tw.getCollection(collectionName).InsertOne(ctx, tokenData)
if err != nil {
if !mongo.IsDuplicateKeyError(err) {
log.Printf("Err insertTokenData into %v: %v", collectionName, err)
} else {
log.Println("Err insertTokenData duplicated _id: ", err)
return nil
}
}
return err
}
func (tw *transactionWorker) removeTokenData(ctx context.Context, tokenDataID, collectionName string) error {
_, err := tw.getCollection(collectionName).DeleteOne(ctx, bson.D{{Key: "_id", Value: tokenDataID}})
if err != nil {
log.Printf("Err removeTransactionData from %v: %v", collectionName, err)
return err
}
return err
}
// insertTokenTransactionData insert accessData and refreshData to the TxnCName db
func (tw *transactionWorker) insertTokenTransactionData(ctx context.Context, txnData transactionData) error {
_, err := tw.getCollection(tw.tc.TxnCName).InsertOne(ctx, txnData)
if err != nil {
if !mongo.IsDuplicateKeyError(err) {
log.Println("Err insertTokenTransactionData into TxnCname: ", err)
} else {
log.Println("Err insertTokenTransactionData duplicated _id: ", err)
return nil
}
}
return err
}
// removeTransactionData remove transaction's tuple
func (tw *transactionWorker) removeTransactionData(ctx context.Context, tokenDataID string) error {
_, err := tw.getCollection(tw.tc.TxnCName).DeleteOne(ctx, bson.D{{Key: "_id", Value: tokenDataID}})
if err != nil {
log.Println("Err removeTransactionData from TxnCName: ", err)
return err
}
return err
}
/*
* cleanupTransactionsData is called when the service start
* if some entries remain in the txn db it means some transaction failed without having been cleaned
* in this case clean the entries in the basicToken or/and accessToken then clean the TxnCName
**/
func (tw *transactionWorker) cleanupTransactionsData(ctx context.Context, service string) (err error) {
filter := bson.M{"Service": service}
cursor, err := tw.getCollection(tw.tc.TxnCName).Find(ctx, filter)
if err != nil {
log.Println("Err cleanupTransactionsData findAll TxnCName: ", err)
return
}
// Iterate over the cursor to get all documents
var txnsData []transactionData
if err := cursor.All(ctx, &txnsData); err != nil {
log.Println("Err removeTransactionsData when iterate cursor: ", err)
}
if len(txnsData) > 0 {
for _, txn := range txnsData {
if txn.Collection == tw.tc.BasicCName {
err = tw.removeBasicData(ctx, txn.ID)
if err != nil {
log.Println("Err cleanupTransactionsData removeBasicData id: ", txn.ID)
continue
}
err = tw.removeTransactionData(ctx, txn.ID)
if err != nil {
log.Println("Err cleanupTransactionsData removeTransactionData(basic) id: ", txn.ID)
}
} else if txn.Collection == tw.tc.AccessCName {
err = tw.removeTokenData(ctx, txn.ID, txn.Collection)
if err != nil {
log.Println("Err cleanupTransactionsData removeAccessData id: ", txn.ID)
continue
}
err = tw.removeTransactionData(ctx, txn.ID)
if err != nil {
log.Println("Err cleanupTransactionsData removeTransactionData(access) id: ", txn.ID)
}
} else {
log.Println("Err cleanupTransactionsData unfound collection: ", txn.Collection)
}
}
}
return
}