Skip to content

Commit

Permalink
make it impossible to listent to / emit an event via an operation tha…
Browse files Browse the repository at this point in the history
…t does not emit it
  • Loading branch information
RomainMuller committed Nov 30, 2023
1 parent cc75f71 commit 3d6b5e6
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 136 deletions.
12 changes: 6 additions & 6 deletions contrib/graphql-go/graphql/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ func Benchmark(b *testing.B) {
},
}

b.Run("baseline", func(b *testing.B) {
b.Run("version=baseline", func(b *testing.B) {
for name, tc := range testCases {
b.Run(name, func(b *testing.B) {
b.Run(fmt.Sprintf("scenario=%s", name), func(b *testing.B) {

b.StopTimer()
b.ReportAllocs()
Expand All @@ -161,9 +161,9 @@ func Benchmark(b *testing.B) {
}
})

b.Run("dyngo-v1", func(b *testing.B) {
b.Run("version=v1", func(b *testing.B) {
for name, tc := range testCases {
b.Run(name, func(b *testing.B) {
b.Run(fmt.Sprintf("scenario=%s", name), func(b *testing.B) {
b.StopTimer()
b.ReportAllocs()

Expand Down Expand Up @@ -197,9 +197,9 @@ func Benchmark(b *testing.B) {
}
})

b.Run("refactored", func(b *testing.B) {
b.Run("version=v2", func(b *testing.B) {
for name, tc := range testCases {
b.Run(name, func(b *testing.B) {
b.Run(fmt.Sprintf("scenario=%s", name), func(b *testing.B) {
b.StopTimer()
b.ReportAllocs()

Expand Down
68 changes: 37 additions & 31 deletions internal/appsec/dyngo/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ type Operation interface {
self() *operation
}

type ArgOf[O Operation] interface {
IsArgOf(O)
}
type ResultOf[O Operation] interface {
IsResultOf(O)
}

// EventListener interface allowing to identify the Go type listened to and
// dispatch calls to the underlying event listener function.
type EventListener[O Operation, T any] func(O, T)
Expand All @@ -64,7 +71,7 @@ func SwapRootOperation(new Operation) {
// bubble-up the operation stack, which allows listening to future events that
// might happen in the operation lifetime.
type operation struct {
parent Operation
parent *operation
eventRegister
dataBroadcaster

Expand Down Expand Up @@ -118,17 +125,20 @@ func NewOperation(parent Operation) Operation {
parent = *ptr
}
}
return &operation{parent: parent}
var parentOp *operation
if parent != nil {
parentOp = parent.self()
}
return &operation{parent: parentOp}
}

// StartOperation starts a new operation along with its arguments and emits a
// start event with the operation arguments.
func StartOperation[O Operation, T any](op O, args T) {

func StartOperation[O Operation, E ArgOf[O]](op O, args E) {
// Bubble-up the start event starting from the parent operation as you can't
// listen for your own start event
for current := op.Parent(); current != nil; current = current.Parent() {
emitEvent(&current.self().eventRegister, op, args)
for current := op.self().parent; current != nil; current = current.parent {
emitEvent(&current.eventRegister, op, args)
}
}

Expand All @@ -139,36 +149,40 @@ func newOperation(parent Operation) *operation {
// Finish finishes the operation along with its results and emits a
// finish event with the operation results.
// The operation is then disabled and its event listeners removed.
func Finish[O Operation, T any](op O, results T) {
finish(op.self(), op, results)
}
func Finish[O Operation, E ResultOf[O]](op O, results E) {
o := op.self()
defer o.disable() // This will need the RLock below to be released...

func finish[O Operation, T any](o *operation, op O, results T) {
// Defer the call to o.disable() first so that the RWMutex gets unlocked first
defer o.disable()
o.mu.RLock()
defer o.mu.RUnlock() // Deferred and stacked on top of the previously deferred call to o.disable()

if o.disabled {
return
}
for current := Operation(op); current != nil; current = current.Parent() {
emitEvent(&current.self().eventRegister, op, results)

for current := o; current != nil; current = current.parent {
emitEvent(&current.eventRegister, op, results)
}
}

// Disable the operation and remove all its event listeners.
func (o *operation) disable() {
o.mu.Lock()
defer o.mu.Unlock()

if o.disabled {
return
}

o.disabled = true
o.eventRegister.clear()
}

// Add the given event listeners to the operation.
func addListener[O Operation, T any](o *operation, l EventListener[O, T]) {
// On registers and event listener that will be called when the operation
// begins.
func On[O Operation, E ArgOf[O]](op Operation, l EventListener[O, E]) {
o := op.self()

o.mu.RLock()
defer o.mu.RUnlock()
if o.disabled {
Expand All @@ -177,8 +191,9 @@ func addListener[O Operation, T any](o *operation, l EventListener[O, T]) {
addEventListener(&o.eventRegister, l)
}

// On registers the event listener.
func On[O Operation, T any](op Operation, l EventListener[O, T]) {
// OnFinish registers an event listener that will be called when the operation
// finishes.
func OnFinish[O Operation, E ResultOf[O]](op Operation, l EventListener[O, E]) {
o := op.self()

o.mu.RLock()
Expand Down Expand Up @@ -211,8 +226,8 @@ func EmitData[T any](op Operation, data T) {
// Bubble up the data to the stack of operations. Contrary to events,
// we also send the data to ourselves since SDK operations are leaf operations
// that both emit and listen for data (errors).
for current := op; current != nil; current = current.Parent() {
emitData(&current.self().dataBroadcaster, data)
for current := o; current != nil; current = current.parent {
emitData(&current.dataBroadcaster, data)
}
}

Expand Down Expand Up @@ -283,7 +298,7 @@ func addEventListener[O Operation, T any](r *eventRegister, l EventListener[O, T
r.mu.Lock()
defer r.mu.Unlock()
if r.listeners == nil {
r.listeners = make(eventListenerMap, 1)
r.listeners = make(eventListenerMap)
}

key := typeID[EventListener[O, T]]{}
Expand All @@ -305,16 +320,7 @@ func emitEvent[O Operation, T any](r *eventRegister, op O, v T) {
r.mu.RLock()
defer r.mu.RUnlock()

key := typeID[EventListener[O, T]]{}
for _, listener := range r.listeners[key] {
for _, listener := range r.listeners[typeID[EventListener[O, T]]{}] {
listener.(EventListener[O, T])(op, v)
}

// Allow registering listeners on dyngo.Operation
genericKey := typeID[EventListener[Operation, T]]{}
if generic, found := r.listeners[genericKey]; found && key.String() != genericKey.String() {
for _, listener := range generic {
listener.(EventListener[Operation, T])(op, v)
}
}
}
Loading

0 comments on commit 3d6b5e6

Please sign in to comment.