Skip to content

Commit

Permalink
Consolidate service worker channels and goroutines (#417)
Browse files Browse the repository at this point in the history
Instead of having every service worker report a failure error and the
done status of each goroutine within the service, use a single channel
to have all service report on their status and completion.

Fixes #415
  • Loading branch information
gammazero authored Nov 9, 2023
1 parent ead1a94 commit 56c253f
Show file tree
Hide file tree
Showing 18 changed files with 381 additions and 309 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/go-test-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"skip32bit": true
}
7 changes: 6 additions & 1 deletion analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"gorm.io/gorm/clause"
)

const flushInterval = time.Hour

var Enabled = true

var logger = log.Logger("analytics")
Expand Down Expand Up @@ -184,11 +186,14 @@ func (c *Collector) Flush() error {
}

func (c *Collector) Start(ctx context.Context) {
timer := time.NewTimer(flushInterval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Hour):
case <-timer.C:
timer.Reset(flushInterval)
}
//nolint:contextcheck
c.Flush()
Expand Down
29 changes: 14 additions & 15 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,10 @@ var logger = logging.Logger("api")
// 3. Completion of analytics event flushing.
// - A channel (service.Fail) that reports errors that occur while the server is running.
// - An error if there is an issue during the initialization phase, otherwise nil.
func (s Server) Start(ctx context.Context) ([]service.Done, service.Fail, error) {
func (s Server) Start(ctx context.Context, exitErr chan<- error) error {
err := analytics.Init(ctx, s.db)
if err != nil {
return nil, nil, errors.WithStack(err)
return errors.WithStack(err)
}
e := echo.New()
e.Debug = true
Expand Down Expand Up @@ -453,16 +453,18 @@ func (s Server) Start(ctx context.Context) ([]service.Done, service.Fail, error)
e.Listener = s.listener

done := make(chan struct{})
fail := make(chan error)
eventsFlushed := make(chan struct{})

go func() {
err := e.Start("")
if err != nil {
select {
case <-ctx.Done():
case fail <- err:
}
<-eventsFlushed
<-done

if exitErr != nil {
exitErr <- err
}
}()

go func() {
defer close(done)
<-ctx.Done()
Expand All @@ -477,21 +479,18 @@ func (s Server) Start(ctx context.Context) ([]service.Done, service.Fail, error)
if err != nil {
logger.Errorw("failed to close database connection", "err", err)
}
}()
hostDone := make(chan struct{})
go func() {
defer close(hostDone)
<-ctx.Done()

s.host.Close()
}()
eventsFlushed := make(chan struct{})

go func() {
defer close(eventsFlushed)
analytics.Default.Start(ctx)
//nolint:contextcheck
analytics.Default.Flush()
}()
return []service.Done{done, hostDone, eventsFlushed}, fail, nil

return nil
}

func isIntKind(kind reflect.Kind) bool {
Expand Down
9 changes: 8 additions & 1 deletion cmd/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func CalculateCommp(t *testing.T, content []byte, targetPieceSize uint64) string
func WaitForServerReady(ctx context.Context, url string) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var timer *time.Timer
for {
if ctx.Err() != nil {
return ctx.Err()
Expand All @@ -180,10 +181,16 @@ func WaitForServerReady(ctx context.Context, url string) error {
if err == nil && resp != nil && resp.StatusCode == http.StatusOK {
return nil
}
if timer == nil {
timer = time.NewTimer(100 * time.Millisecond)
defer timer.Stop()
} else {
timer.Reset(100 * time.Millisecond)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
case <-timer.C:
}
}
}
Expand Down
13 changes: 6 additions & 7 deletions service/contentprovider/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/cockroachdb/errors"
"github.com/data-preservation-programs/singularity/service"
"github.com/data-preservation-programs/singularity/store"
"github.com/data-preservation-programs/singularity/util"
nilrouting "github.com/ipfs/go-ipfs-routing/none"
Expand Down Expand Up @@ -51,25 +50,25 @@ func (BitswapServer) Name() string {
// It sets up the necessary routing and networking components,
// and starts serving Bitswap requests.
// It returns channels that signal when the service has stopped or encountered an error.
func (s BitswapServer) Start(ctx context.Context) ([]service.Done, service.Fail, error) {
func (s BitswapServer) Start(ctx context.Context, exitErr chan<- error) error {
nilRouter, err := nilrouting.ConstructNilRouting(ctx, nil, nil, nil)
if err != nil {
return nil, nil, errors.WithStack(err)
return errors.WithStack(err)
}

net := bsnetwork.NewFromIpfsHost(s.host, nilRouter)
bs := &store.FileReferenceBlockStore{DBNoContext: s.dbNoContext}
bsserver := server.New(ctx, net, bs)
net.Start(bsserver)
done := make(chan struct{})
fail := make(chan error)

go func() {
defer close(done)
<-ctx.Done()
net.Stop()
bsserver.Close()
s.host.Close()
if exitErr != nil {
exitErr <- nil
}
}()
return []service.Done{done}, fail, nil
return nil
}
7 changes: 5 additions & 2 deletions service/contentprovider/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ func TestBitswapServer(t *testing.T) {
host: h,
}
require.Equal(t, "Bitswap", s.Name())

exitErr := make(chan error, 1)
ctx, cancel := context.WithCancel(ctx)
done, _, err := s.Start(ctx)
err = s.Start(ctx, exitErr)
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
cancel()
select {
case <-time.After(1 * time.Second):
t.Fatal("bitswap server did not stop")
case <-done[0]:
case err = <-exitErr:
require.NoError(t, err)
}
})
}
37 changes: 22 additions & 15 deletions service/contentprovider/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/data-preservation-programs/singularity/model"
"github.com/data-preservation-programs/singularity/service"
"github.com/data-preservation-programs/singularity/storagesystem"
"github.com/data-preservation-programs/singularity/store"
"github.com/data-preservation-programs/singularity/util"
Expand Down Expand Up @@ -49,7 +48,7 @@ func (*HTTPServer) Name() string {
// - A Done channel slice that are closed when the server has stopped.
// - A Fail channel that receives an error if the server fails to start or stop.
// - An error if the server fails to start.
func (s *HTTPServer) Start(ctx context.Context) ([]service.Done, service.Fail, error) {
func (s *HTTPServer) Start(ctx context.Context, exitErr chan<- error) error {
e := echo.New()
e.Use(middleware.GzipWithConfig(middleware.GzipConfig{}))
e.Use(
Expand Down Expand Up @@ -98,27 +97,35 @@ func (s *HTTPServer) Start(ctx context.Context) ([]service.Done, service.Fail, e
e.GET("/health", func(c echo.Context) error {
return c.String(http.StatusOK, "ok")
})
done := make(chan struct{})
fail := make(chan error)

forceShutdown := make(chan struct{})
shutdownErr := make(chan error, 1)

go func() {
err := e.Start(s.bind)
if err != nil {
select {
case <-ctx.Done():
case fail <- err:
if errors.Is(err, http.ErrServerClosed) {
err = nil
}
close(forceShutdown)
closeErr := <-shutdownErr
if exitErr != nil {
if err == nil {
err = closeErr
}
exitErr <- err
}
}()

go func() {
defer close(done)
<-ctx.Done()
//nolint:contextcheck
err := e.Shutdown(context.Background())
if err != nil {
fail <- err
select {
case <-ctx.Done():
case <-forceShutdown:
}
//nolint:contextcheck
shutdownErr <- e.Shutdown(context.Background())
}()
return []service.Done{done}, fail, nil

return nil
}

func getPieceMetadata(ctx context.Context, db *gorm.DB, car model.Car) (*PieceMetadata, error) {
Expand Down
6 changes: 4 additions & 2 deletions service/contentprovider/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ func TestHTTPServerStart(t *testing.T) {
bind: "127.0.0.1:65432",
}
require.Equal(t, "HTTPServer", s.Name())
exitErr := make(chan error, 1)
ctx, cancel := context.WithCancel(ctx)
done, _, err := s.Start(ctx)
err := s.Start(ctx, exitErr)
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
gorequest.New().Get("http://127.0.0.1:65432/health").End()
Expand All @@ -37,7 +38,8 @@ func TestHTTPServerStart(t *testing.T) {
select {
case <-time.After(1 * time.Second):
t.Fatal("http server did not stop")
case <-done[0]:
case err = <-exitErr:
require.NoError(t, err)
}
})
}
Expand Down
Loading

0 comments on commit 56c253f

Please sign in to comment.