Skip to content

Commit

Permalink
fix goroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmasi committed Jul 18, 2024
1 parent b3d2385 commit 9389156
Showing 1 changed file with 28 additions and 37 deletions.
65 changes: 28 additions & 37 deletions x/wire/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"

wpb "github.com/openconfig/kne/proto/wire"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
log "k8s.io/klog/v2"
Expand Down Expand Up @@ -82,58 +83,48 @@ func NewWire(src ReadWriter) *Wire {
}

func (w *Wire) Transmit(ctx context.Context, stream Stream) error {
errs := make(chan error)
go func() {
<-ctx.Done()
if err := ctx.Err(); err != nil {
errs <- err
}
close(errs)
}()
go func() {
g := new(errgroup.Group)
g.Go(func() error {
for {
p, err := stream.Recv()
if err == io.EOF {
// read done.
log.Infof("EOF recv from stream")
close(errs)
return
return nil
}
if err != nil {
errs <- fmt.Errorf("failed to receive a packet: %v", err)
close(errs)
return
return fmt.Errorf("failed to receive packet: %v", err)
}
log.Infof("Recv packet: %q", string(p.Data))
if err := w.src.Write(p.Data); err != nil {
errs <- fmt.Errorf("failed to write the received packet: %v", err)
close(errs)
return
return fmt.Errorf("failed to write packet: %v", err)
}
log.Infof("Wrote packet: %q", string(p.Data))
}
}()
for {
data, err := w.src.Read()
if err == io.EOF {
// read done.
log.Infof("EOF reading from src")
break
}
if err != nil {
return err
})
g.Go(func() error {
if cs, ok := stream.(grpc.ClientStream); ok {
defer cs.CloseSend()
}
p := &wpb.Packet{Data: data}
log.Infof("Read packet: %q", string(p.Data))
if err := stream.Send(p); err != nil {
return fmt.Errorf("failed to send packet: %v", err)
for {
data, err := w.src.Read()
if err == io.EOF {
// read done.
log.Infof("EOF reading from src")
return nil
}
if err != nil {
return fmt.Errorf("failed to read packet: %v", err)
}
p := &wpb.Packet{Data: data}
log.Infof("Read packet: %q", string(p.Data))
if err := stream.Send(p); err != nil {
return fmt.Errorf("failed to send packet: %v", err)
}
log.Infof("Sent packet: %q", string(p.Data))
}
log.Infof("Sent packet: %q", string(p.Data))
}
if cs, ok := stream.(grpc.ClientStream); ok {
cs.CloseSend()
}
return <-errs
})
return g.Wait()
}

type PhysicalEndpoint struct {
Expand Down

0 comments on commit 9389156

Please sign in to comment.