Skip to content

Commit

Permalink
feat: support periodic ping to principal for keeping connection alive
Browse files Browse the repository at this point in the history
Signed-off-by: Jayendra Parsai <[email protected]>
  • Loading branch information
Jayendra Parsai committed Feb 4, 2025
1 parent bc9011d commit d823bdd
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 0 deletions.
2 changes: 2 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type AgentOptions struct {
//
// However, as of this writing, this feature is not available.
namespaces []string

pingInterval int
}

type AgentOption func(*Agent) error
Expand Down
13 changes: 13 additions & 0 deletions agent/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error
if q == nil {
return fmt.Errorf("no send queue found for the remote principal")
}

go func() {
if a.options.pingInterval > 0 {
logCtx.Debugf(fmt.Sprintf("Agent ping to principal is enabled, agent will send a ping event after every %d minutes.", a.options.pingInterval))
for {
time.Sleep(time.Minute * time.Duration(a.options.pingInterval))
a.eventWriter.Add(event.NewEventSource(fmt.Sprintf("agent://%s", a.mode)).PingEvent(event.Ping))
}
} else {
logCtx.Debugf("Agent ping to principal is disabled")
}
}()

// Get() is blocking until there is at least one item in the
// queue.
logCtx.Tracef("Waiting to grab an item from queue as it appears")
Expand Down
7 changes: 7 additions & 0 deletions agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,10 @@ func WithMode(mode string) AgentOption {
return nil
}
}

func WithPingInterval(t int) AgentOption {
return func(a *Agent) error {
a.options.pingInterval = t
return nil
}
}
6 changes: 6 additions & 0 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func NewAgentRunCommand() *cobra.Command {
tlsClientCrt string
tlsClientKey string
enableWebSocket bool
pingInterval int
)
command := &cobra.Command{
Short: "Run the argocd-agent agent component",
Expand Down Expand Up @@ -119,6 +120,8 @@ func NewAgentRunCommand() *cobra.Command {
}
agentOpts = append(agentOpts, agent.WithRemote(remote))
agentOpts = append(agentOpts, agent.WithMode(agentMode))
agentOpts = append(agentOpts, agent.WithPingInterval(pingInterval))

ag, err := agent.NewAgent(ctx, kubeConfig, namespace, agentOpts...)
if err != nil {
cmdutil.Fatal("Could not create a new agent instance: %v", err)
Expand Down Expand Up @@ -166,6 +169,9 @@ func NewAgentRunCommand() *cobra.Command {
command.Flags().BoolVar(&enableWebSocket, "enable-websocket",
env.BoolWithDefault("ARGOCD_AGENT_ENABLE_WEBSOCKET", false),
"Agent will rely on gRPC over WebSocket to stream events to the Principal")
command.Flags().IntVar(&pingInterval, "agent-ping-interval",
env.NumWithDefault("AGENT_PING_INTERVAL", nil, 0),
"Time interval for agent to send a ping event to principal for keeping connection alive")

command.Flags().StringVar(&kubeConfig, "kubeconfig", "", "Path to a kubeconfig file to use")
command.Flags().StringVar(&kubeContext, "kubecontext", "", "Override the default kube context")
Expand Down
12 changes: 12 additions & 0 deletions internal/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const (
TargetAppProject EventTarget = "appproject"
TargetEventAck EventTarget = "eventProcessed"
TargetResource EventTarget = "resource"
TargetPing EventTarget = "ping"
)

const (
Expand Down Expand Up @@ -256,6 +257,15 @@ func (evs EventSource) ProcessedEvent(evType EventType, ev *Event) *cloudevents.
return &cev
}

func (evs EventSource) PingEvent(evType EventType) *cloudevents.Event {
cev := cloudevents.NewEvent()
cev.SetSource(evs.source)
cev.SetSpecVersion(cloudEventSpecVersion)
cev.SetType(evType.String())
cev.SetDataSchema(TargetPing.String())
return &cev
}

// FromWire validates an event from the wire in protobuf format, converts it
// into an Event object and returns it. If the event on the wire is invalid,
// or could not be converted for another reason, FromWire returns an error.
Expand Down Expand Up @@ -283,6 +293,8 @@ func Target(raw *cloudevents.Event) EventTarget {
return TargetResource
case TargetEventAck.String():
return TargetEventAck
case TargetPing.String():
return TargetPing
}
return ""
}
Expand Down
8 changes: 8 additions & 0 deletions principal/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func (s *Server) processRecvQueue(ctx context.Context, agentName string, q workq
err = s.processAppProjectEvent(ctx, agentName, ev)
case event.TargetResource:
err = s.processResourceEventResponse(ctx, agentName, ev)
case event.TargetPing:
err = s.processPingEvent()
default:
err = fmt.Errorf("unknown target: '%s'", target)
}
Expand Down Expand Up @@ -252,6 +254,12 @@ func (s *Server) processResourceEventResponse(ctx context.Context, agentName str
return err
}

// processPingEvent processes an incoming ping event
func (s *Server) processPingEvent() error {
// Do nothing and return nil since this event is sent just to keep connection alive
return nil
}

// eventProcessor is the main loop to process event from the receiver queue,
// i.e. events coming from the connect agents. It will process events from
// different agents in parallel, but it will not parallelize processing of
Expand Down

0 comments on commit d823bdd

Please sign in to comment.