Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
thomash-acinq committed Feb 10, 2025
1 parent a8787ab commit 3025a99
Show file tree
Hide file tree
Showing 40 changed files with 621 additions and 406 deletions.
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ class Setup(val datadir: File,
}
dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume))
register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume))
offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, router, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager")
offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager")
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume))
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
peerReadyManager = system.spawn(Behaviors.supervise(PeerReadyManager()).onFailure(typed.SupervisorStrategy.restart), name = "peer-ready-manager")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL
PaymentMetrics.PaymentFailed.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).increment()

case e: PaymentReceived =>
PaymentMetrics.PaymentAmount.withTag(PaymentTags.Direction, PaymentTags.Directions.Received).record(e.amount.truncateToSatoshi.toLong)
PaymentMetrics.PaymentAmount.withTag(PaymentTags.Direction, PaymentTags.Directions.Received).record(e.realAmount.truncateToSatoshi.toLong)
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Received).record(e.parts.length)
auditDb.add(e)
e.parts.foreach(p => channelsDb.updateChannelMeta(p.fromChannelId, ChannelEvent.EventType.PaymentReceived))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,14 @@ case class DualPaymentsDb(primary: PaymentsDb, secondary: PaymentsDb) extends Pa
primary.addIncomingPayment(pr, preimage, paymentType)
}

override def receiveIncomingPayment(paymentHash: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli): Boolean = {
runAsync(secondary.receiveIncomingPayment(paymentHash, amount, receivedAt))
primary.receiveIncomingPayment(paymentHash, amount, receivedAt)
override def receiveIncomingPayment(paymentHash: ByteVector32, virtualAmount: MilliSatoshi, realAmount: MilliSatoshi, receivedAt: TimestampMilli): Boolean = {
runAsync(secondary.receiveIncomingPayment(paymentHash, virtualAmount, realAmount, receivedAt))
primary.receiveIncomingPayment(paymentHash, virtualAmount, realAmount, receivedAt)
}

override def receiveIncomingOfferPayment(pr: MinimalBolt12Invoice, preimage: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli, paymentType: String): Unit = {
runAsync(secondary.receiveIncomingOfferPayment(pr, preimage, amount, receivedAt, paymentType))
primary.receiveIncomingOfferPayment(pr, preimage, amount, receivedAt, paymentType)
override def receiveIncomingOfferPayment(pr: MinimalBolt12Invoice, preimage: ByteVector32, virtualAmount: MilliSatoshi, realAmount: MilliSatoshi, receivedAt: TimestampMilli, paymentType: String): Unit = {
runAsync(secondary.receiveIncomingOfferPayment(pr, preimage, virtualAmount, realAmount, receivedAt, paymentType))
primary.receiveIncomingOfferPayment(pr, preimage, virtualAmount, realAmount, receivedAt, paymentType)
}

override def getIncomingPayment(paymentHash: ByteVector32): Option[IncomingPayment] = {
Expand Down
11 changes: 6 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/PaymentsDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ trait IncomingPaymentsDb {
* Mark an incoming payment as received (paid). The received amount may exceed the invoice amount.
* If there was no matching invoice in the DB, this will return false.
*/
def receiveIncomingPayment(paymentHash: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli = TimestampMilli.now()): Boolean
def receiveIncomingPayment(paymentHash: ByteVector32, virtualAmount: MilliSatoshi, realAmount: MilliSatoshi, receivedAt: TimestampMilli = TimestampMilli.now()): Boolean

/**
* Add a new incoming offer payment as received.
* If the invoice is already paid, adds `amount` to the amount paid.
*/
def receiveIncomingOfferPayment(pr: MinimalBolt12Invoice, preimage: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli = TimestampMilli.now(), paymentType: String = PaymentType.Blinded): Unit
def receiveIncomingOfferPayment(pr: MinimalBolt12Invoice, preimage: ByteVector32, virtualAmount: MilliSatoshi, realAmount: MilliSatoshi, receivedAt: TimestampMilli = TimestampMilli.now(), paymentType: String = PaymentType.Blinded): Unit

/** Get information about the incoming payment (paid or not) for the given payment hash, if any. */
def getIncomingPayment(paymentHash: ByteVector32): Option[IncomingPayment]
Expand Down Expand Up @@ -150,10 +150,11 @@ object IncomingPaymentStatus {
/**
* Payment has been successfully received.
*
* @param amount amount of the payment received, in milli-satoshis (may exceed the invoice amount).
* @param receivedAt absolute time in milli-seconds since UNIX epoch when the payment was received.
* @param virtualAmount amount of the payment received, in milli-satoshis (may exceed the invoice amount).
* @param realAmount amount of the payment received, in milli-satoshis (may be less or more than the invoice amount).
* @param receivedAt absolute time in milli-seconds since UNIX epoch when the payment was received.
*/
case class Received(amount: MilliSatoshi, receivedAt: TimestampMilli) extends IncomingPaymentStatus
case class Received(virtualAmount: MilliSatoshi, realAmount: MilliSatoshi, receivedAt: TimestampMilli) extends IncomingPaymentStatus

}

Expand Down
31 changes: 22 additions & 9 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import javax.sql.DataSource

object PgAuditDb {
val DB_NAME = "audit"
val CURRENT_VERSION = 12
val CURRENT_VERSION = 13
}

class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
Expand Down Expand Up @@ -114,12 +114,20 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON audit.transactions_published(channel_id)")
}

def migration1213(statement: Statement): Unit = {
statement.executeUpdate("ALTER TABLE audit.received RENAME TO received_old")
statement.executeUpdate("CREATE TABLE audit.received (virtual_amount_msat BIGINT NOT NULL, real_amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("INSERT INTO audit.received SELECT amount_msat, amount_msat, payment_hash, from_channel_id, timestamp FROM audit.received_old")
statement.executeUpdate("DROP TABLE audit.received_old")
statement.executeUpdate("CREATE INDEX received_timestamp_idx ON audit.received(timestamp)")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE SCHEMA audit")

statement.executeUpdate("CREATE TABLE audit.sent (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, recipient_amount_msat BIGINT NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash TEXT NOT NULL, payment_preimage TEXT NOT NULL, recipient_node_id TEXT NOT NULL, to_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.received (amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.received (virtual_amount_msat BIGINT NOT NULL, real_amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.relayed_trampoline (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, next_node_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
Expand Down Expand Up @@ -149,7 +157,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON audit.transactions_published(channel_id)")
statement.executeUpdate("CREATE INDEX transactions_published_timestamp_idx ON audit.transactions_published(timestamp)")
statement.executeUpdate("CREATE INDEX transactions_confirmed_timestamp_idx ON audit.transactions_confirmed(timestamp)")
case Some(v@(4 | 5 | 6 | 7 | 8 | 9 | 10 | 11)) =>
case Some(v@(4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12)) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
if (v < 5) {
migration45(statement)
Expand All @@ -175,6 +183,9 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
if (v < 12) {
migration1112(statement)
}
if (v < 13) {
migration1213(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down Expand Up @@ -220,12 +231,13 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def add(e: PaymentReceived): Unit = withMetrics("audit/add-payment-received", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO audit.received VALUES (?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.received VALUES (?, ?, ?, ?, ?)")) { statement =>
e.parts.foreach(p => {
statement.setLong(1, p.amount.toLong)
statement.setString(2, e.paymentHash.toHex)
statement.setString(3, p.fromChannelId.toHex)
statement.setTimestamp(4, p.timestamp.toSqlTimestamp)
statement.setLong(1, p.virtualAmount.toLong)
statement.setLong(2, p.realAmount.toLong)
statement.setString(3, e.paymentHash.toHex)
statement.setString(4, p.fromChannelId.toHex)
statement.setTimestamp(5, p.timestamp.toSqlTimestamp)
statement.addBatch()
})
statement.executeBatch()
Expand Down Expand Up @@ -404,7 +416,8 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
.foldLeft(Map.empty[ByteVector32, PaymentReceived]) { (receivedByHash, rs) =>
val paymentHash = rs.getByteVector32FromHex("payment_hash")
val part = PaymentReceived.PartialPayment(
MilliSatoshi(rs.getLong("amount_msat")),
MilliSatoshi(rs.getLong("virtual_amount_msat")),
MilliSatoshi(rs.getLong("real_amount_msat")),
rs.getByteVector32FromHex("from_channel_id"),
TimestampMilli.fromSqlTimestamp(rs.getTimestamp("timestamp")))
val received = receivedByHash.get(paymentHash) match {
Expand Down
Loading

0 comments on commit 3025a99

Please sign in to comment.