Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Announcements and Monitoring of DST #269

Closed
wants to merge 13 commits into from
9 changes: 5 additions & 4 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/resonatehq/resonate/cmd/util"
"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/announcements"
"github.com/resonatehq/resonate/internal/announcements/monitors"
"github.com/resonatehq/resonate/internal/api"
"github.com/resonatehq/resonate/internal/app/coroutines"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/network"
Expand Down Expand Up @@ -73,11 +74,11 @@ func RunDSTCmd() *cobra.Command {
}))
slog.SetDefault(logger)

monitor := announcements.NewMonitor()
monitor.RegisterEventHandler("TaskClaimed", announcements.TaskClaimedHandler)
monitor.RegisterEventHandler("TaskCompleted", announcements.TaskCompletedHandler)
announcements.Initialize(announcements.Dst, []announcements.Monitors{})

announcements.Initialize(announcements.Dst, monitor)
// register monitors
taskMonitor := monitors.NewTaskMonitor()
announcements.GetInstance().Register(taskMonitor)

// instantiate metrics
reg := prometheus.NewRegistry()
Expand Down
31 changes: 13 additions & 18 deletions internal/announcements/announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,36 @@
e.Data[key] = value
}

func (e *Event) Get(key string) (interface{}, error) {
value, exists := e.Data[key]
if !exists {
return nil, fmt.Errorf("key '%s' does not exist in event of type '%s'", key, e.Type)

Check warning on line 34 in internal/announcements/announce.go

View check run for this annotation

Codecov / codecov/patch

internal/announcements/announce.go#L31-L34

Added lines #L31 - L34 were not covered by tests
}
return value, nil

Check warning on line 36 in internal/announcements/announce.go

View check run for this annotation

Codecov / codecov/patch

internal/announcements/announce.go#L36

Added line #L36 was not covered by tests
}

type Announcement interface {
Announce(event *Event)
Register(monitor *Monitor)
Register(monitor Monitors)
}

type NoopAnnouncement struct{}

type DstAnnouncement struct {
announcements []Event
monitor *Monitor
monitors []Monitors
mutex sync.Mutex // Mutex for thread safety
Copy link
Contributor

@susarlanikhilesh susarlanikhilesh May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually a better practice to have the mutex at the start of the struct.
As the struct need not calculate the offset every time to find the mutex variable inside the struct.

}

// Register implements Announcement.
func (d *NoopAnnouncement) Register(monitor *Monitor) {
func (d *NoopAnnouncement) Register(monitor Monitors) {

Check warning on line 53 in internal/announcements/announce.go

View check run for this annotation

Codecov / codecov/patch

internal/announcements/announce.go#L53

Added line #L53 was not covered by tests
// Do nothing
}

func (d *DstAnnouncement) Register(monitor *Monitor) {
panic("unimplemented")
func (d *DstAnnouncement) Register(monitor Monitors) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.monitors = append(d.monitors, monitor)
}

var (
Expand All @@ -70,18 +72,18 @@
Dst
)

func Initialize(envType EnvironmentType, monitor *Monitor) {
func Initialize(envType EnvironmentType, monitors []Monitors) {
once.Do(func() {
switch envType {
case Noop:
instance = &NoopAnnouncement{}

Check warning on line 79 in internal/announcements/announce.go

View check run for this annotation

Codecov / codecov/patch

internal/announcements/announce.go#L78-L79

Added lines #L78 - L79 were not covered by tests
case Dst:
instance = &DstAnnouncement{
announcements: make([]Event, 0, 100), // Preallocate capacity to prevent frequent reallocations
monitor: monitor,
monitors: monitors,
}
default:
panic("Invalid environment type.")

Check warning on line 86 in internal/announcements/announce.go

View check run for this annotation

Codecov / codecov/patch

internal/announcements/announce.go#L85-L86

Added lines #L85 - L86 were not covered by tests
}
})
}
Expand All @@ -91,7 +93,7 @@
return instance
}

func (n *NoopAnnouncement) Announce(event *Event) {

Check warning on line 96 in internal/announcements/announce.go

View check run for this annotation

Codecov / codecov/patch

internal/announcements/announce.go#L96

Added line #L96 was not covered by tests
// Do nothing
}

Expand All @@ -101,15 +103,8 @@
d.announcements = append(d.announcements, *event)
// Print the announcement
fmt.Println("Announcement:", event.Type, event.Data)
// Apply the event to the monitor
d.monitor.Apply(*event)
// Apply the all the registered monitors
for _, monitor := range d.monitors {
monitor.Apply(*event)
}
}

// func (d *DstAnnouncement) GetAnnouncements() []Event {
// d.mutex.Lock()
// defer d.mutex.Unlock()
// // Return a copy of the announcements slice to ensure thread safety
// announcementsCopy := make([]Event, len(d.announcements))
// copy(announcementsCopy, d.announcements)
// return announcementsCopy
// }
92 changes: 5 additions & 87 deletions internal/announcements/monitor.go
Original file line number Diff line number Diff line change
@@ -1,90 +1,8 @@
package announcements

import (
"fmt"
"sync"
)

type EventHandler func(event Event, monitor *Monitor)

type Monitor struct {
errors []error
eventHandlers map[string]EventHandler // Map to store event handlers by event type
mutex sync.Mutex // Mutex for thread safety
}

func NewMonitor() *Monitor {
return &Monitor{
eventHandlers: make(map[string]EventHandler),
}
}

func (m *Monitor) Apply(event Event) {
m.mutex.Lock()
defer m.mutex.Unlock()

handler, ok := m.eventHandlers[event.Type]
if !ok {
m.errors = append(m.errors, fmt.Errorf("no handler found for event type: %s", event.Type))
return
}

handler(event, m)
}

func (m *Monitor) Status() []error {
return m.errors
}

func (m *Monitor) RegisterEventHandler(eventType string, handler EventHandler) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.eventHandlers[eventType] = handler
}

// ----------------------------------------------------------------------------------

// Task Cache
type TaskCache struct {
tasks map[string]Event
mutex sync.Mutex
}

var taskCache = TaskCache{
tasks: make(map[string]Event),
}
// monitor.go

// TaskClaimedHandler is an event handler for the "TaskClaimed" event type.
// It checks if the corresponding "TaskCompleted" event is received within a certain timeframe.
// If not, it raises an alert.
func TaskClaimedHandler(event Event, monitor *Monitor) {
taskID, err := event.Get("taskId")
// fmt.Println("-------------------------Monitor: Task claimed:", taskID)
if err != nil {
monitor.errors = append(monitor.errors, fmt.Errorf("error getting task ID: %v", err))
return
}

// Check if the task is already in the cache
// If not, add it to the cache
if _, ok := taskCache.tasks[taskID.(string)]; !ok {
taskCache.mutex.Lock()
taskCache.tasks[taskID.(string)] = event
taskCache.mutex.Unlock()
}
}
package announcements

func TaskCompletedHandler(event Event, monitor *Monitor) {
// Task completion event received, can perform any necessary actions
// For example, log the event or update some state
// fmt.Println("-------------------------Monitor: Task completed:", event)
if taskID, err := event.Get("taskId"); err == nil {
taskCache.mutex.Lock()
delete(taskCache.tasks, taskID.(string))
taskCache.mutex.Unlock()
} else {
fmt.Println("Monitor: Error getting task ID:", err)
monitor.errors = append(monitor.errors, fmt.Errorf("error getting task ID: %v", err))
}
type Monitors interface {
Apply(event Event)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename the interface to Monitor instead of Monitors (to the object not the verb)

Status() []error
}
37 changes: 37 additions & 0 deletions internal/announcements/monitors/taskmonitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package monitors

import (
"fmt"
"sync"

"github.com/resonatehq/resonate/internal/announcements"
)

type taskMonitor struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskMonitor must not part of the core package for announcements and monitors, but part of the test/dst -- it's application specific

eventTypeCount map[string]int // Map to count occurrences of each event type
mutex sync.Mutex // Mutex for thread safety
}

func NewTaskMonitor() *taskMonitor {
return &taskMonitor{
eventTypeCount: make(map[string]int),
}
}

func (tm *taskMonitor) Apply(event announcements.Event) {
tm.mutex.Lock()
defer tm.mutex.Unlock()

// Increment event type count
tm.eventTypeCount[event.Type]++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the right idea: When an event happens, modify a counter. When we check the monitors at the end of the run, here we want taskCreated == taskCompleted. If that check is too aggressive, at least we want to check that taskCreated >= taskCompleted

if event.Type == "TaskCreated" {
  tm.taskCreated ++
}
if event.Type == "TaskCompleted" {
  tm.taskCompleted ++
}


// If the event type is TaskCreated or TaskCompleted, print the event
if event.Type == "TaskCreated" || event.Type == "TaskCompleted" {
fmt.Println("Received event:", event.Type, event.Data)
}
}

// Status returns any errors or status information associated with the taskMonitor.
func (tm *taskMonitor) Status() []error {
return nil

Check warning on line 36 in internal/announcements/monitors/taskmonitor.go

View check run for this annotation

Codecov / codecov/patch

internal/announcements/monitors/taskmonitor.go#L35-L36

Added lines #L35 - L36 were not covered by tests
}
11 changes: 6 additions & 5 deletions test/dst/dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/announcements"
"github.com/resonatehq/resonate/internal/announcements/monitors"
"github.com/resonatehq/resonate/internal/api"
"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/metadata"
Expand Down Expand Up @@ -65,12 +66,12 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System,
// model
model := NewModel(d.config.Scenario)

monitor := announcements.NewMonitor()
monitor.RegisterEventHandler("TaskClaimed", announcements.TaskClaimedHandler)
monitor.RegisterEventHandler("TaskCompleted", announcements.TaskCompletedHandler)

// setup announcements
announcements.Initialize(announcements.Dst, monitor)
announcements.Initialize(announcements.Dst, []announcements.Monitors{})

// register monitors
taskMonitor := monitors.NewTaskMonitor()
announcements.GetInstance().Register(taskMonitor)

// add req/res
for _, req := range reqs {
Expand Down
40 changes: 20 additions & 20 deletions test/dst/dst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,26 @@ func test(t *testing.T, scenario *Scenario) {
// specify reqs to enable
reqs := []t_api.Kind{
// PROMISE
// t_api.ReadPromise,
// t_api.SearchPromises,
// t_api.CreatePromise,
// t_api.CompletePromise,

// // SCHEDULE
// t_api.ReadSchedule,
// t_api.SearchSchedules,
// t_api.CreateSchedule,
// t_api.DeleteSchedule,

// // SUBSCRIPTION
// t_api.ReadSubscriptions,
// t_api.CreateSubscription,
// t_api.DeleteSubscription,

// // LOCK
// t_api.AcquireLock,
// t_api.HeartbeatLocks,
// t_api.ReleaseLock,
t_api.ReadPromise,
t_api.SearchPromises,
t_api.CreatePromise,
t_api.CompletePromise,

// SCHEDULE
t_api.ReadSchedule,
t_api.SearchSchedules,
t_api.CreateSchedule,
t_api.DeleteSchedule,

// SUBSCRIPTION
t_api.ReadSubscriptions,
t_api.CreateSubscription,
t_api.DeleteSubscription,

// LOCK
t_api.AcquireLock,
t_api.HeartbeatLocks,
t_api.ReleaseLock,

// TASK
t_api.ClaimTask,
Expand Down