Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Announcements and Monitoring of DST #269

Closed
wants to merge 13 commits into from
8 changes: 8 additions & 0 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/resonatehq/resonate/cmd/util"
"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/announcements"
"github.com/resonatehq/resonate/internal/announcements/monitors"
"github.com/resonatehq/resonate/internal/api"
"github.com/resonatehq/resonate/internal/app/coroutines"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/network"
Expand Down Expand Up @@ -72,6 +74,12 @@ func RunDSTCmd() *cobra.Command {
}))
slog.SetDefault(logger)

announcements.Initialize(announcements.Dst, []announcements.Monitors{})

// register monitors
taskMonitor := monitors.NewTaskMonitor()
announcements.GetInstance().Register(taskMonitor)

// instantiate metrics
reg := prometheus.NewRegistry()
metrics := metrics.New(reg)
Expand Down
110 changes: 110 additions & 0 deletions internal/announcements/announce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package announcements

import (
"fmt"
"sync"
)

type Event struct {
Type string
Data map[string]interface{}
}

func NewEvent(eventType string, initialData ...map[string]interface{}) *Event {
var data map[string]interface{}
if len(initialData) > 0 {
data = initialData[0] // Use the first map provided if any.
} else {
data = make(map[string]interface{})
}

return &Event{
Type: eventType,
Data: data,
}
}

func (e *Event) Set(key string, value interface{}) {
e.Data[key] = value
}

func (e *Event) Get(key string) (interface{}, error) {
value, exists := e.Data[key]
if !exists {
return nil, fmt.Errorf("key '%s' does not exist in event of type '%s'", key, e.Type)

Check warning on line 34 in internal/announcements/announce.go

View check run for this annotation

Codecov / codecov/patch

internal/announcements/announce.go#L31-L34

Added lines #L31 - L34 were not covered by tests
}
return value, nil

Check warning on line 36 in internal/announcements/announce.go

View check run for this annotation

Codecov / codecov/patch

internal/announcements/announce.go#L36

Added line #L36 was not covered by tests
}

type Announcement interface {
Announce(event *Event)
Register(monitor Monitors)
}

type NoopAnnouncement struct{}

type DstAnnouncement struct {
announcements []Event
monitors []Monitors
mutex sync.Mutex // Mutex for thread safety
Copy link
Contributor

@susarlanikhilesh susarlanikhilesh May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually a better practice to have the mutex at the start of the struct.
As the struct need not calculate the offset every time to find the mutex variable inside the struct.

}

// Register implements Announcement.
func (d *NoopAnnouncement) Register(monitor Monitors) {

Check warning on line 53 in internal/announcements/announce.go

View check run for this annotation

Codecov / codecov/patch

internal/announcements/announce.go#L53

Added line #L53 was not covered by tests
// Do nothing
}

func (d *DstAnnouncement) Register(monitor Monitors) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.monitors = append(d.monitors, monitor)
}

var (
instance Announcement
once sync.Once
)

type EnvironmentType int

const (
Noop EnvironmentType = iota
Dst
)

func Initialize(envType EnvironmentType, monitors []Monitors) {
once.Do(func() {
switch envType {
case Noop:
instance = &NoopAnnouncement{}

Check warning on line 79 in internal/announcements/announce.go

View check run for this annotation

Codecov / codecov/patch

internal/announcements/announce.go#L78-L79

Added lines #L78 - L79 were not covered by tests
case Dst:
instance = &DstAnnouncement{
announcements: make([]Event, 0, 100), // Preallocate capacity to prevent frequent reallocations
monitors: monitors,
}
default:
panic("Invalid environment type.")

Check warning on line 86 in internal/announcements/announce.go

View check run for this annotation

Codecov / codecov/patch

internal/announcements/announce.go#L85-L86

Added lines #L85 - L86 were not covered by tests
}
})
}

func GetInstance() Announcement {
// check if the instance has been initialized
return instance
}

func (n *NoopAnnouncement) Announce(event *Event) {

Check warning on line 96 in internal/announcements/announce.go

View check run for this annotation

Codecov / codecov/patch

internal/announcements/announce.go#L96

Added line #L96 was not covered by tests
// Do nothing
}

func (d *DstAnnouncement) Announce(event *Event) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.announcements = append(d.announcements, *event)
// Print the announcement
fmt.Println("Announcement:", event.Type, event.Data)
// Apply the all the registered monitors
for _, monitor := range d.monitors {
monitor.Apply(*event)
}
}
8 changes: 8 additions & 0 deletions internal/announcements/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// monitor.go

package announcements

type Monitors interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename the interface to Monitor instead of Monitors (to the object not the verb)

Apply(event Event)
Status() []error
}
37 changes: 37 additions & 0 deletions internal/announcements/monitors/taskmonitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package monitors

import (
"fmt"
"sync"

"github.com/resonatehq/resonate/internal/announcements"
)

type taskMonitor struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskMonitor must not part of the core package for announcements and monitors, but part of the test/dst -- it's application specific

eventTypeCount map[string]int // Map to count occurrences of each event type
mutex sync.Mutex // Mutex for thread safety
}

func NewTaskMonitor() *taskMonitor {
return &taskMonitor{
eventTypeCount: make(map[string]int),
}
}

func (tm *taskMonitor) Apply(event announcements.Event) {
tm.mutex.Lock()
defer tm.mutex.Unlock()

// Increment event type count
tm.eventTypeCount[event.Type]++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the right idea: When an event happens, modify a counter. When we check the monitors at the end of the run, here we want taskCreated == taskCompleted. If that check is too aggressive, at least we want to check that taskCreated >= taskCompleted

if event.Type == "TaskCreated" {
  tm.taskCreated ++
}
if event.Type == "TaskCompleted" {
  tm.taskCompleted ++
}


// If the event type is TaskCreated or TaskCompleted, print the event
if event.Type == "TaskCreated" || event.Type == "TaskCompleted" {
fmt.Println("Received event:", event.Type, event.Data)
}
}

// Status returns any errors or status information associated with the taskMonitor.
func (tm *taskMonitor) Status() []error {
return nil

Check warning on line 36 in internal/announcements/monitors/taskmonitor.go

View check run for this annotation

Codecov / codecov/patch

internal/announcements/monitors/taskmonitor.go#L35-L36

Added lines #L35 - L36 were not covered by tests
}
13 changes: 13 additions & 0 deletions internal/app/subsystems/aio/network/network_dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"

"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/announcements"
"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/util"
Expand Down Expand Up @@ -69,10 +70,22 @@ func (d *NetworkDSTDevice) Process(sqes []*bus.SQE[t_aio.Submission, t_aio.Compl
res = &http.Response{
StatusCode: http.StatusOK,
}

event := announcements.NewEvent("HTTPResponse")
vaibhawvipul marked this conversation as resolved.
Show resolved Hide resolved
event.Set("StatusCode", res.StatusCode)
event.Set("URL", sqe.Submission.Network.Http.Url)
event.Set("Method", sqe.Submission.Network.Http.Method)
announcements.GetInstance().Announce(event)
} else {
res = &http.Response{
StatusCode: http.StatusInternalServerError,
}

event := announcements.NewEvent("HTTPResponse")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good!!

event.Set("StatusCode", res.StatusCode)
event.Set("URL", sqe.Submission.Network.Http.Url)
event.Set("Method", sqe.Submission.Network.Http.Method)
announcements.GetInstance().Announce(event)
}

cqe.Completion = &t_aio.Completion{
Expand Down
1 change: 1 addition & 0 deletions internal/kernel/t_aio/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type NetworkSubmission struct {
}

func (s *NetworkSubmission) String() string {

switch s.Kind {
case Http:
return fmt.Sprintf("Network(http=Http(method=%s, url=%s))", s.Http.Method, s.Http.Url)
Expand Down
9 changes: 9 additions & 0 deletions test/dst/dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strconv"

"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/announcements"
"github.com/resonatehq/resonate/internal/announcements/monitors"
"github.com/resonatehq/resonate/internal/api"
"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/metadata"
Expand Down Expand Up @@ -64,6 +66,13 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System,
// model
model := NewModel(d.config.Scenario)

// setup announcements
announcements.Initialize(announcements.Dst, []announcements.Monitors{})

// register monitors
taskMonitor := monitors.NewTaskMonitor()
announcements.GetInstance().Register(taskMonitor)

// add req/res
for _, req := range reqs {
switch req {
Expand Down
10 changes: 10 additions & 0 deletions test/dst/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand" // nosemgrep
"strconv"

"github.com/resonatehq/resonate/internal/announcements"
"github.com/resonatehq/resonate/internal/kernel/t_api"
"github.com/resonatehq/resonate/pkg/idempotency"
"github.com/resonatehq/resonate/pkg/promise"
Expand Down Expand Up @@ -396,6 +397,11 @@ func (g *Generator) GenerateClaimTask(r *rand.Rand, t int64) *t_api.Request {
executionId := g.idSet[r.Intn(len(g.idSet))]
expiryInMilliseconds := RangeInt63n(r, t, g.ticks*g.timeElapsedPerTick)

// claim task event announcement
event := announcements.NewEvent("TaskClaimed", map[string]interface{}{"taskId": taskId, "counter": counter, "processId": processId, "executionId": executionId, "expiryInMilliseconds": expiryInMilliseconds})

announcements.GetInstance().Announce(event)

return &t_api.Request{
Kind: t_api.ClaimTask,
ClaimTask: &t_api.ClaimTaskRequest{
Expand All @@ -420,6 +426,10 @@ func (g *Generator) GenerateCompleteTask(r *rand.Rand, t int64) *t_api.Request {
state = promise.Rejected
}

// complete task event announcement
event := announcements.NewEvent("TaskCompleted", map[string]interface{}{"taskId": taskId, "counter": counter, "executionId": executionId, "state": state, "headers": headers, "data": data})
announcements.GetInstance().Announce(event)

return &t_api.Request{
Kind: t_api.CompleteTask,
CompleteTask: &t_api.CompleteTaskRequest{
Expand Down