Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 70 additions & 21 deletions notifications/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/lightninglabs/aperture/l402"
"github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightningnetwork/lnd/lntypes"
"google.golang.org/grpc"
)

Expand All @@ -26,6 +27,13 @@ const (
NotificationTypeStaticLoopInSweepRequest
)

const (
// defaultMinAliveConnTime is the default minimum time that the
// connection to the server needs to be alive before we consider it a
// successful connection.
defaultMinAliveConnTime = time.Minute
)

// Client is the interface that the notification manager needs to implement in
// order to be able to subscribe to notifications.
type Client interface {
Expand All @@ -45,6 +53,10 @@ type Config struct {
// CurrentToken returns the token that is currently contained in the
// store or an l402.ErrNoToken error if there is none.
CurrentToken func() (*l402.Token, error)

// MinAliveConnTime is the minimum time that the connection to the
// server needs to be alive before we consider it a successful.
MinAliveConnTime time.Duration
}

// Manager is a manager for notifications that the swap server sends to the
Expand All @@ -60,6 +72,11 @@ type Manager struct {

// NewManager creates a new notification manager.
func NewManager(cfg *Config) *Manager {
// Set the default minimum alive connection time if it's not set.
if cfg.MinAliveConnTime == 0 {
cfg.MinAliveConnTime = defaultMinAliveConnTime
}

return &Manager{
cfg: cfg,
subscribers: make(map[NotificationType][]subscriber),
Expand Down Expand Up @@ -128,13 +145,18 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
// close the readyChan to signal that the manager is ready.
func (m *Manager) Run(ctx context.Context) error {
// Initially we want to immediately try to connect to the server.
waitTime := time.Duration(0)
var (
waitTime time.Duration
backoff time.Duration
attempts int
)

// Start the notification runloop.
for {
timer := time.NewTimer(waitTime)
// Increase the wait time for the next iteration.
waitTime += time.Second * 1
backoff = waitTime + time.Duration(attempts)*time.Second
waitTime = 0
timer := time.NewTimer(backoff)

// Return if the context has been canceled.
select {
Expand All @@ -145,37 +167,66 @@ func (m *Manager) Run(ctx context.Context) error {
}

// In order to create a valid l402 we first are going to call
// the FetchL402 method. As a client might not have outbound capacity
// yet, we'll retry until we get a valid response.
// the FetchL402 method. As a client might not have outbound
// capacity yet, we'll retry until we get a valid response.
if !m.hasL402 {
_, err := m.cfg.CurrentToken()
token, err := m.cfg.CurrentToken()
if err != nil {
// We only log the error if it's not the case that we
// don't have a token yet to avoid spamming the logs.
// We only log the error if it's not the case
// that we don't have a token yet to avoid
// spamming the logs.
if err != l402.ErrNoToken {
log.Errorf("Error getting L402 from store: %v", err)
log.Errorf("Error getting L402 from "+
"the store: %v", err)
}
continue
}
m.hasL402 = true
}

connectedFunc := func() {
// Reset the wait time to 10 seconds.
waitTime = time.Second * 10
// If the preimage is empty, we don't have a valid L402
// yet so we'll continue to retry with the incremental
// backoff.
emptyPreimage := lntypes.Preimage{}
if token.Preimage == emptyPreimage {
attempts++
continue
}

attempts = 0
m.hasL402 = true
}

err := m.subscribeNotifications(ctx, connectedFunc)
connectAttempted := time.Now()
err := m.subscribeNotifications(ctx)
if err != nil {
log.Errorf("Error subscribing to notifications: %v", err)
log.Errorf("Error subscribing to notifications: %v",
err)
}
connectionAliveTime := time.Since(connectAttempted)

// Note that we may be able to connet to the stream but not
// able to use it if the client is unable to pay for their
// L402. In this case the subscription will fail on the first
// read immediately after connecting. We'll therefore only
// consider the connection successful if we were able to use
// the stream for at least the minimum alive connection time
// (which defaults to 1 minute).
if connectionAliveTime > m.cfg.MinAliveConnTime {
// Reset the backoff to 10 seconds and the connect
// attempts to zero if we were really connected for a
// considerable amount of time (1 minute).
waitTime = time.Second * 10
attempts = 0
} else {
// We either failed to connect or the stream
// disconnected immediately, so we just increase the
// backoff.
attempts++
}
}
}

// subscribeNotifications subscribes to the notifications from the server.
func (m *Manager) subscribeNotifications(ctx context.Context,
connectedFunc func()) error {

func (m *Manager) subscribeNotifications(ctx context.Context) error {
callCtx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -186,8 +237,6 @@ func (m *Manager) subscribeNotifications(ctx context.Context,
return err
}

// Signal that we're connected to the server.
connectedFunc()
log.Debugf("Successfully subscribed to server notifications")

for {
Expand Down
Loading
Loading