forked from jjeffery/stomp
-
Notifications
You must be signed in to change notification settings - Fork 96
/
transaction.go
178 lines (149 loc) · 5.12 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package stomp
import (
"github.com/go-stomp/stomp/v3/frame"
)
// A Transaction applies to the sending of messages to the STOMP server,
// and the acknowledgement of messages received from the STOMP server.
// All messages sent and and acknowledged in the context of a transaction
// are processed atomically by the STOMP server.
//
// Transactions are committed with the Commit method. When a transaction is
// committed, all sent messages, acknowledgements and negative acknowledgements,
// are processed by the STOMP server. Alternatively transactions can be aborted,
// in which case all sent messages, acknowledgements and negative
// acknowledgements are discarded by the STOMP server.
type Transaction struct {
id string
conn *Conn
completed bool
}
// Id returns the unique identifier for the transaction.
func (tx *Transaction) Id() string {
return tx.id
}
// Conn returns the connection associated with this transaction.
func (tx *Transaction) Conn() *Conn {
return tx.conn
}
// Abort will abort the transaction. Any calls to Send, SendWithReceipt,
// Ack and Nack on this transaction will be discarded.
// This function does not wait for the server to process the ABORT frame.
// See AbortWithReceipt if you want to ensure the ABORT is processed.
func (tx *Transaction) Abort() error {
return tx.abort(false)
}
// Abort will abort the transaction. Any calls to Send, SendWithReceipt,
// Ack and Nack on this transaction will be discarded.
func (tx *Transaction) AbortWithReceipt() error {
return tx.abort(true)
}
func (tx *Transaction) abort(receipt bool) error {
if tx.completed {
return ErrCompletedTransaction
}
f := frame.New(frame.ABORT, frame.Transaction, tx.id)
if receipt {
id := allocateId()
f.Header.Set(frame.Receipt, id)
}
err := tx.conn.sendFrame(f)
if err != nil {
return err
}
tx.completed = true
return nil
}
// Commit will commit the transaction. All messages and acknowledgements
// sent to the STOMP server on this transaction will be processed atomically.
// This function does not wait for the server to process the COMMIT frame.
// See CommitWithReceipt if you want to ensure the COMMIT is processed.
func (tx *Transaction) Commit() error {
return tx.commit(false)
}
// Commit will commit the transaction. All messages and acknowledgements
// sent to the STOMP server on this transaction will be processed atomically.
func (tx *Transaction) CommitWithReceipt() error {
return tx.commit(true)
}
func (tx *Transaction) commit(receipt bool) error {
if tx.completed {
return ErrCompletedTransaction
}
f := frame.New(frame.COMMIT, frame.Transaction, tx.id)
if receipt {
id := allocateId()
f.Header.Set(frame.Receipt, id)
}
err := tx.conn.sendFrame(f)
if err != nil {
return err
}
tx.completed = true
return nil
}
// Send sends a message to the STOMP server as part of a transaction. The server will not process the
// message until the transaction is committed.
// This method returns without confirming that the STOMP server has received the message. If the STOMP server
// does fail to receive the message for any reason, the connection will close.
//
// The content type should be specified, according to the STOMP specification, but if contentType is an empty
// string, the message will be delivered without a content type header entry. The body array contains the
// message body, and its content should be consistent with the specified content type.
//
// TODO: document opts
func (tx *Transaction) Send(destination, contentType string, body []byte, opts ...func(*frame.Frame) error) error {
if tx.completed {
return ErrCompletedTransaction
}
f, err := createSendFrame(destination, contentType, body, opts)
if err != nil {
return err
}
f.Header.Set(frame.Transaction, tx.id)
return tx.conn.sendFrame(f)
}
// Ack sends an acknowledgement for the message to the server. The STOMP
// server will not process the acknowledgement until the transaction
// has been committed. If the subscription has an AckMode of AckAuto, calling
// this function has no effect.
func (tx *Transaction) Ack(msg *Message) error {
if tx.completed {
return ErrCompletedTransaction
}
f, err := tx.conn.createAckNackFrame(msg, true)
if err != nil {
return err
}
if f != nil {
f.Header.Set(frame.Transaction, tx.id)
err := tx.conn.sendFrame(f)
if err != nil {
return err
}
}
return nil
}
// Nack sends a negative acknowledgement for the message to the server,
// indicating that this client cannot or will not process the message and
// that it should be processed elsewhere. The STOMP server will not process
// the negative acknowledgement until the transaction has been committed.
// It is an error to call this method if the subscription has an AckMode
// of AckAuto, because the STOMP server will not be expecting any kind
// of acknowledgement (positive or negative) for this message.
func (tx *Transaction) Nack(msg *Message) error {
if tx.completed {
return ErrCompletedTransaction
}
f, err := tx.conn.createAckNackFrame(msg, false)
if err != nil {
return err
}
if f != nil {
f.Header.Set(frame.Transaction, tx.id)
err := tx.conn.sendFrame(f)
if err != nil {
return err
}
}
return nil
}