diff --git a/errors.go b/errors.go index efc26af..a33d8c3 100644 --- a/errors.go +++ b/errors.go @@ -16,6 +16,8 @@ const ( ErrTypeRegistry ErrType = 1 << 61 // ErrTypeGodim happens in internal godim failure ErrTypeGodim ErrType = 1 << 60 + // ErrTypeEvent happens in internal event switch + ErrTypeEvent ErrType = 1 << 59 // ErrTypeAny for any other kind of errors ErrTypeAny ErrType = 1 << 1 ) diff --git a/event.go b/event.go index 46cb111..c440ce0 100644 --- a/event.go +++ b/event.go @@ -2,76 +2,339 @@ package godim import ( "errors" + "fmt" "log" - "runtime" + "sync" +) + +// EventState will give the status of an event throu the application +type EventState int + +const ( + // ESEmitted means he is currently being processed amongst switch, interceptor and receiver + ESEmitted EventState = iota + // ESResolved means every interceptor and receiver have finished their job on this event + ESResolved + // ESAborted can be set by an interceptor, it will stop event processing + ESAborted + // ESError means there is 1 or more failure during treatment + ESError ) // Event an Event can be transmitted from an Emitter to any Receiver // // they're not yet immutable but needs to be considered as they can be concucrrently accessed +// +// The id is set internally and can be override only if another generator is used. +// +// metadatas can be set through the AddMetadata(key string, value interface{}) and retrieve throu GetMetadata(key string) +// +// an event is locked after "interceptor" phase. it means his metadata cannot be modified throu previous methods +// type Event struct { - Type string - Payload map[string]interface{} + Type string + Payload map[string]interface{} + metadata eventMetadata +} + +type eventMetadata struct { + id uint64 + state EventState + datas map[string]interface{} + lock bool + states map[string]EventState + mu *sync.Mutex + totalListeners int + finalizer EventFinalizer + abortReason string +} + +// GetID Retrieve the id of an event +func (event *Event) GetID() uint64 { + return event.metadata.id +} + +// Abort can be used to abort event dispatching after the current interceptor +// +// a locked event cannot be aborted +func (event *Event) Abort(current EventInterceptor, reason string) { + if event.metadata.lock { + return + } + event.setState(current.Key(), ESAborted) + event.metadata.abortReason = reason +} + +func (event *Event) setID(id uint64) { + event.metadata.id = id +} + +// AddMetadata add a key value pair of metadata to the event +// +// an error will occur if the event is locked or if the key is already set +func (event *Event) AddMetadata(key string, value interface{}) error { + if event.metadata.lock { + return newError(fmt.Errorf("event id %v already lock", event.metadata.id)).SetErrType(ErrTypeEvent) + } + if _, ok := event.metadata.datas[key]; ok { + return newError(fmt.Errorf("unable to set metadata key %s. already setted", key)).SetErrType(ErrTypeEvent) + } + event.metadata.datas[key] = value + return nil +} + +// GetMetadata retrieve a metadata +func (event *Event) GetMetadata(key string) interface{} { + return event.metadata.datas[key] +} + +// GetState return the current state +func (event *Event) GetState() EventState { + return event.metadata.state +} + +// GetStates return a copy of current states +// +// can be accessed by multiple go routine +func (e eventMetadata) GetStates() map[string]EventState { + e.mu.Lock() + states := make(map[string]EventState) + for k, v := range e.states { + states[k] = v + } + e.mu.Unlock() + return states +} + +func (event *Event) setStateIf(key string, ifstate, state EventState) { + e := event.metadata + e.mu.Lock() + if e.states[key] == ifstate { + e.mu.Unlock() + event.setState(key, state) + } else { + e.mu.Unlock() + } +} + +func (event *Event) setState(key string, state EventState) { + event.metadata.mu.Lock() + event.metadata.states[key] = state + nberr := 0 + nbres := 0 + nbemit := 0 + for _, s := range event.metadata.states { + switch s { + case ESAborted: + event.metadata.state = ESAborted + event.metadata.mu.Unlock() + event.runFinalizer() + return + case ESEmitted: + nbemit = nbemit + 1 + // we can stop verifying states as we still have something running + break + case ESResolved: + nbres = nbres + 1 + case ESError: + nberr = nberr + 1 + } + } + if nbemit == 0 && event.metadata.totalListeners == nberr+nbres { + if nberr > 0 { + event.metadata.state = ESError + } else { + event.metadata.state = ESResolved + } + event.metadata.mu.Unlock() + event.runFinalizer() + return + } + event.metadata.mu.Unlock() } // Emitter The base interface of an emitter type Emitter interface { - InitChan(chan *Event) + initEmitter(chan *Event, IDGenerator, EventFinalizer) + prepareRun(map[string]int) Emit(*Event) } // EventEmitter event emitter type EventEmitter struct { - eventChan chan *Event + idGenerator IDGenerator + eventChan chan *Event + listenersCount map[string]int + finalizer EventFinalizer } -// InitChan initialize event channel -func (ee *EventEmitter) InitChan(e chan *Event) { +// initEmitter initialize event channel and id gen +func (ee *EventEmitter) initEmitter(e chan *Event, idGen IDGenerator, f EventFinalizer) { + ee.idGenerator = idGen ee.eventChan = e + ee.finalizer = f +} + +func (ee *EventEmitter) prepareRun(lc map[string]int) { + ee.listenersCount = lc } // Emit emits an event func (ee *EventEmitter) Emit(event *Event) { + event.metadata.id = ee.idGenerator.NextID() + event.metadata.state = ESEmitted + event.metadata.mu = &sync.Mutex{} + event.metadata.datas = make(map[string]interface{}) + event.metadata.totalListeners = ee.listenersCount[event.Type] + event.metadata.states = make(map[string]EventState, event.metadata.totalListeners) + event.metadata.finalizer = ee.finalizer ee.eventChan <- event } -// EventReceiver event receiver +// EventReceiver event receiver is like observer, +// +// all receiveEvent are received in a choregraphic pattern, asynchronously +// +// the HandleEventType filter the events this Receiver wants to receive type EventReceiver interface { - ReceiveEvent(*Event) + Identifier + ReceiveEvent(*Event) error HandleEventTypes() []string } +// EventInterceptor interceptor interface +// +// an Interceptor +// +// - will see all event goes throu intercept method +// +// - can call Abort on an Event +// +// - has a priority : there can't be 2 interceptors at the same priority +type EventInterceptor interface { + Identifier + Intercept(*Event) error + InterceptPriority() int +} + +// EventFinalizer event finalizer can be declared +type EventFinalizer interface { + Finalize(*Event) +} + +// IDGenerator handle id generation +type IDGenerator interface { + SetSeed(uint64) error + NextID() uint64 +} + +type defaultIDGenerator struct { + seed uint64 + mutex *sync.Mutex + next uint64 +} + +func (dig *defaultIDGenerator) SetSeed(seed uint64) error { + if dig.seed != 0 { + return newError(errors.New("default generator seed can be changed only once")) + } + dig.seed = seed + return nil +} + +func (dig *defaultIDGenerator) NextID() uint64 { + dig.mutex.Lock() + next := dig.next + dig.next = dig.next + 1 + dig.mutex.Unlock() + return next +} + +func newGenerator() IDGenerator { + dig := &defaultIDGenerator{ + seed: 0, + mutex: &sync.Mutex{}, + next: 0, + } + return dig +} + // EventSwitch this is not a hub, we want a switch type EventSwitch struct { - mainChain chan *Event - close chan struct{} - receivers map[string][]EventReceiver - running bool + mainChain chan *Event + close chan struct{} + emitters []Emitter + receivers map[string][]EventReceiver + interceptors map[int]EventInterceptor + listenerCount map[string]int + orderedIntercept []int + running bool + hasInterceptor bool + idGenerator IDGenerator + eventFinalizer EventFinalizer } // NewEventSwitch build a new event switch func NewEventSwitch(bufferSize int) *EventSwitch { return &EventSwitch{ - mainChain: make(chan *Event, bufferSize), - close: make(chan struct{}), - receivers: make(map[string][]EventReceiver), - running: false, + mainChain: make(chan *Event, bufferSize), + close: make(chan struct{}), + receivers: make(map[string][]EventReceiver), + interceptors: make(map[int]EventInterceptor), + listenerCount: make(map[string]int), + orderedIntercept: nil, + running: false, + hasInterceptor: true, + idGenerator: newGenerator(), + eventFinalizer: nil, + } +} + +// WithIDGenerator change the id generator used. +// +// if the switch is running, do nothing +func (es *EventSwitch) WithIDGenerator(idg IDGenerator) *EventSwitch { + if es.running { + return es + } + es.idGenerator = idg + return es +} + +// WithSeed change the seed of the generator +// +// if the switch is running, do nothing +func (es *EventSwitch) WithSeed(seed uint64) *EventSwitch { + if es.running { + return es + } + err := es.idGenerator.SetSeed(seed) + if err != nil { + log.Println("[Godim EventSwitch]Trying to change the seed with a running switch") } + return es +} + +// WithEventFinalizer declare an event finalizer that will be called at the end of an event management +func (es *EventSwitch) WithEventFinalizer(f EventFinalizer) *EventSwitch { + es.eventFinalizer = f + return es } // AddEmitter add an emitter func (es *EventSwitch) AddEmitter(e Emitter) error { if es.running { - return errors.New("Can't add an emitter on the fly yet") + return newError(errors.New("Can't add an emitter on the fly yet")).SetErrType(ErrTypeEvent) } - e.InitChan(es.mainChain) + e.initEmitter(es.mainChain, es.idGenerator, es.eventFinalizer) + es.emitters = append(es.emitters, e) return nil } // AddReceiver add a receiver func (es *EventSwitch) AddReceiver(e EventReceiver) error { if es.running { - return errors.New("Can't add a receiver on the fly yet") + return newError(errors.New("Can't add a receiver on the fly yet")).SetErrType(ErrTypeEvent) } for _, et := range e.HandleEventTypes() { es.receivers[et] = append(es.receivers[et], e) @@ -79,11 +342,38 @@ func (es *EventSwitch) AddReceiver(e EventReceiver) error { return nil } +// AddInterceptor add an interceptor. Interceptor must be prioritized through InterceptPriority(), only one interceptor can run at the same priority +func (es *EventSwitch) AddInterceptor(e EventInterceptor) error { + if es.running { + return newError(errors.New("Can't add an interceptor on the fly")).SetErrType(ErrTypeEvent) + } + if _, ok := es.interceptors[e.InterceptPriority()]; ok { + return newError(fmt.Errorf("another interceptor already declared on priority %v", e.InterceptPriority())).SetErrType(ErrTypeEvent) + } + es.interceptors[e.InterceptPriority()] = e + es.hasInterceptor = true + return nil +} + // Start will start the event switch func (es *EventSwitch) Start() { if es.running { return } + nbInterceptor := 0 + if es.hasInterceptor { + es.orderedIntercept = make([]int, len(es.interceptors)) + for prio := range es.interceptors { + es.orderedIntercept[nbInterceptor] = prio + nbInterceptor++ + } + } + for k, v := range es.receivers { + es.listenerCount[k] = nbInterceptor + len(v) + } + for _, e := range es.emitters { + e.prepareRun(es.listenerCount) + } es.running = true go es.run() } @@ -107,12 +397,14 @@ func (es *EventSwitch) Close() { func (es *EventSwitch) run() { for { select { - case event := <-es.mainChain: + case event, valid := <-es.mainChain: if !es.running { break } + if !valid { + break + } go es.switchEvent(event) - continue case <-es.close: return } @@ -120,24 +412,67 @@ func (es *EventSwitch) run() { } func (es *EventSwitch) switchEvent(event *Event) { + if es.hasInterceptor { + for _, prio := range es.orderedIntercept { + interceptor := es.interceptors[prio] + es.runInterceptorWithRecover(interceptor, event) + if event.metadata.state == ESAborted { + log.Printf("[Godim EventSwitch] aborting event id [%v]\n", event.metadata.id) + return + } + } + } + event.metadata.lock = true rs, ok := es.receivers[event.Type] if ok { for _, receiver := range rs { go es.runObserverWithRecover(receiver, event) } - } else { - log.Println("an event is declared with no subscribers :", event.Type) } } func (es *EventSwitch) runObserverWithRecover(receiver EventReceiver, event *Event) { - defer func() { - if rec := recover(); rec != nil { - const size = 64 << 10 - buffer := make([]byte, size) - buffer = buffer[:runtime.Stack(buffer, false)] - log.Printf("cron: panic running job: %v\n%s", rec, buffer) - } - }() - receiver.ReceiveEvent(event) + event.setState(receiver.Key(), ESEmitted) + defer internalRecover(event, receiver) + err := receiver.ReceiveEvent(event) + if err != nil { + event.setState(receiver.Key(), ESError) + } +} + +func (es *EventSwitch) runInterceptorWithRecover(interceptor EventInterceptor, event *Event) { + event.setState(interceptor.Key(), ESEmitted) + defer internalRecover(event, interceptor) + err := interceptor.Intercept(event) + if err != nil { + event.setState(interceptor.Key(), ESError) + } +} + +func internalRecover(event *Event, identifier Identifier) { + if rec := recover(); rec != nil { + event.setState(identifier.Key(), ESError) + dumpRec(rec, "godim: panic receiving event") + } else { + event.setStateIf(identifier.Key(), ESEmitted, ESResolved) + } +} + +func dumpRec(rec interface{}, msg string) { + // const size = 64 << 10 + // buffer := make([]byte, size) + // buffer = buffer[:runtime.Stack(buffer, false)] + // log.Printf("%s: %v\n%s", msg, rec, buffer) + log.Printf("%s: %v\n", msg, rec) +} + +func (event *Event) runFinalizer() { + if event.metadata.finalizer != nil { + defer func() { + if rec := recover(); rec != nil { + dumpRec(rec, "godim: panic finalizing event") + } + }() + event.metadata.finalizer.Finalize(event) + } } diff --git a/event_test.go b/event_test.go index 8bd9008..938b2b9 100644 --- a/event_test.go +++ b/event_test.go @@ -1,8 +1,8 @@ package godim import ( - "fmt" "log" + "sync" "testing" "time" ) @@ -11,50 +11,14 @@ type Testeur struct { EventEmitter } -func TestEmitter(t *testing.T) { - tester := new(Testeur) - myChan := make(chan *Event, 100) - tester.eventChan = myChan - tester2 := new(Testeur) - tester2.eventChan = myChan - - results := make(map[string]*Event) - - go func() { - for { - select { - case event := <-myChan: - go managerEvent(results, event) - } - } - }() - for i := 0; i < 10; i++ { - e := &Event{ - Type: fmt.Sprintln("my ", i), - } - tester.Emit(e) - time.Sleep(2 * time.Millisecond) - e2 := &Event{ - Type: fmt.Sprintln("sec ", i), - } - tester2.Emit(e2) - time.Sleep(2 * time.Millisecond) - } - - time.Sleep(1 * time.Second) - if len(results) != 20 { - t.Fatal("not the right number of events : ", len(results)) - } -} - -func managerEvent(results map[string]*Event, e *Event) { - log.Println("Event received : ", e.Type) - results[e.Type] = e - -} - type Rec1 struct { nbReceived int + ids map[uint64]bool + mu *sync.Mutex +} + +func (rec1 *Rec1) Key() string { + return "Rec1" } func (rec1 *Rec1) HandleEventTypes() []string { @@ -64,8 +28,12 @@ func (rec1 *Rec1) HandleEventTypes() []string { } } -func (rec1 *Rec1) ReceiveEvent(e *Event) { +func (rec1 *Rec1) ReceiveEvent(e *Event) error { + rec1.mu.Lock() + rec1.ids[e.metadata.id] = true + rec1.mu.Unlock() rec1.nbReceived = rec1.nbReceived + 1 + return nil } func TestSwitch(t *testing.T) { @@ -73,6 +41,8 @@ func TestSwitch(t *testing.T) { t1 := new(Testeur) es.AddEmitter(t1) r1 := new(Rec1) + r1.ids = make(map[uint64]bool) + r1.mu = &sync.Mutex{} es.AddReceiver(r1) es.Start() @@ -88,9 +58,214 @@ func TestSwitch(t *testing.T) { if r1.nbReceived != 100 { t.Fatal("Received events : ", r1.nbReceived) } + if len(r1.ids) != 100 { + t.Fatal("received ids ", len(r1.ids)) + } es.Close() if es.running { t.Fatal("switch still running") } } + +type Panicker struct { +} + +func (rec1 *Panicker) Key() string { + return "Panicker" +} + +func (rec1 *Panicker) HandleEventTypes() []string { + return []string{ + "a", + "b", + } +} + +func (rec1 *Panicker) ReceiveEvent(e *Event) error { + panic("this is my role") + +} + +func TestPanic(t *testing.T) { + es := NewEventSwitch(10) + t1 := new(Testeur) + es.AddEmitter(t1) + r1 := new(Rec1) + r1.ids = make(map[uint64]bool) + r1.mu = &sync.Mutex{} + es.AddReceiver(r1) + r2 := new(Panicker) + es.AddReceiver(r2) + + es.Start() + e := &Event{ + Type: "a", + } + t1.Emit(e) + time.Sleep(2 * time.Millisecond) + if r1.nbReceived != 1 { + t.Fatal("event not received during a riot") + } + f := &Event{ + Type: "b", + } + t1.Emit(f) + time.Sleep(2 * time.Millisecond) + if r1.nbReceived != 2 { + t.Fatal("event not received during a riot") + } + if f.metadata.states["Panicker"] != ESError { + t.Fatal("state not correct", f.metadata.states["Panicker"]) + } +} + +type Producer struct { + EventEmitter + nbEmitted int + typ string + closed chan struct{} +} + +func (p *Producer) EmitWhile() { + for { + payload := make(map[string]interface{}) + p.nbEmitted = p.nbEmitted + 1 + payload["nb"] = p.nbEmitted + e := &Event{ + Type: p.typ, + Payload: payload, + } + p.Emit(e) + time.Sleep(3 * time.Millisecond) + var ok bool + select { + case <-p.closed: + ok = true + // log.Println("closing producer") + default: + ok = false + } + if ok { + break + } + } +} + +type GlobalRec struct { + mu *sync.Mutex + received int + typ string +} + +func (gr *GlobalRec) Key() string { + return "GR" +} + +func (gr *GlobalRec) HandleEventTypes() []string { + return []string{ + gr.typ, + } +} + +func (gr *GlobalRec) ReceiveEvent(e *Event) error { + gr.mu.Lock() + if e.GetID()%10 == 0 { + log.Println("should not have received this event") + } + gr.received = gr.received + 1 + gr.mu.Unlock() + return nil +} + +type Inter struct { + received int + mu *sync.Mutex + aborted int +} + +func (i *Inter) Key() string { + return "inter" +} +func (i *Inter) InterceptPriority() int { + return -1 +} +func (i *Inter) Intercept(e *Event) error { + i.mu.Lock() + i.received = i.received + 1 + + if e.GetID()%10 == 0 { + e.Abort(i, "abort every 10") + i.aborted = i.aborted + 1 + } + i.mu.Unlock() + return nil +} + +type Fin struct { + nb int + mu *sync.Mutex +} + +func (f *Fin) Finalize(e *Event) { + f.mu.Lock() + f.nb = f.nb + 1 + f.mu.Unlock() +} + +func TestMassiveEvents(t *testing.T) { + nbProducer := 50 + nbReceiver := 10 + producers := make(map[int]*Producer, nbProducer) + fin := &Fin{mu: &sync.Mutex{}} + es := NewEventSwitch(10).WithEventFinalizer(fin) + + for i := 0; i < nbProducer; i++ { + p := &Producer{ + closed: make(chan struct{}), + typ: "a", + } + es.AddEmitter(p) + producers[i] = p + } + receivers := make(map[int]*GlobalRec, nbReceiver) + gr := &GlobalRec{ + typ: "a", + mu: &sync.Mutex{}, + } + receivers[0] = gr + es.AddReceiver(gr) + in := &Inter{ + mu: &sync.Mutex{}, + } + es.AddInterceptor(in) + + es.Start() + + for i := 0; i < nbProducer; i++ { + go producers[i].EmitWhile() + } + + time.Sleep(10 * time.Millisecond) + for i := 0; i < nbProducer; i++ { + producers[i].closed <- struct{}{} + } + es.Stop() + time.Sleep(1 * time.Millisecond) + total := 0 + for i := 0; i < nbProducer; i++ { + total = total + producers[i].nbEmitted + } + if fin.nb != total { + t.Fatal("finalize did not received all revents ", fin.nb, "-", total) + } + if in.received != total { + t.Fatal("Multiproducers failed : ", gr.received, " - ", total) + } + if gr.received == total { + t.Fatal("should have some aborted event") + } + if gr.received+in.aborted != total { + t.Fatal("total must equal to received and aborted") + } +} diff --git a/godim_test.go b/godim_test.go index ee1617c..4ec5dd5 100644 --- a/godim_test.go +++ b/godim_test.go @@ -176,6 +176,10 @@ type RA struct { nbReceived int } +func (ra *RA) Key() string { + return "RA" +} + func (ra *RA) OnInit() error { ra.nbReceived = 0 return nil @@ -186,8 +190,26 @@ func (ra *RA) HandleEventTypes() []string { "aa", } } -func (ra *RA) ReceiveEvent(e *Event) { +func (ra *RA) ReceiveEvent(e *Event) error { ra.nbReceived = ra.nbReceived + 1 + return nil +} + +type IA struct { + nbIntercept int +} + +func (ia *IA) Key() string { + return "IA" +} + +func (ia *IA) InterceptPriority() int { + return -10 +} + +func (ia *IA) Intercept(e *Event) error { + ia.nbIntercept = ia.nbIntercept + 1 + return nil } func TestWithEventSwitch(t *testing.T) { @@ -197,7 +219,8 @@ func TestWithEventSwitch(t *testing.T) { Build() ea := new(EA) ra := new(RA) - err := g.Declare("service", ea, ra) + ia := new(IA) + err := g.Declare("service", ea, ra, ia) if err != nil { t.Fatal("Error while declaring service:", err) } @@ -216,6 +239,9 @@ func TestWithEventSwitch(t *testing.T) { if ra.nbReceived != 100 { t.Fatal("wrong number of event received", ra.nbReceived) } + if ia.nbIntercept != 100 { + t.Fatal("wrong number of event intercepted", ia.nbIntercept) + } g.CloseApp() time.Sleep(1 * time.Millisecond) if g.eventSwitch.running { diff --git a/registry.go b/registry.go index 2851e68..9bb9996 100644 --- a/registry.go +++ b/registry.go @@ -100,12 +100,13 @@ func (registry *Registry) declare(label string, o interface{}) error { } var ( - initType = reflect.TypeOf((*Initializer)(nil)).Elem() - closeType = reflect.TypeOf((*Closer)(nil)).Elem() - keyType = reflect.TypeOf((*Identifier)(nil)).Elem() - prioType = reflect.TypeOf((*Prioritizer)(nil)).Elem() - emitType = reflect.TypeOf((*Emitter)(nil)).Elem() - recType = reflect.TypeOf((*EventReceiver)(nil)).Elem() + initType = reflect.TypeOf((*Initializer)(nil)).Elem() + closeType = reflect.TypeOf((*Closer)(nil)).Elem() + keyType = reflect.TypeOf((*Identifier)(nil)).Elem() + prioType = reflect.TypeOf((*Prioritizer)(nil)).Elem() + emitType = reflect.TypeOf((*Emitter)(nil)).Elem() + recType = reflect.TypeOf((*EventReceiver)(nil)).Elem() + interceptType = reflect.TypeOf((*EventInterceptor)(nil)).Elem() ) func getKey(typ reflect.Type, o interface{}) string { @@ -151,6 +152,9 @@ func (registry *Registry) declareInterfaces(o interface{}, typ reflect.Type) err if ptyp.Implements(recType) { registry.eventSwitch.AddReceiver(o.(EventReceiver)) } + if ptyp.Implements(interceptType) { + registry.eventSwitch.AddInterceptor(o.(EventInterceptor)) + } } return nil }