Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pending tx subscription API #590

Merged
merged 1 commit into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func New(
Mount(router, "/debug")
node.New(nw).
Mount(router, "/node")
subs := subscriptions.New(repo, origins, backtraceLimit)
subs := subscriptions.New(repo, origins, backtraceLimit, txPool)
subs.Mount(router, "/subscriptions")

if pprofOn {
Expand Down
70 changes: 70 additions & 0 deletions api/subscriptions/pending_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) 2023 The VeChainThor developers

// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying
// file LICENSE or <https://www.gnu.org/licenses/lgpl-3.0.html>
package subscriptions

import (
"sync"

"github.com/vechain/thor/tx"
"github.com/vechain/thor/txpool"
)

type pendingTx struct {
txPool *txpool.TxPool
listeners map[chan *tx.Transaction]struct{}
mu sync.RWMutex
}

func newPendingTx(txPool *txpool.TxPool) *pendingTx {
p := &pendingTx{
txPool: txPool,
listeners: make(map[chan *tx.Transaction]struct{}),
}

return p
}

func (p *pendingTx) Subscribe(ch chan *tx.Transaction) {
p.mu.Lock()
defer p.mu.Unlock()

p.listeners[ch] = struct{}{}
}

func (p *pendingTx) Unsubscribe(ch chan *tx.Transaction) {
p.mu.Lock()
defer p.mu.Unlock()

delete(p.listeners, ch)
}

func (p *pendingTx) DispatchLoop(done <-chan struct{}) {
txCh := make(chan *txpool.TxEvent)
sub := p.txPool.SubscribeTxEvent(txCh)
defer sub.Unsubscribe()

for {
select {
case txEv := <-txCh:
if txEv.Executable == nil || !*txEv.Executable {
continue
}
p.mu.RLock()
func() {
for lsn := range p.listeners {
select {
case lsn <- txEv.Tx:
case <-done:
return
default: // broadcast in a non-blocking manner, so there's no guarantee that all subscriber receives it
}
}
}()
p.mu.RUnlock()
case <-done:
return
}
}
}
101 changes: 84 additions & 17 deletions api/subscriptions/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ import (
"github.com/vechain/thor/block"
"github.com/vechain/thor/chain"
"github.com/vechain/thor/thor"
"github.com/vechain/thor/tx"
"github.com/vechain/thor/txpool"
)

const txQueueSize = 20

type Subscriptions struct {
backtraceLimit uint32
repo *chain.Repository
upgrader *websocket.Upgrader
pendingTx *pendingTx
done chan struct{}
wg sync.WaitGroup
}
Expand All @@ -43,8 +48,8 @@ const (
pingPeriod = (pongWait * 7) / 10
)

func New(repo *chain.Repository, allowedOrigins []string, backtraceLimit uint32) *Subscriptions {
return &Subscriptions{
func New(repo *chain.Repository, allowedOrigins []string, backtraceLimit uint32, txpool *txpool.TxPool) *Subscriptions {
sub := &Subscriptions{
backtraceLimit: backtraceLimit,
repo: repo,
upgrader: &websocket.Upgrader{
Expand All @@ -62,8 +67,17 @@ func New(repo *chain.Repository, allowedOrigins []string, backtraceLimit uint32)
return false
},
},
done: make(chan struct{}),
pendingTx: newPendingTx(txpool),
done: make(chan struct{}),
}

sub.wg.Add(1)
go func() {
defer sub.wg.Done()

sub.pendingTx.DispatchLoop(sub.done)
}()
return sub
}

func (s *Subscriptions) handleBlockReader(w http.ResponseWriter, req *http.Request) (*blockReader, error) {
Expand Down Expand Up @@ -188,33 +202,63 @@ func (s *Subscriptions) handleSubject(w http.ResponseWriter, req *http.Request)
return utils.HTTPError(errors.New("not found"), http.StatusNotFound)
}

conn, err := s.upgrader.Upgrade(w, req, nil)
conn, closed, err := s.setupConn(w, req)
// since the conn is hijacked here, no error should be returned in lines below
if err != nil {
log.Debug("upgrade to websocket", "err", err)
return nil
}

err = s.pipe(conn, reader, closed)
s.closeConn(conn, err)
return nil
}

func (s *Subscriptions) handlePendingTransactions(w http.ResponseWriter, req *http.Request) error {
s.wg.Add(1)
defer s.wg.Done()

conn, closed, err := s.setupConn(w, req)
// since the conn is hijacked here, no error should be returned in lines below
if err != nil {
log.Debug("upgrade to websocket", "err", err)
return nil
}
defer s.closeConn(conn, err)

pingTicker := time.NewTicker(pingPeriod)
defer pingTicker.Stop()

txCh := make(chan *tx.Transaction, txQueueSize)
s.pendingTx.Subscribe(txCh)
defer func() {
if err := conn.Close(); err != nil {
log.Debug("close websocket", "err", err)
}
s.pendingTx.Unsubscribe(txCh)
close(txCh)
}()

var closeMsg []byte
if err := s.pipe(conn, reader); err != nil {
closeMsg = websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())
} else {
closeMsg = websocket.FormatCloseMessage(websocket.CloseGoingAway, "")
for {
select {
case tx := <-txCh:
err = conn.WriteJSON(&PendingTxIDMessage{ID: tx.ID()})
if err != nil {
return nil
}
case <-s.done:
return nil
case <-closed:
return nil
case <-pingTicker.C:
conn.WriteMessage(websocket.PingMessage, nil)
}
}
}

if err := conn.WriteMessage(websocket.CloseMessage, closeMsg); err != nil {
log.Debug("write close message", "err", err)
func (s *Subscriptions) setupConn(w http.ResponseWriter, req *http.Request) (*websocket.Conn, chan struct{}, error) {
conn, err := s.upgrader.Upgrade(w, req, nil)
if err != nil {
return nil, nil, err
}
return nil
}

func (s *Subscriptions) pipe(conn *websocket.Conn, reader msgReader) error {
closed := make(chan struct{})
// start read loop to handle close event
s.wg.Add(1)
Expand All @@ -233,6 +277,28 @@ func (s *Subscriptions) pipe(conn *websocket.Conn, reader msgReader) error {
}
}
}()

return conn, closed, nil
}

func (s *Subscriptions) closeConn(conn *websocket.Conn, err error) {
var closeMsg []byte
if err != nil {
closeMsg = websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())
} else {
closeMsg = websocket.FormatCloseMessage(websocket.CloseGoingAway, "")
}

if err := conn.WriteMessage(websocket.CloseMessage, closeMsg); err != nil {
log.Debug("write close message", "err", err)
}

if err := conn.Close(); err != nil {
log.Debug("close websocket", "err", err)
}
}

func (s *Subscriptions) pipe(conn *websocket.Conn, reader msgReader, closed chan struct{}) error {
ticker := s.repo.NewTicker()
pingTicker := time.NewTicker(pingPeriod)
defer pingTicker.Stop()
Expand Down Expand Up @@ -316,4 +382,5 @@ func (s *Subscriptions) Mount(root *mux.Router, pathPrefix string) {
sub := root.PathPrefix(pathPrefix).Subrouter()

sub.Path("/{subject}").Methods("Get").HandlerFunc(utils.WrapHandlerFunc(s.handleSubject))
sub.Path("/txpool/pending").Methods("Get").HandlerFunc(utils.WrapHandlerFunc(s.handlePendingTransactions))
}
4 changes: 4 additions & 0 deletions api/subscriptions/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,7 @@ type Beat2Message struct {
K uint8 `json:"k"`
Obsolete bool `json:"obsolete"`
}

type PendingTxIDMessage struct {
ID thor.Bytes32 `json:"id"`
}
3 changes: 1 addition & 2 deletions api/utils/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ func WrapHandlerFunc(f HandlerFunc) http.HandlerFunc {

// content types
const (
JSONContentType = "application/json; charset=utf-8"
OctetStreamContentType = "application/octet-stream"
JSONContentType = "application/json; charset=utf-8"
)

// ParseJSON parse a JSON object using strict mode.
Expand Down