From 7ab8ca65a579e9e31922cfdd3456a336a0d2d912 Mon Sep 17 00:00:00 2001 From: Nicolas Prud'homme Date: Sat, 2 Mar 2019 21:02:26 +0100 Subject: [PATCH] improve event switching with metadatas around events introduce interceptor for the orchestration part and left receiver for choregraphy this version can allow replay of events events can abort during orchestration phase --- errors.go | 2 + event.go | 401 +++++++++++++++++++++++++++++++++++++++++++++----- event_test.go | 263 +++++++++++++++++++++++++++------ godim_test.go | 30 +++- registry.go | 16 +- 5 files changed, 627 insertions(+), 85 deletions(-) diff --git a/errors.go b/errors.go index efc26af..a33d8c3 100644 --- a/errors.go +++ b/errors.go @@ -16,6 +16,8 @@ const ( ErrTypeRegistry ErrType = 1 << 61 // ErrTypeGodim happens in internal godim failure ErrTypeGodim ErrType = 1 << 60 + // ErrTypeEvent happens in internal event switch + ErrTypeEvent ErrType = 1 << 59 // ErrTypeAny for any other kind of errors ErrTypeAny ErrType = 1 << 1 ) diff --git a/event.go b/event.go index 46cb111..c440ce0 100644 --- a/event.go +++ b/event.go @@ -2,76 +2,339 @@ package godim import ( "errors" + "fmt" "log" - "runtime" + "sync" +) + +// EventState will give the status of an event throu the application +type EventState int + +const ( + // ESEmitted means he is currently being processed amongst switch, interceptor and receiver + ESEmitted EventState = iota + // ESResolved means every interceptor and receiver have finished their job on this event + ESResolved + // ESAborted can be set by an interceptor, it will stop event processing + ESAborted + // ESError means there is 1 or more failure during treatment + ESError ) // Event an Event can be transmitted from an Emitter to any Receiver // // they're not yet immutable but needs to be considered as they can be concucrrently accessed +// +// The id is set internally and can be override only if another generator is used. +// +// metadatas can be set through the AddMetadata(key string, value interface{}) and retrieve throu GetMetadata(key string) +// +// an event is locked after "interceptor" phase. it means his metadata cannot be modified throu previous methods +// type Event struct { - Type string - Payload map[string]interface{} + Type string + Payload map[string]interface{} + metadata eventMetadata +} + +type eventMetadata struct { + id uint64 + state EventState + datas map[string]interface{} + lock bool + states map[string]EventState + mu *sync.Mutex + totalListeners int + finalizer EventFinalizer + abortReason string +} + +// GetID Retrieve the id of an event +func (event *Event) GetID() uint64 { + return event.metadata.id +} + +// Abort can be used to abort event dispatching after the current interceptor +// +// a locked event cannot be aborted +func (event *Event) Abort(current EventInterceptor, reason string) { + if event.metadata.lock { + return + } + event.setState(current.Key(), ESAborted) + event.metadata.abortReason = reason +} + +func (event *Event) setID(id uint64) { + event.metadata.id = id +} + +// AddMetadata add a key value pair of metadata to the event +// +// an error will occur if the event is locked or if the key is already set +func (event *Event) AddMetadata(key string, value interface{}) error { + if event.metadata.lock { + return newError(fmt.Errorf("event id %v already lock", event.metadata.id)).SetErrType(ErrTypeEvent) + } + if _, ok := event.metadata.datas[key]; ok { + return newError(fmt.Errorf("unable to set metadata key %s. already setted", key)).SetErrType(ErrTypeEvent) + } + event.metadata.datas[key] = value + return nil +} + +// GetMetadata retrieve a metadata +func (event *Event) GetMetadata(key string) interface{} { + return event.metadata.datas[key] +} + +// GetState return the current state +func (event *Event) GetState() EventState { + return event.metadata.state +} + +// GetStates return a copy of current states +// +// can be accessed by multiple go routine +func (e eventMetadata) GetStates() map[string]EventState { + e.mu.Lock() + states := make(map[string]EventState) + for k, v := range e.states { + states[k] = v + } + e.mu.Unlock() + return states +} + +func (event *Event) setStateIf(key string, ifstate, state EventState) { + e := event.metadata + e.mu.Lock() + if e.states[key] == ifstate { + e.mu.Unlock() + event.setState(key, state) + } else { + e.mu.Unlock() + } +} + +func (event *Event) setState(key string, state EventState) { + event.metadata.mu.Lock() + event.metadata.states[key] = state + nberr := 0 + nbres := 0 + nbemit := 0 + for _, s := range event.metadata.states { + switch s { + case ESAborted: + event.metadata.state = ESAborted + event.metadata.mu.Unlock() + event.runFinalizer() + return + case ESEmitted: + nbemit = nbemit + 1 + // we can stop verifying states as we still have something running + break + case ESResolved: + nbres = nbres + 1 + case ESError: + nberr = nberr + 1 + } + } + if nbemit == 0 && event.metadata.totalListeners == nberr+nbres { + if nberr > 0 { + event.metadata.state = ESError + } else { + event.metadata.state = ESResolved + } + event.metadata.mu.Unlock() + event.runFinalizer() + return + } + event.metadata.mu.Unlock() } // Emitter The base interface of an emitter type Emitter interface { - InitChan(chan *Event) + initEmitter(chan *Event, IDGenerator, EventFinalizer) + prepareRun(map[string]int) Emit(*Event) } // EventEmitter event emitter type EventEmitter struct { - eventChan chan *Event + idGenerator IDGenerator + eventChan chan *Event + listenersCount map[string]int + finalizer EventFinalizer } -// InitChan initialize event channel -func (ee *EventEmitter) InitChan(e chan *Event) { +// initEmitter initialize event channel and id gen +func (ee *EventEmitter) initEmitter(e chan *Event, idGen IDGenerator, f EventFinalizer) { + ee.idGenerator = idGen ee.eventChan = e + ee.finalizer = f +} + +func (ee *EventEmitter) prepareRun(lc map[string]int) { + ee.listenersCount = lc } // Emit emits an event func (ee *EventEmitter) Emit(event *Event) { + event.metadata.id = ee.idGenerator.NextID() + event.metadata.state = ESEmitted + event.metadata.mu = &sync.Mutex{} + event.metadata.datas = make(map[string]interface{}) + event.metadata.totalListeners = ee.listenersCount[event.Type] + event.metadata.states = make(map[string]EventState, event.metadata.totalListeners) + event.metadata.finalizer = ee.finalizer ee.eventChan <- event } -// EventReceiver event receiver +// EventReceiver event receiver is like observer, +// +// all receiveEvent are received in a choregraphic pattern, asynchronously +// +// the HandleEventType filter the events this Receiver wants to receive type EventReceiver interface { - ReceiveEvent(*Event) + Identifier + ReceiveEvent(*Event) error HandleEventTypes() []string } +// EventInterceptor interceptor interface +// +// an Interceptor +// +// - will see all event goes throu intercept method +// +// - can call Abort on an Event +// +// - has a priority : there can't be 2 interceptors at the same priority +type EventInterceptor interface { + Identifier + Intercept(*Event) error + InterceptPriority() int +} + +// EventFinalizer event finalizer can be declared +type EventFinalizer interface { + Finalize(*Event) +} + +// IDGenerator handle id generation +type IDGenerator interface { + SetSeed(uint64) error + NextID() uint64 +} + +type defaultIDGenerator struct { + seed uint64 + mutex *sync.Mutex + next uint64 +} + +func (dig *defaultIDGenerator) SetSeed(seed uint64) error { + if dig.seed != 0 { + return newError(errors.New("default generator seed can be changed only once")) + } + dig.seed = seed + return nil +} + +func (dig *defaultIDGenerator) NextID() uint64 { + dig.mutex.Lock() + next := dig.next + dig.next = dig.next + 1 + dig.mutex.Unlock() + return next +} + +func newGenerator() IDGenerator { + dig := &defaultIDGenerator{ + seed: 0, + mutex: &sync.Mutex{}, + next: 0, + } + return dig +} + // EventSwitch this is not a hub, we want a switch type EventSwitch struct { - mainChain chan *Event - close chan struct{} - receivers map[string][]EventReceiver - running bool + mainChain chan *Event + close chan struct{} + emitters []Emitter + receivers map[string][]EventReceiver + interceptors map[int]EventInterceptor + listenerCount map[string]int + orderedIntercept []int + running bool + hasInterceptor bool + idGenerator IDGenerator + eventFinalizer EventFinalizer } // NewEventSwitch build a new event switch func NewEventSwitch(bufferSize int) *EventSwitch { return &EventSwitch{ - mainChain: make(chan *Event, bufferSize), - close: make(chan struct{}), - receivers: make(map[string][]EventReceiver), - running: false, + mainChain: make(chan *Event, bufferSize), + close: make(chan struct{}), + receivers: make(map[string][]EventReceiver), + interceptors: make(map[int]EventInterceptor), + listenerCount: make(map[string]int), + orderedIntercept: nil, + running: false, + hasInterceptor: true, + idGenerator: newGenerator(), + eventFinalizer: nil, + } +} + +// WithIDGenerator change the id generator used. +// +// if the switch is running, do nothing +func (es *EventSwitch) WithIDGenerator(idg IDGenerator) *EventSwitch { + if es.running { + return es + } + es.idGenerator = idg + return es +} + +// WithSeed change the seed of the generator +// +// if the switch is running, do nothing +func (es *EventSwitch) WithSeed(seed uint64) *EventSwitch { + if es.running { + return es + } + err := es.idGenerator.SetSeed(seed) + if err != nil { + log.Println("[Godim EventSwitch]Trying to change the seed with a running switch") } + return es +} + +// WithEventFinalizer declare an event finalizer that will be called at the end of an event management +func (es *EventSwitch) WithEventFinalizer(f EventFinalizer) *EventSwitch { + es.eventFinalizer = f + return es } // AddEmitter add an emitter func (es *EventSwitch) AddEmitter(e Emitter) error { if es.running { - return errors.New("Can't add an emitter on the fly yet") + return newError(errors.New("Can't add an emitter on the fly yet")).SetErrType(ErrTypeEvent) } - e.InitChan(es.mainChain) + e.initEmitter(es.mainChain, es.idGenerator, es.eventFinalizer) + es.emitters = append(es.emitters, e) return nil } // AddReceiver add a receiver func (es *EventSwitch) AddReceiver(e EventReceiver) error { if es.running { - return errors.New("Can't add a receiver on the fly yet") + return newError(errors.New("Can't add a receiver on the fly yet")).SetErrType(ErrTypeEvent) } for _, et := range e.HandleEventTypes() { es.receivers[et] = append(es.receivers[et], e) @@ -79,11 +342,38 @@ func (es *EventSwitch) AddReceiver(e EventReceiver) error { return nil } +// AddInterceptor add an interceptor. Interceptor must be prioritized through InterceptPriority(), only one interceptor can run at the same priority +func (es *EventSwitch) AddInterceptor(e EventInterceptor) error { + if es.running { + return newError(errors.New("Can't add an interceptor on the fly")).SetErrType(ErrTypeEvent) + } + if _, ok := es.interceptors[e.InterceptPriority()]; ok { + return newError(fmt.Errorf("another interceptor already declared on priority %v", e.InterceptPriority())).SetErrType(ErrTypeEvent) + } + es.interceptors[e.InterceptPriority()] = e + es.hasInterceptor = true + return nil +} + // Start will start the event switch func (es *EventSwitch) Start() { if es.running { return } + nbInterceptor := 0 + if es.hasInterceptor { + es.orderedIntercept = make([]int, len(es.interceptors)) + for prio := range es.interceptors { + es.orderedIntercept[nbInterceptor] = prio + nbInterceptor++ + } + } + for k, v := range es.receivers { + es.listenerCount[k] = nbInterceptor + len(v) + } + for _, e := range es.emitters { + e.prepareRun(es.listenerCount) + } es.running = true go es.run() } @@ -107,12 +397,14 @@ func (es *EventSwitch) Close() { func (es *EventSwitch) run() { for { select { - case event := <-es.mainChain: + case event, valid := <-es.mainChain: if !es.running { break } + if !valid { + break + } go es.switchEvent(event) - continue case <-es.close: return } @@ -120,24 +412,67 @@ func (es *EventSwitch) run() { } func (es *EventSwitch) switchEvent(event *Event) { + if es.hasInterceptor { + for _, prio := range es.orderedIntercept { + interceptor := es.interceptors[prio] + es.runInterceptorWithRecover(interceptor, event) + if event.metadata.state == ESAborted { + log.Printf("[Godim EventSwitch] aborting event id [%v]\n", event.metadata.id) + return + } + } + } + event.metadata.lock = true rs, ok := es.receivers[event.Type] if ok { for _, receiver := range rs { go es.runObserverWithRecover(receiver, event) } - } else { - log.Println("an event is declared with no subscribers :", event.Type) } } func (es *EventSwitch) runObserverWithRecover(receiver EventReceiver, event *Event) { - defer func() { - if rec := recover(); rec != nil { - const size = 64 << 10 - buffer := make([]byte, size) - buffer = buffer[:runtime.Stack(buffer, false)] - log.Printf("cron: panic running job: %v\n%s", rec, buffer) - } - }() - receiver.ReceiveEvent(event) + event.setState(receiver.Key(), ESEmitted) + defer internalRecover(event, receiver) + err := receiver.ReceiveEvent(event) + if err != nil { + event.setState(receiver.Key(), ESError) + } +} + +func (es *EventSwitch) runInterceptorWithRecover(interceptor EventInterceptor, event *Event) { + event.setState(interceptor.Key(), ESEmitted) + defer internalRecover(event, interceptor) + err := interceptor.Intercept(event) + if err != nil { + event.setState(interceptor.Key(), ESError) + } +} + +func internalRecover(event *Event, identifier Identifier) { + if rec := recover(); rec != nil { + event.setState(identifier.Key(), ESError) + dumpRec(rec, "godim: panic receiving event") + } else { + event.setStateIf(identifier.Key(), ESEmitted, ESResolved) + } +} + +func dumpRec(rec interface{}, msg string) { + // const size = 64 << 10 + // buffer := make([]byte, size) + // buffer = buffer[:runtime.Stack(buffer, false)] + // log.Printf("%s: %v\n%s", msg, rec, buffer) + log.Printf("%s: %v\n", msg, rec) +} + +func (event *Event) runFinalizer() { + if event.metadata.finalizer != nil { + defer func() { + if rec := recover(); rec != nil { + dumpRec(rec, "godim: panic finalizing event") + } + }() + event.metadata.finalizer.Finalize(event) + } } diff --git a/event_test.go b/event_test.go index 8bd9008..938b2b9 100644 --- a/event_test.go +++ b/event_test.go @@ -1,8 +1,8 @@ package godim import ( - "fmt" "log" + "sync" "testing" "time" ) @@ -11,50 +11,14 @@ type Testeur struct { EventEmitter } -func TestEmitter(t *testing.T) { - tester := new(Testeur) - myChan := make(chan *Event, 100) - tester.eventChan = myChan - tester2 := new(Testeur) - tester2.eventChan = myChan - - results := make(map[string]*Event) - - go func() { - for { - select { - case event := <-myChan: - go managerEvent(results, event) - } - } - }() - for i := 0; i < 10; i++ { - e := &Event{ - Type: fmt.Sprintln("my ", i), - } - tester.Emit(e) - time.Sleep(2 * time.Millisecond) - e2 := &Event{ - Type: fmt.Sprintln("sec ", i), - } - tester2.Emit(e2) - time.Sleep(2 * time.Millisecond) - } - - time.Sleep(1 * time.Second) - if len(results) != 20 { - t.Fatal("not the right number of events : ", len(results)) - } -} - -func managerEvent(results map[string]*Event, e *Event) { - log.Println("Event received : ", e.Type) - results[e.Type] = e - -} - type Rec1 struct { nbReceived int + ids map[uint64]bool + mu *sync.Mutex +} + +func (rec1 *Rec1) Key() string { + return "Rec1" } func (rec1 *Rec1) HandleEventTypes() []string { @@ -64,8 +28,12 @@ func (rec1 *Rec1) HandleEventTypes() []string { } } -func (rec1 *Rec1) ReceiveEvent(e *Event) { +func (rec1 *Rec1) ReceiveEvent(e *Event) error { + rec1.mu.Lock() + rec1.ids[e.metadata.id] = true + rec1.mu.Unlock() rec1.nbReceived = rec1.nbReceived + 1 + return nil } func TestSwitch(t *testing.T) { @@ -73,6 +41,8 @@ func TestSwitch(t *testing.T) { t1 := new(Testeur) es.AddEmitter(t1) r1 := new(Rec1) + r1.ids = make(map[uint64]bool) + r1.mu = &sync.Mutex{} es.AddReceiver(r1) es.Start() @@ -88,9 +58,214 @@ func TestSwitch(t *testing.T) { if r1.nbReceived != 100 { t.Fatal("Received events : ", r1.nbReceived) } + if len(r1.ids) != 100 { + t.Fatal("received ids ", len(r1.ids)) + } es.Close() if es.running { t.Fatal("switch still running") } } + +type Panicker struct { +} + +func (rec1 *Panicker) Key() string { + return "Panicker" +} + +func (rec1 *Panicker) HandleEventTypes() []string { + return []string{ + "a", + "b", + } +} + +func (rec1 *Panicker) ReceiveEvent(e *Event) error { + panic("this is my role") + +} + +func TestPanic(t *testing.T) { + es := NewEventSwitch(10) + t1 := new(Testeur) + es.AddEmitter(t1) + r1 := new(Rec1) + r1.ids = make(map[uint64]bool) + r1.mu = &sync.Mutex{} + es.AddReceiver(r1) + r2 := new(Panicker) + es.AddReceiver(r2) + + es.Start() + e := &Event{ + Type: "a", + } + t1.Emit(e) + time.Sleep(2 * time.Millisecond) + if r1.nbReceived != 1 { + t.Fatal("event not received during a riot") + } + f := &Event{ + Type: "b", + } + t1.Emit(f) + time.Sleep(2 * time.Millisecond) + if r1.nbReceived != 2 { + t.Fatal("event not received during a riot") + } + if f.metadata.states["Panicker"] != ESError { + t.Fatal("state not correct", f.metadata.states["Panicker"]) + } +} + +type Producer struct { + EventEmitter + nbEmitted int + typ string + closed chan struct{} +} + +func (p *Producer) EmitWhile() { + for { + payload := make(map[string]interface{}) + p.nbEmitted = p.nbEmitted + 1 + payload["nb"] = p.nbEmitted + e := &Event{ + Type: p.typ, + Payload: payload, + } + p.Emit(e) + time.Sleep(3 * time.Millisecond) + var ok bool + select { + case <-p.closed: + ok = true + // log.Println("closing producer") + default: + ok = false + } + if ok { + break + } + } +} + +type GlobalRec struct { + mu *sync.Mutex + received int + typ string +} + +func (gr *GlobalRec) Key() string { + return "GR" +} + +func (gr *GlobalRec) HandleEventTypes() []string { + return []string{ + gr.typ, + } +} + +func (gr *GlobalRec) ReceiveEvent(e *Event) error { + gr.mu.Lock() + if e.GetID()%10 == 0 { + log.Println("should not have received this event") + } + gr.received = gr.received + 1 + gr.mu.Unlock() + return nil +} + +type Inter struct { + received int + mu *sync.Mutex + aborted int +} + +func (i *Inter) Key() string { + return "inter" +} +func (i *Inter) InterceptPriority() int { + return -1 +} +func (i *Inter) Intercept(e *Event) error { + i.mu.Lock() + i.received = i.received + 1 + + if e.GetID()%10 == 0 { + e.Abort(i, "abort every 10") + i.aborted = i.aborted + 1 + } + i.mu.Unlock() + return nil +} + +type Fin struct { + nb int + mu *sync.Mutex +} + +func (f *Fin) Finalize(e *Event) { + f.mu.Lock() + f.nb = f.nb + 1 + f.mu.Unlock() +} + +func TestMassiveEvents(t *testing.T) { + nbProducer := 50 + nbReceiver := 10 + producers := make(map[int]*Producer, nbProducer) + fin := &Fin{mu: &sync.Mutex{}} + es := NewEventSwitch(10).WithEventFinalizer(fin) + + for i := 0; i < nbProducer; i++ { + p := &Producer{ + closed: make(chan struct{}), + typ: "a", + } + es.AddEmitter(p) + producers[i] = p + } + receivers := make(map[int]*GlobalRec, nbReceiver) + gr := &GlobalRec{ + typ: "a", + mu: &sync.Mutex{}, + } + receivers[0] = gr + es.AddReceiver(gr) + in := &Inter{ + mu: &sync.Mutex{}, + } + es.AddInterceptor(in) + + es.Start() + + for i := 0; i < nbProducer; i++ { + go producers[i].EmitWhile() + } + + time.Sleep(10 * time.Millisecond) + for i := 0; i < nbProducer; i++ { + producers[i].closed <- struct{}{} + } + es.Stop() + time.Sleep(1 * time.Millisecond) + total := 0 + for i := 0; i < nbProducer; i++ { + total = total + producers[i].nbEmitted + } + if fin.nb != total { + t.Fatal("finalize did not received all revents ", fin.nb, "-", total) + } + if in.received != total { + t.Fatal("Multiproducers failed : ", gr.received, " - ", total) + } + if gr.received == total { + t.Fatal("should have some aborted event") + } + if gr.received+in.aborted != total { + t.Fatal("total must equal to received and aborted") + } +} diff --git a/godim_test.go b/godim_test.go index ee1617c..4ec5dd5 100644 --- a/godim_test.go +++ b/godim_test.go @@ -176,6 +176,10 @@ type RA struct { nbReceived int } +func (ra *RA) Key() string { + return "RA" +} + func (ra *RA) OnInit() error { ra.nbReceived = 0 return nil @@ -186,8 +190,26 @@ func (ra *RA) HandleEventTypes() []string { "aa", } } -func (ra *RA) ReceiveEvent(e *Event) { +func (ra *RA) ReceiveEvent(e *Event) error { ra.nbReceived = ra.nbReceived + 1 + return nil +} + +type IA struct { + nbIntercept int +} + +func (ia *IA) Key() string { + return "IA" +} + +func (ia *IA) InterceptPriority() int { + return -10 +} + +func (ia *IA) Intercept(e *Event) error { + ia.nbIntercept = ia.nbIntercept + 1 + return nil } func TestWithEventSwitch(t *testing.T) { @@ -197,7 +219,8 @@ func TestWithEventSwitch(t *testing.T) { Build() ea := new(EA) ra := new(RA) - err := g.Declare("service", ea, ra) + ia := new(IA) + err := g.Declare("service", ea, ra, ia) if err != nil { t.Fatal("Error while declaring service:", err) } @@ -216,6 +239,9 @@ func TestWithEventSwitch(t *testing.T) { if ra.nbReceived != 100 { t.Fatal("wrong number of event received", ra.nbReceived) } + if ia.nbIntercept != 100 { + t.Fatal("wrong number of event intercepted", ia.nbIntercept) + } g.CloseApp() time.Sleep(1 * time.Millisecond) if g.eventSwitch.running { diff --git a/registry.go b/registry.go index 2851e68..9bb9996 100644 --- a/registry.go +++ b/registry.go @@ -100,12 +100,13 @@ func (registry *Registry) declare(label string, o interface{}) error { } var ( - initType = reflect.TypeOf((*Initializer)(nil)).Elem() - closeType = reflect.TypeOf((*Closer)(nil)).Elem() - keyType = reflect.TypeOf((*Identifier)(nil)).Elem() - prioType = reflect.TypeOf((*Prioritizer)(nil)).Elem() - emitType = reflect.TypeOf((*Emitter)(nil)).Elem() - recType = reflect.TypeOf((*EventReceiver)(nil)).Elem() + initType = reflect.TypeOf((*Initializer)(nil)).Elem() + closeType = reflect.TypeOf((*Closer)(nil)).Elem() + keyType = reflect.TypeOf((*Identifier)(nil)).Elem() + prioType = reflect.TypeOf((*Prioritizer)(nil)).Elem() + emitType = reflect.TypeOf((*Emitter)(nil)).Elem() + recType = reflect.TypeOf((*EventReceiver)(nil)).Elem() + interceptType = reflect.TypeOf((*EventInterceptor)(nil)).Elem() ) func getKey(typ reflect.Type, o interface{}) string { @@ -151,6 +152,9 @@ func (registry *Registry) declareInterfaces(o interface{}, typ reflect.Type) err if ptyp.Implements(recType) { registry.eventSwitch.AddReceiver(o.(EventReceiver)) } + if ptyp.Implements(interceptType) { + registry.eventSwitch.AddInterceptor(o.(EventInterceptor)) + } } return nil }