Skip to content

Commit

Permalink
feat(statement-distribution):Implement FanIn approach to statement di…
Browse files Browse the repository at this point in the history
…stribution (#4406)
  • Loading branch information
DanielDDHM authored Jan 14, 2025
1 parent e9f3d8d commit 074f958
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 48 deletions.
103 changes: 55 additions & 48 deletions dot/parachain/statement-distribution/statement_distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,76 @@ package statementdistribution

import (
"context"
"fmt"
"time"

parachainutil "github.com/ChainSafe/gossamer/dot/parachain/util"
"github.com/ChainSafe/gossamer/internal/log"

statementedistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
)

var logger = log.NewFromGlobal(log.AddContext("pkg", "statement-distribution"))

type StatementDistribution struct {
SubSystemToOverseer chan<- any
}

func (s StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-chan any) {
for {
select {
case msg, ok := <-overseerToSubSystem:
if !ok {
return
}
err := s.processMessage(msg)
if err != nil {
logger.Errorf("processing overseer message: %w", err)
}
case <-ctx.Done():
if err := ctx.Err(); err != nil {
logger.Errorf("ctx error: %v\n", err)
}
}
}
type MuxedMessage interface {
isMuxedMessage()
}

func (s StatementDistribution) processMessage(msg any) error {
switch msg := msg.(type) {
case statementedistributionmessages.Backed:
// TODO #4171
case statementedistributionmessages.Share:
// TODO #4170
// case statementedistributionmessages.NetworkBridgeUpdate
// TODO #4172 this above case would need to wait until network bridge receiver side is merged
case parachaintypes.ActiveLeavesUpdateSignal:
return s.ProcessActiveLeavesUpdateSignal(msg)
case parachaintypes.BlockFinalizedSignal:
return s.ProcessBlockFinalizedSignal(msg)

default:
return parachaintypes.ErrUnknownOverseerMessage
}

return nil
type overseerMessage struct {
inner any
}

func (s StatementDistribution) Name() parachaintypes.SubSystemName {
return parachaintypes.StatementDistribution
}
func (*overseerMessage) isMuxedMessage() {}

func (s StatementDistribution) ProcessActiveLeavesUpdateSignal(signal parachaintypes.ActiveLeavesUpdateSignal) error {
// TODO #4173
return nil
type responderMessage struct {
inner any // should be replaced with AttestedCandidateRequest type
}

func (s StatementDistribution) ProcessBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) error {
// nothing to do here
return nil
func (*responderMessage) isMuxedMessage() {}

type reputationChangeMessage struct{}

func (*reputationChangeMessage) isMuxedMessage() {}

// Run just receives the ctx and a channel from the overseer to subsystem
func (s *StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-chan any) {
// Inside the method Run, we spawn a goroutine to handle network incoming requests
// TODO: https://github.com/ChainSafe/gossamer/issues/4285
responderCh := make(chan any, 1)
go taskResponder(responderCh)

// Timer for reputation aggregator trigger
reputationDelay := time.NewTicker(parachainutil.ReputationChangeInterval) // Adjust the duration as needed
defer reputationDelay.Stop()

for {
message := s.awaitMessageFrom(overseerToSubSystem, responderCh, reputationDelay.C)

switch innerMessage := message.(type) {
case *reputationChangeMessage:
logger.Info("Reputation change triggered.")
default:
logger.Warn("Unhandled message type: " + fmt.Sprintf("%v", innerMessage))
}
}
}

func (s StatementDistribution) Stop() {}
func taskResponder(responderCh chan any) {}

// awaitMessageFrom waits for messages from either the overseerToSubSystem, responderCh, or reputationDelay
func (s *StatementDistribution) awaitMessageFrom(
overseerToSubSystem <-chan any,
responderCh chan any,
reputationDelay <-chan time.Time,
) MuxedMessage {
select {
case msg := <-overseerToSubSystem:
return &overseerMessage{inner: msg}
case msg := <-responderCh:
return &responderMessage{inner: msg}
case <-reputationDelay:
return &reputationChangeMessage{}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package statementdistribution

// //nolint
func createStatementDistribution() (*StatementDistribution, chan any) {

subSystemToOverseer := make(chan any, 10)

return &StatementDistribution{
SubSystemToOverseer: subSystemToOverseer,
}, subSystemToOverseer
}
2 changes: 2 additions & 0 deletions dot/parachain/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type UnifiedReputationChange struct {
Reason string
}

const ReputationChangeInterval = 30 * time.Second

// CostOrBenefit returns the cost or benefit of the reputation change.
func (u UnifiedReputationChange) CostOrBenefit() int32 {
switch u.Type {
Expand Down

0 comments on commit 074f958

Please sign in to comment.