Skip to content

Commit

Permalink
Simplified New* functions and added close to channel piper
Browse files Browse the repository at this point in the history
  • Loading branch information
migueleliasweb committed Jul 1, 2022
1 parent 262fe9e commit 5debbe4
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 9 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func runMetricsEndpoint() {
}

func main() {
pv := peevee.NewPeeVee("myWorkChannel", WithPromMetrics[int]())
pv := peevee.New("myWorkChannel", WithPromMetrics[int]())

// mimic an asynchronous channel writer
go func() {
Expand Down
4 changes: 3 additions & 1 deletion pkg/pv/channelpiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type CallbackChannelPiper[T any] struct {
callback func(T)
}

// Pipe pipe items between channels
// Pipe pipes items between the read and write channels
func (ccp *CallbackChannelPiper[T]) Pipe() {
for item := range ccp.WriteChan {
if ccp.callback != nil {
Expand All @@ -28,4 +28,6 @@ func (ccp *CallbackChannelPiper[T]) Pipe() {

ccp.ReadChan <- item
}

close(ccp.ReadChan)
}
4 changes: 2 additions & 2 deletions pkg/pv/channelpiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func TestCallbackChannelPiper(t *testing.T) {
pv := NewPeeVee("myintpeevee", WithCallback(func(i int) {}))
pv := New("myintpeevee", WithCallback(func(i int) {}))

go func() {
pv.GetWritableChan() <- 1234
Expand All @@ -28,7 +28,7 @@ func TestCallbackChannelPiper(t *testing.T) {
}

func TestDefaultChannelPiper(t *testing.T) {
pv := NewPeeVee[int]("myintpeevee")
pv := New[int]("myintpeevee")

go func() {
pv.GetWritableChan() <- 1234
Expand Down
2 changes: 1 addition & 1 deletion pkg/pv/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package peevee
// )

// func TestWithMetricsWriter(t *testing.T) {
// NewPeeVee("myintpeevee", WithMetricsWriter[int](WithMetricsWriterConfig{
// New("myintpeevee", WithMetricsWriter[int](WithMetricsWriterConfig{
// Writer: os.Stdout,
// }))
// }
4 changes: 2 additions & 2 deletions pkg/pv/prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
)

func TestWithPromMetricsSimple(t *testing.T) {
NewPeeVee("myintpeevee", WithPromMetrics[int]())
New("myintpeevee", WithPromMetrics[int]())
}

func TestWithPromMetrics(t *testing.T) {
r := prometheus.NewRegistry()

pv := NewPeeVee("myintpeevee", WithPromMetrics[int](WithPromMetricsConfig{
pv := New("myintpeevee", WithPromMetrics[int](WithPromMetricsConfig{
UseRegisterer: r,
}))

Expand Down
4 changes: 2 additions & 2 deletions pkg/pv/pv.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func (pv *PeeVee[T]) GetWritableChan() chan<- T {
return pv.writeChan
}

//NewPeeVee Configures and returns a new PeeVee
func NewPeeVee[T any](PVname string, ops ...PVOptions[T]) PeeVee[T] {
//New Configures and returns a new PeeVee
func New[T any](PVname string, ops ...PVOptions[T]) PeeVee[T] {
pv := PeeVee[T]{
Name: PVname,
readChan: make(chan T),
Expand Down
41 changes: 41 additions & 0 deletions pkg/pv/pv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,44 @@ func TestNewPeeVeeReaderWrap(t *testing.T) {
}
}
}

func TestNewPeeVeeReaderWrapWithClose(t *testing.T) {
// this is a channel that gets returned
// from an SDK or a third party lib for example
channelYouDontControl := make(chan bool)

go func() {
// this would be the third party code sending items
// to the channel
channelYouDontControl <- true

// and this is the channel being closed by the same third party code
close(channelYouDontControl)
}()

pv := NewReaderWrap(
"boolwrap",
channelYouDontControl,
)

for {
select {
case i, more := <-pv.GetReadableChan():
// now, we purposedly don't return
// and instead stay in the loop waiting for the channel
// to be closed
if !more {
return
}

if i != true {
t.Error("got wrong item from channel:", i)
t.FailNow()
}

case <-time.After(time.Second * 5):
t.Error("timeout reading from channel")
t.FailNow()
}
}
}

0 comments on commit 5debbe4

Please sign in to comment.