Skip to content

Commit

Permalink
fixup! Address @thomash-acinq comments
Browse files Browse the repository at this point in the history
  • Loading branch information
t-bast committed Feb 12, 2025
1 parent aacfecc commit f5dbd02
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 24 deletions.
20 changes: 10 additions & 10 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
def migration45(statement: Statement): Unit = {
statement.executeUpdate("ALTER TABLE local.peers RENAME COLUMN data TO node_address")
statement.executeUpdate("ALTER TABLE local.peers ALTER COLUMN node_address DROP NOT NULL")
statement.executeUpdate("ALTER TABLE local.peers ADD COLUMN node_features BYTEA")
statement.executeUpdate("ALTER TABLE local.peers ADD COLUMN init_features BYTEA")
}

using(pg.createStatement()) { statement =>
getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local")
statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, node_address BYTEA, node_features BYTEA)")
statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, node_address BYTEA, init_features BYTEA)")
statement.executeUpdate("CREATE TABLE local.relay_fees (node_id TEXT NOT NULL PRIMARY KEY, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, last_updated_at TIMESTAMP WITH TIME ZONE NOT NULL, removed_peer_at TIMESTAMP WITH TIME ZONE)")

Expand Down Expand Up @@ -101,10 +101,10 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(features).require.toByteArray
using(pg.prepareStatement(
"""
| INSERT INTO local.peers (node_id, node_address, node_features)
| INSERT INTO local.peers (node_id, node_address, init_features)
| VALUES (?, ?, ?)
| ON CONFLICT (node_id)
| DO UPDATE SET node_address = EXCLUDED.node_address, node_features = EXCLUDED.node_features
| DO UPDATE SET node_address = EXCLUDED.node_address, init_features = EXCLUDED.init_features
|""".stripMargin)) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.setBytes(2, encodedAddress)
Expand All @@ -119,10 +119,10 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(features).require.toByteArray
using(pg.prepareStatement(
"""
| INSERT INTO local.peers (node_id, node_address, node_features)
| INSERT INTO local.peers (node_id, node_address, init_features)
| VALUES (?, NULL, ?)
| ON CONFLICT (node_id)
| DO UPDATE SET node_features = EXCLUDED.node_features
| DO UPDATE SET init_features = EXCLUDED.init_features
|""".stripMargin)) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.setBytes(2, encodedFeatures)
Expand Down Expand Up @@ -156,11 +156,11 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg

override def getPeer(nodeId: PublicKey): Option[NodeInfo] = withMetrics("peers/get", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT node_address, node_features FROM local.peers WHERE node_id=?")) { statement =>
using(pg.prepareStatement("SELECT node_address, init_features FROM local.peers WHERE node_id=?")) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.executeQuery().map { rs =>
val nodeAddress_opt = rs.getBitVectorOpt("node_address").map(CommonCodecs.nodeaddress.decode(_).require.value)
val nodeFeatures_opt = rs.getBitVectorOpt("node_features").map(CommonCodecs.initFeaturesCodec.decode(_).require.value)
val nodeFeatures_opt = rs.getBitVectorOpt("init_features").map(CommonCodecs.initFeaturesCodec.decode(_).require.value)
NodeInfo(nodeFeatures_opt.getOrElse(Features.empty), nodeAddress_opt)
}.headOption
}
Expand All @@ -169,12 +169,12 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg

override def listPeers(): Map[PublicKey, NodeInfo] = withMetrics("peers/list", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT node_id, node_address, node_features FROM local.peers")) { statement =>
using(pg.prepareStatement("SELECT node_id, node_address, init_features FROM local.peers")) { statement =>
statement.executeQuery()
.map { rs =>
val nodeId = PublicKey(rs.getByteVectorFromHex("node_id"))
val nodeAddress_opt = rs.getBitVectorOpt("node_address").map(CommonCodecs.nodeaddress.decode(_).require.value)
val nodeFeatures_opt = rs.getBitVectorOpt("node_features").map(CommonCodecs.initFeaturesCodec.decode(_).require.value)
val nodeFeatures_opt = rs.getBitVectorOpt("init_features").map(CommonCodecs.initFeaturesCodec.decode(_).require.value)
nodeId -> NodeInfo(nodeFeatures_opt.getOrElse(Features.empty), nodeAddress_opt)
}.toMap
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging {
}

def migration34(statement: Statement): Unit = {
statement.executeUpdate("CREATE TABLE peers_v4 (node_id BLOB NOT NULL PRIMARY KEY, node_address BLOB, node_features BLOB)")
statement.executeUpdate("INSERT INTO peers_v4 (node_id, node_address, node_features) SELECT node_id, data, NULL FROM peers")
statement.executeUpdate("CREATE TABLE peers_v4 (node_id BLOB NOT NULL PRIMARY KEY, node_address BLOB, init_features BLOB)")
statement.executeUpdate("INSERT INTO peers_v4 (node_id, node_address, init_features) SELECT node_id, data, NULL FROM peers")
statement.executeUpdate("DROP TABLE peers")
statement.executeUpdate("ALTER TABLE peers_v4 RENAME TO peers")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE TABLE peers (node_id BLOB NOT NULL PRIMARY KEY, node_address BLOB, node_features BLOB)")
statement.executeUpdate("CREATE TABLE peers (node_id BLOB NOT NULL PRIMARY KEY, node_address BLOB, init_features BLOB)")
statement.executeUpdate("CREATE TABLE relay_fees (node_id BLOB NOT NULL PRIMARY KEY, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)")

Expand All @@ -85,7 +85,7 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging {
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, address: NodeAddress, features: Features[InitFeature]): Unit = withMetrics("peers/add-or-update", DbBackends.Sqlite) {
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(features).require.toByteArray
val encodedAddress = CommonCodecs.nodeaddress.encode(address).require.toByteArray
using(sqlite.prepareStatement("UPDATE peers SET node_address=?, node_features=? WHERE node_id=?")) { update =>
using(sqlite.prepareStatement("UPDATE peers SET node_address=?, init_features=? WHERE node_id=?")) { update =>
update.setBytes(1, encodedAddress)
update.setBytes(2, encodedFeatures)
update.setBytes(3, nodeId.value.toArray)
Expand All @@ -102,7 +102,7 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging {

override def addOrUpdatePeerFeatures(nodeId: Crypto.PublicKey, features: Features[InitFeature]): Unit = withMetrics("peers/add-or-update", DbBackends.Sqlite) {
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(features).require.toByteArray
using(sqlite.prepareStatement("UPDATE peers SET node_features=? WHERE node_id=?")) { update =>
using(sqlite.prepareStatement("UPDATE peers SET init_features=? WHERE node_id=?")) { update =>
update.setBytes(1, encodedFeatures)
update.setBytes(2, nodeId.value.toArray)
if (update.executeUpdate() == 0) {
Expand Down Expand Up @@ -135,22 +135,22 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging {
}

override def getPeer(nodeId: PublicKey): Option[NodeInfo] = withMetrics("peers/get", DbBackends.Sqlite) {
using(sqlite.prepareStatement("SELECT node_address, node_features FROM peers WHERE node_id=?")) { statement =>
using(sqlite.prepareStatement("SELECT node_address, init_features FROM peers WHERE node_id=?")) { statement =>
statement.setBytes(1, nodeId.value.toArray)
statement.executeQuery().map { rs =>
val nodeAddress_opt = rs.getBitVectorOpt("node_address").map(CommonCodecs.nodeaddress.decode(_).require.value)
val nodeFeatures_opt = rs.getBitVectorOpt("node_features").map(CommonCodecs.initFeaturesCodec.decode(_).require.value)
val nodeFeatures_opt = rs.getBitVectorOpt("init_features").map(CommonCodecs.initFeaturesCodec.decode(_).require.value)
NodeInfo(nodeFeatures_opt.getOrElse(Features.empty), nodeAddress_opt)
}.headOption
}
}

override def listPeers(): Map[PublicKey, NodeInfo] = withMetrics("peers/list", DbBackends.Sqlite) {
using(sqlite.prepareStatement("SELECT node_id, node_address, node_features FROM peers")) { statement =>
using(sqlite.prepareStatement("SELECT node_id, node_address, init_features FROM peers")) { statement =>
statement.executeQuery().map { rs =>
val nodeId = PublicKey(rs.getByteVector("node_id"))
val nodeAddress_opt = rs.getBitVectorOpt("node_address").map(CommonCodecs.nodeaddress.decode(_).require.value)
val nodeFeatures_opt = rs.getBitVectorOpt("node_features").map(CommonCodecs.initFeaturesCodec.decode(_).require.value)
val nodeFeatures_opt = rs.getBitVectorOpt("init_features").map(CommonCodecs.initFeaturesCodec.decode(_).require.value)
nodeId -> NodeInfo(nodeFeatures_opt.getOrElse(Features.empty), nodeAddress_opt)
}.toMap
}
Expand Down
8 changes: 3 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.wire.protocol
import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure
import fr.acinq.eclair.wire.protocol.{AddFeeCredit, ChannelTlv, CurrentFeeCredit, Error, FailureReason, HasChannelId, HasTemporaryChannelId, LightningMessage, LiquidityAds, NodeAddress, NodeInfo, OnTheFlyFundingFailureMessage, OnionMessage, OnionRoutingPacket, PeerStorageRetrieval, PeerStorageStore, RecommendedFeerates, RoutingMessage, SpliceInit, TlvStream, TxAbort, UnknownMessage, Warning, WillAddHtlc, WillFailHtlc, WillFailMalformedHtlc}
import fr.acinq.eclair.wire.protocol.{AddFeeCredit, ChannelTlv, CurrentFeeCredit, Error, FailureReason, HasChannelId, HasTemporaryChannelId, LightningMessage, LiquidityAds, NodeAddress, OnTheFlyFundingFailureMessage, OnionMessage, OnionRoutingPacket, PeerStorageRetrieval, PeerStorageStore, RecommendedFeerates, RoutingMessage, SpliceInit, TlvStream, TxAbort, UnknownMessage, Warning, WillAddHtlc, WillFailHtlc, WillFailMalformedHtlc}
import scodec.bits.ByteVector

/**
Expand Down Expand Up @@ -620,10 +620,8 @@ class Peer(val nodeParams: NodeParams,
stay()
case d: DisconnectedData =>
// If we haven't reconnected since our last restart, we fetch the latest remote features from our DB.
val remoteFeatures_opt = d.remoteFeatures_opt match {
case Some(remoteFeatures) => Some(remoteFeatures)
case None => nodeParams.db.peers.getPeer(remoteNodeId).map(nodeInfo => LastRemoteFeatures(nodeInfo.features, written = true))
}
val remoteFeatures_opt = d.remoteFeatures_opt
.orElse(nodeParams.db.peers.getPeer(remoteNodeId).map(nodeInfo => LastRemoteFeatures(nodeInfo.features, written = true)))
replyTo ! PeerInfo(self, remoteNodeId, stateName, remoteFeatures_opt.map(_.features), None, d.channels.values.toSet)
stay() using d.copy(remoteFeatures_opt = remoteFeatures_opt)
case _ =>
Expand Down

0 comments on commit f5dbd02

Please sign in to comment.