generated from atomicgo/template
-
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat!: performance improvements and better thread safety
- Loading branch information
1 parent
30a45be
commit cd7fd04
Showing
2 changed files
with
59 additions
and
48 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
/* | ||
Package event provides a generic event system for Go. | ||
*/ | ||
// Package event provides a generic and thread-safe event system for Go. | ||
// It allows multiple listeners to subscribe to events carrying data of any type. | ||
// Listeners can be added and notified when events are triggered, and the event | ||
// can be closed to prevent further operations. | ||
package event |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,74 +1,84 @@ | ||
package event | ||
|
||
import "sync" | ||
import ( | ||
"errors" | ||
"sync" | ||
) | ||
|
||
// Event represents an event system that can handle multiple listeners. | ||
// ErrEventClosed is returned when an operation is attempted on a closed event. | ||
var ErrEventClosed = errors.New("event is closed") | ||
|
||
// Event represents a generic, thread-safe event system that can handle multiple listeners. | ||
// The type parameter T specifies the type of data that the event carries when triggered. | ||
type Event[T any] struct { | ||
listeners []chan T | ||
mu sync.Mutex | ||
listeners []func(T) | ||
mu sync.RWMutex | ||
closed bool | ||
} | ||
|
||
// New creates a new event. | ||
// New creates and returns a new Event instance for the specified type T. | ||
func New[T any]() *Event[T] { | ||
// Create a new event | ||
return &Event[T]{ | ||
listeners: []chan T{}, | ||
} | ||
return &Event[T]{} | ||
} | ||
|
||
// Trigger triggers the event and notifies all listeners. | ||
func (e *Event[T]) Trigger(value T) { | ||
e.mu.Lock() | ||
defer e.mu.Unlock() | ||
// Trigger notifies all registered listeners by invoking their callback functions with the provided value. | ||
// It runs each listener in a separate goroutine and waits for all listeners to complete. | ||
// Returns ErrEventClosed if the event has been closed. | ||
func (e *Event[T]) Trigger(value T) error { | ||
e.mu.RLock() | ||
if e.closed { | ||
e.mu.RUnlock() | ||
return ErrEventClosed | ||
} | ||
|
||
// Copy the listeners to avoid holding the lock during execution. | ||
// This ensures that triggering the event is thread-safe even if listeners are added or removed concurrently. | ||
listeners := make([]func(T), len(e.listeners)) | ||
copy(listeners, e.listeners) | ||
e.mu.RUnlock() | ||
|
||
var wg sync.WaitGroup | ||
for _, listener := range listeners { | ||
wg.Add(1) | ||
|
||
for _, listener := range e.listeners { | ||
go func(l chan T) { | ||
if !e.closed { | ||
l <- value | ||
} | ||
go func(f func(T)) { | ||
defer wg.Done() | ||
f(value) | ||
}(listener) | ||
} | ||
|
||
wg.Wait() | ||
|
||
return nil | ||
} | ||
|
||
// Listen gets called when the event is triggered. | ||
func (e *Event[T]) Listen(f func(T)) { | ||
// Check if the event is closed | ||
if e.closed { | ||
return | ||
} | ||
// Listen registers a new listener callback function for the event. | ||
// The listener will be invoked with the event's data whenever Trigger is called. | ||
// Returns ErrEventClosed if the event has been closed. | ||
func (e *Event[T]) Listen(f func(T)) error { | ||
e.mu.Lock() | ||
defer e.mu.Unlock() | ||
|
||
// Create listener slice if it doesn't exist | ||
if e.listeners == nil { | ||
e.listeners = []chan T{} | ||
if e.closed { | ||
return ErrEventClosed | ||
} | ||
|
||
// Create a new channel | ||
ch := make(chan T) | ||
e.listeners = append(e.listeners, f) | ||
|
||
e.mu.Lock() | ||
e.listeners = append(e.listeners, ch) | ||
e.mu.Unlock() | ||
|
||
go func() { | ||
for v := range ch { | ||
if !e.closed { | ||
f(v) | ||
} | ||
} | ||
}() | ||
return nil | ||
} | ||
|
||
// Close closes the event and all its listeners. | ||
// After calling this method, the event can't be used anymore and new listeners can't be added. | ||
// Close closes the event system, preventing any new listeners from being added or events from being triggered. | ||
// After calling Close, any subsequent calls to Trigger or Listen will return ErrEventClosed. | ||
// Existing listeners are removed, and resources are cleaned up. | ||
func (e *Event[T]) Close() { | ||
e.mu.Lock() | ||
defer e.mu.Unlock() | ||
|
||
for _, listener := range e.listeners { | ||
close(listener) | ||
if e.closed { | ||
return | ||
} | ||
|
||
e.listeners = nil | ||
e.closed = true | ||
e.listeners = nil // Release references to listener functions | ||
} |