Skip to content

Commit 3bf9a5d

Browse files
authored
wip: filter for events on handler
1 parent 3066569 commit 3bf9a5d

File tree

4 files changed

+112
-7
lines changed

4 files changed

+112
-7
lines changed

Diff for: events/handlers.go

+31-4
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,23 @@ import (
1010
"strings"
1111

1212
cloudevents "github.com/cloudevents/sdk-go"
13+
"github.com/zeiss/pkg/channels"
14+
"github.com/zeiss/pkg/slices"
1315
"github.com/zeiss/pkg/utilx"
1416
)
1517

18+
// FilterFunc is the filter for events.
19+
func FilterFunc(types ...string) func(cloudevents.Event) bool {
20+
return func(e cloudevents.Event) bool {
21+
return slices.In(e.Type(), types...)
22+
}
23+
}
24+
25+
// Filter is the filter for events.
26+
func Filter(input <-chan cloudevents.Event, types ...string) <-chan cloudevents.Event {
27+
return channels.Filter(input, FilterFunc(types...))
28+
}
29+
1630
// EventHandler is the handler for events.
1731
type EventHandler struct {
1832
events chan cloudevents.Event
@@ -33,6 +47,20 @@ func WithBufferSize(size int) Opt {
3347
}
3448
}
3549

50+
// WithEvents sets the events channel.
51+
func WithEvents(events chan cloudevents.Event) Opt {
52+
return func(h *EventHandler) {
53+
h.events = events
54+
}
55+
}
56+
57+
// Close closes the events channel.
58+
func (h *EventHandler) Close() {
59+
if h.events != nil {
60+
close(h.events)
61+
}
62+
}
63+
3664
// ServeHTTP is the handler for events.
3765
func (h *EventHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
3866
r.Body = http.MaxBytesReader(w, r.Body, 1048576)
@@ -61,10 +89,7 @@ func (h *EventHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
6189
case errors.Is(err, io.ErrUnexpectedEOF):
6290
http.Error(w, "Request body contains badly-formed JSON", http.StatusBadRequest)
6391

64-
// Catch any type errors, like trying to assign a string in the
65-
// JSON request body to a int field in our Person struct. We can
66-
// interpolate the relevant field name and position into the error
67-
// message to make it easier for the client to fix.
92+
// Catch any type errors, like trying to assign a string in the payload.
6893
case errors.As(err, &unmarshalTypeError):
6994
msg := fmt.Sprintf("Request body contains an invalid value for the %q field (at position %d)", unmarshalTypeError.Field, unmarshalTypeError.Offset)
7095
http.Error(w, msg, http.StatusBadRequest)
@@ -101,6 +126,8 @@ func (h *EventHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
101126
for _, e := range events {
102127
h.events <- e
103128
}
129+
130+
w.WriteHeader(http.StatusAccepted)
104131
}
105132

106133
// NewEventHandler creates a new event handler.

Diff for: events/handlers_test.go

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package events_test
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"net/http"
7+
"net/http/httptest"
8+
"testing"
9+
10+
cloudevents "github.com/cloudevents/sdk-go"
11+
"github.com/stretchr/testify/require"
12+
"github.com/zeiss/go-acs/events"
13+
)
14+
15+
func TestEventHandler_ServeHTTP(t *testing.T) {
16+
event := cloudevents.NewEvent()
17+
event.SetID("test")
18+
event.SetType("test")
19+
event.SetSource("test")
20+
21+
bb := new(bytes.Buffer)
22+
enc := json.NewEncoder(bb).Encode([]cloudevents.Event{event})
23+
require.NoError(t, enc)
24+
25+
req, err := http.NewRequest(http.MethodPost, "/events", bb)
26+
require.NoError(t, err)
27+
28+
in := make(chan cloudevents.Event, 1)
29+
30+
hh := events.NewEventHandler(events.WithEvents(in))
31+
rr := httptest.NewRecorder()
32+
33+
hh.ServeHTTP(rr, req)
34+
require.Equal(t, http.StatusAccepted, rr.Code)
35+
36+
e := <-in
37+
require.Equal(t, event, e)
38+
}
39+
40+
func TestFilterFunc(t *testing.T) {
41+
event := cloudevents.NewEvent()
42+
event.SetID("test")
43+
event.SetType("test")
44+
event.SetSource("test")
45+
46+
fn := events.FilterFunc("test")
47+
require.True(t, fn(event))
48+
49+
bb := new(bytes.Buffer)
50+
enc := json.NewEncoder(bb).Encode([]cloudevents.Event{event})
51+
require.NoError(t, enc)
52+
53+
req, err := http.NewRequest(http.MethodPost, "/events", bb)
54+
require.NoError(t, err)
55+
56+
in := make(chan cloudevents.Event, 1)
57+
58+
hh := events.NewEventHandler(events.WithEvents(in))
59+
rr := httptest.NewRecorder()
60+
61+
hh.ServeHTTP(rr, req)
62+
require.Equal(t, http.StatusAccepted, rr.Code)
63+
64+
e := <-events.Filter(in, "test")
65+
require.Equal(t, event, e)
66+
}

Diff for: go.mod

+5-1
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ require (
88
github.com/cloudevents/sdk-go v1.2.0
99
github.com/cloudevents/sdk-go/v2 v2.15.2
1010
github.com/go-resty/resty/v2 v2.15.3
11+
github.com/stretchr/testify v1.9.0
1112
github.com/zeiss/carry v1.0.0
12-
github.com/zeiss/pkg v0.1.12
13+
github.com/zeiss/pkg v0.1.17
1314
)
1415

1516
require (
17+
github.com/davecgh/go-spew v1.1.1 // indirect
1618
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
1719
github.com/google/go-cmp v0.6.0 // indirect
1820
github.com/google/go-querystring v1.1.0 // indirect
@@ -22,8 +24,10 @@ require (
2224
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
2325
github.com/modern-go/reflect2 v1.0.2 // indirect
2426
github.com/pkg/errors v0.9.1 // indirect
27+
github.com/pmezard/go-difflib v1.0.0 // indirect
2528
go.opencensus.io v0.24.0 // indirect
2629
go.uber.org/multierr v1.10.0 // indirect
2730
go.uber.org/zap v1.27.0 // indirect
2831
golang.org/x/net v0.30.0 // indirect
32+
gopkg.in/yaml.v3 v3.0.1 // indirect
2933
)

Diff for: go.sum

+10-2
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,12 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv
104104
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
105105
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
106106
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
107+
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
108+
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
107109
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
108110
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
111+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
112+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
109113
github.com/lightstep/tracecontext.go v0.0.0-20181129014701-1757c391b1ac h1:+2b6iGRJe3hvV/yVXrd41yVEjxuFHxasJqDhkIjS4gk=
110114
github.com/lightstep/tracecontext.go v0.0.0-20181129014701-1757c391b1ac/go.mod h1:Frd2bnT3w5FB5q49ENTfVlztJES+1k/7lyWX2+9gq/M=
111115
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
@@ -148,6 +152,8 @@ github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R
148152
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
149153
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
150154
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
155+
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
156+
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
151157
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
152158
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
153159
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -165,8 +171,8 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
165171
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
166172
github.com/zeiss/carry v1.0.0 h1:tdmz4wSPyYFGrhhp1TzpaDoR7aCdSkwBMluXu7CyQno=
167173
github.com/zeiss/carry v1.0.0/go.mod h1:28sinlJ0JnPz51PA3GAUezxnZFwlnKlm5JWCdn4ThlI=
168-
github.com/zeiss/pkg v0.1.12 h1:3fydksqtda+rH5EIFZvVhdriBN2SCTVUwFq6t4khaQk=
169-
github.com/zeiss/pkg v0.1.12/go.mod h1:a/VpZ+rOSIet4wPu3MZVQ/H4h2JXV6hNe8xGUh91+6c=
174+
github.com/zeiss/pkg v0.1.17 h1:rDvBtaRUSD1ypeu66R3UHMtEphPSBaZ52484BQtPEVI=
175+
github.com/zeiss/pkg v0.1.17/go.mod h1:2k/MCcM0p8KiHJMdUG3Rnx90pE7UfzaGd0GIXm6V7/8=
170176
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
171177
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
172178
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
@@ -289,6 +295,8 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks
289295
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
290296
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
291297
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
298+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
299+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
292300
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
293301
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
294302
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=

0 commit comments

Comments
 (0)