Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Announcements and Monitoring of DST #269

Closed
wants to merge 13 commits into from
3 changes: 3 additions & 0 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/resonatehq/resonate/cmd/util"
"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/announcements"
"github.com/resonatehq/resonate/internal/api"
"github.com/resonatehq/resonate/internal/app/coroutines"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/network"
Expand Down Expand Up @@ -70,6 +71,8 @@ func RunDSTCmd() *cobra.Command {
}))
slog.SetDefault(logger)

announcements.Initialize(announcements.Dst)
vaibhawvipul marked this conversation as resolved.
Show resolved Hide resolved

// instantiate metrics
reg := prometheus.NewRegistry()
metrics := metrics.New(reg)
Expand Down
94 changes: 94 additions & 0 deletions internal/announcements/announce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package announcements

import (
"fmt"
"sync"
)

type Event struct {
Type string
Data map[string]interface{}
}

func NewEvent(eventType string, initialData ...map[string]interface{}) *Event {
var data map[string]interface{}
if len(initialData) > 0 {
data = initialData[0] // Use the first map provided if any.
} else {
data = make(map[string]interface{})
}

return &Event{
Type: eventType,
Data: data,
}
}

func (e *Event) Set(key string, value interface{}) {
e.Data[key] = value
}

func (e *Event) Get(key string) (interface{}, error) {
value, exists := e.Data[key]
if !exists {
return nil, fmt.Errorf("key '%s' does not exist in event of type '%s'", key, e.Type)
}
return value, nil
}

type Announcement interface {
Announce(event *Event)
}

type NopAnnouncement struct{}
vaibhawvipul marked this conversation as resolved.
Show resolved Hide resolved

type DstAnnouncement struct {
announcements []Event
mutex sync.Mutex // Mutex for thread safety
Copy link
Contributor

@susarlanikhilesh susarlanikhilesh May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually a better practice to have the mutex at the start of the struct.
As the struct need not calculate the offset every time to find the mutex variable inside the struct.

}

var (
instance Announcement
once sync.Once
)

type EnvironmentType int

const (
Nop EnvironmentType = iota
Dst
)

func Initialize(envType EnvironmentType) {
once.Do(func() {
switch envType {
case Nop:
instance = &NopAnnouncement{}
case Dst:
instance = &DstAnnouncement{
announcements: make([]Event, 0, 100), // Preallocate capacity to prevent frequent reallocations
}
default:
panic("Invalid environment type.")
}
})
}

func GetInstance() Announcement {
// check if the instance has been initialized
if instance == nil {
// initialize with the default environment type
Initialize(Dst)
vaibhawvipul marked this conversation as resolved.
Show resolved Hide resolved
}
return instance
}

func (n *NopAnnouncement) Announce(event *Event) {
// Do nothing
}

func (d *DstAnnouncement) Announce(event *Event) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.announcements = append(d.announcements, *event)
}
9 changes: 9 additions & 0 deletions internal/app/subsystems/aio/network/network_dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"

"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/announcements"
"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/util"
Expand Down Expand Up @@ -69,10 +70,18 @@ func (d *NetworkDSTDevice) Process(sqes []*bus.SQE[t_aio.Submission, t_aio.Compl
res = &http.Response{
StatusCode: http.StatusOK,
}

event := announcements.NewEvent("HTTPResponse")
vaibhawvipul marked this conversation as resolved.
Show resolved Hide resolved
event.Set("StatusCode", res.StatusCode)
announcements.GetInstance().Announce(event)
} else {
res = &http.Response{
StatusCode: http.StatusInternalServerError,
}

event := announcements.NewEvent("HTTPResponse")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good!!

event.Set("StatusCode", res.StatusCode)
announcements.GetInstance().Announce(event)
}

cqe.Completion = &t_aio.Completion{
Expand Down
1 change: 1 addition & 0 deletions internal/kernel/t_aio/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type NetworkSubmission struct {
}

func (s *NetworkSubmission) String() string {

switch s.Kind {
case Http:
return fmt.Sprintf("Network(http=Http(method=%s, url=%s))", s.Http.Method, s.Http.Url)
Expand Down
Loading