Skip to content

Commit

Permalink
feat: add fetch of stacks state (#383)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Jul 7, 2023
1 parent 0df34a4 commit faf8536
Show file tree
Hide file tree
Showing 5 changed files with 473 additions and 225 deletions.
54 changes: 41 additions & 13 deletions components/agent/internal/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
"google.golang.org/grpc/metadata"
controllererrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)

type K8SClient interface {
Get(ctx context.Context, name string, options metav1.GetOptions) (*v1beta3.Stack, error)
Create(ctx context.Context, stack *v1beta3.Stack) (*v1beta3.Stack, error)
Update(ctx context.Context, stack *v1beta3.Stack) (*v1beta3.Stack, error)
Delete(ctx context.Context, name string) error
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
}

type Authenticator interface {
Expand Down Expand Up @@ -94,16 +96,22 @@ func (client *client) Connect(ctx context.Context) error {
return errors.Wrap(err, "authenticating client")
}

md.Append("id", client.id)
md.Append("baseUrl", client.baseUrl.String())
md.Append("production", func() string {
if client.production {
return "true"
}
return "false"
}())

connectContext := metadata.NewOutgoingContext(client.connectContext, md)
connectClient, err := client.grpcClient.Connect(connectContext, &generated.ConnectRequest{
Id: client.id,
BaseUrl: client.baseUrl.String(),
Production: client.production,
})
connectClient, err := client.grpcClient.Join(connectContext)
if err != nil {
return err
}
client.connectClient = connectClient

return nil
}

Expand Down Expand Up @@ -149,14 +157,20 @@ func (client *client) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

watcher, err := client.k8sClient.Watch(ctx, metav1.ListOptions{})
if err != nil {
return err
}
defer watcher.Stop()

var (
closed = false
errCh = make(chan error, 1)
msgs = make(chan *generated.ConnectResponse)
msgs = make(chan *generated.Order)
)
go func() {
for {
msg := &generated.ConnectResponse{}
msg := &generated.Order{}
if err := client.connectClient.RecvMsg(msg); err != nil {
if err == io.EOF {
if !closed {
Expand Down Expand Up @@ -188,7 +202,7 @@ func (client *client) Start(ctx context.Context) error {
}
client.connectCancel()
for {
msg := &generated.ConnectResponse{}
msg := &generated.Order{}
if err := client.connectClient.RecvMsg(msg); err != nil { // Drain messages
break
}
Expand All @@ -199,9 +213,23 @@ func (client *client) Start(ctx context.Context) error {
case err := <-errCh:
sharedlogging.FromContext(ctx).Errorf("Stream closed with error: %s", err)
return err
case k8sUpdate := <-watcher.ResultChan():
stack := k8sUpdate.Object.(*v1beta3.Stack)
sharedlogging.FromContext(ctx).Infof("Got update for stack '%s'", stack.Name)
if err := client.connectClient.SendMsg(&generated.StatusChanged{
StackId: stack.Name,
Status: func() generated.StackStatus {
if stack.IsReady() {
return generated.StackStatus_Ready
}
return generated.StackStatus_Progressing
}(),
}); err != nil {
sharedlogging.FromContext(ctx).Errorf("Unable to send stack status to server: %s", err)
}
case msg := <-msgs:
switch msg := msg.Message.(type) {
case *generated.ConnectResponse_ExistingStack:
case *generated.Order_ExistingStack:
crd := client.k8sStackFromProtobuf(msg.ExistingStack)
existingStack, err := client.k8sClient.Get(ctx, crd.Name, metav1.GetOptions{})
if err != nil {
Expand All @@ -222,12 +250,12 @@ func (client *client) Start(ctx context.Context) error {
}
sharedlogging.FromContext(ctx).Infof("stack %s updated", crd.Name)

case *generated.ConnectResponse_DeletedStack:
case *generated.Order_DeletedStack:
if err := client.k8sClient.Delete(ctx, msg.DeletedStack.ClusterName); err != nil {
sharedlogging.FromContext(ctx).Errorf("creating deleting cluster side: %s", err)
}
sharedlogging.FromContext(ctx).Infof("stack %s deleted", msg.DeletedStack.ClusterName)
case *generated.ConnectResponse_DisabledStack:
case *generated.Order_DisabledStack:
existingStack, err := client.k8sClient.Get(ctx, msg.DisabledStack.ClusterName, metav1.GetOptions{})
if err != nil {
if controllererrors.IsNotFound(err) {
Expand All @@ -243,7 +271,7 @@ func (client *client) Start(ctx context.Context) error {
continue
}
sharedlogging.FromContext(ctx).Infof("stack %s disabled", msg.DisabledStack.ClusterName)
case *generated.ConnectResponse_EnabledStack:
case *generated.Order_EnabledStack:
existingStack, err := client.k8sClient.Get(ctx, msg.EnabledStack.ClusterName, metav1.GetOptions{})
if err != nil {
if controllererrors.IsNotFound(err) {
Expand All @@ -259,7 +287,7 @@ func (client *client) Start(ctx context.Context) error {
continue
}
sharedlogging.FromContext(ctx).Infof("stack %s enabled", msg.EnabledStack.ClusterName)
case *generated.ConnectResponse_UpdateUsageReport:
case *generated.Order_UpdateUsageReport:
total, err := CountDocument(msg.UpdateUsageReport.ClusterName)
if err != nil {
sharedlogging.FromContext(ctx).Errorf("counting documents: %s", err)
Expand Down
Loading

1 comment on commit faf8536

@vercel
Copy link

@vercel vercel bot commented on faf8536 Jul 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.