Skip to content

Commit

Permalink
fix: webhook workers on rebalance (#1162)
Browse files Browse the repository at this point in the history
* fix: log ui

* fix: partition handling and unregister

* fix: concurrent cleanup

* feat: op pool

* fix: run or continue partition id

* fix: return false out of check
  • Loading branch information
grutt authored Jan 7, 2025
1 parent 775feea commit e921468
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 19 deletions.
14 changes: 10 additions & 4 deletions frontend/app/src/components/cloud/logging/logs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,12 @@ const LoggingComponent: React.FC<LogProps> = ({
</div>
)}
{sortedLogs.map((log, i) => {
const sanitizedHtml = DOMPurify.sanitize(convert.toHtml(log.line), {
USE_PROFILES: { html: true },
});
const sanitizedHtml = DOMPurify.sanitize(
convert.toHtml(log.line || ''),
{
USE_PROFILES: { html: true },
},
);

const logHash = log.timestamp + generateHash(log.line);

Expand Down Expand Up @@ -188,7 +191,10 @@ const LoggingComponent: React.FC<LogProps> = ({
);
};

const generateHash = (input: string): string => {
const generateHash = (input: string | undefined): string => {
if (!input) {
return Math.random().toString(36).substring(2, 15);
}
const trimmedInput = input.substring(0, 50);
return cyrb53(trimmedInput) + '';
};
Expand Down
59 changes: 51 additions & 8 deletions internal/services/webhooks/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/hatchet-dev/hatchet/internal/queueutils"
"github.com/hatchet-dev/hatchet/internal/services/partition"
"github.com/hatchet-dev/hatchet/internal/whrequest"
"github.com/hatchet-dev/hatchet/pkg/config/server"
Expand All @@ -26,15 +27,21 @@ type WebhooksController struct {
cleanups map[string]func() error
p *partition.Partition
mu sync.Mutex // Add a mutex for concurrent map access
checkOps *queueutils.OperationPool
}

func New(sc *server.ServerConfig, p *partition.Partition) *WebhooksController {
return &WebhooksController{

wc := &WebhooksController{
sc: sc,
registeredWorkerIds: map[string]bool{},
cleanups: map[string]func() error{},
p: p,
}

wc.checkOps = queueutils.NewOperationPool(sc.Logger, time.Second*5, "check webhooks", wc.check)

return wc
}

func (c *WebhooksController) Start() (func() error, error) {
Expand All @@ -45,9 +52,7 @@ func (c *WebhooksController) Start() (func() error, error) {
for {
select {
case <-ticker.C:
if err := c.check(); err != nil {
c.sc.Logger.Warn().Err(err).Msgf("error checking webhooks")
}
c.checkOps.RunOrContinue(c.p.GetWorkerPartitionId())
case <-ctx.Done():
ticker.Stop()
return
Expand All @@ -70,25 +75,44 @@ func (c *WebhooksController) Start() (func() error, error) {
}, nil
}

func (c *WebhooksController) check() error {
wws, err := c.sc.EngineRepository.WebhookWorker().ListWebhookWorkersByPartitionId(context.Background(), c.p.GetWorkerPartitionId())
func (c *WebhooksController) check(ctx context.Context, id string) (bool, error) {
wws, err := c.sc.EngineRepository.WebhookWorker().ListWebhookWorkersByPartitionId(
ctx,
c.p.GetWorkerPartitionId(),
)

if err != nil {
return fmt.Errorf("could not get webhook workers: %w", err)
return false, fmt.Errorf("could not get webhook workers: %w", err)
}

currentRegisteredWorkerIds := map[string]bool{}

var wg sync.WaitGroup
for _, ww := range wws {
ww := ww // Create a new variable for each goroutine
wg.Add(1)
go func() {
defer wg.Done()
c.processWebhookWorker(ww)
currentRegisteredWorkerIds[sqlchelpers.UUIDToStr(ww.ID)] = true
}()
}
wg.Wait()

return nil
// cleanup workers that have been moved to a different partition
var cleanupWG sync.WaitGroup
for id := range c.registeredWorkerIds {
if !currentRegisteredWorkerIds[id] {
cleanupWG.Add(1)
go func(id string) {
defer cleanupWG.Done()
c.cleanupMovedPartitionWorker(id)
}(id)
}
}
cleanupWG.Wait()

return false, nil
}

func (c *WebhooksController) processWebhookWorker(ww *dbsqlc.WebhookWorker) {
Expand Down Expand Up @@ -138,6 +162,25 @@ func (c *WebhooksController) processWebhookWorker(ww *dbsqlc.WebhookWorker) {
}
}

func (c *WebhooksController) cleanupMovedPartitionWorker(id string) {
c.mu.Lock()
cleanup, ok := c.cleanups[id]
c.mu.Unlock()

if ok {
if err := cleanup(); err != nil {
c.sc.Logger.Err(err).Msgf("error cleaning up webhook worker %s", id)
}
}

c.mu.Lock()
delete(c.registeredWorkerIds, id)
delete(c.cleanups, id)
c.mu.Unlock()

c.sc.Logger.Debug().Msgf("webhook worker %s has been removed from partition", id)
}

func (c *WebhooksController) cleanupDeletedWorker(id, tenantId string) {
c.mu.Lock()
cleanup, ok := c.cleanups[id]
Expand Down
7 changes: 0 additions & 7 deletions pkg/worker/worker_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,6 @@ func (w *Worker) StartWebhook(ww WebhookWorkerOpts) (func() error, error) {
cleanup := func() error {
cancel()

w.l.Debug().Msgf("worker %s is stopping...", w.name)

err := listener.Unregister()
if err != nil {
return fmt.Errorf("could not unregister worker: %w", err)
}

w.l.Debug().Msgf("worker %s stopped", w.name)

return nil
Expand Down

0 comments on commit e921468

Please sign in to comment.