diff --git a/.gitignore b/.gitignore index 296cef38..e3e04153 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ kne_cli/kne_cli kne_cli/kne controller/server/server x/webhook/webhook +x/wire/client/client +x/wire/server/server diff --git a/go.mod b/go.mod index 02246343..b995101b 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/wI2L/jsondiff v0.5.1 go.universe.tf/metallb v0.13.5 golang.org/x/oauth2 v0.14.0 + golang.org/x/sync v0.5.0 google.golang.org/api v0.149.0 google.golang.org/grpc v1.61.0 google.golang.org/protobuf v1.33.0 @@ -136,7 +137,6 @@ require ( golang.org/x/crypto v0.21.0 // indirect golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/net v0.23.0 // indirect - golang.org/x/sync v0.5.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/proto/generate.go b/proto/generate.go index 39b168e0..8a8e2458 100644 --- a/proto/generate.go +++ b/proto/generate.go @@ -4,4 +4,5 @@ package proto //go:generate protoc --go_out=./topo --go_opt=paths=source_relative ./topo.proto //go:generate protoc --go_out=./ceos --go_opt=paths=source_relative ./ceos.proto //go:generate protoc --go_out=./controller --go-grpc_out=./controller --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative ./controller.proto +//go:generate protoc --go_out=./wire --go-grpc_out=./wire --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative ./wire.proto //go:generate protoc --go_out=./event --go-grpc_out=./event --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative ./event.proto diff --git a/proto/wire.proto b/proto/wire.proto new file mode 100644 index 00000000..91168d2a --- /dev/null +++ b/proto/wire.proto @@ -0,0 +1,30 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +syntax = "proto3"; + +package wire; + +option go_package = "github.com/openconfig/kne/proto/wire"; + +// Wire service definition. +service Wire { + // Transmit sets up a bidirectional Packet stream. + // Metadata can be configured to identify the destination. + rpc Transmit(stream Packet) returns (stream Packet); +} + +// Packet is a wrapper around bytes. +message Packet { + bytes data = 1; +} diff --git a/proto/wire/wire.pb.go b/proto/wire/wire.pb.go new file mode 100644 index 00000000..4f0ba709 --- /dev/null +++ b/proto/wire/wire.pb.go @@ -0,0 +1,163 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: wire.proto + +package wire + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Packet is a wrapper around bytes. +type Packet struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *Packet) Reset() { + *x = Packet{} + if protoimpl.UnsafeEnabled { + mi := &file_wire_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Packet) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Packet) ProtoMessage() {} + +func (x *Packet) ProtoReflect() protoreflect.Message { + mi := &file_wire_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Packet.ProtoReflect.Descriptor instead. +func (*Packet) Descriptor() ([]byte, []int) { + return file_wire_proto_rawDescGZIP(), []int{0} +} + +func (x *Packet) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +var File_wire_proto protoreflect.FileDescriptor + +var file_wire_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x77, 0x69, 0x72, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x77, 0x69, + 0x72, 0x65, 0x22, 0x1c, 0x0a, 0x06, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x12, 0x0a, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x32, 0x32, 0x0a, 0x04, 0x57, 0x69, 0x72, 0x65, 0x12, 0x2a, 0x0a, 0x08, 0x54, 0x72, 0x61, 0x6e, + 0x73, 0x6d, 0x69, 0x74, 0x12, 0x0c, 0x2e, 0x77, 0x69, 0x72, 0x65, 0x2e, 0x50, 0x61, 0x63, 0x6b, + 0x65, 0x74, 0x1a, 0x0c, 0x2e, 0x77, 0x69, 0x72, 0x65, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, + 0x28, 0x01, 0x30, 0x01, 0x42, 0x26, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2f, 0x6b, 0x6e, + 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x77, 0x69, 0x72, 0x65, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_wire_proto_rawDescOnce sync.Once + file_wire_proto_rawDescData = file_wire_proto_rawDesc +) + +func file_wire_proto_rawDescGZIP() []byte { + file_wire_proto_rawDescOnce.Do(func() { + file_wire_proto_rawDescData = protoimpl.X.CompressGZIP(file_wire_proto_rawDescData) + }) + return file_wire_proto_rawDescData +} + +var file_wire_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_wire_proto_goTypes = []interface{}{ + (*Packet)(nil), // 0: wire.Packet +} +var file_wire_proto_depIdxs = []int32{ + 0, // 0: wire.Wire.Transmit:input_type -> wire.Packet + 0, // 1: wire.Wire.Transmit:output_type -> wire.Packet + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_wire_proto_init() } +func file_wire_proto_init() { + if File_wire_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_wire_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Packet); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_wire_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_wire_proto_goTypes, + DependencyIndexes: file_wire_proto_depIdxs, + MessageInfos: file_wire_proto_msgTypes, + }.Build() + File_wire_proto = out.File + file_wire_proto_rawDesc = nil + file_wire_proto_goTypes = nil + file_wire_proto_depIdxs = nil +} diff --git a/proto/wire/wire_grpc.pb.go b/proto/wire/wire_grpc.pb.go new file mode 100644 index 00000000..6174209d --- /dev/null +++ b/proto/wire/wire_grpc.pb.go @@ -0,0 +1,141 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.12 +// source: wire.proto + +package wire + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// WireClient is the client API for Wire service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type WireClient interface { + // Transmit sets up a bidirectional Packet stream. + // Metadata can be configured to identify the destination. + Transmit(ctx context.Context, opts ...grpc.CallOption) (Wire_TransmitClient, error) +} + +type wireClient struct { + cc grpc.ClientConnInterface +} + +func NewWireClient(cc grpc.ClientConnInterface) WireClient { + return &wireClient{cc} +} + +func (c *wireClient) Transmit(ctx context.Context, opts ...grpc.CallOption) (Wire_TransmitClient, error) { + stream, err := c.cc.NewStream(ctx, &Wire_ServiceDesc.Streams[0], "/wire.Wire/Transmit", opts...) + if err != nil { + return nil, err + } + x := &wireTransmitClient{stream} + return x, nil +} + +type Wire_TransmitClient interface { + Send(*Packet) error + Recv() (*Packet, error) + grpc.ClientStream +} + +type wireTransmitClient struct { + grpc.ClientStream +} + +func (x *wireTransmitClient) Send(m *Packet) error { + return x.ClientStream.SendMsg(m) +} + +func (x *wireTransmitClient) Recv() (*Packet, error) { + m := new(Packet) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// WireServer is the server API for Wire service. +// All implementations must embed UnimplementedWireServer +// for forward compatibility +type WireServer interface { + // Transmit sets up a bidirectional Packet stream. + // Metadata can be configured to identify the destination. + Transmit(Wire_TransmitServer) error + mustEmbedUnimplementedWireServer() +} + +// UnimplementedWireServer must be embedded to have forward compatible implementations. +type UnimplementedWireServer struct { +} + +func (UnimplementedWireServer) Transmit(Wire_TransmitServer) error { + return status.Errorf(codes.Unimplemented, "method Transmit not implemented") +} +func (UnimplementedWireServer) mustEmbedUnimplementedWireServer() {} + +// UnsafeWireServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to WireServer will +// result in compilation errors. +type UnsafeWireServer interface { + mustEmbedUnimplementedWireServer() +} + +func RegisterWireServer(s grpc.ServiceRegistrar, srv WireServer) { + s.RegisterService(&Wire_ServiceDesc, srv) +} + +func _Wire_Transmit_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(WireServer).Transmit(&wireTransmitServer{stream}) +} + +type Wire_TransmitServer interface { + Send(*Packet) error + Recv() (*Packet, error) + grpc.ServerStream +} + +type wireTransmitServer struct { + grpc.ServerStream +} + +func (x *wireTransmitServer) Send(m *Packet) error { + return x.ServerStream.SendMsg(m) +} + +func (x *wireTransmitServer) Recv() (*Packet, error) { + m := new(Packet) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Wire_ServiceDesc is the grpc.ServiceDesc for Wire service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Wire_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "wire.Wire", + HandlerType: (*WireServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Transmit", + Handler: _Wire_Transmit_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "wire.proto", +} diff --git a/x/wire/client/main.go b/x/wire/client/main.go new file mode 100644 index 00000000..77fb25f7 --- /dev/null +++ b/x/wire/client/main.go @@ -0,0 +1,79 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package main provides an example wire client. +package main + +import ( + "context" + "flag" + + wpb "github.com/openconfig/kne/proto/wire" + "github.com/openconfig/kne/x/wire" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + log "k8s.io/klog/v2" +) + +var ( + addr = flag.String("addr", "localhost:50058", "Wire server address") +) + +func main() { + ctx := context.Background() + frw1, err := wire.NewFileReadWriter("testdata/fwdxx01sql17src.txt", "testdata/fwdxx01sql17dst.txt") + if err != nil { + log.Fatalf("Failed to create file based read/writer: %v", err) + } + defer frw1.Close() + frw2, err := wire.NewFileReadWriter("testdata/fwdxx02sql17src.txt", "testdata/fwdxx02sql17dst.txt") + if err != nil { + log.Fatalf("Failed to create file based read/writer: %v", err) + } + defer frw2.Close() + endpoints := map[*wire.PhysicalEndpoint]*wire.Wire{ + wire.NewPhysicalEndpoint("xx01.sql17", "Ethernet0/0/0/0"): wire.NewWire(frw1), + wire.NewPhysicalEndpoint("xx02.sql17", "Ethernet0/0/0/1"): wire.NewWire(frw2), + } + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + conn, err := grpc.DialContext(ctx, *addr, opts...) + if err != nil { + log.Fatalf("Failed to dial %q: %v", *addr, err) + } + defer conn.Close() + c := wpb.NewWireClient(conn) + g := new(errgroup.Group) + for e, w := range endpoints { + e := e + w := w + g.Go(func() error { + octx := e.NewContext(ctx) + stream, err := c.Transmit(octx) + if err != nil { + return err + } + defer func() { + stream.CloseSend() + }() + log.Infof("Transmitting endpoint %v over wire...", e) + return w.Transmit(ctx, stream) + }) + } + if err := g.Wait(); err != nil { + log.Fatalf("Failed to wait for wire transmits: %v", err) + } +} diff --git a/x/wire/client/testdata/fwdxx01sql17src.txt b/x/wire/client/testdata/fwdxx01sql17src.txt new file mode 100644 index 00000000..e49d61ba --- /dev/null +++ b/x/wire/client/testdata/fwdxx01sql17src.txt @@ -0,0 +1,3 @@ +this is a packet +this is a different packet +finally, packet 3 diff --git a/x/wire/client/testdata/fwdxx02sql17src.txt b/x/wire/client/testdata/fwdxx02sql17src.txt new file mode 100644 index 00000000..7438df1c --- /dev/null +++ b/x/wire/client/testdata/fwdxx02sql17src.txt @@ -0,0 +1 @@ +i would like this packet forwarded to xx02 please diff --git a/x/wire/server/main.go b/x/wire/server/main.go new file mode 100644 index 00000000..898e2f4d --- /dev/null +++ b/x/wire/server/main.go @@ -0,0 +1,85 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package main provides an example wire server. +package main + +import ( + "flag" + "fmt" + "net" + + log "github.com/golang/glog" + wpb "github.com/openconfig/kne/proto/wire" + "github.com/openconfig/kne/x/wire" + "google.golang.org/grpc" +) + +var ( + port = flag.Int("port", 50058, "Wire server port") +) + +type server struct { + wpb.UnimplementedWireServer + endpoints map[wire.PhysicalEndpoint]*wire.Wire +} + +func newServer(endpoints map[wire.PhysicalEndpoint]*wire.Wire) *server { + return &server{endpoints: endpoints} +} + +func (s *server) Transmit(stream wpb.Wire_TransmitServer) error { + pe, err := wire.ParsePhysicalEndpoint(stream.Context()) + if err != nil { + return fmt.Errorf("unable to parse physical endpoint from incoming stream context: %v", err) + } + log.Infof("New Transmit stream started for endpoint %v", pe) + w, ok := s.endpoints[*pe] + if !ok { + return fmt.Errorf("no endpoint found on server for request: %v", pe) + } + if err := w.Transmit(stream.Context(), stream); err != nil { + return fmt.Errorf("transmit failed: %v", err) + } + return nil +} + +func main() { + flag.Parse() + addr := fmt.Sprintf(":%d", *port) + lis, err := net.Listen("tcp6", addr) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + s := grpc.NewServer() + frw1, err := wire.NewFileReadWriter("testdata/xx01sql17src.txt", "testdata/xx01sql17dst.txt") + if err != nil { + log.Fatalf("Failed to create file based read/writer: %v", err) + } + defer frw1.Close() + frw2, err := wire.NewFileReadWriter("testdata/xx02sql17src.txt", "testdata/xx02sql17dst.txt") + if err != nil { + log.Fatalf("Failed to create file based read/writer: %v", err) + } + defer frw2.Close() + endpoints := map[wire.PhysicalEndpoint]*wire.Wire{ + *wire.NewPhysicalEndpoint("xx01.sql17", "Ethernet0/0/0/0"): wire.NewWire(frw1), + *wire.NewPhysicalEndpoint("xx02.sql17", "Ethernet0/0/0/1"): wire.NewWire(frw2), + } + wpb.RegisterWireServer(s, newServer(endpoints)) + log.Infof("Wire server listening at %v", lis.Addr()) + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } +} diff --git a/x/wire/server/testdata/xx01sql17src.txt b/x/wire/server/testdata/xx01sql17src.txt new file mode 100644 index 00000000..7218c691 --- /dev/null +++ b/x/wire/server/testdata/xx01sql17src.txt @@ -0,0 +1,2 @@ +here is a packet coming from the physical device +this is a second packet from the device diff --git a/x/wire/server/testdata/xx02sql17src.txt b/x/wire/server/testdata/xx02sql17src.txt new file mode 100644 index 00000000..49da9e43 --- /dev/null +++ b/x/wire/server/testdata/xx02sql17src.txt @@ -0,0 +1,3 @@ +xx02 reporting for duty +second packet form xx02 +last one from xx02 diff --git a/x/wire/wire.go b/x/wire/wire.go new file mode 100644 index 00000000..fafc1125 --- /dev/null +++ b/x/wire/wire.go @@ -0,0 +1,179 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package wire provides a generic wire transport. +package wire + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + + wpb "github.com/openconfig/kne/proto/wire" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + log "k8s.io/klog/v2" +) + +type ReadWriter interface { + Read() ([]byte, error) + Write([]byte) error +} + +type FileReadWriter struct { + src *os.File + srcScanner *bufio.Scanner + dst *os.File +} + +func NewFileReadWriter(srcPath string, dstPath string) (*FileReadWriter, error) { + src, err := os.Open(srcPath) + if err != nil { + return nil, err + } + dst, err := os.Create(dstPath) + if err != nil { + return nil, err + } + f := &FileReadWriter{ + src: src, + srcScanner: bufio.NewScanner(src), + dst: dst, + } + return f, nil +} + +func (f *FileReadWriter) Read() ([]byte, error) { + if ok := f.srcScanner.Scan(); ok { + return append(f.srcScanner.Bytes(), []byte("\n")...), nil + } + if err := f.srcScanner.Err(); err != nil { + return nil, err + } + return nil, io.EOF +} + +func (f *FileReadWriter) Write(b []byte) error { + if _, err := f.dst.Write(b); err != nil { + return err + } + return nil +} + +func (f *FileReadWriter) Close() error { + if err := f.src.Close(); err != nil { + return err + } + if err := f.dst.Close(); err != nil { + return err + } + return nil +} + +type Stream interface { + Recv() (*wpb.Packet, error) + Send(*wpb.Packet) error +} + +type Wire struct { + src ReadWriter +} + +func NewWire(src ReadWriter) *Wire { + return &Wire{src: src} +} + +func (w *Wire) Transmit(ctx context.Context, stream Stream) error { + g := new(errgroup.Group) + g.Go(func() error { + for { + p, err := stream.Recv() + if err == io.EOF { + // read done. + log.Infof("EOF recv from stream") + return nil + } + if err != nil { + return fmt.Errorf("failed to receive packet: %v", err) + } + log.Infof("Recv packet: %q", string(p.Data)) + if err := w.src.Write(p.Data); err != nil { + return fmt.Errorf("failed to write packet: %v", err) + } + log.Infof("Wrote packet: %q", string(p.Data)) + } + }) + g.Go(func() error { + if cs, ok := stream.(grpc.ClientStream); ok { + defer cs.CloseSend() + } + for { + data, err := w.src.Read() + if err == io.EOF { + // read done. + log.Infof("EOF reading from src") + return nil + } + if err != nil { + return fmt.Errorf("failed to read packet: %v", err) + } + p := &wpb.Packet{Data: data} + log.Infof("Read packet: %q", string(p.Data)) + if err := stream.Send(p); err != nil { + return fmt.Errorf("failed to send packet: %v", err) + } + log.Infof("Sent packet: %q", string(p.Data)) + } + }) + return g.Wait() +} + +type PhysicalEndpoint struct { + device string + intf string +} + +func NewPhysicalEndpoint(device, intf string) *PhysicalEndpoint { + return &PhysicalEndpoint{device: device, intf: intf} +} + +func ParsePhysicalEndpoint(ctx context.Context) (*PhysicalEndpoint, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, fmt.Errorf("no metadata in incoming context") + } + p := &PhysicalEndpoint{} + vals := md.Get("device") + if len(vals) != 1 || vals[0] == "" { + return nil, fmt.Errorf("device key not found") + } + p.device = vals[0] + vals = md.Get("interface") + if len(vals) != 1 || vals[0] == "" { + return nil, fmt.Errorf("interface key not found") + } + p.intf = vals[0] + return p, nil +} + +func (p *PhysicalEndpoint) NewContext(ctx context.Context) context.Context { + md := metadata.New(map[string]string{ + "device": p.device, + "interface": p.intf, + }) + return metadata.NewOutgoingContext(ctx, md) +} diff --git a/x/wire/wire_test.go b/x/wire/wire_test.go new file mode 100644 index 00000000..ef5b3c47 --- /dev/null +++ b/x/wire/wire_test.go @@ -0,0 +1,112 @@ +package wire + +import ( + "context" + "io" + "testing" + + wpb "github.com/openconfig/kne/proto/wire" + "google.golang.org/grpc/metadata" +) + +type FakeStream struct { + incoming []*wpb.Packet + i int + outgoing []*wpb.Packet +} + +func NewFakeStream(packets []*wpb.Packet) *FakeStream { + return &FakeStream{incoming: packets, outgoing: []*wpb.Packet{}} +} + +func (f *FakeStream) Recv() (*wpb.Packet, error) { + if f.i >= len(f.incoming) { + return nil, io.EOF + } + p := f.incoming[f.i] + f.i++ + return p, nil +} + +func (f *FakeStream) Send(p *wpb.Packet) error { + f.outgoing = append(f.outgoing, p) + return nil +} + +type FakeReadWriter struct { + src [][]byte + i int + dst [][]byte +} + +func NewFakeReadWriter(b [][]byte) *FakeReadWriter { + return &FakeReadWriter{src: b, dst: [][]byte{}} +} + +func (f *FakeReadWriter) Read() ([]byte, error) { + if f.i >= len(f.src) { + return nil, io.EOF + } + b := f.src[f.i] + f.i++ + return b, nil +} + +func (f *FakeReadWriter) Write(b []byte) error { + f.dst = append(f.dst, b) + return nil +} + +func TestTransmit(t *testing.T) { + rwSrc := [][]byte{ + []byte("rwSrc1"), + []byte("rwSrc2"), + } + rw := NewFakeReadWriter(rwSrc) + sSrc := []*wpb.Packet{ + &wpb.Packet{Data: []byte("sSrc1")}, + &wpb.Packet{Data: []byte("sSrc2")}, + &wpb.Packet{Data: []byte("sSrc3")}, + } + s := NewFakeStream(sSrc) + w := NewWire(rw) + if err := w.Transmit(context.Background(), s); err != nil { + t.Fatalf("wire.Transmit() unexpected error: %v", err) + } + if len(rw.dst) != len(s.incoming) { + t.Fatalf("Got %v []bytes written, want %v", len(rw.dst), len(s.incoming)) + } + for i := range rw.dst { + if string(rw.dst[i]) != string(s.incoming[i].Data) { + t.Errorf("Got data at index %v: %v, want %v", i, string(rw.dst[i]), string(s.incoming[i].Data)) + } + } + if len(s.outgoing) != len(rw.src) { + t.Fatalf("Got %v packets written, want %v", len(s.outgoing), len(rw.src)) + } + for i := range s.outgoing { + if string(s.outgoing[i].Data) != string(rw.src[i]) { + t.Errorf("Got data at index %v: %v, want %v", i, string(s.outgoing[i].Data), string(rw.src[i])) + } + } +} + +func TestPhysicalEndpoint(t *testing.T) { + pe := NewPhysicalEndpoint("dev1", "intf1") + ctx := pe.NewContext(context.Background()) + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + t.Fatalf("Unable to get metadata from context") + } + ctx = metadata.NewIncomingContext(context.Background(), md) + ppe, err := ParsePhysicalEndpoint(ctx) + if err != nil { + t.Fatalf("ParsePhysicalEndpoint() unexpected error: %v", err) + } + if ppe.device != pe.device { + t.Errorf("ParsePhysicalEndpoint() got device %v, want device %v", ppe.device, pe.device) + } + if ppe.intf != pe.intf { + t.Errorf("ParsePhysicalEndpoint() got intf %v, want intf %v", ppe.intf, pe.intf) + } +}