diff --git a/api/api.go b/api/api.go index dacabce79..4a2d2923d 100644 --- a/api/api.go +++ b/api/api.go @@ -83,7 +83,7 @@ func New( Mount(router, "/debug") node.New(nw). Mount(router, "/node") - subs := subscriptions.New(repo, origins, backtraceLimit) + subs := subscriptions.New(repo, origins, backtraceLimit, txPool) subs.Mount(router, "/subscriptions") if pprofOn { diff --git a/api/subscriptions/pending_tx.go b/api/subscriptions/pending_tx.go new file mode 100644 index 000000000..f1967ace4 --- /dev/null +++ b/api/subscriptions/pending_tx.go @@ -0,0 +1,70 @@ +// Copyright (c) 2023 The VeChainThor developers + +// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying +// file LICENSE or +package subscriptions + +import ( + "sync" + + "github.com/vechain/thor/tx" + "github.com/vechain/thor/txpool" +) + +type pendingTx struct { + txPool *txpool.TxPool + listeners map[chan *tx.Transaction]struct{} + mu sync.RWMutex +} + +func newPendingTx(txPool *txpool.TxPool) *pendingTx { + p := &pendingTx{ + txPool: txPool, + listeners: make(map[chan *tx.Transaction]struct{}), + } + + return p +} + +func (p *pendingTx) Subscribe(ch chan *tx.Transaction) { + p.mu.Lock() + defer p.mu.Unlock() + + p.listeners[ch] = struct{}{} +} + +func (p *pendingTx) Unsubscribe(ch chan *tx.Transaction) { + p.mu.Lock() + defer p.mu.Unlock() + + delete(p.listeners, ch) +} + +func (p *pendingTx) DispatchLoop(done <-chan struct{}) { + txCh := make(chan *txpool.TxEvent) + sub := p.txPool.SubscribeTxEvent(txCh) + defer sub.Unsubscribe() + + for { + select { + case txEv := <-txCh: + if txEv.Executable == nil || !*txEv.Executable { + continue + } + p.mu.RLock() + func() { + for lsn := range p.listeners { + select { + case lsn <- txEv.Tx: + case <-done: + return + default: // broadcast in a non-blocking manner, so there's no guarantee that all subscriber receives it + } + } + }() + p.mu.RUnlock() + case <-done: + return + } + } +} diff --git a/api/subscriptions/subscriptions.go b/api/subscriptions/subscriptions.go index 603fcbdb5..b012a5cdb 100644 --- a/api/subscriptions/subscriptions.go +++ b/api/subscriptions/subscriptions.go @@ -18,12 +18,17 @@ import ( "github.com/vechain/thor/block" "github.com/vechain/thor/chain" "github.com/vechain/thor/thor" + "github.com/vechain/thor/tx" + "github.com/vechain/thor/txpool" ) +const txQueueSize = 20 + type Subscriptions struct { backtraceLimit uint32 repo *chain.Repository upgrader *websocket.Upgrader + pendingTx *pendingTx done chan struct{} wg sync.WaitGroup } @@ -43,8 +48,8 @@ const ( pingPeriod = (pongWait * 7) / 10 ) -func New(repo *chain.Repository, allowedOrigins []string, backtraceLimit uint32) *Subscriptions { - return &Subscriptions{ +func New(repo *chain.Repository, allowedOrigins []string, backtraceLimit uint32, txpool *txpool.TxPool) *Subscriptions { + sub := &Subscriptions{ backtraceLimit: backtraceLimit, repo: repo, upgrader: &websocket.Upgrader{ @@ -62,8 +67,17 @@ func New(repo *chain.Repository, allowedOrigins []string, backtraceLimit uint32) return false }, }, - done: make(chan struct{}), + pendingTx: newPendingTx(txpool), + done: make(chan struct{}), } + + sub.wg.Add(1) + go func() { + defer sub.wg.Done() + + sub.pendingTx.DispatchLoop(sub.done) + }() + return sub } func (s *Subscriptions) handleBlockReader(w http.ResponseWriter, req *http.Request) (*blockReader, error) { @@ -188,33 +202,63 @@ func (s *Subscriptions) handleSubject(w http.ResponseWriter, req *http.Request) return utils.HTTPError(errors.New("not found"), http.StatusNotFound) } - conn, err := s.upgrader.Upgrade(w, req, nil) + conn, closed, err := s.setupConn(w, req) + // since the conn is hijacked here, no error should be returned in lines below + if err != nil { + log.Debug("upgrade to websocket", "err", err) + return nil + } + + err = s.pipe(conn, reader, closed) + s.closeConn(conn, err) + return nil +} + +func (s *Subscriptions) handlePendingTransactions(w http.ResponseWriter, req *http.Request) error { + s.wg.Add(1) + defer s.wg.Done() + + conn, closed, err := s.setupConn(w, req) // since the conn is hijacked here, no error should be returned in lines below if err != nil { log.Debug("upgrade to websocket", "err", err) return nil } + defer s.closeConn(conn, err) + + pingTicker := time.NewTicker(pingPeriod) + defer pingTicker.Stop() + txCh := make(chan *tx.Transaction, txQueueSize) + s.pendingTx.Subscribe(txCh) defer func() { - if err := conn.Close(); err != nil { - log.Debug("close websocket", "err", err) - } + s.pendingTx.Unsubscribe(txCh) + close(txCh) }() - var closeMsg []byte - if err := s.pipe(conn, reader); err != nil { - closeMsg = websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error()) - } else { - closeMsg = websocket.FormatCloseMessage(websocket.CloseGoingAway, "") + for { + select { + case tx := <-txCh: + err = conn.WriteJSON(&PendingTxIDMessage{ID: tx.ID()}) + if err != nil { + return nil + } + case <-s.done: + return nil + case <-closed: + return nil + case <-pingTicker.C: + conn.WriteMessage(websocket.PingMessage, nil) + } } +} - if err := conn.WriteMessage(websocket.CloseMessage, closeMsg); err != nil { - log.Debug("write close message", "err", err) +func (s *Subscriptions) setupConn(w http.ResponseWriter, req *http.Request) (*websocket.Conn, chan struct{}, error) { + conn, err := s.upgrader.Upgrade(w, req, nil) + if err != nil { + return nil, nil, err } - return nil -} -func (s *Subscriptions) pipe(conn *websocket.Conn, reader msgReader) error { closed := make(chan struct{}) // start read loop to handle close event s.wg.Add(1) @@ -233,6 +277,28 @@ func (s *Subscriptions) pipe(conn *websocket.Conn, reader msgReader) error { } } }() + + return conn, closed, nil +} + +func (s *Subscriptions) closeConn(conn *websocket.Conn, err error) { + var closeMsg []byte + if err != nil { + closeMsg = websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error()) + } else { + closeMsg = websocket.FormatCloseMessage(websocket.CloseGoingAway, "") + } + + if err := conn.WriteMessage(websocket.CloseMessage, closeMsg); err != nil { + log.Debug("write close message", "err", err) + } + + if err := conn.Close(); err != nil { + log.Debug("close websocket", "err", err) + } +} + +func (s *Subscriptions) pipe(conn *websocket.Conn, reader msgReader, closed chan struct{}) error { ticker := s.repo.NewTicker() pingTicker := time.NewTicker(pingPeriod) defer pingTicker.Stop() @@ -316,4 +382,5 @@ func (s *Subscriptions) Mount(root *mux.Router, pathPrefix string) { sub := root.PathPrefix(pathPrefix).Subrouter() sub.Path("/{subject}").Methods("Get").HandlerFunc(utils.WrapHandlerFunc(s.handleSubject)) + sub.Path("/txpool/pending").Methods("Get").HandlerFunc(utils.WrapHandlerFunc(s.handlePendingTransactions)) } diff --git a/api/subscriptions/types.go b/api/subscriptions/types.go index 69daf5ad3..762a0bb45 100644 --- a/api/subscriptions/types.go +++ b/api/subscriptions/types.go @@ -219,3 +219,7 @@ type Beat2Message struct { K uint8 `json:"k"` Obsolete bool `json:"obsolete"` } + +type PendingTxIDMessage struct { + ID thor.Bytes32 `json:"id"` +} diff --git a/api/utils/http.go b/api/utils/http.go index 28eed5ba2..652c3e408 100644 --- a/api/utils/http.go +++ b/api/utils/http.go @@ -69,8 +69,7 @@ func WrapHandlerFunc(f HandlerFunc) http.HandlerFunc { // content types const ( - JSONContentType = "application/json; charset=utf-8" - OctetStreamContentType = "application/octet-stream" + JSONContentType = "application/json; charset=utf-8" ) // ParseJSON parse a JSON object using strict mode.