From 074f9583d5b302e8eefbf4be4d490c50b6006964 Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 14 Jan 2025 10:07:57 -0300 Subject: [PATCH] feat(statement-distribution):Implement FanIn approach to statement distribution (#4406) --- .../statement_distribution.go | 103 ++++++++++-------- .../statement_distribution_test.go | 11 ++ dot/parachain/util/util.go | 2 + 3 files changed, 68 insertions(+), 48 deletions(-) create mode 100644 dot/parachain/statement-distribution/statement_distribution_test.go diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index ff4ae40d8c..7cde0429ba 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -2,69 +2,76 @@ package statementdistribution import ( "context" + "fmt" + "time" + parachainutil "github.com/ChainSafe/gossamer/dot/parachain/util" "github.com/ChainSafe/gossamer/internal/log" - - statementedistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages" - parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" ) var logger = log.NewFromGlobal(log.AddContext("pkg", "statement-distribution")) type StatementDistribution struct { + SubSystemToOverseer chan<- any } -func (s StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-chan any) { - for { - select { - case msg, ok := <-overseerToSubSystem: - if !ok { - return - } - err := s.processMessage(msg) - if err != nil { - logger.Errorf("processing overseer message: %w", err) - } - case <-ctx.Done(): - if err := ctx.Err(); err != nil { - logger.Errorf("ctx error: %v\n", err) - } - } - } +type MuxedMessage interface { + isMuxedMessage() } -func (s StatementDistribution) processMessage(msg any) error { - switch msg := msg.(type) { - case statementedistributionmessages.Backed: - // TODO #4171 - case statementedistributionmessages.Share: - // TODO #4170 - // case statementedistributionmessages.NetworkBridgeUpdate - // TODO #4172 this above case would need to wait until network bridge receiver side is merged - case parachaintypes.ActiveLeavesUpdateSignal: - return s.ProcessActiveLeavesUpdateSignal(msg) - case parachaintypes.BlockFinalizedSignal: - return s.ProcessBlockFinalizedSignal(msg) - - default: - return parachaintypes.ErrUnknownOverseerMessage - } - - return nil +type overseerMessage struct { + inner any } -func (s StatementDistribution) Name() parachaintypes.SubSystemName { - return parachaintypes.StatementDistribution -} +func (*overseerMessage) isMuxedMessage() {} -func (s StatementDistribution) ProcessActiveLeavesUpdateSignal(signal parachaintypes.ActiveLeavesUpdateSignal) error { - // TODO #4173 - return nil +type responderMessage struct { + inner any // should be replaced with AttestedCandidateRequest type } -func (s StatementDistribution) ProcessBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) error { - // nothing to do here - return nil +func (*responderMessage) isMuxedMessage() {} + +type reputationChangeMessage struct{} + +func (*reputationChangeMessage) isMuxedMessage() {} + +// Run just receives the ctx and a channel from the overseer to subsystem +func (s *StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-chan any) { + // Inside the method Run, we spawn a goroutine to handle network incoming requests + // TODO: https://github.com/ChainSafe/gossamer/issues/4285 + responderCh := make(chan any, 1) + go taskResponder(responderCh) + + // Timer for reputation aggregator trigger + reputationDelay := time.NewTicker(parachainutil.ReputationChangeInterval) // Adjust the duration as needed + defer reputationDelay.Stop() + + for { + message := s.awaitMessageFrom(overseerToSubSystem, responderCh, reputationDelay.C) + + switch innerMessage := message.(type) { + case *reputationChangeMessage: + logger.Info("Reputation change triggered.") + default: + logger.Warn("Unhandled message type: " + fmt.Sprintf("%v", innerMessage)) + } + } } -func (s StatementDistribution) Stop() {} +func taskResponder(responderCh chan any) {} + +// awaitMessageFrom waits for messages from either the overseerToSubSystem, responderCh, or reputationDelay +func (s *StatementDistribution) awaitMessageFrom( + overseerToSubSystem <-chan any, + responderCh chan any, + reputationDelay <-chan time.Time, +) MuxedMessage { + select { + case msg := <-overseerToSubSystem: + return &overseerMessage{inner: msg} + case msg := <-responderCh: + return &responderMessage{inner: msg} + case <-reputationDelay: + return &reputationChangeMessage{} + } +} diff --git a/dot/parachain/statement-distribution/statement_distribution_test.go b/dot/parachain/statement-distribution/statement_distribution_test.go new file mode 100644 index 0000000000..682de9cf51 --- /dev/null +++ b/dot/parachain/statement-distribution/statement_distribution_test.go @@ -0,0 +1,11 @@ +package statementdistribution + +// //nolint +func createStatementDistribution() (*StatementDistribution, chan any) { + + subSystemToOverseer := make(chan any, 10) + + return &StatementDistribution{ + SubSystemToOverseer: subSystemToOverseer, + }, subSystemToOverseer +} diff --git a/dot/parachain/util/util.go b/dot/parachain/util/util.go index 82afe41a3f..dec86af337 100644 --- a/dot/parachain/util/util.go +++ b/dot/parachain/util/util.go @@ -64,6 +64,8 @@ type UnifiedReputationChange struct { Reason string } +const ReputationChangeInterval = 30 * time.Second + // CostOrBenefit returns the cost or benefit of the reputation change. func (u UnifiedReputationChange) CostOrBenefit() int32 { switch u.Type {