Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ var testMiner = &cobra.Command{
ctx, cancel := context.WithCancel(context.Background())
exit.GlobalExitHandler.AddCancel(cancel)

client, err := stratum.NewClient("user", "miner", "password", "invitecode", "payoutaddress", config.CompiledInVersion)
client, err := stratum.NewClient("user", "miner", "password", "invitecode", "payoutaddress", config.CompiledInVersion, nil)
if err != nil {
panic(err)
}
Expand Down
14 changes: 14 additions & 0 deletions mining/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func InitLX() {
const (
_ = iota
BatchCommand
CurrentHashRate
NewNoncePrefix
NewOPRHash
ResetRecords
Expand Down Expand Up @@ -336,6 +337,14 @@ func (p *PegnetMiner) HandleCommand(c *MinerCommand) {
for _, c := range commands {
p.HandleCommand(c)
}
case CurrentHashRate:
currentStats := *p.MiningState.stats
currentStats.Stop = time.Now()
w := c.Data.(chan *SingleMinerStats)
select {
case w <- &currentStats:
default:
}
case NewNoncePrefix:
p.ID = c.Data.(uint32)
p.ResetNonce()
Expand Down Expand Up @@ -392,6 +401,11 @@ func BuildCommand() *CommandBuilder {
return c
}

func (b *CommandBuilder) CurrentHashRate(w chan *SingleMinerStats) *CommandBuilder {
b.commands = append(b.commands, &MinerCommand{Command: CurrentHashRate, Data: w})
return b
}

func (b *CommandBuilder) SubmitStats(w chan *SingleMinerStats) *CommandBuilder {
b.commands = append(b.commands, &MinerCommand{Command: SubmitStats, Data: w})
return b
Expand Down
22 changes: 21 additions & 1 deletion prosper-miner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ var rootCmd = &cobra.Command{
return
}

client, err := stratum.NewClient(username, minerid, password, invitecode, payoutaddress, config.CompiledInVersion)
notifications := stratum.NewNotificationChannels()
client, err := stratum.NewClient(username, minerid, password, invitecode, payoutaddress, config.CompiledInVersion, notifications)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -170,6 +171,7 @@ var rootCmd = &cobra.Command{
client.Handshake()

go func() {
stopFeed := make(chan int)
for {
userCommand, _ := keyboardReader.ReadString('\n')
words := strings.Fields(userCommand)
Expand All @@ -185,6 +187,11 @@ var rootCmd = &cobra.Command{
if len(words) > 1 {
client.SuggestTarget(words[1])
}
case "startfeed":
fmt.Println("Use 'stopfeed' to stop")
go startFeed(stopFeed, notifications)
case "stopfeed":
stopFeed <- 1
default:
fmt.Println("Client command not supported: ", words[0])
}
Expand Down Expand Up @@ -306,3 +313,16 @@ func initLogger(cmd *cobra.Command) {

log.StandardLogger().Hooks.Add(&loghelp.ContextHook{})
}

func startFeed(stop chan int, nc *stratum.NotificationChannels) {
for {
select {
case <-stop:
return
case i := <-nc.HashRateChannel:
fmt.Printf("Current hash rate: %.2f\n", i)
case <-nc.SubmissionChannel:
fmt.Println("A share was submitted")
}
}
}
76 changes: 75 additions & 1 deletion stratum/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Client struct {
miners []*ControlledMiner
successes chan *mining.Winner
totalSuccesses uint64 // Total submitted shares
notificationChannels *NotificationChannels
shutdown chan int

subscriptions []Subscription
requestsMade map[int32]func(Response)
Expand All @@ -61,7 +63,20 @@ func (c *ControlledMiner) SendCommand(command *mining.MinerCommand) bool {
}
}

func NewClient(username, minername, password, invitecode, payoutaddress, version string) (*Client, error) {
type NotificationChannels struct {
HashRateChannel chan float64
SubmissionChannel chan int
}

func NewNotificationChannels() (*NotificationChannels) {
nc := &NotificationChannels {
HashRateChannel: make(chan float64),
SubmissionChannel: make(chan int),
}
return nc
}

func NewClient(username, minername, password, invitecode, payoutaddress, version string, notificationChannels *NotificationChannels) (*Client, error) {
c := new(Client)
c.autoreconnect = true
c.version = version
Expand All @@ -77,6 +92,10 @@ func NewClient(username, minername, password, invitecode, payoutaddress, version

successChannel := make(chan *mining.Winner, 100)
c.successes = successChannel
c.notificationChannels = notificationChannels
// Increate the buffer size for the shutdown channel for each goroutine
// that will listen to the shutdown channel.
c.shutdown = make(chan int, 1)

go c.ListenForSuccess()
//
Expand All @@ -94,6 +113,8 @@ func (c *Client) InitMiners(num int) {
Miner: mining.NewPegnetMiner(uint32(i), commandChannel, c.successes),
}
}
// Once the miners are initialized, there can be a hash rate to report.
go c.ReportHashRate()
}

func (c *Client) SetFakeHashRate(rate int) {
Expand Down Expand Up @@ -313,6 +334,10 @@ func (c *Client) SuggestTarget(preferredTarget string) error {

func (c *Client) Close() error {
c.autoreconnect = false
// Tell goroutines to shutdown
for i, j := 0, cap(c.shutdown); i < j; i++ {
c.shutdown <- 1
}
if !reflect.ValueOf(c.conn).IsNil() {
log.Infof("shutting down stratum client")
return c.conn.Close()
Expand Down Expand Up @@ -523,6 +548,28 @@ func (c *Client) AggregateStats(job int32, stats chan *mining.SingleMinerStats,
log.WithFields(groupStats.LogFields()).Info("job miner stats")
}

func (c *Client) AggregateStatsAndNotify(job int32, stats chan *mining.SingleMinerStats, l int) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second * 3)
defer cancel() // Must clean up context to avoid a memory leak
groupStats := mining.NewGroupMinerStats(job)

for i := 0; i < l; i ++ {
select {
case stat := <-stats:
groupStats.Miners[stat.ID] = stat
case <-ctx.Done():
}
}
if c.notificationChannels != nil {
// Notify listeners. Do nothing if no goroutines are
// listening.
select {
case c.notificationChannels.HashRateChannel <- groupStats.TotalHashPower():
default:
}
}
}

func (c *Client) HandleResponse(resp Response) {
c.Lock()
if funcToPerform, ok := c.requestsMade[resp.ID]; ok {
Expand All @@ -546,11 +593,38 @@ func (c *Client) ListenForSuccess() {
log.WithError(err).Error("failed to submit to server")
} else {
c.totalSuccesses++
if c.notificationChannels != nil {
// Notify listeners. Do nothing if no
// goroutines are listening.
select {
case c.notificationChannels.SubmissionChannel <- 1:
default:
}
}
}
}
}
}

func (c *Client) ReportHashRate() {
ticker := time.NewTicker(time.Second * 10)
for {
select {
case <- c.shutdown:
ticker.Stop()
return
case <- ticker.C:
existingJobID, _ := strconv.ParseInt(c.currentJobID, 10, 64)
stats := make(chan *mining.SingleMinerStats, len(c.miners))
command := mining.BuildCommand().
CurrentHashRate(stats).
Build()
c.SendCommand(command)
go c.AggregateStatsAndNotify(int32(existingJobID), stats, len(c.miners))
}
}
}

func (c *Client) TotalSuccesses() uint64 {
return c.totalSuccesses
}