Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
paulolimarb committed Nov 14, 2019
0 parents commit a31d871
Show file tree
Hide file tree
Showing 23 changed files with 1,650 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Created by .ignore support plugin (hsz.mobi)

.idea/**
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Splunk Writer

TODO
129 changes: 129 additions & 0 deletions buffer/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package buffer

import (
"fmt"
"sync"
"time"
)

const (
DefaultCapacity = 100
DefaultOnWait = 100
DefaultExpiration = 60000
DefaultBackoff = 10000
)

type Buffer interface {
Write(item interface{})
}

type buffer struct {
sync.Locker
cap int
size int
expiration time.Duration
chunks chan entry
items []interface{}
backoff time.Duration
}

func (b *buffer) Write(item interface{}) {
b.Lock()
defer b.Unlock()
b.items[b.size] = item
b.size++
if b.size >= b.cap {
b.clear()
}
}

func (b *buffer) clear() {
if b.size > 0 {
events := b.items[:b.size]
b.size = 0
b.items = make([]interface{}, b.cap)
go func() {
b.chunks <- entry{
items: events,
retries: cap(b.chunks),
}
}()
}
}

func (b *buffer) watcher() {
defer func() {
err := recover()
if err != nil {
fmt.Printf("%v\n", err)
}
}()
for {
time.Sleep(b.expiration)
b.Lock()
b.clear()
b.Unlock()
}
}

type Config struct {
Cap int
OnWait int
Expiration time.Duration
BackOff time.Duration
OnOverflow func([]interface{}) error
}

type entry struct {
items []interface{}
retries int
}

func New(c Config) Buffer {
if c.Cap == 0 {
c.Cap = DefaultCapacity
}
if c.Expiration == 0 {
c.Expiration = DefaultExpiration
}
if c.BackOff == 0 {
c.BackOff = DefaultBackoff
}
if c.OnWait == 0 {
c.OnWait = DefaultOnWait
}

b := &buffer{
Locker: &sync.Mutex{},
size: 0,
cap: c.Cap,
expiration: c.Expiration,
chunks: make(chan entry, c.OnWait),
items: make([]interface{}, c.Cap),
backoff: c.BackOff,
}
go b.watcher()
go b.consumer(c)
return b
}

func (b *buffer) consumer(c Config) {
defer func() {
err := recover()
if err != nil {
fmt.Printf("%v\n", err)
}
}()
for events := range b.chunks {
err := c.OnOverflow(events.items)
if err != nil {
go func(events entry) {
events.retries--
if events.retries >= 0 {
time.Sleep(b.backoff)
b.chunks <- events
}
}(events)
}
}
}
180 changes: 180 additions & 0 deletions buffer/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// +build unit

package buffer

import (
"errors"
"github.com/stretchr/testify/assert"
"sync"
"testing"
"time"
)

func TestBuffer_Write(t *testing.T) {
t.Parallel()
t.Run("when the buffer is not full", func(t *testing.T) {
t.Parallel()
is := assert.New(t)
subject := &buffer{
Locker: &sync.Mutex{},
size: 0,
cap: 10,
items: make([]interface{}, 10),
chunks: make(chan []interface{}, 10),
}
subject.Write("something")
is.Equal(1, subject.size, "it should increment the size of the buffer in one unit")
is.Equal([]interface{}{"something", nil, nil, nil, nil, nil, nil, nil, nil, nil}, subject.items, "it should change the buffer's inner slice")
})
t.Run("when the buffer is full", func(t *testing.T) {
t.Parallel()
is := assert.New(t)
subject := &buffer{
Locker: &sync.Mutex{},
size: 0,
cap: 1,
items: make([]interface{}, 1),
chunks: make(chan []interface{}, 10),
}
subject.Write("something")
is.Equal(0, subject.size, "it should remain zero")
is.Equal([]interface{}{nil}, subject.items, "it should clean the buffer's inner slice")
timeout := time.NewTimer(10 * time.Millisecond)
select {
case actual := <-subject.chunks:
is.Equal([]interface{}{"something"}, actual, "it should read the expected slice")
case <-timeout.C:
is.Fail("nothing was published")
}
})
}

func TestNew(t *testing.T) {
t.Parallel()
t.Run("when the buffer expires", func(t *testing.T) {
t.Parallel()
t.Run("but the consumer returns an error", func(t *testing.T) {
t.Parallel()
is := assert.New(t)
called := make(chan []interface{}, 1)
err := errors.New("")
subject := New(Config{
OnOverflow: func(items []interface{}) error {
called <- items
err1 := err
err = nil
return err1
},
BackOff: 10 * time.Millisecond,
Expiration: 10 * time.Millisecond,
Cap: 10,
OnWait: 10,
})
subject.Write(1)
subject.Write(2)
subject.Write(3)
timeout := time.NewTimer(20 * time.Millisecond)
select {
case items := <-called:
is.Equal([]interface{}{1, 2, 3}, items, "it should return the expected array")
case <-timeout.C:
is.Fail("nothing was published")
}
timeout = time.NewTimer(20 * time.Millisecond)
select {
case items := <-called:
is.Equal([]interface{}{1, 2, 3}, items, "it should return the expected array")
case <-timeout.C:
is.Fail("nothing was published")
}
})
t.Run("and the consumer returns no error", func(t *testing.T) {
t.Parallel()
is := assert.New(t)
called := make(chan []interface{}, 1)
subject := New(Config{
OnOverflow: func(items []interface{}) error {
called <- items
return nil
},
BackOff: 10 * time.Millisecond,
Expiration: 10 * time.Millisecond,
Cap: 10,
OnWait: 10,
})
subject.Write(1)
subject.Write(2)
subject.Write(3)
timeout := time.NewTimer(20 * time.Millisecond)
select {
case items := <-called:
is.Equal([]interface{}{1, 2, 3}, items, "it should return the expected array")
case <-timeout.C:
is.Fail("nothing was published")
}
})

})
t.Run("when the buffer overflow", func(t *testing.T) {
t.Run("but the consumer returns an error", func(t *testing.T) {
t.Parallel()
is := assert.New(t)
called := make(chan []interface{}, 1)
err := errors.New("")
subject := New(Config{
OnOverflow: func(items []interface{}) error {
called <- items
err1 := err
err = nil
return err1
},
BackOff: 10 * time.Millisecond,
Expiration: 100 * time.Millisecond,
Cap: 3,
OnWait: 10,
})
subject.Write(1)
subject.Write(2)
subject.Write(3)
timeout := time.NewTimer(20 * time.Millisecond)
select {
case items := <-called:
is.Equal([]interface{}{1, 2, 3}, items, "it should return the expected array")
case <-timeout.C:
is.Fail("nothing was published")
}
timeout = time.NewTimer(20 * time.Millisecond)
select {
case items := <-called:
is.Equal([]interface{}{1, 2, 3}, items, "it should return the expected array")
case <-timeout.C:
is.Fail("nothing was published")
}
})
t.Run("and the consumer returns no error", func(t *testing.T) {
t.Parallel()
is := assert.New(t)
called := make(chan []interface{}, 1)
subject := New(Config{
OnOverflow: func(items []interface{}) error {
called <- items
return nil
},
BackOff: 10 * time.Millisecond,
Expiration: 100 * time.Millisecond,
Cap: 3,
OnWait: 10,
})
subject.Write(1)
subject.Write(2)
subject.Write(3)
timeout := time.NewTimer(20 * time.Millisecond)
select {
case items := <-called:
is.Equal([]interface{}{1, 2, 3}, items, "it should return the expected array")
case <-timeout.C:
is.Fail("nothing was published")
}
})
})
}
11 changes: 11 additions & 0 deletions buffer/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package buffer

import "github.com/stretchr/testify/mock"

type Mock struct {
mock.Mock
}

func (m *Mock) Write(item interface{}) {
m.Called(item)
}
66 changes: 66 additions & 0 deletions entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package splunk

import (
"fmt"
"reflect"
)

type Entry map[string]interface{}

func (log Entry) Add(name string, value interface{}) Entry {
log[name] = value
return log
}

func NewEntry(p ...interface{}) Entry {
if len(p) == 1 {
c, ok := p[0].([]interface{})
if ok {
p = c
}
}
normalized := Entry{}
for _, item := range p {
if item == nil {
continue
}
itemType := reflect.TypeOf(item)
v := reflect.ValueOf(item)
inner := Entry{}
switch itemType.Kind() {
case reflect.Map:
for _, key := range v.MapKeys() {
inner[fmt.Sprint(key.Interface())] = v.MapIndex(key).Interface()
}
case reflect.Ptr, reflect.Interface:
if !v.IsNil() {
inner = NewEntry(v.Elem().Interface())
}
default:
inner[itemType.Name()] = item
}
normalized = Merge(normalized, inner)
}
return normalized
}

func Merge(np Entry, other Entry) Entry {
collisions := map[string]int{}
r := Entry{}
for key, value := range np {
r[key] = value
}
for key, value := range other {
if _, ok := r[key]; ok {
var index int
if index, ok = collisions[key]; !ok {
index = 0
}
index += 1
collisions[key] = index
key = fmt.Sprintf("%s%d", key, index)
}
r[key] = value
}
return r
}
Loading

0 comments on commit a31d871

Please sign in to comment.