Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

invoices: create package #2355

Merged
merged 1 commit into from
Jan 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 49 additions & 42 deletions invoiceregistry.go → invoices/invoiceregistry.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package main
package invoices

import (
"bytes"
"crypto/sha256"
"fmt"
"github.com/btcsuite/btcd/chaincfg"
"sync"
"sync/atomic"
"time"
Expand All @@ -18,29 +19,30 @@ import (
)

var (
// debugPre is the default debug preimage which is inserted into the
// DebugPre is the default debug preimage which is inserted into the
// invoice registry if the --debughtlc flag is activated on start up.
// All nodes initialized with the flag active will immediately settle
// any incoming HTLC whose rHash corresponds with the debug
// preimage.
debugPre, _ = chainhash.NewHash(bytes.Repeat([]byte{1}, 32))
DebugPre, _ = chainhash.NewHash(bytes.Repeat([]byte{1}, 32))

debugHash = chainhash.Hash(sha256.Sum256(debugPre[:]))
// DebugHash is the hash of the default preimage.
DebugHash = chainhash.Hash(sha256.Sum256(DebugPre[:]))
)

// invoiceRegistry is a central registry of all the outstanding invoices
// InvoiceRegistry is a central registry of all the outstanding invoices
// created by the daemon. The registry is a thin wrapper around a map in order
// to ensure that all updates/reads are thread safe.
type invoiceRegistry struct {
type InvoiceRegistry struct {
sync.RWMutex

cdb *channeldb.DB

clientMtx sync.Mutex
nextClientID uint32
notificationClients map[uint32]*invoiceSubscription
notificationClients map[uint32]*InvoiceSubscription

newSubscriptions chan *invoiceSubscription
newSubscriptions chan *InvoiceSubscription
subscriptionCancels chan uint32
invoiceEvents chan *invoiceEvent

Expand All @@ -49,28 +51,33 @@ type invoiceRegistry struct {
// that *all* nodes are able to fully settle.
debugInvoices map[chainhash.Hash]*channeldb.Invoice

activeNetParams *chaincfg.Params

wg sync.WaitGroup
quit chan struct{}
}

// newInvoiceRegistry creates a new invoice registry. The invoice registry
// NewRegistry creates a new invoice registry. The invoice registry
// wraps the persistent on-disk invoice storage with an additional in-memory
// layer. The in-memory layer is in place such that debug invoices can be added
// which are volatile yet available system wide within the daemon.
func newInvoiceRegistry(cdb *channeldb.DB) *invoiceRegistry {
return &invoiceRegistry{
func NewRegistry(cdb *channeldb.DB,
activeNetParams *chaincfg.Params) *InvoiceRegistry {

return &InvoiceRegistry{
cdb: cdb,
debugInvoices: make(map[chainhash.Hash]*channeldb.Invoice),
notificationClients: make(map[uint32]*invoiceSubscription),
newSubscriptions: make(chan *invoiceSubscription),
notificationClients: make(map[uint32]*InvoiceSubscription),
newSubscriptions: make(chan *InvoiceSubscription),
subscriptionCancels: make(chan uint32),
invoiceEvents: make(chan *invoiceEvent, 100),
activeNetParams: activeNetParams,
quit: make(chan struct{}),
}
}

// Start starts the registry and all goroutines it needs to carry out its task.
func (i *invoiceRegistry) Start() error {
func (i *InvoiceRegistry) Start() error {
i.wg.Add(1)

go i.invoiceEventNotifier()
Expand All @@ -79,7 +86,7 @@ func (i *invoiceRegistry) Start() error {
}

// Stop signals the registry for a graceful shutdown.
func (i *invoiceRegistry) Stop() {
func (i *InvoiceRegistry) Stop() {
close(i.quit)

i.wg.Wait()
Expand All @@ -96,7 +103,7 @@ type invoiceEvent struct {
// invoiceEventNotifier is the dedicated goroutine responsible for accepting
// new notification subscriptions, cancelling old subscriptions, and
// dispatching new invoice events.
func (i *invoiceRegistry) invoiceEventNotifier() {
func (i *InvoiceRegistry) invoiceEventNotifier() {
defer i.wg.Done()

for {
Expand All @@ -110,11 +117,11 @@ func (i *invoiceRegistry) invoiceEventNotifier() {
// invoice events.
err := i.deliverBacklogEvents(newClient)
if err != nil {
ltndLog.Errorf("unable to deliver backlog invoice "+
log.Errorf("unable to deliver backlog invoice "+
"notifications: %v", err)
}

ltndLog.Infof("New invoice subscription "+
log.Infof("New invoice subscription "+
"client: id=%v", newClient.id)

// With the backlog notifications delivered (if any),
Expand All @@ -125,7 +132,7 @@ func (i *invoiceRegistry) invoiceEventNotifier() {
// A client no longer wishes to receive invoice notifications.
// So we'll remove them from the set of active clients.
case clientID := <-i.subscriptionCancels:
ltndLog.Infof("Cancelling invoice subscription for "+
log.Infof("Cancelling invoice subscription for "+
"client=%v", clientID)

delete(i.notificationClients, clientID)
Expand Down Expand Up @@ -157,14 +164,14 @@ func (i *invoiceRegistry) invoiceEventNotifier() {
// instance.
case event.state == channeldb.ContractOpen &&
client.addIndex+1 != invoice.AddIndex:
ltndLog.Warnf("client=%v for invoice "+
log.Warnf("client=%v for invoice "+
"notifications missed an update, "+
"add_index=%v, new add event index=%v",
clientID, client.addIndex,
invoice.AddIndex)
case event.state == channeldb.ContractSettled &&
client.settleIndex+1 != invoice.SettleIndex:
ltndLog.Warnf("client=%v for invoice "+
log.Warnf("client=%v for invoice "+
"notifications missed an update, "+
"settle_index=%v, new settle event index=%v",
clientID, client.settleIndex,
Expand Down Expand Up @@ -192,7 +199,7 @@ func (i *invoiceRegistry) invoiceEventNotifier() {
case channeldb.ContractOpen:
client.addIndex = invoice.AddIndex
default:
ltndLog.Errorf("unknown invoice "+
log.Errorf("unknown invoice "+
"state: %v", event.state)
}
}
Expand All @@ -205,7 +212,7 @@ func (i *invoiceRegistry) invoiceEventNotifier() {

// deliverBacklogEvents will attempts to query the invoice database for any
// notifications that the client has missed since it reconnected last.
func (i *invoiceRegistry) deliverBacklogEvents(client *invoiceSubscription) error {
func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) error {
// First, we'll query the database to see if based on the provided
// addIndex and settledIndex we need to deliver any backlog
// notifications.
Expand Down Expand Up @@ -257,7 +264,7 @@ func (i *invoiceRegistry) deliverBacklogEvents(client *invoiceSubscription) erro
// by the passed preimage. Once this invoice is added, subsystems within the
// daemon add/forward HTLCs that are able to obtain the proper preimage
// required for redemption in the case that we're the final destination.
func (i *invoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash.Hash) {
func (i *InvoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash.Hash) {
paymentHash := chainhash.Hash(sha256.Sum256(preimage[:]))

invoice := &channeldb.Invoice{
Expand All @@ -272,7 +279,7 @@ func (i *invoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash
i.debugInvoices[paymentHash] = invoice
i.Unlock()

ltndLog.Debugf("Adding debug invoice %v", newLogClosure(func() string {
log.Debugf("Adding debug invoice %v", newLogClosure(func() string {
return spew.Sdump(invoice)
}))
}
Expand All @@ -284,11 +291,11 @@ func (i *invoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash
// redemption in the case that we're the final destination. We also return the
// addIndex of the newly created invoice which monotonically increases for each
// new invoice added.
func (i *invoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error) {
func (i *InvoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error) {
i.Lock()
defer i.Unlock()

ltndLog.Debugf("Adding invoice %v", newLogClosure(func() string {
log.Debugf("Adding invoice %v", newLogClosure(func() string {
return spew.Sdump(invoice)
}))

Expand All @@ -311,7 +318,7 @@ func (i *invoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error)
// according to the cltv delta.
//
// TODO(roasbeef): ignore if settled?
func (i *invoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice, uint32, error) {
func (i *InvoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice, uint32, error) {
// First check the in-memory debug invoice index to see if this is an
// existing invoice added for debugging.
i.RLock()
Expand All @@ -331,7 +338,7 @@ func (i *invoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice
}

payReq, err := zpay32.Decode(
string(invoice.PaymentRequest), activeNetParams.Params,
string(invoice.PaymentRequest), i.activeNetParams,
)
if err != nil {
return channeldb.Invoice{}, 0, err
Expand All @@ -343,13 +350,13 @@ func (i *invoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice
// SettleInvoice attempts to mark an invoice as settled. If the invoice is a
// debug invoice, then this method is a noop as debug invoices are never fully
// settled.
func (i *invoiceRegistry) SettleInvoice(rHash chainhash.Hash,
func (i *InvoiceRegistry) SettleInvoice(rHash chainhash.Hash,
amtPaid lnwire.MilliSatoshi) error {

i.Lock()
defer i.Unlock()

ltndLog.Debugf("Settling invoice %x", rHash[:])
log.Debugf("Settling invoice %x", rHash[:])

// First check the in-memory debug invoice index to see if this is an
// existing invoice added for debugging.
Expand All @@ -366,7 +373,7 @@ func (i *invoiceRegistry) SettleInvoice(rHash chainhash.Hash,
return err
}

ltndLog.Infof("Payment received: %v", spew.Sdump(invoice))
log.Infof("Payment received: %v", spew.Sdump(invoice))

i.notifyClients(invoice, channeldb.ContractSettled)

Expand All @@ -375,7 +382,7 @@ func (i *invoiceRegistry) SettleInvoice(rHash chainhash.Hash,

// notifyClients notifies all currently registered invoice notification clients
// of a newly added/settled invoice.
func (i *invoiceRegistry) notifyClients(invoice *channeldb.Invoice,
func (i *InvoiceRegistry) notifyClients(invoice *channeldb.Invoice,
state channeldb.ContractState) {

event := &invoiceEvent{
Expand All @@ -389,12 +396,12 @@ func (i *invoiceRegistry) notifyClients(invoice *channeldb.Invoice,
}
}

// invoiceSubscription represents an intent to receive updates for newly added
// InvoiceSubscription represents an intent to receive updates for newly added
// or settled invoices. For each newly added invoice, a copy of the invoice
// will be sent over the NewInvoices channel. Similarly, for each newly settled
// invoice, a copy of the invoice will be sent over the SettledInvoices
// channel.
type invoiceSubscription struct {
type InvoiceSubscription struct {
cancelled uint32 // To be used atomically.

// NewInvoices is a channel that we'll use to send all newly created
Expand Down Expand Up @@ -424,16 +431,16 @@ type invoiceSubscription struct {

id uint32

inv *invoiceRegistry
inv *InvoiceRegistry

cancelChan chan struct{}

wg sync.WaitGroup
}

// Cancel unregisters the invoiceSubscription, freeing any previously allocated
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated
// resources.
func (i *invoiceSubscription) Cancel() {
func (i *InvoiceSubscription) Cancel() {
if !atomic.CompareAndSwapUint32(&i.cancelled, 0, 1) {
return
}
Expand All @@ -449,13 +456,13 @@ func (i *invoiceSubscription) Cancel() {
i.wg.Wait()
}

// SubscribeNotifications returns an invoiceSubscription which allows the
// SubscribeNotifications returns an InvoiceSubscription which allows the
// caller to receive async notifications when any invoices are settled or
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
// by first sending out all new events with an invoice index _greater_ than
// this value. Afterwards, we'll send out real-time notifications.
func (i *invoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *invoiceSubscription {
client := &invoiceSubscription{
func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *InvoiceSubscription {
client := &InvoiceSubscription{
NewInvoices: make(chan *channeldb.Invoice),
SettledInvoices: make(chan *channeldb.Invoice),
addIndex: addIndex,
Expand Down Expand Up @@ -495,7 +502,7 @@ func (i *invoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *
case channeldb.ContractSettled:
targetChan = client.SettledInvoices
default:
ltndLog.Errorf("unknown invoice "+
log.Errorf("unknown invoice "+
"state: %v", invoiceEvent.state)

continue
Expand Down
45 changes: 45 additions & 0 deletions invoices/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package invoices

import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)

// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger

// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger("INVC", nil))
}

// DisableLog disables all library log output. Logging output is disabled
// by default until UseLogger is called.
func DisableLog() {
UseLogger(btclog.Disabled)
}

// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}

// logClosure is used to provide a closure over expensive logging operations so
// don't have to be performed when the logging level doesn't warrant it.
type logClosure func() string

// String invokes the underlying function and returns the result.
func (c logClosure) String() string {
return c()
}

// newLogClosure returns a new closure over a function that returns a string
// which itself provides a Stringer interface so that it can be used with the
// logging system.
func newLogClosure(c func() string) logClosure {
return logClosure(c)
}
4 changes: 4 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"github.com/lightningnetwork/lnd/invoices"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -71,6 +72,7 @@ var (
sgnrLog = build.NewSubLogger("SGNR", backendLog.Logger)
wlktLog = build.NewSubLogger("WLKT", backendLog.Logger)
arpcLog = build.NewSubLogger("ARPC", backendLog.Logger)
invcLog = build.NewSubLogger("INVC", backendLog.Logger)
)

// Initialize package-global logger variables.
Expand All @@ -91,6 +93,7 @@ func init() {
signrpc.UseLogger(sgnrLog)
walletrpc.UseLogger(wlktLog)
autopilotrpc.UseLogger(arpcLog)
invoices.UseLogger(invcLog)
}

// subsystemLoggers maps each subsystem identifier to its associated logger.
Expand All @@ -117,6 +120,7 @@ var subsystemLoggers = map[string]btclog.Logger{
"SGNR": sgnrLog,
"WLKT": wlktLog,
"ARPC": arpcLog,
"INVC": invcLog,
}

// initLogRotator initializes the logging rotator to write logs to logFile and
Expand Down
Loading