Skip to content

Commit 1a3422f

Browse files
dmitryaxcrobert-1
andauthored
Restore protocol/signalfx.SetupChain (#144)
* Restore protocol/signalfx.SetupChain It's still being used in the signalfx-forwarder monitor * Update protocol/zipper/zipper.go Co-authored-by: Curtis Robert <[email protected]> --------- Co-authored-by: Curtis Robert <[email protected]>
1 parent 2779273 commit 1a3422f

File tree

4 files changed

+224
-1
lines changed

4 files changed

+224
-1
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ lint: ; $(info $(M) running linting on $(CURDIR))
3636

3737
ARGS = -race -timeout=60s -failfast
3838
COVERAGE_MODE = atomic
39-
REQUIRED_COVERAGE = 93.0
39+
REQUIRED_COVERAGE = 92.0
4040
COVERAGE_DIR = $(CURDIR)/coverage.$(shell date -u +"%Y_%m_%dT%H_%M_%SZ")
4141
COVERAGE_P_FILE = $(COVERAGE_DIR)/coverage/parallel/coverage.out
4242
COVERAGE_S_FILE = $(COVERAGE_DIR)/coverage/serialized/coverage.out

protocol/signalfx/datapoint.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ import (
2424
"github.com/signalfx/golib/v3/errors"
2525
"github.com/signalfx/golib/v3/log"
2626
"github.com/signalfx/golib/v3/sfxclient"
27+
"github.com/signalfx/golib/v3/web"
2728
"github.com/signalfx/ingest-protocols/logkey"
2829
"github.com/signalfx/ingest-protocols/protocol"
2930
signalfxformat "github.com/signalfx/ingest-protocols/protocol/signalfx/format"
31+
"github.com/signalfx/ingest-protocols/protocol/zipper"
3032
)
3133

3234
// MericTypeGetter is an old metric interface that returns the type of a metric name
@@ -262,3 +264,70 @@ func SetupJSONV1Paths(r *mux.Router, handler http.Handler) {
262264
SetupJSONByPaths(r, handler, "/datapoint")
263265
SetupJSONByPaths(r, handler, "/v1/datapoint")
264266
}
267+
268+
// ErrorTrackerHandler behaves like a http handler, but tracks error returns from a ErrorReader
269+
type ErrorTrackerHandler struct {
270+
TotalErrors int64
271+
reader ErrorReader
272+
Logger log.Logger
273+
}
274+
275+
// Datapoints gets TotalErrors stats
276+
func (e *ErrorTrackerHandler) Datapoints() []*datapoint.Datapoint {
277+
return []*datapoint.Datapoint{
278+
sfxclient.Cumulative("total_errors", nil, atomic.LoadInt64(&e.TotalErrors)),
279+
}
280+
}
281+
282+
func addTokenToContext(ctx context.Context, req *http.Request) context.Context {
283+
head := req.Header.Get(sfxclient.TokenHeaderName)
284+
if head != "" {
285+
ctx = context.WithValue(ctx, sfxclient.TokenHeaderName, head)
286+
}
287+
return ctx
288+
}
289+
290+
// ServeHTTPC will serve the wrapped ErrorReader and return the error (if any) to rw if ErrorReader
291+
// fails
292+
func (e *ErrorTrackerHandler) ServeHTTPC(ctx context.Context, rw http.ResponseWriter, req *http.Request) {
293+
ctx = addTokenToContext(ctx, req)
294+
if err := e.reader.Read(ctx, req); err != nil {
295+
atomic.AddInt64(&e.TotalErrors, 1)
296+
rw.WriteHeader(http.StatusBadRequest)
297+
_, err = rw.Write([]byte(err.Error()))
298+
log.IfErr(e.Logger, err)
299+
return
300+
}
301+
_, err := rw.Write([]byte(`"OK"`))
302+
log.IfErr(e.Logger, err)
303+
}
304+
305+
// SetupChain wraps the reader returned by getReader in an http.Handler along
306+
// with some middleware that calculates internal metrics about requests.
307+
func SetupChain(ctx context.Context, sink Sink, chainType string, getReader func(Sink) ErrorReader, httpChain web.NextConstructor, logger log.Logger, counter *dpsink.Counter, moreConstructors ...web.Constructor) (http.Handler, sfxclient.Collector) {
308+
zippers := zipper.NewZipper()
309+
310+
ucount := UnifyNextSinkWrap(counter)
311+
finalSink := FromChain(sink, NextWrap(ucount))
312+
errReader := getReader(finalSink)
313+
errorTracker := ErrorTrackerHandler{
314+
reader: errReader,
315+
Logger: logger,
316+
}
317+
metricTracking := web.RequestCounter{}
318+
handler := web.NewHandler(ctx, &errorTracker).Add(web.NextHTTP(metricTracking.ServeHTTP)).Add(httpChain)
319+
for _, c := range moreConstructors {
320+
handler.Add(c)
321+
}
322+
st := &sfxclient.WithDimensions{
323+
Collector: sfxclient.NewMultiCollector(
324+
&metricTracking,
325+
&errorTracker,
326+
zippers,
327+
),
328+
Dimensions: map[string]string{
329+
"protocol": "sfx_" + chainType,
330+
},
331+
}
332+
return zippers.GzipHandler(handler), st
333+
}

protocol/zipper/zipper.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package zipper
2+
3+
import (
4+
"bytes"
5+
"compress/gzip"
6+
"io"
7+
"net/http"
8+
"sync"
9+
"sync/atomic"
10+
11+
"github.com/signalfx/golib/v3/datapoint"
12+
"github.com/signalfx/golib/v3/log"
13+
"github.com/signalfx/golib/v3/sfxclient"
14+
)
15+
16+
// ReadZipper creates a Pool that contains previously used Readers and can create new ones if we run out.
17+
type ReadZipper struct {
18+
zippers sync.Pool
19+
Log log.Logger
20+
NewCount int64
21+
HitCount int64
22+
MissCount int64
23+
ErrCount int64
24+
}
25+
26+
// GzipHandler transparently decodes your possibly gzipped request
27+
func (z *ReadZipper) GzipHandler(h http.Handler) http.Handler {
28+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
29+
var err error
30+
if r.Header.Get("Content-Encoding") == "gzip" {
31+
gzi := z.zippers.Get()
32+
if gzi != nil {
33+
gz := gzi.(*gzip.Reader)
34+
// put it back
35+
defer z.zippers.Put(gz)
36+
err = gz.Reset(r.Body)
37+
if err == nil {
38+
defer log.IfErr(z.Log, gz.Close())
39+
// nasty? could construct another object but seems expensive
40+
r.Body = gz
41+
atomic.AddInt64(&z.HitCount, 1)
42+
h.ServeHTTP(w, r)
43+
return
44+
}
45+
}
46+
}
47+
if err != nil {
48+
atomic.AddInt64(&z.ErrCount, 1)
49+
w.WriteHeader(http.StatusBadRequest)
50+
_, err = w.Write([]byte("error handling gzip compressed request " + err.Error()))
51+
log.IfErr(z.Log, err)
52+
return
53+
}
54+
atomic.AddInt64(&z.MissCount, 1)
55+
h.ServeHTTP(w, r)
56+
})
57+
}
58+
59+
// Datapoints implements Collector interface and returns metrics
60+
func (z *ReadZipper) Datapoints() []*datapoint.Datapoint {
61+
return []*datapoint.Datapoint{
62+
sfxclient.CumulativeP("zipper.hitCount", nil, &z.HitCount),
63+
sfxclient.CumulativeP("zipper.missCount", nil, &z.MissCount),
64+
sfxclient.CumulativeP("zipper.newCount", nil, &z.NewCount),
65+
sfxclient.CumulativeP("zipper.errCount", nil, &z.ErrCount),
66+
}
67+
}
68+
69+
// NewZipper gives you a ReadZipper
70+
func NewZipper() *ReadZipper {
71+
return newZipper(gzip.NewReader)
72+
}
73+
74+
func newZipper(zipperFunc func(r io.Reader) (*gzip.Reader, error)) *ReadZipper {
75+
z := &ReadZipper{}
76+
z.zippers = sync.Pool{New: func() interface{} {
77+
atomic.AddInt64(&z.NewCount, 1)
78+
// This is just the header of an empty gzip, unlike NewWriter, it can't pass in nil to empty bytes
79+
g, err := zipperFunc(bytes.NewBuffer([]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255}))
80+
if err == nil {
81+
return g
82+
}
83+
return nil
84+
}}
85+
return z
86+
}

protocol/zipper/zipper_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package zipper
2+
3+
import (
4+
"bytes"
5+
"compress/gzip"
6+
"context"
7+
"io"
8+
"net/http"
9+
"net/http/httptest"
10+
"testing"
11+
12+
"github.com/signalfx/golib/v3/errors"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
type foo struct{}
18+
19+
func (f *foo) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
20+
buf := new(bytes.Buffer)
21+
if _, err := buf.ReadFrom(req.Body); err == nil {
22+
if buf.String() == "OK" {
23+
rw.WriteHeader(http.StatusOK)
24+
}
25+
}
26+
rw.WriteHeader(http.StatusBadRequest)
27+
}
28+
29+
func TestZipper(t *testing.T) {
30+
zippers := NewZipper()
31+
badZippers := newZipper(func(io.Reader) (*gzip.Reader, error) {
32+
return new(gzip.Reader), errors.New("nope")
33+
})
34+
f := new(foo)
35+
zipped := new(bytes.Buffer)
36+
w := gzip.NewWriter(zipped)
37+
_, err := w.Write([]byte("OK"))
38+
require.NoError(t, err)
39+
require.NoError(t, w.Close())
40+
tests := []struct {
41+
zipper *ReadZipper
42+
name string
43+
data []byte
44+
status int
45+
headers map[string]string
46+
}{
47+
{zippers, "test non gzipped", []byte("OK"), http.StatusOK, map[string]string{}},
48+
{zippers, "test gzipped", zipped.Bytes(), http.StatusOK, map[string]string{"Content-Encoding": "gzip"}},
49+
{zippers, "test gzipped but bad", zipped.Bytes()[:5], http.StatusBadRequest, map[string]string{"Content-Encoding": "gzip"}},
50+
{badZippers, "test gzipped failure", zipped.Bytes(), http.StatusBadRequest, map[string]string{"Content-Encoding": "gzip"}},
51+
}
52+
for _, test := range tests {
53+
t.Run(test.name, func(t *testing.T) {
54+
req, err := http.NewRequestWithContext(context.Background(), "GET", "/health-check", bytes.NewBuffer(test.data))
55+
for k, v := range test.headers {
56+
req.Header.Set(k, v)
57+
}
58+
require.NoError(t, err)
59+
rr := httptest.NewRecorder()
60+
g := test.zipper.GzipHandler(f)
61+
g.ServeHTTP(rr, req)
62+
assert.Equal(t, test.status, rr.Code)
63+
})
64+
}
65+
t.Run("check datapoints", func(t *testing.T) {
66+
assert.Equal(t, 4, len(zippers.Datapoints()))
67+
})
68+
}

0 commit comments

Comments
 (0)