diff --git a/cmd/root.go b/cmd/root.go index 0588cd5..a45e8c3 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -203,7 +203,7 @@ var testMiner = &cobra.Command{ ctx, cancel := context.WithCancel(context.Background()) exit.GlobalExitHandler.AddCancel(cancel) - client, err := stratum.NewClient("user", "miner", "password", "invitecode", "payoutaddress", config.CompiledInVersion) + client, err := stratum.NewClient("user", "miner", "password", "invitecode", "payoutaddress", config.CompiledInVersion, nil) if err != nil { panic(err) } diff --git a/mining/miner.go b/mining/miner.go index 485620c..74b1ebc 100644 --- a/mining/miner.go +++ b/mining/miner.go @@ -41,6 +41,7 @@ func InitLX() { const ( _ = iota BatchCommand + CurrentHashRate NewNoncePrefix NewOPRHash ResetRecords @@ -336,6 +337,14 @@ func (p *PegnetMiner) HandleCommand(c *MinerCommand) { for _, c := range commands { p.HandleCommand(c) } + case CurrentHashRate: + currentStats := *p.MiningState.stats + currentStats.Stop = time.Now() + w := c.Data.(chan *SingleMinerStats) + select { + case w <- ¤tStats: + default: + } case NewNoncePrefix: p.ID = c.Data.(uint32) p.ResetNonce() @@ -392,6 +401,11 @@ func BuildCommand() *CommandBuilder { return c } +func (b *CommandBuilder) CurrentHashRate(w chan *SingleMinerStats) *CommandBuilder { + b.commands = append(b.commands, &MinerCommand{Command: CurrentHashRate, Data: w}) + return b +} + func (b *CommandBuilder) SubmitStats(w chan *SingleMinerStats) *CommandBuilder { b.commands = append(b.commands, &MinerCommand{Command: SubmitStats, Data: w}) return b diff --git a/prosper-miner/main.go b/prosper-miner/main.go index 6d74c89..4057d87 100644 --- a/prosper-miner/main.go +++ b/prosper-miner/main.go @@ -125,7 +125,8 @@ var rootCmd = &cobra.Command{ return } - client, err := stratum.NewClient(username, minerid, password, invitecode, payoutaddress, config.CompiledInVersion) + notifications := stratum.NewNotificationChannels() + client, err := stratum.NewClient(username, minerid, password, invitecode, payoutaddress, config.CompiledInVersion, notifications) if err != nil { panic(err) } @@ -170,6 +171,7 @@ var rootCmd = &cobra.Command{ client.Handshake() go func() { + stopFeed := make(chan int) for { userCommand, _ := keyboardReader.ReadString('\n') words := strings.Fields(userCommand) @@ -185,6 +187,11 @@ var rootCmd = &cobra.Command{ if len(words) > 1 { client.SuggestTarget(words[1]) } + case "startfeed": + fmt.Println("Use 'stopfeed' to stop") + go startFeed(stopFeed, notifications) + case "stopfeed": + stopFeed <- 1 default: fmt.Println("Client command not supported: ", words[0]) } @@ -306,3 +313,16 @@ func initLogger(cmd *cobra.Command) { log.StandardLogger().Hooks.Add(&loghelp.ContextHook{}) } + +func startFeed(stop chan int, nc *stratum.NotificationChannels) { + for { + select { + case <-stop: + return + case i := <-nc.HashRateChannel: + fmt.Printf("Current hash rate: %.2f\n", i) + case <-nc.SubmissionChannel: + fmt.Println("A share was submitted") + } + } +} diff --git a/stratum/client.go b/stratum/client.go index 0833353..ebebfa6 100644 --- a/stratum/client.go +++ b/stratum/client.go @@ -40,6 +40,8 @@ type Client struct { miners []*ControlledMiner successes chan *mining.Winner totalSuccesses uint64 // Total submitted shares + notificationChannels *NotificationChannels + shutdown chan int subscriptions []Subscription requestsMade map[int32]func(Response) @@ -61,7 +63,20 @@ func (c *ControlledMiner) SendCommand(command *mining.MinerCommand) bool { } } -func NewClient(username, minername, password, invitecode, payoutaddress, version string) (*Client, error) { +type NotificationChannels struct { + HashRateChannel chan float64 + SubmissionChannel chan int +} + +func NewNotificationChannels() (*NotificationChannels) { + nc := &NotificationChannels { + HashRateChannel: make(chan float64), + SubmissionChannel: make(chan int), + } + return nc +} + +func NewClient(username, minername, password, invitecode, payoutaddress, version string, notificationChannels *NotificationChannels) (*Client, error) { c := new(Client) c.autoreconnect = true c.version = version @@ -77,6 +92,10 @@ func NewClient(username, minername, password, invitecode, payoutaddress, version successChannel := make(chan *mining.Winner, 100) c.successes = successChannel + c.notificationChannels = notificationChannels + // Increate the buffer size for the shutdown channel for each goroutine + // that will listen to the shutdown channel. + c.shutdown = make(chan int, 1) go c.ListenForSuccess() // @@ -94,6 +113,8 @@ func (c *Client) InitMiners(num int) { Miner: mining.NewPegnetMiner(uint32(i), commandChannel, c.successes), } } + // Once the miners are initialized, there can be a hash rate to report. + go c.ReportHashRate() } func (c *Client) SetFakeHashRate(rate int) { @@ -313,6 +334,10 @@ func (c *Client) SuggestTarget(preferredTarget string) error { func (c *Client) Close() error { c.autoreconnect = false + // Tell goroutines to shutdown + for i, j := 0, cap(c.shutdown); i < j; i++ { + c.shutdown <- 1 + } if !reflect.ValueOf(c.conn).IsNil() { log.Infof("shutting down stratum client") return c.conn.Close() @@ -523,6 +548,28 @@ func (c *Client) AggregateStats(job int32, stats chan *mining.SingleMinerStats, log.WithFields(groupStats.LogFields()).Info("job miner stats") } +func (c *Client) AggregateStatsAndNotify(job int32, stats chan *mining.SingleMinerStats, l int) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second * 3) + defer cancel() // Must clean up context to avoid a memory leak + groupStats := mining.NewGroupMinerStats(job) + + for i := 0; i < l; i ++ { + select { + case stat := <-stats: + groupStats.Miners[stat.ID] = stat + case <-ctx.Done(): + } + } + if c.notificationChannels != nil { + // Notify listeners. Do nothing if no goroutines are + // listening. + select { + case c.notificationChannels.HashRateChannel <- groupStats.TotalHashPower(): + default: + } + } +} + func (c *Client) HandleResponse(resp Response) { c.Lock() if funcToPerform, ok := c.requestsMade[resp.ID]; ok { @@ -546,11 +593,38 @@ func (c *Client) ListenForSuccess() { log.WithError(err).Error("failed to submit to server") } else { c.totalSuccesses++ + if c.notificationChannels != nil { + // Notify listeners. Do nothing if no + // goroutines are listening. + select { + case c.notificationChannels.SubmissionChannel <- 1: + default: + } + } } } } } +func (c *Client) ReportHashRate() { + ticker := time.NewTicker(time.Second * 10) + for { + select { + case <- c.shutdown: + ticker.Stop() + return + case <- ticker.C: + existingJobID, _ := strconv.ParseInt(c.currentJobID, 10, 64) + stats := make(chan *mining.SingleMinerStats, len(c.miners)) + command := mining.BuildCommand(). + CurrentHashRate(stats). + Build() + c.SendCommand(command) + go c.AggregateStatsAndNotify(int32(existingJobID), stats, len(c.miners)) + } + } +} + func (c *Client) TotalSuccesses() uint64 { return c.totalSuccesses }