Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate service worker channels and goroutines #417

Merged
merged 11 commits into from
Nov 9, 2023
3 changes: 3 additions & 0 deletions .github/workflows/go-test-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"skip32bit": true
}
7 changes: 6 additions & 1 deletion analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"gorm.io/gorm/clause"
)

const flushInterval = time.Hour
gammazero marked this conversation as resolved.
Show resolved Hide resolved

var Enabled = true

var logger = log.Logger("analytics")
Expand Down Expand Up @@ -184,11 +186,14 @@ func (c *Collector) Flush() error {
}

func (c *Collector) Start(ctx context.Context) {
timer := time.NewTimer(flushInterval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Hour):
case <-timer.C:
timer.Reset(flushInterval)
}
//nolint:contextcheck
c.Flush()
Expand Down
29 changes: 14 additions & 15 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,10 @@ var logger = logging.Logger("api")
// 3. Completion of analytics event flushing.
// - A channel (service.Fail) that reports errors that occur while the server is running.
// - An error if there is an issue during the initialization phase, otherwise nil.
func (s Server) Start(ctx context.Context) ([]service.Done, service.Fail, error) {
func (s Server) Start(ctx context.Context, exitErr chan<- error) error {
err := analytics.Init(ctx, s.db)
if err != nil {
return nil, nil, errors.WithStack(err)
return errors.WithStack(err)
}
e := echo.New()
e.Debug = true
Expand Down Expand Up @@ -453,16 +453,18 @@ func (s Server) Start(ctx context.Context) ([]service.Done, service.Fail, error)
e.Listener = s.listener

done := make(chan struct{})
fail := make(chan error)
eventsFlushed := make(chan struct{})

go func() {
err := e.Start("")
if err != nil {
select {
case <-ctx.Done():
case fail <- err:
}
<-eventsFlushed
<-done

if exitErr != nil {
exitErr <- err
}
}()

go func() {
defer close(done)
<-ctx.Done()
Expand All @@ -477,21 +479,18 @@ func (s Server) Start(ctx context.Context) ([]service.Done, service.Fail, error)
if err != nil {
logger.Errorw("failed to close database connection", "err", err)
}
}()
hostDone := make(chan struct{})
go func() {
defer close(hostDone)
<-ctx.Done()

s.host.Close()
}()
eventsFlushed := make(chan struct{})

go func() {
defer close(eventsFlushed)
analytics.Default.Start(ctx)
//nolint:contextcheck
analytics.Default.Flush()
}()
return []service.Done{done, hostDone, eventsFlushed}, fail, nil

return nil
}

func isIntKind(kind reflect.Kind) bool {
Expand Down
9 changes: 8 additions & 1 deletion cmd/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func CalculateCommp(t *testing.T, content []byte, targetPieceSize uint64) string
func WaitForServerReady(ctx context.Context, url string) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var timer *time.Timer
for {
if ctx.Err() != nil {
return ctx.Err()
Expand All @@ -180,10 +181,16 @@ func WaitForServerReady(ctx context.Context, url string) error {
if err == nil && resp != nil && resp.StatusCode == http.StatusOK {
return nil
}
if timer == nil {
timer = time.NewTimer(100 * time.Millisecond)
defer timer.Stop()
} else {
timer.Reset(100 * time.Millisecond)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
case <-timer.C:
}
}
}
Expand Down
13 changes: 6 additions & 7 deletions service/contentprovider/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/cockroachdb/errors"
"github.com/data-preservation-programs/singularity/service"
"github.com/data-preservation-programs/singularity/store"
"github.com/data-preservation-programs/singularity/util"
nilrouting "github.com/ipfs/go-ipfs-routing/none"
Expand Down Expand Up @@ -51,25 +50,25 @@ func (BitswapServer) Name() string {
// It sets up the necessary routing and networking components,
// and starts serving Bitswap requests.
// It returns channels that signal when the service has stopped or encountered an error.
func (s BitswapServer) Start(ctx context.Context) ([]service.Done, service.Fail, error) {
func (s BitswapServer) Start(ctx context.Context, exitErr chan<- error) error {
nilRouter, err := nilrouting.ConstructNilRouting(ctx, nil, nil, nil)
if err != nil {
return nil, nil, errors.WithStack(err)
return errors.WithStack(err)
}

net := bsnetwork.NewFromIpfsHost(s.host, nilRouter)
bs := &store.FileReferenceBlockStore{DBNoContext: s.dbNoContext}
bsserver := server.New(ctx, net, bs)
net.Start(bsserver)
done := make(chan struct{})
fail := make(chan error)

go func() {
defer close(done)
<-ctx.Done()
net.Stop()
bsserver.Close()
s.host.Close()
if exitErr != nil {
exitErr <- nil
}
}()
return []service.Done{done}, fail, nil
return nil
}
7 changes: 5 additions & 2 deletions service/contentprovider/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ func TestBitswapServer(t *testing.T) {
host: h,
}
require.Equal(t, "Bitswap", s.Name())

exitErr := make(chan error, 1)
ctx, cancel := context.WithCancel(ctx)
done, _, err := s.Start(ctx)
err = s.Start(ctx, exitErr)
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
cancel()
select {
case <-time.After(1 * time.Second):
t.Fatal("bitswap server did not stop")
case <-done[0]:
case err = <-exitErr:
require.NoError(t, err)
}
})
}
37 changes: 22 additions & 15 deletions service/contentprovider/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/data-preservation-programs/singularity/model"
"github.com/data-preservation-programs/singularity/service"
"github.com/data-preservation-programs/singularity/storagesystem"
"github.com/data-preservation-programs/singularity/store"
"github.com/data-preservation-programs/singularity/util"
Expand Down Expand Up @@ -49,7 +48,7 @@ func (*HTTPServer) Name() string {
// - A Done channel slice that are closed when the server has stopped.
// - A Fail channel that receives an error if the server fails to start or stop.
// - An error if the server fails to start.
func (s *HTTPServer) Start(ctx context.Context) ([]service.Done, service.Fail, error) {
func (s *HTTPServer) Start(ctx context.Context, exitErr chan<- error) error {
e := echo.New()
e.Use(middleware.GzipWithConfig(middleware.GzipConfig{}))
e.Use(
Expand Down Expand Up @@ -98,27 +97,35 @@ func (s *HTTPServer) Start(ctx context.Context) ([]service.Done, service.Fail, e
e.GET("/health", func(c echo.Context) error {
return c.String(http.StatusOK, "ok")
})
done := make(chan struct{})
fail := make(chan error)

forceShutdown := make(chan struct{})
shutdownErr := make(chan error, 1)

go func() {
err := e.Start(s.bind)
if err != nil {
select {
case <-ctx.Done():
case fail <- err:
if errors.Is(err, http.ErrServerClosed) {
err = nil
}
close(forceShutdown)
closeErr := <-shutdownErr
if exitErr != nil {
if err == nil {
err = closeErr
}
exitErr <- err
}
}()

go func() {
defer close(done)
<-ctx.Done()
//nolint:contextcheck
err := e.Shutdown(context.Background())
if err != nil {
fail <- err
select {
case <-ctx.Done():
case <-forceShutdown:
}
//nolint:contextcheck
shutdownErr <- e.Shutdown(context.Background())
}()
return []service.Done{done}, fail, nil

return nil
}

func getPieceMetadata(ctx context.Context, db *gorm.DB, car model.Car) (*PieceMetadata, error) {
Expand Down
6 changes: 4 additions & 2 deletions service/contentprovider/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ func TestHTTPServerStart(t *testing.T) {
bind: "127.0.0.1:65432",
}
require.Equal(t, "HTTPServer", s.Name())
exitErr := make(chan error, 1)
ctx, cancel := context.WithCancel(ctx)
done, _, err := s.Start(ctx)
err := s.Start(ctx, exitErr)
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
gorequest.New().Get("http://127.0.0.1:65432/health").End()
Expand All @@ -37,7 +38,8 @@ func TestHTTPServerStart(t *testing.T) {
select {
case <-time.After(1 * time.Second):
t.Fatal("http server did not stop")
case <-done[0]:
case err = <-exitErr:
require.NoError(t, err)
}
})
}
Expand Down
Loading
Loading