-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmessages.go
171 lines (146 loc) · 3.95 KB
/
messages.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Copyright © 2017 Pennock Tech, LLC.
// All rights reserved, except as granted under license.
// Licensed per file LICENSE.txt
package main
import (
"fmt"
"os"
"sync"
"sync/atomic"
)
func debugf(spec string, args ...interface{}) {
if !opts.debug {
return
}
fmt.Fprintf(os.Stderr, "DEBUG: "+spec, args...)
}
// emitOutputMessages prints the messages it receives on the channel.
//
// We scan N hostnames, each of M IPs, in parallel.
// We emit messages and don't want them stomping on each other. So we're
// a simple demultiplexer.
func emitOutputMessages(messages <-chan string, shuttingDown *sync.WaitGroup) {
for {
msg, ok := <-messages
if !ok {
break
}
fmt.Println(msg)
}
shuttingDown.Done()
}
func (s *programStatus) AddErr() {
_ = atomic.AddUint32(&s.errorCount, 1)
}
func (s *programStatus) AddWarning() {
_ = atomic.AddUint32(&s.warningCount, 1)
}
func (s *programStatus) Messagef(spec string, args ...interface{}) {
s.Message(fmt.Sprintf(spec, args...))
}
func (s *programStatus) Message(msg string) {
s.output <- msg
}
func (s *programStatus) Waffle(msg string) {
if !opts.terse {
s.Message(msg)
}
}
func (s *programStatus) Wafflef(spec string, args ...interface{}) {
if !opts.terse {
s.Message(fmt.Sprintf(spec, args...))
}
}
func (s *programStatus) Error(msg string) {
s.Message(ColorRed(msg))
s.AddErr()
}
func (s *programStatus) Errorf(spec string, args ...interface{}) {
s.Error(fmt.Sprintf(spec, args...))
}
func (s *programStatus) Warn(msg string) {
s.Message(ColorYellow(msg))
s.AddWarning()
}
func (s *programStatus) Warnf(spec string, args ...interface{}) {
s.Error(fmt.Sprintf(spec, args...))
}
func (s *programStatus) Successf(spec string, args ...interface{}) {
s.Success(fmt.Sprintf(spec, args...))
}
func (s *programStatus) Success(msg string) {
s.output <- ColorGreen(msg)
}
func (s *programStatus) ChildBatcher(label1, label2 string) (new *programStatus) {
messages := make(chan string, 10)
var label string
if label2 != "" {
label = fmt.Sprintf("%s[%s]", label1, label2)
} else {
label = label1
}
if s.label != "" {
label = fmt.Sprintf("%s→%s", s.label, label)
}
new = &programStatus{
probing: s.probing,
shuttingDown: s.shuttingDown,
batchChildren: &sync.WaitGroup{},
output: messages,
label: label,
}
s.probing.Add(1) // shared
s.batchChildren.Add(1) // unshared, deliberately the old one (parent)
go batchedEmitMessages(messages, new, s)
return new
}
func (s *programStatus) BatchFinished() {
label := s.label
if label == "" {
label = "main(smtpdane)"
}
// wait for any of _our_ children to finish using us
debugf("%s: we're finished\n", label)
s.batchChildren.Wait()
debugf("%s: and our children are done too\n", label)
// then close ours
close(s.output)
s.probing.Done()
}
func batchedEmitMessages(src <-chan string, this, sink *programStatus) {
batch := make([]string, 0, 50)
for {
msg, ok := <-src
if !ok {
break
}
if opts.debugFast {
sink.Message(msg)
} else {
batch = append(batch, msg)
}
}
// closed, so BatchFinished, so we _and_ our children are finished.
errored := atomic.LoadUint32(&this.errorCount) != 0
if errored {
// race-safety: children exited, so nothing modifying our our
// errorCount any more, could safely just reference it directly, so
// there's no race between errored check above and this load here.
//
// We could just use .AddErr() but we want an accurate count preserved.
_ = atomic.AddUint32(&sink.errorCount, atomic.LoadUint32(&this.errorCount))
}
_ = atomic.AddUint32(&sink.warningCount, atomic.LoadUint32(&this.warningCount))
if errored || !opts.quiet {
for i := range batch {
sink.Message(batch[i])
}
}
// let parent know that we're done as a child user of its messaging channel, letting
// it close output when ready to propagate.
//
// has had BatchFinished called, then the parent can close too.
sink.batchChildren.Done()
// top-level "things are in flight"
sink.probing.Done()
}