diff --git a/Dockerfile b/Dockerfile index f6168d60..6bedfb15 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,7 +16,8 @@ RUN groupadd -g 999 e2core && \ chown -R e2core /home/e2core && \ chmod -R 700 /home/e2core RUN apt-get update \ - && apt-get install -y ca-certificates + && apt-get install -y ca-certificates \ + && apt-get install -y curl # e2core binary COPY --from=builder /go/src/github.com/suborbital/e2core/.bin/e2core /usr/local/bin/ diff --git a/e2core/auth/access.go b/e2core/auth/access.go index 1e81148f..891e5673 100644 --- a/e2core/auth/access.go +++ b/e2core/auth/access.go @@ -13,6 +13,7 @@ import ( "github.com/suborbital/e2core/e2core/options" "github.com/suborbital/e2core/foundation/common" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/systemspec/system" ) @@ -28,6 +29,11 @@ func AuthorizationMiddleware(opts *options.Options) echo.MiddlewareFunc { return func(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) error { + ctx, span := tracing.Tracer.Start(c.Request().Context(), "authorization-middleware") + defer span.End() + + c.SetRequest(c.Request().WithContext(ctx)) + identifier := c.Param("ident") namespace := c.Param("namespace") name := c.Param("name") diff --git a/e2core/backend/satbackend/exec/exec.go b/e2core/backend/satbackend/exec/exec.go index a303d2c1..9accfd86 100644 --- a/e2core/backend/satbackend/exec/exec.go +++ b/e2core/backend/satbackend/exec/exec.go @@ -15,7 +15,7 @@ type WaitFunc func() error // Run runs a command, outputting to terminal and returning the full output and/or error // a channel is returned which, when sent on, will terminate the process that was started -func Run(cmd []string, env ...string) (string, context.CancelCauseFunc, WaitFunc, error) { +func Run(cmd []string, env ...string) (string, int, context.CancelCauseFunc, WaitFunc, error) { procUUID := uuid.New().String() uuidEnv := fmt.Sprintf("%s_UUID=%s", strings.ToUpper(cmd[0]), procUUID) env = append(env, uuidEnv) @@ -33,10 +33,10 @@ func Run(cmd []string, env ...string) (string, context.CancelCauseFunc, WaitFunc err := command.Start() if err != nil { - return "", nil, nil, errors.Wrap(err, "command.Start()") + return "", -1, nil, nil, errors.Wrap(err, "command.Start()") } - return procUUID, cxl, command.Wait, nil + return procUUID, command.Process.Pid, cxl, command.Wait, nil } // this is unused but we may want to do logging-to-speficig-directory some time in the diff --git a/e2core/backend/satbackend/orchestrator.go b/e2core/backend/satbackend/orchestrator.go index 938c95e8..0951284c 100644 --- a/e2core/backend/satbackend/orchestrator.go +++ b/e2core/backend/satbackend/orchestrator.go @@ -51,7 +51,7 @@ func (o *Orchestrator) Start() error { var err error - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(5 * time.Second) loop: for { select { @@ -144,11 +144,15 @@ func (o *Orchestrator) reconcileConstellation(syncer *syncer.Syncer) { } // repeat forever in case the command does error out - processUUID, cxl, wait, err := exec.Run( + processUUID, pid, cxl, wait, err := exec.Run( cmd, "SAT_HTTP_PORT="+port, "SAT_CONTROL_PLANE="+o.opts.ControlPlane, "SAT_CONNECTIONS="+connectionsEnv, + "SAT_TRACER_TYPE=collector", + "SAT_TRACER_SERVICENAME=e2core_bebby-"+port, + "SAT_TRACER_PROBABILITY=1", + "SAT_TRACER_COLLECTOR_ENDPOINT=collector:4317", ) if err != nil { ll.Err(err).Str("moduleFQMN", module.FQMN).Msg("exec.Run failed for sat instance") @@ -158,20 +162,41 @@ func (o *Orchestrator) reconcileConstellation(syncer *syncer.Syncer) { go func() { err := wait() if err != nil { - ll.Err(err).Str("moduleFQMN", module.FQMN).Str("port", port).Msg("calling waitfunc for the module failed") + ll.Err(err). + Str("moduleFQMN", module.FQMN). + Str("port", port). + Int("pid", pid). + Str("uuid", processUUID). + Msg("waitfunc returned with an error") } + ll.Info(). + Str("moduleFQMN", module.FQMN). + Str("port", port). + Int("pid", pid). + Str("uuid", processUUID). + Msg("adding port to dead list") + err = satWatcher.addToDead(port) if err != nil { - ll.Err(err).Str("moduleFQMN", module.FQMN).Str("port", port).Msg("adding the port to the dead list") + ll.Err(err). + Str("moduleFQMN", module.FQMN). + Str("port", port). + Int("pid", pid). + Str("uuid", processUUID). + Msg("adding the port to the dead list failed") } - ll.Info().Str("moduleFQMN", module.FQMN).Str("port", port).Msg("added port to dead list") }() - satWatcher.add(module.FQMN, port, processUUID, cxl) + satWatcher.add(module.FQMN, port, processUUID, pid, cxl) - ll.Debug().Str("moduleFQMN", module.FQMN).Str("port", port).Msg("successfully started sat") + ll.Info(). + Str("moduleFQMN", module.FQMN). + Str("port", port). + Int("pid", pid). + Str("uuid", processUUID). + Msg("successfully started sat") } // we want to max out at 8 threads per instance diff --git a/e2core/backend/satbackend/watcher.go b/e2core/backend/satbackend/watcher.go index 10293ebb..aeea4e1a 100644 --- a/e2core/backend/satbackend/watcher.go +++ b/e2core/backend/satbackend/watcher.go @@ -41,6 +41,7 @@ type instance struct { fqmn string metrics *MetricsResponse uuid string + pid int cxl context.CancelCauseFunc } @@ -76,7 +77,7 @@ func (w *watcher) addToDead(port string) error { } // add inserts a new instance to the watched pool. -func (w *watcher) add(fqmn, port, uuid string, cxl context.CancelCauseFunc) { +func (w *watcher) add(fqmn, port, uuid string, pid int, cxl context.CancelCauseFunc) { w.log.Info().Str("port", port).Str("fqmn", fqmn).Msg("adding one to the waitgroup port") w.instancesRunning.Add(1) @@ -88,6 +89,7 @@ func (w *watcher) add(fqmn, port, uuid string, cxl context.CancelCauseFunc) { w.instances[port] = &instance{ fqmn: fqmn, uuid: uuid, + pid: pid, cxl: cxl, } } diff --git a/e2core/command/mod_start.go b/e2core/command/mod_start.go index 4033dc62..d2e53860 100644 --- a/e2core/command/mod_start.go +++ b/e2core/command/mod_start.go @@ -11,11 +11,14 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog" + "github.com/sethvargo/go-envconfig" "github.com/spf13/cobra" "github.com/suborbital/e2core/e2core/release" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/e2core/sat/sat" "github.com/suborbital/e2core/sat/sat/metrics" + satOptions "github.com/suborbital/e2core/sat/sat/options" ) func ModStart() *cobra.Command { @@ -30,13 +33,23 @@ func ModStart() *cobra.Command { path = args[0] } + opts, err := satOptions.Resolve(envconfig.OsLookuper()) + if err != nil { + return errors.Wrap(err, "options.Resolve") + } + zerolog.TimeFieldFormat = zerolog.TimeFormatUnix l := zerolog.New(os.Stderr).With(). Timestamp(). - Str("command", "mod start"). - Logger().Level(zerolog.InfoLevel) - - config, err := sat.ConfigFromModuleArg(l, path) + Str("port", string(opts.Port)). + Str("procuuid", string(opts.ProcUUID)). + Int("pid", os.Getpid()). + Int("ppid", os.Getppid()). + Str("mode", "bebby"). + Str("fqmn", path). + Logger() + + config, err := sat.ConfigFromModuleArg(l, opts, path) if err != nil { return errors.Wrap(err, "failed to ConfigFromModuleArg") } @@ -49,14 +62,18 @@ func ModStart() *cobra.Command { } if httpPort > 0 { config.Port = httpPort - l.Debug().Int("port", httpPort).Msg(fmt.Sprintf("Using port :%d for the sat backend", httpPort)) + l.Info().Int("port", httpPort).Msg(fmt.Sprintf("Using port :%d for the sat backend", httpPort)) } - traceProvider, err := sat.SetupTracing(config.TracerConfig, l) + l.Info().Interface("sdkTrace-config", config.TracerConfig).Msg("this is the sdkTrace config we're using") + + traceProvider, err := tracing.SetupTracing(config.TracerConfig, l) if err != nil { return errors.Wrap(err, "setup tracing") } + l.Info().Msg("successfully set up tracing") + mctx, mcancel := context.WithTimeout(context.Background(), 5*time.Second) defer mcancel() @@ -67,7 +84,7 @@ func ModStart() *cobra.Command { defer traceProvider.Shutdown(context.Background()) - satInstance, err := sat.New(config, l, traceProvider, mtx) + satInstance, err := sat.New(config, l, mtx) if err != nil { return errors.Wrap(err, "failed to sat.New") } diff --git a/e2core/command/start.go b/e2core/command/start.go index 66db9a57..c99958fb 100644 --- a/e2core/command/start.go +++ b/e2core/command/start.go @@ -26,7 +26,7 @@ import ( ) const ( - shutdownWaitTime = time.Second * 3 + shutdownWaitTime = time.Second * 10 ) func Start() *cobra.Command { @@ -116,8 +116,10 @@ func Start() *cobra.Command { return errors.Wrap(err, "srv.Shutdown") } - if err := sourceSrv.Shutdown(ctx); err != nil { - return errors.Wrap(err, "sourceSrv.Shutdown") + if sourceSrv != nil { + if err := sourceSrv.Shutdown(ctx); err != nil { + return errors.Wrap(err, "sourceSrv.Shutdown") + } } backend.Shutdown() @@ -141,7 +143,7 @@ func setupLogger() zerolog.Logger { logger := zerolog.New(os.Stderr).With(). Timestamp(). - Str("command", "start"). + Str("mode", "mothership"). Str("version", release.Version). Logger().Level(zerolog.InfoLevel) @@ -197,14 +199,14 @@ func setupSourceServer(logger zerolog.Logger, opts *options.Options) (*echo.Echo ll.Debug().Msg("creating sourceserver from bundle: " + opts.BundlePath) - server, err := sourceserver.FromBundle(opts.BundlePath) + sourceSrv, err := sourceserver.FromBundle(opts.BundlePath) if err != nil { return nil, errors.Wrap(err, "failed to sourceserver.FromBundle") } - server.HideBanner = true + sourceSrv.HideBanner = true - return server, nil + return sourceSrv, nil } // a nil server is ok if we don't need to run one diff --git a/e2core/options/options.go b/e2core/options/options.go index f79b6ab3..5539cd84 100644 --- a/e2core/options/options.go +++ b/e2core/options/options.go @@ -7,6 +7,8 @@ import ( "github.com/pkg/errors" "github.com/sethvargo/go-envconfig" + + "github.com/suborbital/e2core/foundation/tracing" ) const ( @@ -29,7 +31,8 @@ type Options struct { Domain string `env:"E2CORE_DOMAIN"` HTTPPort int `env:"E2CORE_HTTP_PORT,default=8080"` TLSPort int `env:"E2CORE_TLS_PORT,default=443"` - TracerConfig TracerConfig `env:",prefix=E2CORE_TRACER_"` + EnvTracerConfig TracerConfig `env:",prefix=E2CORE_TRACER_"` + TracerConfig tracing.Config } // TracerConfig holds values specific to setting up the tracer. It's only used in proxy mode. All configuration options @@ -149,11 +152,37 @@ func (o *Options) finalize() error { o.Features = envOpts.Features o.EnvironmentToken = "" - o.TracerConfig = TracerConfig{} + o.EnvTracerConfig = TracerConfig{} o.StaticPeers = envOpts.StaticPeers o.EnvironmentToken = envOpts.EnvironmentToken - o.TracerConfig = envOpts.TracerConfig + o.EnvTracerConfig = envOpts.EnvTracerConfig + + tc := tracing.Config{ + ServiceName: envOpts.EnvTracerConfig.ServiceName, + Probability: envOpts.EnvTracerConfig.Probability, + } + + switch envOpts.EnvTracerConfig.TracerType { + case "collector": + tc.Type = tracing.ExporterCollector + case "honeycomb": + tc.Type = tracing.ExporterHoneycomb + } + + if envOpts.EnvTracerConfig.HoneycombConfig != nil { + tc.Honeycomb = tracing.HoneycombConfig{ + Endpoint: envOpts.EnvTracerConfig.HoneycombConfig.Endpoint, + APIKey: envOpts.EnvTracerConfig.HoneycombConfig.APIKey, + Dataset: envOpts.EnvTracerConfig.HoneycombConfig.Dataset, + } + } + + if envOpts.EnvTracerConfig.Collector != nil { + tc.Collector = tracing.CollectorConfig{Endpoint: envOpts.EnvTracerConfig.Collector.Endpoint} + } + + o.TracerConfig = tc return nil } diff --git a/e2core/server/dispatcher.go b/e2core/server/dispatcher.go index 0488ea47..3d14098b 100644 --- a/e2core/server/dispatcher.go +++ b/e2core/server/dispatcher.go @@ -1,6 +1,7 @@ package server import ( + "context" "encoding/json" "fmt" "sync" @@ -11,6 +12,7 @@ import ( "github.com/suborbital/e2core/e2core/sequence" "github.com/suborbital/e2core/foundation/bus/bus" + "github.com/suborbital/e2core/foundation/tracing" ) const ( @@ -55,18 +57,27 @@ func newDispatcher(l zerolog.Logger, pod *bus.Pod) *dispatcher { // Execute returns the "final state" of a Sequence. If the state's err is not nil, it means a runnable returned an error, and the Directive indicates the Sequence should return. // if exec itself actually returns an error other than ErrSequenceRunErr, it means there was a problem executing the Sequence as described, and should be treated as such. -func (d *dispatcher) Execute(seq *sequence.Sequence) error { +func (d *dispatcher) Execute(ctx context.Context, seq *sequence.Sequence) error { + ctx, span := tracing.Tracer.Start(ctx, "dispatcher.execute") + defer span.End() + + ll := d.log.With().Str("requestID", seq.ParentID()).Logger() + ll.Info().Interface("dispatcher-pod", d.pod).Msg("created a sequence dispatcher") s := &sequenceDispatcher{ seq: seq, pod: d.pod, - log: d.log, + log: ll.With().Str("part", "sequenceDispatcher").Logger(), } + ll.Info().Msg("creating a result chan, and a callback function that takes in a result, and sends that result back into the resultchan.") + resultChan := make(chan *sequence.ExecResult) cb := func(result *sequence.ExecResult) { + ll.Info().Msg("callback: sending result to resultchan") resultChan <- result } + ll.Info().Msg("this callback is added to the sequence.parentID in the dispatcher. It's just a map. One sequence ID, one callback") d.addCallback(seq.ParentID(), cb) defer d.removeCallback(seq.ParentID()) @@ -75,7 +86,10 @@ func (d *dispatcher) Execute(seq *sequence.Sequence) error { return errors.New("sequence contains no steps") } - if err := s.dispatchSingle(firstStep, resultChan); err != nil { + ll.Info().Interface("first-step", firstStep).Msg("dispatchsingle gets called on the sequence dispatcher. Arguments are the results channel and the first step.") + + span.AddEvent("dispatching single") + if err := s.dispatchSingle(ctx, firstStep, resultChan); err != nil { return errors.Wrap(err, "failed to dispatchSingle") } @@ -88,7 +102,7 @@ func (d *dispatcher) Execute(seq *sequence.Sequence) error { if step == nil { break } else if step.IsSingle() { - if err := s.awaitResult(resultChan); err != nil { + if err := s.awaitResult(ctx, resultChan); err != nil { return errors.Wrap(err, "failed to awaitResult") } } else if step.IsGroup() { @@ -100,37 +114,58 @@ func (d *dispatcher) Execute(seq *sequence.Sequence) error { } // dispatchSingle executes a single plugin from a sequence step -func (s *sequenceDispatcher) dispatchSingle(step *sequence.Step, resultChan chan *sequence.ExecResult) error { +func (s *sequenceDispatcher) dispatchSingle(ctx context.Context, step *sequence.Step, resultChan chan *sequence.ExecResult) error { + ctx, span := tracing.Tracer.Start(ctx, "sequencedispatcher.dispatchsingle") + defer span.End() + data, err := s.seq.Request().ToJSON() if err != nil { return errors.Wrap(err, "failed to req.toJSON") } + s.log.Info().Str("data in dispatchSingle", string(data)).Msg("message about to be sent") + + span.AddEvent("created new message with parent id") msg := bus.NewMsgWithParentID(step.FQMN, s.seq.ParentID(), data) + msg.SetContext(ctx) + + s.log.Info().Interface("bus.Message", msg).Msg("bus msg. Next is pod.tunnel with step.fqmn with message.") - // find an appropriate peer and tunnel the first excution to them + // find an appropriate peer and tunnel the first execution to them if err := s.pod.Tunnel(step.FQMN, msg); err != nil { return errors.Wrap(err, "failed to Tunnel") } - s.log.Debug().Str("parentID", s.seq.ParentID()). + s.log.Info(). Str("msgUUID", msg.UUID()). Msg("dispatched execution for parent to peer with message") - return s.awaitResult(resultChan) + return s.awaitResult(ctx, resultChan) } -func (s *sequenceDispatcher) awaitResult(resultChan chan *sequence.ExecResult) error { +func (s *sequenceDispatcher) awaitResult(ctx context.Context, resultChan chan *sequence.ExecResult) error { + ctx, span := tracing.Tracer.Start(ctx, "awaitResult") + defer span.End() + select { case result := <-resultChan: + span.AddEvent("result came in the channel") + + s.log.Info().Msg("we have a message back from the result channel") if result.Response == nil { + s.log.Error().Msg("sadly the response was nil") return fmt.Errorf("recieved nil response for %s", result.FQMN) } + s.log.Info().Msg("handling the step results") if err := s.seq.HandleStepResults([]sequence.ExecResult{*result}); err != nil { + s.log.Err(err).Msg("something went wrong while handling the step results") return errors.Wrap(err, "failed to HandleStepResults") } case <-time.After(time.Second * 10): + span.AddEvent("10 seconds have passed, sad times") + + s.log.Warn().Msg("dispatchSingle timeout reached") return ErrDispatchTimeout } @@ -140,21 +175,28 @@ func (s *sequenceDispatcher) awaitResult(resultChan chan *sequence.ExecResult) e // onMsgHandler is called when a new message is received from the pod func (d *dispatcher) onMsgHandler() bus.MsgFunc { return func(msg bus.Message) error { + ll := d.log.With().Str("requestID", msg.ParentID()).Logger() d.lock.RLock() defer d.lock.RUnlock() + + ll.Info().Msg("message received to dispatcher.onMsgHandler") + // we only care about the messages related to our specific sequence callback, exists := d.callbacks[msg.ParentID()] if !exists { + ll.Warn().Str("uuid", msg.ParentID()).Msg("did not exist") return nil } result := &sequence.ExecResult{} if err := json.Unmarshal(msg.Data(), result); err != nil { - d.log.Err(err).Msg("json.Unmarshal message data failure") + ll.Err(err).Msg("json.Unmarshal message data failure") return nil } + ll.Info().Str("requestID", msg.ParentID()).Msg("calling the callback with the result") + callback(result) return nil diff --git a/e2core/server/handlers.go b/e2core/server/handlers.go index 5d8382eb..788c0245 100644 --- a/e2core/server/handlers.go +++ b/e2core/server/handlers.go @@ -5,14 +5,24 @@ import ( "net/http" "github.com/labstack/echo/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/suborbital/e2core/e2core/sequence" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/systemspec/request" "github.com/suborbital/systemspec/tenant" ) func (s *Server) executePluginByNameHandler() echo.HandlerFunc { return func(c echo.Context) error { + ctx, span := tracing.Tracer.Start(c.Request().Context(), "executePluginByNameHandler", trace.WithAttributes( + attribute.String("request_id", c.Response().Header().Get(echo.HeaderXRequestID)), + )) + defer span.End() + + c.SetRequest(c.Request().WithContext(ctx)) + // with the authorization middleware, this is going to be the uuid of the tenant specified by the path name in // the environment specified by the authorization token. ident := ReadParam(c, "ident") @@ -24,11 +34,17 @@ func (s *Server) executePluginByNameHandler() echo.HandlerFunc { name := ReadParam(c, "name") ll := s.logger.With(). + Str("requestID", c.Response().Header().Get(echo.HeaderXRequestID)). Str("ident", ident). Str("namespace", namespace). Str("fn", name). Logger() + span.AddEvent("grabbing module by name", trace.WithAttributes( + attribute.String("ident", ident), + attribute.String("namespace", namespace), + attribute.String("name", name), + )) mod := s.syncer.GetModuleByName(ident, namespace, name) if mod == nil { ll.Error().Msg("syncer did not find module by these details") @@ -51,13 +67,15 @@ func (s *Server) executePluginByNameHandler() echo.HandlerFunc { steps := []tenant.WorkflowStep{{FQMN: mod.FQMN}} + span.AddEvent("sequence.New from req") + // a sequence executes the handler's steps and manages its state. seq, err := sequence.New(steps, req) if err != nil { return echo.NewHTTPError(http.StatusInternalServerError, "failed to handle request").SetInternal(err) } - if err := s.dispatcher.Execute(seq); err != nil { + if err := s.dispatcher.Execute(c.Request().Context(), seq); err != nil { return echo.NewHTTPError(http.StatusInternalServerError, "failed to execute plugin").SetInternal(err) } diff --git a/e2core/server/init.go b/e2core/server/init.go index a5605858..abb4e431 100644 --- a/e2core/server/init.go +++ b/e2core/server/init.go @@ -1,61 +1 @@ package server - -import ( - "context" - - "github.com/pkg/errors" - "github.com/rs/zerolog" - - "github.com/suborbital/e2core/e2core/options" - "github.com/suborbital/go-kit/observability" -) - -// setupTracing configure open telemetry to be used with otel exporter. Returns a tracer closer func and an error. -func setupTracing(config options.TracerConfig, logger zerolog.Logger) (func(ctx context.Context) error, error) { - l := logger.With().Str("function", "setupTracing").Logger() - emptyShutdown := func(_ context.Context) error { return nil } - - switch config.TracerType { - case "honeycomb": - conn, err := observability.GrpcConnection(context.Background(), config.HoneycombConfig.Endpoint) - if err != nil { - return emptyShutdown, errors.Wrapf(err, "observability.GrpcConnection to %s", config.HoneycombConfig.Endpoint) - } - - tp, err := observability.HoneycombTracer(context.Background(), conn, observability.HoneycombTracingConfig{ - TracingConfig: observability.TracingConfig{ - Probability: config.Probability, - ServiceName: config.ServiceName, - }, - APIKey: config.HoneycombConfig.APIKey, - Dataset: config.HoneycombConfig.Dataset, - }) - if err != nil { - return emptyShutdown, errors.Wrap(err, "observability.HoneycombTracer") - } - - return tp.Shutdown, nil - case "collector": - conn, err := observability.GrpcConnection(context.Background(), config.Collector.Endpoint) - if err != nil { - return emptyShutdown, errors.Wrap(err, "observability.GrpcConnection") - } - - tp, err := observability.OtelTracer(context.Background(), conn, observability.TracingConfig{ - Probability: config.Probability, - ServiceName: config.ServiceName, - }) - - return tp.Shutdown, nil - default: - l.Warn().Str("tracerType", config.TracerType).Msg("unrecognised tracer type configuration. Defaulting to no tracer") - fallthrough - case "none", "": - tp, err := observability.NoopTracer() - if err != nil { - return emptyShutdown, errors.Wrap(err, "observability.NoopTracer") - } - - return tp.Shutdown, nil - } -} diff --git a/e2core/server/server.go b/e2core/server/server.go index 2ec27d92..43d461b4 100644 --- a/e2core/server/server.go +++ b/e2core/server/server.go @@ -16,6 +16,7 @@ import ( "github.com/suborbital/e2core/foundation/bus/bus" "github.com/suborbital/e2core/foundation/bus/discovery/local" "github.com/suborbital/e2core/foundation/bus/transport/websocket" + "github.com/suborbital/e2core/foundation/tracing" kitError "github.com/suborbital/go-kit/web/error" "github.com/suborbital/go-kit/web/mid" ) @@ -36,11 +37,11 @@ type Server struct { // New creates a new Server instance. func New(l zerolog.Logger, sync *syncer.Syncer, opts *options.Options) (*Server, error) { - ll := l.With().Str("module", "server").Logger() + ll := l.With().Str("module", "e2core-server").Logger() // @todo https://github.com/suborbital/e2core/issues/144, the first return value is a function that would close the // tracer in case of a shutdown. Usually that is put in a defer statement. Server doesn't have a graceful shutdown. - _, err := setupTracing(opts.TracerConfig, ll) + _, err := tracing.SetupTracing(opts.TracerConfig, ll) if err != nil { return nil, errors.Wrapf(err, "setupTracing(%s, %s, %f)", "e2core", "reporter_uri", 0.04) } @@ -48,6 +49,7 @@ func New(l zerolog.Logger, sync *syncer.Syncer, opts *options.Options) (*Server, busOpts := []bus.OptionsModifier{ bus.UseMeshTransport(websocket.New()), bus.UseDiscovery(local.New()), + bus.UseLogger(ll), } b := bus.New(busOpts...) @@ -58,6 +60,7 @@ func New(l zerolog.Logger, sync *syncer.Syncer, opts *options.Options) (*Server, e.Use( mid.UUIDRequestID(), + mid.Logger(ll, nil), otelecho.Middleware("e2core"), middleware.Recover(), ) diff --git a/e2core/syncer/syncer.go b/e2core/syncer/syncer.go index 29872b9d..35e4168f 100644 --- a/e2core/syncer/syncer.go +++ b/e2core/syncer/syncer.go @@ -1,6 +1,7 @@ package syncer import ( + "context" "sync" "github.com/pkg/errors" @@ -36,18 +37,17 @@ type syncJob struct { // New creates a syncer with the given SystemSource func New(opts *options.Options, logger zerolog.Logger, source system.Source) *Syncer { s := &Syncer{ - sched: scheduler.New(), + sched: scheduler.NewWithLogger(logger), opts: opts, - } - - s.job = &syncJob{ - systemSource: source, - state: &system.State{}, - tenantIdents: make(map[string]int64), - overviews: make(map[string]*system.TenantOverview), - modules: make(map[string]tenant.Module), - log: logger.With().Str("module", "syncJob").Logger(), - lock: &sync.RWMutex{}, + job: &syncJob{ + systemSource: source, + state: &system.State{}, + tenantIdents: make(map[string]int64), + overviews: make(map[string]*system.TenantOverview), + modules: make(map[string]tenant.Module), + log: logger.With().Str("module", "syncJob").Logger(), + lock: &sync.RWMutex{}, + }, } s.sched.Register("sync", s.job) @@ -62,11 +62,11 @@ func (s *Syncer) Start() error { } // sync once to seed the initial state - if _, err := s.sched.Do(scheduler.NewJob("sync", nil)).Then(); err != nil { + if _, err := s.sched.Do(scheduler.NewJob("sync", nil).WithContext(context.Background())).Then(); err != nil { return errors.Wrap(err, "failed to Do sync job") } - s.sched.Schedule(scheduler.Every(1, func() scheduler.Job { return scheduler.NewJob("sync", nil) })) + s.sched.Schedule(scheduler.Every(45, func() scheduler.Job { return scheduler.NewJob("sync", nil) })) return nil } diff --git a/foundation/bus/bus/bus.go b/foundation/bus/bus/bus.go index 19da6ae3..8a44bf9b 100644 --- a/foundation/bus/bus/bus.go +++ b/foundation/bus/bus/bus.go @@ -4,6 +4,10 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + + "github.com/suborbital/e2core/foundation/tracing" ) // ErrTransportNotConfigured represent package-level vars @@ -33,7 +37,7 @@ func New(opts ...OptionsModifier) *Bus { BelongsTo: options.BelongsTo, Interests: options.Interests, bus: newMessageBus(), - logger: options.Logger, + logger: options.Logger.With().Str("bus", "bus").Logger(), } // the hub handles coordinating the transport and discovery plugins @@ -71,7 +75,15 @@ func (b *Bus) ConnectBridgeTopic(topic string) error { // This bypasses the main Bus bus, which is why it isn't a method on Pod. // Messages are load balanced between the connections that advertise the capability in question. func (b *Bus) Tunnel(capability string, msg Message) error { - return b.hub.sendTunneledMessage(capability, msg) + ctx, span := tracing.Tracer.Start(msg.Context(), "bus.Tunnel", trace.WithAttributes( + attribute.String("capability", capability), + )) + defer span.End() + + msg.SetContext(ctx) + + b.logger.Info().Str("requestID", msg.ParentID()).Str("fqmn", capability).Msg("bus.Tunnel is happening (pod tunnelfunc?)") + return b.hub.sendTunneledMessage(ctx, capability, msg) } // Withdraw cancels discovery, sends withdraw messages to all peers, @@ -87,7 +99,8 @@ func (b *Bus) Stop() error { } func (b *Bus) connectWithOpts(opts *podOpts) *Pod { - pod := newPod(b.bus.busChan, b.Tunnel, opts) + b.logger.Info().Msg("creating a new pod with the bus's Tunnel method. That one takes the hub on the bus, and calls the sendTunneledMessage") + pod := newPod(b.bus.busChan, b.Tunnel, opts, b.logger.With().Str("component", "pod").Logger()) b.bus.addPod(pod) diff --git a/foundation/bus/bus/connectionhandler.go b/foundation/bus/bus/connectionhandler.go index f920c68c..229f1036 100644 --- a/foundation/bus/bus/connectionhandler.go +++ b/foundation/bus/bus/connectionhandler.go @@ -1,10 +1,14 @@ package bus import ( + "context" + "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel" "github.com/suborbital/e2core/foundation/bus/bus/withdraw" + "github.com/suborbital/e2core/foundation/tracing" ) type connectionHandler struct { @@ -20,7 +24,7 @@ type connectionHandler struct { // Start starts up a listener to read messages from the connection into the Grav bus func (c *connectionHandler) Start() { - ll := c.Log.With().Str("method", "Start").Logger() + ll := c.Log.With().Str("method", "connectionHandler.Start").Logger() withdrawChan := c.Signaler.Listen() go func() { @@ -42,11 +46,12 @@ func (c *connectionHandler) Start() { for { msg, connWithdraw, err := c.Conn.ReadMsg() if err != nil { + // the error that happened is not an "I withdrew" or "my peer withdrew", it's a broken conn if !(c.Signaler.SelfWithdrawn() || c.Signaler.PeerWithdrawn()) { - ll.Err(err).Str("connectionUUID", c.UUID).Msg("failed to ReadMsg from connection") + ll.Err(err).Str("connectionUUID", c.UUID).Msg("failed to ReadMsg from connection, sending to errchan") c.ErrChan <- err } else { - ll.Debug().Msgf("failed to ReadMsg from withdrawn connection, ignoring: %s", err.Error()) + ll.Err(err).Msg("failed to ReadMsg from withdrawn connection, ignoring") } return @@ -60,23 +65,40 @@ func (c *connectionHandler) Start() { return } - ll.Debug().Str("messageUUID", msg.UUID()).Msg("received message") + ctx := otel.GetTextMapPropagator().Extract(context.Background(), msg) + ctx, span := tracing.Tracer.Start(ctx, "connectionHandler.ReadMsg") + + msg.SetContext(ctx) + + ll.Debug().Str("messageUUID", msg.UUID()).Str("requestID", msg.ParentID()).Msg("received message") c.Pod.Send(msg) + + span.End() } }() } -func (c *connectionHandler) Send(msg Message) error { +func (c *connectionHandler) Send(ctx context.Context, msg Message) error { + ctx, span := tracing.Tracer.Start(ctx, "connectionHandler.send") + defer span.End() + + msg.SetContext(ctx) + + ll := c.Log.With().Str("requestID", msg.ParentID()).Logger() if c.Signaler.PeerWithdrawn() { + span.AddEvent("peer withdrawn") return ErrNodeWithdrawn } if err := c.Conn.SendMsg(msg); err != nil { + ll.Err(err).Msg("c.conn.sendmsg returned an error") c.ErrChan <- err return errors.Wrap(err, "failed to SendMsg") } + ll.Info().Msg("message sent successfully") + return nil } diff --git a/foundation/bus/bus/hub.go b/foundation/bus/bus/hub.go index 14b77fc7..54acd788 100644 --- a/foundation/bus/bus/hub.go +++ b/foundation/bus/bus/hub.go @@ -1,14 +1,18 @@ package bus import ( + "context" "sync" "time" "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/suborbital/e2core/foundation/bus/bus/tunnel" "github.com/suborbital/e2core/foundation/bus/bus/withdraw" + "github.com/suborbital/e2core/foundation/tracing" ) const tunnelRetryCount = 32 @@ -41,7 +45,7 @@ func initHub(nodeUUID string, options *Options, connectFunc func() *Pod) *hub { mesh: options.MeshTransport, bridge: options.BridgeTransport, discovery: options.Discovery, - log: options.Logger.With().Str("module", "hub").Logger().Level(zerolog.InfoLevel), + log: options.Logger.With().Str("module", "hub").Logger(), pod: connectFunc(), connectFunc: connectFunc, meshConnections: map[string]*connectionHandler{}, @@ -106,23 +110,41 @@ func initHub(nodeUUID string, options *Options, connectFunc func() *Pod) *hub { // messageHandler takes each message coming from the bus and sends it to currently active mesh connections func (h *hub) messageHandler(msg Message) error { + ctx, span := tracing.Tracer.Start(msg.Context(), "hub messagehandler") + defer span.End() + + msg.SetContext(ctx) + h.lock.RLock() defer h.lock.RUnlock() + ll := h.log.With().Str("requestID", msg.ParentID()).Str("method", "hub.messageHandler").Logger() + + ll.Info().Msg("sending the message to all meshconnections") + // send the message to each. withdrawn connections will result in a no-op for uuid := range h.meshConnections { + ll.Info(). + Str("meshconnection-uuid", uuid). + Msg("sending the message to the handler at this uuid") + handler := h.meshConnections[uuid] - handler.Send(msg) + err := handler.Send(ctx, msg) + if err != nil { + ll.Err(err).Str("meshconnection-uuid", uuid). + Msg("send returned an error") + } } return nil } func (h *hub) discoveryHandler() func(endpoint string, uuid string) { + ll := h.log.With().Str("method", "discoveryHandler").Logger() + return func(endpoint string, uuid string) { - ll := h.log.With().Str("method", "discoveryHandler").Logger() if uuid == h.nodeUUID { - ll.Debug().Msg("discovered self, discarding") + ll.Debug().Str("uuid", uuid).Msg("discovered self, discarding") return } @@ -280,7 +302,7 @@ func (h *hub) addConnection(connection Connection, uuid, belongsTo string, inter Conn: connection, Pod: h.pod, Signaler: signaler, - ErrChan: make(chan error), + ErrChan: make(chan error, 1), BelongsTo: belongsTo, Interests: interests, Log: h.log, @@ -344,81 +366,116 @@ func (h *hub) connectionExists(uuid string) bool { // check for failed connections and clean them up func (h *hub) scanFailedMeshConnections() { ll := h.log.With().Str("method", "scanFailedMeshConnections").Logger() - for { - // we don't want to edit the `meshConnections` map while in the loop, so do it after - toRemove := make([]string, 0) - - // for each connection, check if it has errored or if its peer has withdrawn, - // and in either case close it and remove it from circulation - for _, conn := range h.meshConnections { - select { - case <-conn.ErrChan: - if err := conn.Close(); err != nil { - ll.Err(err).Str("connUUID", conn.UUID).Msg("failed to Close connection") - } - toRemove = append(toRemove, conn.UUID) - default: - if conn.Signaler.PeerWithdrawn() { + ll.Info().Msg("starting the loop to scan for failed mesh connections") + + ticker := time.NewTicker(time.Second) + + for { + select { + case <-ticker.C: + // ll.Info().Msg("starting loop") + // we don't want to edit the `meshConnections` map while in the loop, so do it after + toRemove := make([]string, 0) + + // for each connection, check if it has errored or if its peer has withdrawn, + // and in either case close it and remove it from circulation + for _, conn := range h.meshConnections { + select { + case <-conn.ErrChan: if err := conn.Close(); err != nil { - ll.Err(err).Str("connUUID", conn.UUID).Msg( - "failed to Close connection") + ll.Err(err).Str("connUUID", conn.UUID).Msg("failed to Close connection") } + ll.Warn().Str("conn-uuid", conn.UUID).Msg("adding this to removal") toRemove = append(toRemove, conn.UUID) + default: + // ll.Info().Str("conn-uuid", conn.UUID).Msg("no error came in, doing default") + if conn.Signaler.PeerWithdrawn() { + if err := conn.Close(); err != nil { + ll.Err(err).Str("connUUID", conn.UUID).Msg( + "failed to Close connection") + } + + ll.Warn().Str("conn-uuid", conn.UUID).Msg("peer has withdrawn, so removing it from here") + + toRemove = append(toRemove, conn.UUID) + } } } - } - for _, uuid := range toRemove { - h.removeMeshConnection(uuid) + for _, uuid := range toRemove { + ll.Info().Str("conn-uuid", uuid).Msg("removing mesh connection") + h.removeMeshConnection(uuid) + } } - - time.Sleep(time.Second) } } -func (h *hub) sendTunneledMessage(capability string, msg Message) error { - ll := h.log.With().Str("method", "sendTunneledMessage").Logger() +func (h *hub) sendTunneledMessage(ctx context.Context, capability string, msg Message) error { + ctx, span := tracing.Tracer.Start(ctx, "hub.sendtunneledmessage") + defer span.End() + + ll := h.log.With().Str("method", "sendTunneledMessage"). + Str("requestID", msg.ParentID()).Logger() + + ll.Info().Str("capability", capability).Msg("sending a message with cap. Checking the hub's capabilityBalancers map. It seems to be a list of UUIDs for ... things? belonging to the same capability.") balancer, exists := h.capabilityBalancers[capability] if !exists { return ErrTunnelNotEstablished } - // iterate a reasonable number of times to find a connection that's not removed or dead - for i := 0; i < tunnelRetryCount; i++ { + ll.Info().Interface("balancer", balancer).Str("capability", capability).Msg("balancer for capability") - // wrap this in a function to avoid any sloppy mutex issues - handler, err := func() (*connectionHandler, error) { - h.lock.RLock() - defer h.lock.RUnlock() + ll.Info().Int("tunnel-retry-count", tunnelRetryCount).Msg("starting iteration to check whether we can send a message to someplace") - uuid := balancer.Next() - if uuid == "" { - return nil, ErrTunnelNotEstablished - } + handlerFactory := func(ctx context.Context) (*connectionHandler, error) { + ctx, span := tracing.Tracer.Start(ctx, "handlerFactory") + defer span.End() - handler, exists := h.meshConnections[uuid] - if !exists { - return nil, ErrTunnelNotEstablished - } + h.lock.RLock() + defer h.lock.RUnlock() - return handler, nil - }() + uuid := balancer.Next() + if uuid == "" { + span.AddEvent("balancer doesn't exit") + return nil, ErrTunnelNotEstablished + } + + handler, exists := h.meshConnections[uuid] + if !exists { + span.AddEvent("handler doesn't exist for uuid", trace.WithAttributes( + attribute.String("uuid", uuid), + )) + return nil, ErrTunnelNotEstablished + } + + span.AddEvent("returning a handler for uuid", trace.WithAttributes( + attribute.String("uuid", uuid), + )) + return handler, nil + } + // iterate a reasonable number of times to find a connection that's not removed or dead + for i := 0; i < tunnelRetryCount; i++ { + // wrap this in a function to avoid any sloppy mutex issues + handler, err := handlerFactory(ctx) if err != nil { continue } if handler.Conn != nil { - if err := handler.Send(msg); err != nil { + if err := handler.Send(ctx, msg); err != nil { ll.Err(err).Msg("failed to SendMsg on tunneled connection, will remove") + return errors.Wrap(err, "handler.Send died") } else { - ll.Debug().Str("handlerUUID", handler.UUID).Msg("tunneled to handler") + ll.Info().Str("handlerUUID", handler.UUID).Msg("tunneled to handler") return nil } } + + ll.Info().Msg("handler connection was nil") } return ErrTunnelNotEstablished diff --git a/foundation/bus/bus/message.go b/foundation/bus/bus/message.go index a42982a2..1e0c03f8 100644 --- a/foundation/bus/bus/message.go +++ b/foundation/bus/bus/message.go @@ -1,12 +1,15 @@ package bus import ( + "context" "encoding/json" + "fmt" "io" "net/http" "time" "github.com/google/uuid" + "go.opentelemetry.io/otel/propagation" ) // MsgTypeDefault and other represent message consts @@ -23,19 +26,21 @@ type MsgChan chan Message // Message represents a message type Message interface { - // Unique ID for this message + propagation.TextMapCarrier + + // UUID is the unique ID for this message UUID() string - // ID of the parent event or request, such as HTTP request + // ParentID is the request ID of the parent event or request, such as HTTP request ParentID() string - // The UUID of the message being replied to, if any + // ReplyTo is the UUID of the message being replied to, if any ReplyTo() string - // Allow setting a message UUID that this message is a response to + // SetReplyTo allows setting a message UUID that this message is a response to SetReplyTo(string) // Type of message (application-specific) Type() string - // Time the message was sent + // Timestamp returns the time the message was sent Timestamp() time.Time - // Raw data of message + // Data returns raw data of message Data() []byte // Marshal the message itself to encoded bytes (JSON or otherwise) Marshal() ([]byte, error) @@ -45,6 +50,10 @@ type Message interface { MarshalMetadata() ([]byte, error) // UnmarshalMetadata encoded metadata into object UnmarshalMetadata([]byte) error + // Context will return the embedded context + Context() context.Context + // SetContext will set the new context on the message + SetContext(ctx context.Context) } // NewMsg creates a new Message with the built-in `_message` type @@ -117,6 +126,7 @@ func newMessage(msgType, parentID string, data []byte) Message { Payload: _payload{ Data: data, }, + TraceInfo: make(map[string]string), } return m @@ -126,10 +136,37 @@ func newMessage(msgType, parentID string, data []byte) Message { // most applications should define their own data structure // that implements the interface type _message struct { - Meta _meta `json:"meta"` - Payload _payload `json:"payload"` + Meta _meta `json:"meta"` + Payload _payload `json:"payload"` + ctx context.Context + TraceInfo map[string]string `json:"trace_info"` +} + +var _ Message = &_message{} + +func (m *_message) Get(key string) string { + fmt.Printf("\n\ngetting '%s' from message traceinfo\n\n", key) + return (*m).TraceInfo[key] +} + +func (m *_message) Set(key string, value string) { + fmt.Printf("\n\nsetting '%s' to '%s' in message traceinfo\n\n", key, value) + (*m).TraceInfo[key] = value +} + +func (m *_message) Keys() []string { + keys := make([]string, 0, len(m.TraceInfo)) + for k := range m.TraceInfo { + keys = append(keys, k) + } + + fmt.Printf("\n\ngetting keys from message and they are '%v'\n\n", keys) + + return keys } +var _ Message = &_message{} + type _meta struct { UUID string `json:"uuid"` ParentID string `json:"parent_id"` @@ -196,3 +233,11 @@ func (m *_message) MarshalMetadata() ([]byte, error) { func (m *_message) UnmarshalMetadata(bytes []byte) error { return json.Unmarshal(bytes, &m.Meta) } + +func (m *_message) SetContext(ctx context.Context) { + m.ctx = ctx +} + +func (m *_message) Context() context.Context { + return m.ctx +} diff --git a/foundation/bus/bus/messagebus.go b/foundation/bus/bus/messagebus.go index 96057198..9d2018a6 100644 --- a/foundation/bus/bus/messagebus.go +++ b/foundation/bus/bus/messagebus.go @@ -1,5 +1,9 @@ package bus +import ( + "github.com/suborbital/e2core/foundation/tracing" +) + const ( defaultBusChanSize = 256 ) @@ -38,6 +42,7 @@ func (b *messageBus) start() { // each connection until landing back at the beginning of the // ring, and repeat forever when each new message arrives for msg := range b.busChan { + for { // make sure the next pod is ready for messages if err := b.pool.prepareNext(b.buffer); err == nil { @@ -55,6 +60,11 @@ func (b *messageBus) start() { } func (b *messageBus) traverse(msg Message, start *podConnection) { + ctx, span := tracing.Tracer.Start(msg.Context(), "messagebus.traverse") + defer span.End() + + msg.SetContext(ctx) + startID := start.ID conn := start diff --git a/foundation/bus/bus/options.go b/foundation/bus/bus/options.go index 2859bc80..72d19c69 100644 --- a/foundation/bus/bus/options.go +++ b/foundation/bus/bus/options.go @@ -91,7 +91,7 @@ func defaultOptions() *Options { o := &Options{ BelongsTo: "*", Interests: []string{}, - Logger: zerolog.New(os.Stderr).With().Timestamp().Logger(), + Logger: zerolog.New(os.Stderr).With().Str("mode", "default-options").Timestamp().Logger(), Port: "8080", URI: "/meta/message", MeshTransport: nil, diff --git a/foundation/bus/bus/pod.go b/foundation/bus/bus/pod.go index 673ed534..72071f6d 100644 --- a/foundation/bus/bus/pod.go +++ b/foundation/bus/bus/pod.go @@ -4,6 +4,12 @@ import ( "errors" "sync" "sync/atomic" + + "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + + "github.com/suborbital/e2core/foundation/tracing" ) const ( @@ -41,6 +47,8 @@ type Pod struct { onFunc MsgFunc // the onFunc is called whenever a message is received onFuncLock sync.RWMutex + logger zerolog.Logger + messageChan MsgChan // messageChan is used to receive messages coming from the bus feedbackChan MsgChan // feedbackChan is used to send "feedback" to the bus about the pod's status busChan MsgChan // busChan is used to emit messages to the bus @@ -60,7 +68,7 @@ type podOpts struct { } // newPod creates a new Pod -func newPod(busChan MsgChan, tunnel func(string, Message) error, opts *podOpts) *Pod { +func newPod(busChan MsgChan, tunnel func(string, Message) error, opts *podOpts, l zerolog.Logger) *Pod { p := &Pod{ onFuncLock: sync.RWMutex{}, messageChan: make(chan Message, defaultPodChanSize), @@ -70,6 +78,7 @@ func newPod(busChan MsgChan, tunnel func(string, Message) error, opts *podOpts) tunnelFunc: tunnel, opts: opts, dead: &atomic.Value{}, + logger: l, } // do some "delayed setup" @@ -86,26 +95,51 @@ func newPod(busChan MsgChan, tunnel func(string, Message) error, opts *podOpts) // It is safe to call methods on a nil ticket, they will error with ErrNoTicket // This means error checking can be done on a chained call such as err := p.Send(msg).Wait(...) func (p *Pod) Send(msg Message) *MsgReceipt { + ctx, span := tracing.Tracer.Start(msg.Context(), "pod.Send") + defer span.End() + + msg.SetContext(ctx) + + ll := p.logger.With().Str("requestID", msg.ParentID()).Logger() + + ll.Info().Msg("sending message in pod.send") + // check to see if the pod has died (aka disconnected) if p.dead.Load().(bool) == true { + ll.Warn().Msg("pod has died") return nil } + ll.Info().Str("msg uuid", msg.UUID()).Msg("filtering message") + p.FilterUUID(msg.UUID(), false) // don't allow the same message to bounce back through this pod + ll.Info().Msg("sending message to the bus chan") + + span.AddEvent("sending message to the bus channel") p.busChan <- msg - t := &MsgReceipt{ + ll.Info().Msg("sent message to bus chan") + + ll.Info().Msg("returning a message receipt") + + return &MsgReceipt{ UUID: msg.UUID(), pod: p, } - - return t } // Tunnel bypasses the pod's normal 'Send' and uses the bus itself to tunnel to a specific peer // if a transport is enabled. If not, it's a no-op. func (p *Pod) Tunnel(capability string, msg Message) error { + ctx, span := tracing.Tracer.Start(msg.Context(), "pod.Tunnel", trace.WithAttributes( + attribute.String("capability", capability), + )) + defer span.End() + + msg.SetContext(ctx) + + p.logger.Info().Str("requestID", msg.ParentID()).Str("fqmn", capability).Msg("tunneling using the pod's tunnelfunc") return p.tunnelFunc(capability, msg) } diff --git a/foundation/bus/bus/podconnection.go b/foundation/bus/bus/podconnection.go index 8646aa2d..1892aa80 100644 --- a/foundation/bus/bus/podconnection.go +++ b/foundation/bus/bus/podconnection.go @@ -1,6 +1,11 @@ package bus -import "sync" +import ( + "context" + "sync" + + "github.com/suborbital/e2core/foundation/tracing" +) // podConnection is a connection to a pod via its messageChan // podConnection is also a circular linked list/ring of connections @@ -47,7 +52,17 @@ func newPodConnection(id int64, pod *Pod) *podConnection { // ordering to the messageChan if it becomes full is not guaranteed, this // is sacrificed to ensure that the bus does not block because of a delinquient pod func (p *podConnection) send(msg Message) { - go func() { + ctx, span := tracing.Tracer.Start(msg.Context(), "podconnection.send") + defer span.End() + + msg.SetContext(ctx) + + go func(gctx context.Context, gmsg Message) { + gctx, gspan := tracing.Tracer.Start(gctx, "go func inside podconnection.send") + defer gspan.End() + + gmsg.SetContext(gctx) + p.lock.RLock() defer p.lock.RUnlock() @@ -56,8 +71,8 @@ func (p *podConnection) send(msg Message) { return } - p.messageChan <- msg - }() + p.messageChan <- gmsg + }(ctx, msg) } // checkStatus checks the pod's feedback for any information or failed messages and drains the failures into the failed Message buffer diff --git a/foundation/bus/transport/kafka/tester/main.go b/foundation/bus/transport/kafka/tester/main.go index 426b1daa..f2a0c309 100644 --- a/foundation/bus/transport/kafka/tester/main.go +++ b/foundation/bus/transport/kafka/tester/main.go @@ -14,7 +14,7 @@ import ( ) func main() { - logger := zerolog.New(os.Stderr).With().Timestamp().Logger() + logger := zerolog.New(os.Stderr).With().Str("mode", "kafka-tester").Timestamp().Logger() knats, err := kafka.New("127.0.0.1:9092") if err != nil { diff --git a/foundation/bus/transport/nats/tester/main.go b/foundation/bus/transport/nats/tester/main.go index 5f0d208a..fcb9bb87 100644 --- a/foundation/bus/transport/nats/tester/main.go +++ b/foundation/bus/transport/nats/tester/main.go @@ -14,7 +14,7 @@ import ( ) func main() { - logger := zerolog.New(os.Stderr).With().Timestamp().Logger() + logger := zerolog.New(os.Stderr).With().Str("mode", "nats-tester").Timestamp().Logger() gnats, err := nats.New("nats://localhost:4222") if err != nil { diff --git a/foundation/bus/transport/websocket/tester/main.go b/foundation/bus/transport/websocket/tester/main.go index b4883664..f4aeac5b 100644 --- a/foundation/bus/transport/websocket/tester/main.go +++ b/foundation/bus/transport/websocket/tester/main.go @@ -14,7 +14,7 @@ import ( ) func main() { - logger := zerolog.New(os.Stderr).With().Timestamp().Logger() + logger := zerolog.New(os.Stderr).With().Str("mode", "websocket-tester").Timestamp().Logger() gwss := websocket.New() locald := local.New() diff --git a/foundation/bus/transport/websocket/transport.go b/foundation/bus/transport/websocket/transport.go index 18aef08b..6ee14579 100644 --- a/foundation/bus/transport/websocket/transport.go +++ b/foundation/bus/transport/websocket/transport.go @@ -11,8 +11,12 @@ import ( "github.com/gorilla/websocket" "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/suborbital/e2core/foundation/bus/bus" + "github.com/suborbital/e2core/foundation/tracing" ) const ( @@ -87,51 +91,80 @@ func (t *Transport) Connect(endpoint string) (bus.Connection, error) { // HTTPHandlerFunc returns an http.HandlerFunc for incoming connections func (t *Transport) HTTPHandlerFunc() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + ctx, span := tracing.Tracer.Start(r.Context(), "websocket.transport.httphanderfunc") + defer span.End() + + r = r.Clone(ctx) + if t.connectionFunc == nil { t.log.Error().Msg("incoming connection received, but no connFunc configured") w.WriteHeader(http.StatusInternalServerError) return } + t.log.Info().Msg("receiving a message I think") + + span.AddEvent("upgrading request to websocket connection") c, err := upgrader.Upgrade(w, r, nil) if err != nil { t.log.Err(err).Msg("could not upgrade connection to websocket") return } - t.log.Debug().Str("connectionURL", r.URL.String()).Msg("upgraded connection") + t.log.Info().Str("connectionURL", r.URL.String()).Msg("upgraded connection") conn := &Conn{ conn: c, log: t.log, } + t.log.Info().Interface("connectionfunc", t.connectionFunc).Msg("connection func is this, apparently, bus.Connect, again? request is in the conn.conn as an upgraded websocket connection") + + span.AddEvent("calling connection function") t.connectionFunc(conn) } } // SendMsg sends a message to the connection func (c *Conn) SendMsg(msg bus.Message) error { + ctx, span := tracing.Tracer.Start(msg.Context(), "conn.SendMsg", trace.WithAttributes( + attribute.String("request ID", msg.ParentID()), + )) + defer span.End() + + span.AddEvent("injecting the ctx into the message") + fmt.Printf("\n\n!!!!\n\ninjecting context into message\n") + otel.GetTextMapPropagator().Inject(ctx, msg) + fmt.Printf("\n\n---\n\ndone injecting context into message\n") + + ll := c.log.With().Str("requestID", msg.ParentID()). + Str("msg-uuid", msg.UUID()). + Str("node-uuid", c.nodeUUID).Logger() + + ll.Info().Strs("traceinfo-keys", msg.Keys()).Msg("uh what") + msgBytes, err := msg.Marshal() if err != nil { return errors.Wrap(err, "[transport-websocket] failed to Marshal message") } - c.log.Debug().Str("msgUUID", msg.UUID()). - Str("nodeUUID", c.nodeUUID).Msg("sending message to connection") + ll.Info().Str("messagebytes", string(msgBytes)).Msg("sending message to connection over binary") if err := c.WriteMessage(websocket.BinaryMessage, msgBytes); err != nil { if errors.Is(err, websocket.ErrCloseSent) { + ll.Err(err).Msg("websocket error close sent bla bla") return bus.ErrConnectionClosed } else if err == bus.ErrNodeWithdrawn { + ll.Err(err).Msg("node was withdrawn") return err } + ll.Err(err).Msg("some super different error with connection") + return errors.Wrap(err, "[transport-websocket] failed to WriteMessage") } - c.log.Debug().Str("msgUUID", msg.UUID()). - Str("nodeUUID", c.nodeUUID).Msg("sent message to connection") + ll.Info().Msg("sent message to connection") return nil } @@ -160,6 +193,7 @@ func (c *Conn) ReadMsg() (bus.Message, *bus.Withdraw, error) { } c.log.Debug(). + Str("requestID", msg.ParentID()). Str("msgUUID", msg.UUID()). Str("nodeUUID", c.nodeUUID). Msg("received message from node") diff --git a/foundation/scheduler/core.go b/foundation/scheduler/core.go index 410363ac..a5c851a6 100644 --- a/foundation/scheduler/core.go +++ b/foundation/scheduler/core.go @@ -6,6 +6,10 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + + "github.com/suborbital/e2core/foundation/tracing" ) // coreDoFunc is an internal version of DoFunc that takes a @@ -36,21 +40,48 @@ func newCore(log zerolog.Logger) *core { return c } -func (c *core) do(job *Job) *Result { +func (c *core) do(incomingJob *Job) *Result { + ctx, span := tracing.Tracer.Start(incomingJob.Context(), "core.do") + defer span.End() + + job := incomingJob.WithContext(ctx) + result := newResult(job.UUID()) + span.AddEvent("created a new job", trace.WithAttributes( + attribute.String("job-uuid", job.UUID()))) + + rid := "no-request" + if job.Req() != nil { + rid = job.Req().ID + } + ll := c.log.With().Str("requestID", rid).Logger() + + ll.Info().Msg("core.do function got called") + + span.AddEvent("core.scaler.findWorder for job type", trace.WithAttributes( + attribute.String("jobType", job.jobType), + )) jobWorker := c.scaler.findWorker(job.jobType) if jobWorker == nil { result.sendErr(fmt.Errorf("failed to getWorker for jobType %q", job.jobType)) return result } - go func() { - job.result = result + go func(gjob Job) { + ctx, span := tracing.Tracer.Start(gjob.Context(), "go func inside core.do") + defer span.End() + + ggjob := gjob.WithContext(ctx) + + ggjob.result = result + + ll.Info().Msg("jobworker got a job scheduled") - jobWorker.schedule(job) - }() + jobWorker.schedule(&ggjob) + }(job) + ll.Info().Msg("returning result from core.do func") return result } diff --git a/foundation/scheduler/job.go b/foundation/scheduler/job.go index d98b6760..9225c354 100644 --- a/foundation/scheduler/job.go +++ b/foundation/scheduler/job.go @@ -1,6 +1,7 @@ package scheduler import ( + "context" "encoding/json" "errors" @@ -15,6 +16,7 @@ type Job struct { jobType string result *Result data interface{} + ctx context.Context req *request.CoordinatedRequest } @@ -88,3 +90,12 @@ func (j Job) Data() interface{} { func (j Job) Req() *request.CoordinatedRequest { return j.req } + +func (j Job) WithContext(ctx context.Context) Job { + j.ctx = ctx + return j +} + +func (j Job) Context() context.Context { + return j.ctx +} diff --git a/foundation/scheduler/scheduler.go b/foundation/scheduler/scheduler.go index 0f76abf1..41aa4b18 100644 --- a/foundation/scheduler/scheduler.go +++ b/foundation/scheduler/scheduler.go @@ -1,13 +1,17 @@ package scheduler import ( + "context" "encoding/json" "os" "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/suborbital/e2core/foundation/bus/bus" + "github.com/suborbital/e2core/foundation/tracing" ) // MsgTypeReactrJobErr and others are Grav message types used for Scheduler job @@ -49,7 +53,22 @@ func NewWithLogger(log zerolog.Logger) *Scheduler { } // Do schedules a job to be worked on and returns a result object -func (r *Scheduler) Do(job Job) *Result { +func (r *Scheduler) Do(incomingJob Job) *Result { + ctx, span := tracing.Tracer.Start(incomingJob.Context(), "scheduler do with job") + defer span.End() + + job := incomingJob.WithContext(ctx) + + if job.Req() == nil { + r.log.Info(). + Str("requestID", "no-request"). + Msg("scheduler.Do function got called, passing it on to core.do") + } else { + r.log.Info(). + Str("requestID", job.Req().ID). + Msg("scheduler.Do function got called, passing it on to core.do") + } + return r.core.do(&job) } @@ -125,8 +144,14 @@ func (r *Scheduler) Listen(pod *bus.Pod, msgType string) { // ListenAndRun subscribes Scheduler to a messageType and calls `run` for each job result func (r *Scheduler) ListenAndRun(pod *bus.Pod, msgType string, run func(bus.Message, interface{}, error)) { - helper := func(data interface{}) *Result { - job := NewJob(msgType, data) + helper := func(ctx context.Context, data interface{}) *Result { + ctx, span := tracing.Tracer.Start(ctx, "helper function") + defer span.End() + + span.AddEvent("new job from data and msg type", trace.WithAttributes( + attribute.String("msgType", msgType), + )) + job := NewJob(msgType, data).WithContext(ctx) return r.Do(job) } @@ -134,7 +159,19 @@ func (r *Scheduler) ListenAndRun(pod *bus.Pod, msgType string, run func(bus.Mess // each time a message is received with the associated type, // execute the associated job and pass the result to `run` pod.OnType(msgType, func(msg bus.Message) error { - result, err := helper(msg.Data()).Then() + ctx, span := tracing.Tracer.Start(msg.Context(), "scheduler.ListenAndRun", trace.WithAttributes( + attribute.String("msgType", msgType), + )) + defer span.End() + + msg.SetContext(ctx) + r.log.Info(). + Str("msgType", msgType). + Str("requestID", msg.ParentID()). + Msg("scheduler.ListenAndRun called, msg turned into a job, and job passed to scheduler.Do function") + + span.AddEvent("sending msg.data to the helper") + result, err := helper(ctx, msg.Data()).Then() run(msg, result, err) diff --git a/foundation/scheduler/watcher.go b/foundation/scheduler/watcher.go index cf4a3d2a..1e573ef1 100644 --- a/foundation/scheduler/watcher.go +++ b/foundation/scheduler/watcher.go @@ -11,13 +11,13 @@ import ( // them for new jobs to send to the scheduler type watcher struct { schedules map[string]Schedule - scheduleFunc func(*Job) *Result + scheduleFunc coreDoFunc lock sync.RWMutex startOnce sync.Once } -func newWatcher(scheduleFunc func(*Job) *Result) *watcher { +func newWatcher(scheduleFunc coreDoFunc) *watcher { w := &watcher{ schedules: make(map[string]Schedule), scheduleFunc: scheduleFunc, @@ -43,13 +43,13 @@ func (w *watcher) watch(sched Schedule) { // loop forever and check each schedule for new jobs // repeating every second for { - remove := []string{} + remove := make([]string, 0) w.lock.RLock() - for uuid, s := range w.schedules { + for scheduledUUID, s := range w.schedules { if s.Done() { // set the schedule to be removed if it's done - remove = append(remove, uuid) + remove = append(remove, scheduledUUID) } else { if job := s.Check(); job != nil { // schedule the job and discard the result @@ -60,8 +60,8 @@ func (w *watcher) watch(sched Schedule) { w.lock.RUnlock() w.lock.Lock() - for _, uuid := range remove { - delete(w.schedules, uuid) + for _, uuidToRemove := range remove { + delete(w.schedules, uuidToRemove) } w.lock.Unlock() diff --git a/foundation/scheduler/worker.go b/foundation/scheduler/worker.go index b58db676..e46b3913 100644 --- a/foundation/scheduler/worker.go +++ b/foundation/scheduler/worker.go @@ -6,6 +6,8 @@ import ( "github.com/pkg/errors" "golang.org/x/sync/singleflight" + + "github.com/suborbital/e2core/foundation/tracing" ) const ( @@ -48,16 +50,28 @@ func newWorker(runner Runnable, doFunc coreDoFunc, opts workerOpts) *worker { return w } -func (w *worker) schedule(job *Job) { - go func() { +func (w *worker) schedule(incomingJob *Job) { + ctx, span := tracing.Tracer.Start(incomingJob.Context(), "worker.schedule") + defer span.End() + + job := incomingJob.WithContext(ctx) + + go func(incomingJob Job) { + ctx, span := tracing.Tracer.Start(incomingJob.Context(), "go func inside worker.schedule") + defer span.End() + + job := incomingJob.WithContext(ctx) + + span.AddEvent("reconciling pool size in worker") if err := w.reconcilePoolSize(); err != nil { job.result.sendErr(errors.Wrap(err, "failed to reconcilePoolSize")) return } - w.workChan <- job + span.AddEvent("adding job to the worker workchannel and incrementing the rate by one") + w.workChan <- &job w.rate.add() - }() + }(job) } // start ensures the worker is ready to receive jobs diff --git a/foundation/scheduler/workthread.go b/foundation/scheduler/workthread.go index d25b47fa..e0693498 100644 --- a/foundation/scheduler/workthread.go +++ b/foundation/scheduler/workthread.go @@ -3,6 +3,8 @@ package scheduler import ( "context" "time" + + "github.com/suborbital/e2core/foundation/tracing" ) type workThread struct { @@ -38,18 +40,22 @@ func (wt *workThread) run() { } // wait for the next job - job := <-wt.workChan + inJob := <-wt.workChan + ctx, span := tracing.Tracer.Start(inJob.Context(), "workthread.run in scheduler") + + job := inJob.WithContext(ctx) + var err error - ctx := newCtx(wt.doFunc) + workCtx := newCtx(wt.doFunc) var result interface{} if wt.timeoutSeconds == 0 { // we pass in a dereferenced job so that the Runner cannot modify it - result, err = wt.runner.Run(*job, ctx) + result, err = wt.runner.Run(job, workCtx) } else { - result, err = wt.runWithTimeout(job, ctx) + result, err = wt.runWithTimeout(&job, workCtx) } if err != nil { @@ -58,6 +64,8 @@ func (wt *workThread) run() { } job.result.sendResult(result) + + span.End() } }() } diff --git a/foundation/tracing/tracing.go b/foundation/tracing/tracing.go new file mode 100644 index 00000000..84ff2b6e --- /dev/null +++ b/foundation/tracing/tracing.go @@ -0,0 +1,114 @@ +package tracing + +import ( + "context" + "time" + + "github.com/pkg/errors" + "github.com/rs/zerolog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + sdkTrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" + + "github.com/suborbital/go-kit/observability" +) + +const ( + ExporterCollector CollectorType = "collector" + ExporterHoneycomb CollectorType = "honeycomb" +) + +type CollectorType string + +type Config struct { + Type CollectorType + ServiceName string + Probability float64 + Collector CollectorConfig + Honeycomb HoneycombConfig +} + +type CollectorConfig struct { + Endpoint string +} + +type HoneycombConfig struct { + Endpoint string + APIKey string + Dataset string +} + +var Tracer trace.Tracer + +// SetupTracing configure open telemetry to be used with otel exporter. Returns a tracer closer func and an error. +func SetupTracing(config Config, logger zerolog.Logger) (*sdkTrace.TracerProvider, error) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + var traceProvider *sdkTrace.TracerProvider + var err error + + ll := logger.With().Str("tracerType", string(config.Type)).Logger() + + switch config.Type { + case ExporterHoneycomb: + ll.Info().Msg("configuring honeycomb exporter for tracing") + + conn, err := observability.GrpcConnection(ctx, config.Honeycomb.Endpoint, nil) + if err != nil { + return nil, errors.Wrap(err, "honeycomb GrpcConnection") + } + + traceProvider, err = observability.HoneycombTracer(ctx, conn, observability.HoneycombTracingConfig{ + TracingConfig: observability.TracingConfig{ + Probability: config.Probability, + ServiceName: config.ServiceName, + }, + APIKey: config.Honeycomb.APIKey, + Dataset: config.Honeycomb.Dataset, + }) + if err != nil { + return nil, errors.Wrap(err, "observability.HoneycombTracer") + } + + ll.Info().Msg("created honeycomb sdkTrace exporter") + case ExporterCollector: + ll.Info().Msg("configuring collector exporter for tracing") + + conn, err := observability.GrpcConnection(ctx, config.Collector.Endpoint) + if err != nil { + ll.Err(err).Msg("observability.GrcpConnection failed") + return nil, errors.Wrap(err, "collector GrpcConnection") + } + + traceProvider, err = observability.OtelTracer(ctx, conn, observability.TracingConfig{ + Probability: config.Probability, + ServiceName: config.ServiceName, + }) + if err != nil { + ll.Err(err).Msg("observability.OtelTracer failed") + return nil, errors.Wrap(err, "observability.OtelTracer") + } + + ll.Info().Msg("created collector sdkTrace exporter") + default: + ll.Warn().Msg("unrecognised tracer type configuration. Defaulting to no tracer") + fallthrough + case "none", "": + // Create the most default sdkTrace provider and escape early. + traceProvider, err = observability.NoopTracer() + if err != nil { + return nil, errors.Wrap(err, "noop Tracer") + } + + ll.Info().Msg("finished setting up default noop tracer") + } + + ll.Info().Msg("setting up a global tracer") + Tracer = traceProvider.Tracer("e2core-bebby-tracing") + + otel.SetTextMapPropagator(propagation.TraceContext{}) + + return traceProvider, nil +} diff --git a/sat/engine2/engine.go b/sat/engine2/engine.go index 513bab5b..b6db45b9 100644 --- a/sat/engine2/engine.go +++ b/sat/engine2/engine.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/pkg/errors" + "github.com/rs/zerolog" "github.com/suborbital/e2core/foundation/scheduler" "github.com/suborbital/e2core/sat/engine2/api" @@ -19,9 +20,9 @@ type Engine struct { } // New creates a new Engine with the default API -func New(name string, ref *tenant.WasmModuleRef, api api.HostAPI) *Engine { +func New(name string, ref *tenant.WasmModuleRef, api api.HostAPI, logger zerolog.Logger) *Engine { e := &Engine{ - Scheduler: scheduler.New(), + Scheduler: scheduler.NewWithLogger(logger.With().Str("component", "engine.scheduler").Logger()), } runner := newRunnerFromRef(ref, api) diff --git a/sat/engine2/runtime/pool.go b/sat/engine2/runtime/pool.go index 0f737901..d729d338 100644 --- a/sat/engine2/runtime/pool.go +++ b/sat/engine2/runtime/pool.go @@ -1,12 +1,14 @@ package runtime import ( + "context" "sync" "github.com/bytecodealliance/wasmtime-go/v7" "github.com/pkg/errors" "github.com/suborbital/e2core/foundation/scheduler" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/e2core/sat/engine2/api" "github.com/suborbital/e2core/sat/engine2/runtime/instance" "github.com/suborbital/systemspec/tenant" @@ -65,7 +67,10 @@ func (ip *InstancePool) RemoveInstance() error { } // UseInstance provides an instance from the environment's pool to be used by a callback function -func (ip *InstancePool) UseInstance(ctx *scheduler.Ctx, instFunc func(*instance.Instance, int32)) error { +func (ip *InstancePool) UseInstance(ctx *scheduler.Ctx, spanCtx context.Context, instFunc func(context.Context, *instance.Instance, int32)) error { + spanCtx, span := tracing.Tracer.Start(spanCtx, "instancePool.UseInstance") + defer span.End() + go func() { // prepare a new instance if err := ip.AddInstance(); err != nil { @@ -81,6 +86,7 @@ func (ip *InstancePool) UseInstance(ctx *scheduler.Ctx, instFunc func(*instance. it = nil }(inst) + span.AddEvent("instance.Store") // generate a random identifier as a reference to the instance in use to // easily allow the Wasm module to reference itself when calling back over the FFI ident, err := instance.Store(inst) @@ -88,11 +94,12 @@ func (ip *InstancePool) UseInstance(ctx *scheduler.Ctx, instFunc func(*instance. return errors.Wrap(err, "failed to setupNewIdentifier") } + span.AddEvent("instance.UseCtx") // setup the instance's temporary state inst.UseCtx(ctx) // do the actual call into the Wasm module - instFunc(inst, ident) + instFunc(spanCtx, inst, ident) // clear the instance's temporary state inst.UseCtx(nil) diff --git a/sat/engine2/wasmrunnable.go b/sat/engine2/wasmrunnable.go index 9f36735a..cd2e1355 100644 --- a/sat/engine2/wasmrunnable.go +++ b/sat/engine2/wasmrunnable.go @@ -1,10 +1,13 @@ package engine2 import ( + "context" + "github.com/pkg/errors" "github.com/suborbital/e2core/e2core/sequence" "github.com/suborbital/e2core/foundation/scheduler" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/e2core/sat/engine2/api" "github.com/suborbital/e2core/sat/engine2/runtime" "github.com/suborbital/e2core/sat/engine2/runtime/instance" @@ -33,17 +36,20 @@ func newRunnerFromRef(ref *tenant.WasmModuleRef, api api.HostAPI) *wasmRunner { } // Run runs a wasmRunner -func (w *wasmRunner) Run(job scheduler.Job, ctx *scheduler.Ctx) (interface{}, error) { +func (w *wasmRunner) Run(incomingJob scheduler.Job, ctx *scheduler.Ctx) (interface{}, error) { + spanCtx, span := tracing.Tracer.Start(incomingJob.Context(), "wasmRunner.Run") + defer span.End() + + job := incomingJob.WithContext(spanCtx) + var jobBytes []byte var req *request.CoordinatedRequest // check to ensure the job is a CoordinatedRequest (pointer or bytes), and set up the WasmInstance if jobReq, ok := job.Data().(*request.CoordinatedRequest); ok { req = jobReq - } else if jobReq, err := request.FromJSON(job.Bytes()); err == nil { req = jobReq - } else { return nil, errors.New("job data is not a CoordinatedRequest") } @@ -71,13 +77,17 @@ func (w *wasmRunner) Run(job scheduler.Job, ctx *scheduler.Ctx) (interface{}, er var runErr error var callErr error - if err := w.pool.UseInstance(ctx, func(instance *instance.Instance, ident int32) { + if err := w.pool.UseInstance(ctx, spanCtx, func(ctx context.Context, instance *instance.Instance, ident int32) { + _, span := tracing.Tracer.Start(ctx, "instance function") + defer span.End() + inPointer, writeErr := instance.WriteMemory(jobBytes) if writeErr != nil { runErr = errors.Wrap(writeErr, "failed to instance.writeMemory") return } + span.AddEvent("instance.Call run_e") // execute the module's Run function, passing the input data and ident // set runErr but don't return because the ExecutionResult error should also be grabbed _, callErr = instance.Call("run_e", inPointer, int32(len(jobBytes)), ident) diff --git a/sat/main.go b/sat/main.go index b4417aa9..24aa77b3 100644 --- a/sat/main.go +++ b/sat/main.go @@ -10,20 +10,30 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog" + "github.com/sethvargo/go-envconfig" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/e2core/sat/sat" "github.com/suborbital/e2core/sat/sat/metrics" + satOptions "github.com/suborbital/e2core/sat/sat/options" ) func main() { zerolog.TimeFieldFormat = zerolog.TimeFormatUnix + + opts, err := satOptions.Resolve(envconfig.OsLookuper()) + if err != nil { + log.Fatalf("options.Resolve: %s", err) + } + logger := zerolog.New(os.Stderr).With(). Timestamp(). Str("service", "sat-module"). + Str("port", string(opts.Port)). Str("version", sat.SatDotVersion). Logger() - conf, err := sat.ConfigFromArgs(logger) + conf, err := sat.ConfigFromArgs(logger, opts) if err != nil { log.Fatal(err) } @@ -36,7 +46,7 @@ func main() { // start starts up the Sat instance func start(logger zerolog.Logger, conf *sat.Config) error { - traceProvider, err := sat.SetupTracing(conf.TracerConfig, logger) + traceProvider, err := tracing.SetupTracing(conf.TracerConfig, logger) if err != nil { return errors.Wrap(err, "setup tracing") } @@ -51,7 +61,7 @@ func start(logger zerolog.Logger, conf *sat.Config) error { } // initialize Reactr, echo, and Bus and wrap them in a sat instance. - s, err := sat.New(conf, logger, traceProvider, mtx) + s, err := sat.New(conf, logger, mtx) if err != nil { return errors.Wrap(err, "sat.New") } diff --git a/sat/sat/config.go b/sat/sat/config.go index debce8a2..a00f2841 100644 --- a/sat/sat/config.go +++ b/sat/sat/config.go @@ -11,9 +11,9 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog" - "github.com/sethvargo/go-envconfig" "gopkg.in/yaml.v3" + "github.com/suborbital/e2core/foundation/tracing" satOptions "github.com/suborbital/e2core/sat/sat/options" "github.com/suborbital/systemspec/capabilities" "github.com/suborbital/systemspec/fqmn" @@ -34,11 +34,11 @@ type Config struct { ControlPlaneUrl string EnvToken string ProcUUID string - TracerConfig satOptions.TracerConfig + TracerConfig tracing.Config MetricsConfig satOptions.MetricsConfig } -func ConfigFromArgs(l zerolog.Logger) (*Config, error) { +func ConfigFromArgs(l zerolog.Logger, opts satOptions.Options) (*Config, error) { flag.Parse() args := flag.Args() @@ -48,17 +48,13 @@ func ConfigFromArgs(l zerolog.Logger) (*Config, error) { moduleArg := args[0] - return ConfigFromModuleArg(l, moduleArg) + return ConfigFromModuleArg(l, opts, moduleArg) } -func ConfigFromModuleArg(logger zerolog.Logger, moduleArg string) (*Config, error) { +func ConfigFromModuleArg(logger zerolog.Logger, opts satOptions.Options, moduleArg string) (*Config, error) { var module *tenant.Module var FQMN fqmn.FQMN - - opts, err := satOptions.Resolve(envconfig.OsLookuper()) - if err != nil { - return nil, errors.Wrap(err, "ConfigFromModuleArg options.Resolve") - } + var err error // first, determine if we need to connect to a control plane controlPlane := "" @@ -79,7 +75,6 @@ func ConfigFromModuleArg(logger zerolog.Logger, moduleArg string) (*Config, erro // next, handle the module arg being a URL, an FQMN, or a path on disk if isURL(moduleArg) { - logger.Debug().Msg("fetching module from URL") tmpFile, err := downloadFromURL(moduleArg) if err != nil { return nil, errors.Wrap(err, "failed to downloadFromURL") @@ -88,8 +83,6 @@ func ConfigFromModuleArg(logger zerolog.Logger, moduleArg string) (*Config, erro moduleArg = tmpFile } else if FQMN, err = fqmn.Parse(moduleArg); err == nil { if useControlPlane { - logger.Debug().Msg("fetching module from control plane") - cpModule, err := appClient.GetModule(moduleArg) if err != nil { return nil, errors.Wrap(err, "failed to GetModule") @@ -135,17 +128,6 @@ func ConfigFromModuleArg(logger zerolog.Logger, moduleArg string) (*Config, erro jobType = module.FQMN prettyName = fmt.Sprintf("%s-%s", jobType, opts.ProcUUID[:6]) - - logger = logger.With(). - Str("app", prettyName). - Str("jobType", jobType). - Str("tenant", FQMN.Tenant). - Logger() - - logger.Debug().Msg("configuring") - logger.Debug().Msg("joining tenant") - } else { - logger.Debug().Str("jobType", jobType).Msg("configuring") } conns := make([]tenant.Connection, 0) @@ -155,6 +137,30 @@ func ConfigFromModuleArg(logger zerolog.Logger, moduleArg string) (*Config, erro } } + tc := tracing.Config{ + ServiceName: opts.TracerConfig.ServiceName, + Probability: opts.TracerConfig.Probability, + } + + switch opts.TracerConfig.TracerType { + case "collector": + tc.Type = tracing.ExporterCollector + case "honeycomb": + tc.Type = tracing.ExporterHoneycomb + } + + if opts.TracerConfig.HoneycombConfig != nil { + tc.Honeycomb = tracing.HoneycombConfig{ + Endpoint: opts.TracerConfig.HoneycombConfig.Endpoint, + APIKey: opts.TracerConfig.HoneycombConfig.APIKey, + Dataset: opts.TracerConfig.HoneycombConfig.Dataset, + } + } + + if opts.TracerConfig.Collector != nil { + tc.Collector = tracing.CollectorConfig{Endpoint: opts.TracerConfig.Collector.Endpoint} + } + // finally, put it all together c := &Config{ ModuleArg: moduleArg, @@ -166,7 +172,7 @@ func ConfigFromModuleArg(logger zerolog.Logger, moduleArg string) (*Config, erro Connections: conns, Port: portInt, ControlPlaneUrl: controlPlane, - TracerConfig: opts.TracerConfig, + TracerConfig: tc, MetricsConfig: opts.MetricsConfig, ProcUUID: string(opts.ProcUUID), } diff --git a/sat/sat/embedded.go b/sat/sat/embedded.go index 982e089f..7536ef72 100644 --- a/sat/sat/embedded.go +++ b/sat/sat/embedded.go @@ -1,17 +1,22 @@ package sat import ( + "context" "net/http" "github.com/google/uuid" "github.com/pkg/errors" "github.com/suborbital/e2core/foundation/scheduler" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/systemspec/request" ) // Exec takes input bytes, executes the loaded module, and returns the result -func (s *Sat) Exec(input []byte) (*request.CoordinatedResponse, error) { +func (s *Sat) Exec(ctx context.Context, input []byte) (*request.CoordinatedResponse, error) { + ctx, span := tracing.Tracer.Start(ctx, "sat.Exec") + defer span.End() + // construct a fake HTTP request from the input req := &request.CoordinatedRequest{ Method: http.MethodPost, @@ -24,7 +29,7 @@ func (s *Sat) Exec(input []byte) (*request.CoordinatedResponse, error) { State: map[string][]byte{}, } - result, err := s.engine.Do(scheduler.NewJob(s.config.JobType, req)).Then() + result, err := s.engine.Do(scheduler.NewJob(s.config.JobType, req).WithContext(ctx)).Then() if err != nil { return nil, errors.Wrap(err, "failed to exec") } diff --git a/sat/sat/handler.go b/sat/sat/handler.go index dbfef0ff..d8bbfcb4 100644 --- a/sat/sat/handler.go +++ b/sat/sat/handler.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/suborbital/e2core/foundation/scheduler" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/e2core/sat/engine2" "github.com/suborbital/e2core/sat/sat/metrics" "github.com/suborbital/systemspec/request" @@ -17,8 +18,8 @@ import ( func (s *Sat) handler(engine *engine2.Engine) echo.HandlerFunc { return func(c echo.Context) error { - spanCtx, span := s.tracer.Start(c.Request().Context(), "echoHandler", trace.WithAttributes( - attribute.String("request_id", c.Request().Header.Get("requestID")), + spanCtx, span := tracing.Tracer.Start(c.Request().Context(), "echoHandler", trace.WithAttributes( + attribute.String("requestID", c.Response().Header().Get(echo.HeaderXRequestID)), )) defer span.End() @@ -39,7 +40,7 @@ func (s *Sat) handler(engine *engine2.Engine) echo.HandlerFunc { return echo.NewHTTPError(http.StatusInternalServerError, "unknown error").SetInternal(fmt.Errorf("module %s is not registered", s.config.JobType)) } - result, err := engine.Do(scheduler.NewJob(s.config.JobType, req)).Then() + result, err := engine.Do(scheduler.NewJob(s.config.JobType, req).WithContext(spanCtx)).Then() if err != nil { if errors.As(err, &runErr) { // runErr would be an actual error returned from a function diff --git a/sat/sat/meshed.go b/sat/sat/meshed.go index 4ceb1ac5..4d803dc9 100644 --- a/sat/sat/meshed.go +++ b/sat/sat/meshed.go @@ -12,6 +12,7 @@ import ( "github.com/suborbital/e2core/e2core/server" "github.com/suborbital/e2core/foundation/bus/bus" "github.com/suborbital/e2core/foundation/scheduler" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/systemspec/request" ) @@ -19,7 +20,8 @@ import ( // when a meshed peer sends us a job, it is executed by Reactr and then // the result is passed into this function for handling func (s *Sat) handleFnResult(msg bus.Message, result interface{}, fnErr error) { - ll := s.logger.With().Str("method", "handleFnResult").Logger() + ll := s.logger.With().Str("method", "handleFnResult"). + Str("requestID", msg.ParentID()).Logger() // first unmarshal the request and sequence information req, err := request.FromJSON(msg.Data()) @@ -30,7 +32,7 @@ func (s *Sat) handleFnResult(msg bus.Message, result interface{}, fnErr error) { ctx := context.WithValue(context.Background(), "requestID", req.ID) - spanCtx, span := s.tracer.Start(ctx, "handleFnResult", trace.WithAttributes( + spanCtx, span := tracing.Tracer.Start(ctx, "handleFnResult", trace.WithAttributes( attribute.String("request_id", req.ID), )) defer span.End() @@ -55,12 +57,15 @@ func (s *Sat) handleFnResult(msg bus.Message, result interface{}, fnErr error) { if fnErr != nil { if fnRunErr, isRunErr := fnErr.(scheduler.RunErr); isRunErr { + ll.Err(fnErr).Msg("it's a run error") // great, it's a runErr runErr = fnRunErr } else { + ll.Err(fnErr).Msg("it's an exec error") execErr = fnErr } } else { + ll.Info().Msg("result is a coordinated response, hopefully") resp = result.(*request.CoordinatedResponse) } @@ -78,6 +83,8 @@ func (s *Sat) handleFnResult(msg bus.Message, result interface{}, fnErr error) { }(), } + ll.Info().Msg("sending the fn result back") + if err = s.sendFnResult(fnr, spanCtx); err != nil { ll.Err(err).Msg("s.sendFnResult") return @@ -89,6 +96,8 @@ func (s *Sat) handleFnResult(msg bus.Message, result interface{}, fnErr error) { return } + ll.Info().Msg("dealing with exec result for fn result") + if err = seq.HandleStepResults([]sequence.ExecResult{*fnr}); err != nil { ll.Err(err).Msg("seq.HandleStepResults") return @@ -103,6 +112,7 @@ func (s *Sat) handleFnResult(msg bus.Message, result interface{}, fnErr error) { req.SequenceJSON = stepJSON + ll.Info().Msg("sending next step") s.sendNextStep(msg, seq, req, spanCtx) } @@ -123,6 +133,7 @@ func (s *Sat) sendFnResult(result *sequence.ExecResult, ctx context.Context) err respMsg := bus.NewMsgWithParentID(server.MsgTypeSuborbitalResult, reqID, fnrJSON) s.logger.Debug(). + Str("requestID", reqID). Str("method", "sendFnResult"). Str("function", s.config.JobType). Str("respUUID", respMsg.UUID()). @@ -135,12 +146,14 @@ func (s *Sat) sendFnResult(result *sequence.ExecResult, ctx context.Context) err return nil } -func (s *Sat) sendNextStep(_ bus.Message, seq *sequence.Sequence, req *request.CoordinatedRequest, ctx context.Context) { - ll := s.logger.With().Str("method", "sendNextStep").Logger() - - span := trace.SpanFromContext(ctx) +func (s *Sat) sendNextStep(msg bus.Message, seq *sequence.Sequence, req *request.CoordinatedRequest, ctx context.Context) { + ctx, span := tracing.Tracer.Start(msg.Context(), "sat.sendNextStep") defer span.End() + msg.SetContext(ctx) + + ll := s.logger.With().Str("method", "sendNextStep").Str("requestID", msg.ParentID()).Logger() + nextStep := seq.NextStep() if nextStep == nil { ll.Debug().Msg("sequence completed, no nextStep message to send") diff --git a/sat/sat/sat.go b/sat/sat/sat.go index 90318a49..fdbe0289 100644 --- a/sat/sat/sat.go +++ b/sat/sat/sat.go @@ -12,7 +12,7 @@ import ( "github.com/labstack/echo/v4/middleware" "github.com/pkg/errors" "github.com/rs/zerolog" - "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho" "github.com/suborbital/e2core/foundation/bus/bus" "github.com/suborbital/e2core/foundation/bus/discovery/local" @@ -34,13 +34,12 @@ type Sat struct { pod *bus.Pod transport *websocket.Transport engine *engine2.Engine - tracer trace.Tracer metrics metrics.Metrics } // New initializes a Sat instance // if traceProvider is nil, the default NoopTraceProvider will be used -func New(config *Config, logger zerolog.Logger, traceProvider trace.TracerProvider, mtx metrics.Metrics) (*Sat, error) { +func New(config *Config, logger zerolog.Logger, mtx metrics.Metrics) (*Sat, error) { var module *tenant.WasmModuleRef if config.Module != nil && config.Module.WasmRef != nil && len(config.Module.WasmRef.Data) > 0 { @@ -59,22 +58,18 @@ func New(config *Config, logger zerolog.Logger, traceProvider trace.TracerProvid return nil, errors.Wrap(err, "failed to NewWithConfig") } - engine := engine2.New(config.JobType, module, engineAPI) - - if traceProvider == nil { - traceProvider = trace.NewNoopTracerProvider() - } + engine := engine2.New(config.JobType, module, engineAPI, logger) sat := &Sat{ config: config, logger: logger, engine: engine, - tracer: traceProvider.Tracer("sat"), metrics: mtx, } sat.server = echo.New() sat.server.Use( + otelecho.Middleware("e2core-bebby"), middleware.Recover(), ) sat.server.HTTPErrorHandler = kitError.Handler(logger) @@ -82,11 +77,13 @@ func New(config *Config, logger zerolog.Logger, traceProvider trace.TracerProvid // if a "transport" is configured, enable bus and metrics endpoints, otherwise enable server mode if config.ControlPlaneUrl != "" { + logger.Info().Msg("controlplane url is present, creating the websocket for transport, and the meta/message and meta/metrics endpoints") sat.transport = websocket.New() sat.server.GET("/meta/message", echo.WrapHandler(sat.transport.HTTPHandlerFunc())) sat.server.GET("/meta/metrics", sat.workerMetricsHandler()) } else { + logger.Info().Msg("controlplane url is not present, pass anything to sat.handler") // allow any HTTP method sat.server.Any("*", sat.handler(engine)) } @@ -170,7 +167,7 @@ func (s *Sat) setupBus() { opts := []bus.OptionsModifier{ bus.UseBelongsTo(s.config.Tenant), bus.UseInterests(s.config.JobType), - bus.UseLogger(s.logger), + bus.UseLogger(s.logger.With().Str("source", "sat.setupBus").Logger()), bus.UseMeshTransport(s.transport), bus.UseDiscovery(local.New()), bus.UseEndpoint(fmt.Sprintf("%d", s.config.Port), "/meta/message"), diff --git a/sat/sat/sat_test.go b/sat/sat/sat_test.go index 7290df31..7c40d3d2 100644 --- a/sat/sat/sat_test.go +++ b/sat/sat/sat_test.go @@ -13,9 +13,11 @@ import ( "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/sdk/trace" + sdkTrace "go.opentelemetry.io/otel/sdk/trace" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/e2core/sat/sat/metrics" + "github.com/suborbital/e2core/sat/sat/options" ) func TestEchoRequest(t *testing.T) { @@ -96,18 +98,18 @@ func TestPanicRequest(t *testing.T) { `, string(body)) } -func satForFile(filepath string) (*Sat, *trace.TracerProvider, error) { - config, err := ConfigFromModuleArg(zerolog.Nop(), filepath) +func satForFile(filepath string) (*Sat, *sdkTrace.TracerProvider, error) { + config, err := ConfigFromModuleArg(zerolog.Nop(), options.Options{}, filepath) if err != nil { return nil, nil, err } - traceProvider, err := SetupTracing(config.TracerConfig, zerolog.Nop()) + traceProvider, err := tracing.SetupTracing(tracing.Config{}, zerolog.Nop()) if err != nil { return nil, nil, errors.Wrap(err, "setup tracing") } - sat, err := New(config, zerolog.Nop(), traceProvider, metrics.SetupNoopMetrics()) + sat, err := New(config, zerolog.Nop(), metrics.SetupNoopMetrics()) if err != nil { return nil, nil, err } diff --git a/sat/sat/tracing.go b/sat/sat/tracing.go deleted file mode 100644 index b7cbb8f2..00000000 --- a/sat/sat/tracing.go +++ /dev/null @@ -1,87 +0,0 @@ -package sat - -import ( - "context" - "time" - - "github.com/pkg/errors" - "github.com/rs/zerolog" - "go.opentelemetry.io/otel/sdk/trace" - - "github.com/suborbital/e2core/sat/sat/options" - "github.com/suborbital/go-kit/observability" -) - -// SetupTracing configure open telemetry to be used with otel exporter. Returns a tracer closer func and an error. -func SetupTracing(config options.TracerConfig, logger zerolog.Logger) (*trace.TracerProvider, error) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - defer cancel() - - ll := logger.With().Str("tracerType", config.TracerType).Logger() - - switch config.TracerType { - case "honeycomb": - if config.HoneycombConfig == nil { - return nil, errors.New("missing honeycomb tracing config values") - } - - ll.Info().Msg("configuring honeycomb exporter for tracing") - - conn, err := observability.GrpcConnection(ctx, config.HoneycombConfig.Endpoint, nil) - if err != nil { - return nil, errors.Wrap(err, "honeycomb GrpcConnection") - } - - traceProvider, err := observability.HoneycombTracer(ctx, conn, observability.HoneycombTracingConfig{ - TracingConfig: observability.TracingConfig{ - Probability: config.Probability, - ServiceName: config.ServiceName, - }, - APIKey: config.HoneycombConfig.APIKey, - Dataset: config.HoneycombConfig.Dataset, - }) - if err != nil { - return nil, errors.Wrap(err, "observability.HoneycombTracer") - } - - ll.Info().Msg("created honeycomb trace exporter") - - return traceProvider, nil - case "collector": - if config.Collector == nil { - return nil, errors.New("missing collector tracing config values") - } - - ll.Info().Msg("configuring collector exporter for tracing") - - conn, err := observability.GrpcConnection(ctx, config.Collector.Endpoint) - if err != nil { - return nil, errors.Wrap(err, "collector GrpcConnection") - } - - traceProvider, err := observability.OtelTracer(ctx, conn, observability.TracingConfig{ - Probability: config.Probability, - ServiceName: config.ServiceName, - }) - if err != nil { - return nil, errors.Wrap(err, "observability.OtelTracer") - } - - ll.Info().Msg("created collector trace exporter") - - return traceProvider, nil - default: - ll.Warn().Msg("unrecognised tracer type configuration. Defaulting to no tracer") - fallthrough - case "none", "": - // Create the most default trace provider and escape early. - traceProvider, err := observability.NoopTracer() - if err != nil { - return nil, errors.Wrap(err, "noop Tracer") - } - - ll.Debug().Msg("finished setting up default noop tracer") - - return traceProvider, nil - } -}