Skip to content

Commit

Permalink
refactor: added an injestor
Browse files Browse the repository at this point in the history
  • Loading branch information
sean9999 committed Mar 18, 2023
1 parent 254d634 commit f05316f
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 10 deletions.
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ func main() {
//rebecca := rebouncer.NewInotify(*watchDir, *flushPeriod)

rebecca := rebouncer.New(rebouncer.Config{
BufferSize: 1024,
BufferSize: rebouncer.DefaultBufferSize,
Quantizer: rebouncer.DefaultInotifyQuantizer(*flushPeriod),
Reducer: rebouncer.DefaultInotifyReducer,
Injestor: rebouncer.DefaultInotifyInjestor(*watchDir, rebouncer.DefaultBufferSize),
})
go rebecca.WatchDir(*watchDir)

Expand Down
24 changes: 24 additions & 0 deletions injest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package rebouncer

import "github.com/rjeczalik/notify"

// listens on a channel for events, formats those events as NiceEvents, and send them along as such
type Injestor func() chan NiceEvent

func DefaultInotifyInjestor(dir string, bufferSize int) Injestor {
var niceChan = make(chan NiceEvent, bufferSize)
var fsEvents = make(chan notify.EventInfo, bufferSize)
jester := func() chan NiceEvent {
err := notify.Watch(dir+"/...", fsEvents, WatchMask)
if err != nil {
panic(err)
}
go func() {
for fsEvent := range fsEvents {
niceChan <- NotifyToNiceEvent(fsEvent, dir)
}
}()
return niceChan
}
return jester
}
5 changes: 3 additions & 2 deletions inotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ func NotifyToNiceEvent(ei notify.EventInfo, path string) NiceEvent {
// launch an inotify process that converts to NiceEvents and appends to machinery.batch
func (m *machinery) WatchDir(dir string) {
var fsEvents = make(chan notify.EventInfo, DefaultBufferSize)
err := notify.Watch(dir+"/...", fsEvents, notify.All)
err := notify.Watch(dir+"/...", fsEvents, WatchMask)
if err != nil {
panic(err)
}
go func() {
for fsEvent := range fsEvents {
//m.batch = append(m.batch, NotifyToNiceEvent(fsEvent, dir))
m.Injest(NotifyToNiceEvent(fsEvent, dir))
m.Push(NotifyToNiceEvent(fsEvent, dir))
}
}()
}
Expand All @@ -81,6 +81,7 @@ func NewInotify(dir string, bouncePeriod int) StateMachine {
rebel := New(Config{
BufferSize: 1024,
Quantizer: DefaultInotifyQuantizer(bouncePeriod),
Reducer: DefaultInotifyReducer,
})
go rebel.WatchDir(dir)
return rebel
Expand Down
24 changes: 17 additions & 7 deletions statemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ type StateMachine interface {
Info() map[string]any
WatchDir(string)
Emit()
Injest(NiceEvent)
Push(NiceEvent)
Quantize(chan bool, *[]NiceEvent)
Reduce([]NiceEvent) []NiceEvent
}

type userFunctions struct {
quantizer Quantizer
reducer Reducer
injestor Injestor
}

// pointer to machinery implements StateMachine
Expand All @@ -37,6 +38,7 @@ type Config struct {
BufferSize int
Quantizer Quantizer
Reducer Reducer
Injestor Injestor
}

// The canonical way to create a new StateMachine
Expand All @@ -50,10 +52,20 @@ func New(config Config) StateMachine {
user: userFunctions{
quantizer: config.Quantizer,
reducer: config.Reducer,
injestor: config.Injestor,
},
}

// Emit() whenever we get true on readyChan
incomingEvents := m.user.injestor()

// listen to events emitted by Injestor
go func() {
for inEvent := range incomingEvents {
m.Push(inEvent)
}
}()

// Emit() whenever we get a 'true' value on readyChan
go func() {
for isReady := range m.readyChan {
if isReady {
Expand All @@ -76,20 +88,17 @@ func (m *machinery) Reduce(inEvents []NiceEvent) []NiceEvent {
return outEvents
}

// Injest takes a NiceEvent and either appends it to batchMap or ignores it
// Push takes a NiceEvent and either appends it to batchMap or ignores it
//
// Additionally, it decides whether to call Emit() or not
func (m *machinery) Injest(newEvent NiceEvent) {

func (m *machinery) Push(newEvent NiceEvent) {
m.batchArray = append(m.batchArray, newEvent)
m.batchArray = m.Reduce(m.batchArray)
go m.Quantize(m.readyChan, &m.batchArray)

}

// Quantize runs after Injest() and decides whether or not to call Emit()
func (m *machinery) Quantize(readyChannel chan bool, em *[]NiceEvent) {
fmt.Println("Quantize()")
fn := m.user.quantizer
go fn(readyChannel, em)
}
Expand All @@ -111,6 +120,7 @@ func (m *machinery) Version() string {
func (m *machinery) Info() map[string]any {
r := map[string]any{
"bufferSize": m.bufferSize,
"version": m.Version(),
}
return r
}

0 comments on commit f05316f

Please sign in to comment.