Skip to content

Commit

Permalink
Fetch incoming payments in parallel (#1880)
Browse files Browse the repository at this point in the history
This is a simpler approach to completely parallelizing the handling of
payments, where we simply parallelize the fetch from the database.

This brings a ~30% performance improvement in performance in `PerformanceIntegrationSpec`.
  • Loading branch information
pm47 authored Jul 16, 2021
1 parent 5182402 commit b4183ed
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import fr.acinq.eclair.payment.{IncomingPacket, PaymentReceived, PaymentRequest}
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{Features, Logs, MilliSatoshi, NodeParams, randomBytes32}

import java.util.UUID
import scala.util.{Failure, Success, Try}

/**
Expand All @@ -57,12 +56,16 @@ class MultiPartHandler(nodeParams: NodeParams, register: ActorRef, db: IncomingP

override def handle(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Receive = {
case receivePayment: ReceivePayment =>
val child = ctx.spawn(CreateInvoiceActor(nodeParams), name = UUID.randomUUID().toString)
val child = ctx.spawnAnonymous(CreateInvoiceActor(nodeParams))
child ! CreateInvoiceActor.CreatePaymentRequest(ctx.sender(), receivePayment)

case p: IncomingPacket.FinalPacket if doHandle(p.add.paymentHash) =>
val child = ctx.spawnAnonymous(GetIncomingPaymentActor(nodeParams))
child ! GetIncomingPaymentActor.GetIncomingPayment(ctx.self, p)

case ProcessPacket(p, payment_opt) if doHandle(p.add.paymentHash) =>
Logs.withMdc(log)(Logs.mdc(paymentHash_opt = Some(p.add.paymentHash))) {
db.getIncomingPayment(p.add.paymentHash) match {
payment_opt match {
case Some(record) => validatePayment(nodeParams, p, record) match {
case Some(cmdFail) =>
Metrics.PaymentFailed.withTag(Tags.Direction, Tags.Directions.Received).withTag(Tags.Failure, Tags.FailureType(cmdFail)).increment()
Expand Down Expand Up @@ -166,6 +169,7 @@ class MultiPartHandler(nodeParams: NodeParams, register: ActorRef, db: IncomingP
object MultiPartHandler {

// @formatter:off
case class ProcessPacket(packet: IncomingPacket.FinalPacket, payment_opt: Option[IncomingPayment])
case class DoFulfill(preimage: ByteVector32, success: MultiPartPaymentFSM.MultiPartPaymentSucceeded)

case object GetPendingPayments
Expand Down Expand Up @@ -228,6 +232,22 @@ object MultiPartHandler {
}
}

object GetIncomingPaymentActor {

// @formatter:off
sealed trait Command
case class GetIncomingPayment(replyTo: ActorRef, packet: IncomingPacket.FinalPacket) extends Command
// @formatter:on

def apply(nodeParams: NodeParams): Behavior[Command] = {
Behaviors.receiveMessage {
case GetIncomingPayment(replyTo, packet) =>
replyTo ! ProcessPacket(packet, nodeParams.db.payments.getIncomingPayment(packet.add.paymentHash))
Behaviors.stopped
}
}
}

private def validatePaymentStatus(payment: IncomingPacket.FinalPacket, record: IncomingPayment)(implicit log: LoggingAdapter): Boolean = {
if (record.status.isInstanceOf[IncomingPaymentStatus.Received]) {
log.warning("ignoring incoming payment for which has already been paid")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,11 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val nodeParams = Alice.nodeParams.copy(multiPartPaymentExpiry = 250 millis, features = featuresWithMpp)
val handler = TestActorRef[PaymentHandler](PaymentHandler.props(nodeParams, f.register.ref))

f.sender.send(handler, ReceivePayment(Some(1000 msat), "1 coffee, no sugar"))
val preimage = randomBytes32()
f.sender.send(handler, ReceivePayment(Some(1000 msat), "1 coffee, no sugar", paymentPreimage_opt = Some(preimage)))
val pr = f.sender.expectMsgType[PaymentRequest]
assert(pr.features.allowMultiPart)
assert(pr.paymentHash == Crypto.sha256(preimage))

val add1 = UpdateAddHtlc(ByteVector32.One, 0, 800 msat, pr.paymentHash, f.defaultExpiry, TestConstants.emptyOnionPacket)
f.sender.send(handler, IncomingPacket.FinalPacket(add1, Onion.createMultiPartPayload(add1.amountMsat, 1000 msat, add1.cltvExpiry, pr.paymentSecret.get)))
Expand All @@ -453,14 +455,15 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add3 = UpdateAddHtlc(ByteVector32.Zeroes, 5, 700 msat, pr.paymentHash, f.defaultExpiry, TestConstants.emptyOnionPacket)
f.sender.send(handler, IncomingPacket.FinalPacket(add3, Onion.createMultiPartPayload(add3.amountMsat, 1000 msat, add3.cltvExpiry, pr.paymentSecret.get)))

val cmd1 = f.register.expectMsgType[Register.Forward[CMD_FULFILL_HTLC]]
assert(cmd1.channelId === add2.channelId)
assert(cmd1.message.id === 2)
assert(Crypto.sha256(cmd1.message.r) === pr.paymentHash)
f.register.expectMsg(Register.Forward(ActorRef.noSender, add3.channelId, CMD_FULFILL_HTLC(5, cmd1.message.r, commit = true)))
// the fulfill are not necessarily in the same order as the commands
f.register.expectMsgAllOf(
Register.Forward(ActorRef.noSender, add2.channelId, CMD_FULFILL_HTLC(2, preimage, commit = true)),
Register.Forward(ActorRef.noSender, add3.channelId, CMD_FULFILL_HTLC(5, preimage, commit = true))
)

val paymentReceived = f.eventListener.expectMsgType[PaymentReceived]
assert(paymentReceived.copy(parts = paymentReceived.parts.map(_.copy(timestamp = 0))) === PaymentReceived(pr.paymentHash, PartialPayment(300 msat, ByteVector32.One, 0) :: PartialPayment(700 msat, ByteVector32.Zeroes, 0) :: Nil))
assert(paymentReceived.paymentHash === pr.paymentHash)
assert(paymentReceived.parts.map(_.copy(timestamp = 0)).toSet === Set(PartialPayment(300 msat, ByteVector32.One, 0), PartialPayment(700 msat, ByteVector32.Zeroes, 0)))
val received = nodeParams.db.payments.getIncomingPayment(pr.paymentHash)
assert(received.isDefined && received.get.status.isInstanceOf[IncomingPaymentStatus.Received])
assert(received.get.status.asInstanceOf[IncomingPaymentStatus.Received].amount === 1000.msat)
Expand Down

0 comments on commit b4183ed

Please sign in to comment.