diff --git a/proto/generate.go b/proto/generate.go index 39b168e0..8a8e2458 100644 --- a/proto/generate.go +++ b/proto/generate.go @@ -4,4 +4,5 @@ package proto //go:generate protoc --go_out=./topo --go_opt=paths=source_relative ./topo.proto //go:generate protoc --go_out=./ceos --go_opt=paths=source_relative ./ceos.proto //go:generate protoc --go_out=./controller --go-grpc_out=./controller --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative ./controller.proto +//go:generate protoc --go_out=./wire --go-grpc_out=./wire --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative ./wire.proto //go:generate protoc --go_out=./event --go-grpc_out=./event --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative ./event.proto diff --git a/proto/topo.proto b/proto/topo.proto index edcd1998..8080196e 100644 --- a/proto/topo.proto +++ b/proto/topo.proto @@ -43,6 +43,7 @@ enum Vendor { OPENCONFIG = 10; ALPINE = 11; DRIVENETS = 12; + EXTERNAL = 13; } // Node is a single container inside the topology diff --git a/proto/topo/topo.pb.go b/proto/topo/topo.pb.go index 7e1aa954..1a904e3b 100644 --- a/proto/topo/topo.pb.go +++ b/proto/topo/topo.pb.go @@ -53,6 +53,7 @@ const ( Vendor_OPENCONFIG Vendor = 10 Vendor_ALPINE Vendor = 11 Vendor_DRIVENETS Vendor = 12 + Vendor_EXTERNAL Vendor = 13 ) // Enum value maps for Vendor. @@ -71,6 +72,7 @@ var ( 10: "OPENCONFIG", 11: "ALPINE", 12: "DRIVENETS", + 13: "EXTERNAL", } Vendor_value = map[string]int32{ "UNKNOWN": 0, @@ -86,6 +88,7 @@ var ( "OPENCONFIG": 10, "ALPINE": 11, "DRIVENETS": 12, + "EXTERNAL": 13, } ) @@ -1471,7 +1474,7 @@ var file_topo_proto_rawDesc = []byte{ 0x08, 0x69, 0x6e, 0x73, 0x69, 0x64, 0x65, 0x49, 0x70, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, - 0x07, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x2a, 0xa7, 0x01, 0x0a, + 0x07, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x2a, 0xb5, 0x01, 0x0a, 0x06, 0x56, 0x65, 0x6e, 0x64, 0x6f, 0x72, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x48, 0x4f, 0x53, 0x54, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x52, 0x49, 0x53, 0x54, 0x41, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x43, 0x49, @@ -1482,10 +1485,11 @@ var file_topo_proto_rawDesc = []byte{ 0x12, 0x09, 0x0a, 0x05, 0x4e, 0x4f, 0x4b, 0x49, 0x41, 0x10, 0x09, 0x12, 0x0e, 0x0a, 0x0a, 0x4f, 0x50, 0x45, 0x4e, 0x43, 0x4f, 0x4e, 0x46, 0x49, 0x47, 0x10, 0x0a, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4c, 0x50, 0x49, 0x4e, 0x45, 0x10, 0x0b, 0x12, 0x0d, 0x0a, 0x09, 0x44, 0x52, 0x49, 0x56, 0x45, - 0x4e, 0x45, 0x54, 0x53, 0x10, 0x0c, 0x42, 0x26, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2f, - 0x6b, 0x6e, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x74, 0x6f, 0x70, 0x6f, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x4e, 0x45, 0x54, 0x53, 0x10, 0x0c, 0x12, 0x0c, 0x0a, 0x08, 0x45, 0x58, 0x54, 0x45, 0x52, 0x4e, + 0x41, 0x4c, 0x10, 0x0d, 0x42, 0x26, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2f, 0x6b, 0x6e, + 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x74, 0x6f, 0x70, 0x6f, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/proto/wire.proto b/proto/wire.proto new file mode 100644 index 00000000..d6187cc8 --- /dev/null +++ b/proto/wire.proto @@ -0,0 +1,27 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +syntax = "proto3"; + +package wire; + +option go_package = "github.com/openconfig/kne/proto/wire"; + +// Wire service definition. +service Wire { + rpc Transmit(stream Packet) returns (stream Packet); +} + +message Packet { + bytes data = 1; +} diff --git a/proto/wire/wire.pb.go b/proto/wire/wire.pb.go new file mode 100644 index 00000000..01b5c721 --- /dev/null +++ b/proto/wire/wire.pb.go @@ -0,0 +1,162 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: wire.proto + +package wire + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Packet struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *Packet) Reset() { + *x = Packet{} + if protoimpl.UnsafeEnabled { + mi := &file_wire_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Packet) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Packet) ProtoMessage() {} + +func (x *Packet) ProtoReflect() protoreflect.Message { + mi := &file_wire_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Packet.ProtoReflect.Descriptor instead. +func (*Packet) Descriptor() ([]byte, []int) { + return file_wire_proto_rawDescGZIP(), []int{0} +} + +func (x *Packet) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +var File_wire_proto protoreflect.FileDescriptor + +var file_wire_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x77, 0x69, 0x72, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x77, 0x69, + 0x72, 0x65, 0x22, 0x1c, 0x0a, 0x06, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x12, 0x0a, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x32, 0x32, 0x0a, 0x04, 0x57, 0x69, 0x72, 0x65, 0x12, 0x2a, 0x0a, 0x08, 0x54, 0x72, 0x61, 0x6e, + 0x73, 0x6d, 0x69, 0x74, 0x12, 0x0c, 0x2e, 0x77, 0x69, 0x72, 0x65, 0x2e, 0x50, 0x61, 0x63, 0x6b, + 0x65, 0x74, 0x1a, 0x0c, 0x2e, 0x77, 0x69, 0x72, 0x65, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, + 0x28, 0x01, 0x30, 0x01, 0x42, 0x26, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2f, 0x6b, 0x6e, + 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x77, 0x69, 0x72, 0x65, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_wire_proto_rawDescOnce sync.Once + file_wire_proto_rawDescData = file_wire_proto_rawDesc +) + +func file_wire_proto_rawDescGZIP() []byte { + file_wire_proto_rawDescOnce.Do(func() { + file_wire_proto_rawDescData = protoimpl.X.CompressGZIP(file_wire_proto_rawDescData) + }) + return file_wire_proto_rawDescData +} + +var file_wire_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_wire_proto_goTypes = []interface{}{ + (*Packet)(nil), // 0: wire.Packet +} +var file_wire_proto_depIdxs = []int32{ + 0, // 0: wire.Wire.Transmit:input_type -> wire.Packet + 0, // 1: wire.Wire.Transmit:output_type -> wire.Packet + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_wire_proto_init() } +func file_wire_proto_init() { + if File_wire_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_wire_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Packet); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_wire_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_wire_proto_goTypes, + DependencyIndexes: file_wire_proto_depIdxs, + MessageInfos: file_wire_proto_msgTypes, + }.Build() + File_wire_proto = out.File + file_wire_proto_rawDesc = nil + file_wire_proto_goTypes = nil + file_wire_proto_depIdxs = nil +} diff --git a/proto/wire/wire_grpc.pb.go b/proto/wire/wire_grpc.pb.go new file mode 100644 index 00000000..e455d063 --- /dev/null +++ b/proto/wire/wire_grpc.pb.go @@ -0,0 +1,137 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.12 +// source: wire.proto + +package wire + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// WireClient is the client API for Wire service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type WireClient interface { + Transmit(ctx context.Context, opts ...grpc.CallOption) (Wire_TransmitClient, error) +} + +type wireClient struct { + cc grpc.ClientConnInterface +} + +func NewWireClient(cc grpc.ClientConnInterface) WireClient { + return &wireClient{cc} +} + +func (c *wireClient) Transmit(ctx context.Context, opts ...grpc.CallOption) (Wire_TransmitClient, error) { + stream, err := c.cc.NewStream(ctx, &Wire_ServiceDesc.Streams[0], "/wire.Wire/Transmit", opts...) + if err != nil { + return nil, err + } + x := &wireTransmitClient{stream} + return x, nil +} + +type Wire_TransmitClient interface { + Send(*Packet) error + Recv() (*Packet, error) + grpc.ClientStream +} + +type wireTransmitClient struct { + grpc.ClientStream +} + +func (x *wireTransmitClient) Send(m *Packet) error { + return x.ClientStream.SendMsg(m) +} + +func (x *wireTransmitClient) Recv() (*Packet, error) { + m := new(Packet) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// WireServer is the server API for Wire service. +// All implementations must embed UnimplementedWireServer +// for forward compatibility +type WireServer interface { + Transmit(Wire_TransmitServer) error + mustEmbedUnimplementedWireServer() +} + +// UnimplementedWireServer must be embedded to have forward compatible implementations. +type UnimplementedWireServer struct { +} + +func (UnimplementedWireServer) Transmit(Wire_TransmitServer) error { + return status.Errorf(codes.Unimplemented, "method Transmit not implemented") +} +func (UnimplementedWireServer) mustEmbedUnimplementedWireServer() {} + +// UnsafeWireServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to WireServer will +// result in compilation errors. +type UnsafeWireServer interface { + mustEmbedUnimplementedWireServer() +} + +func RegisterWireServer(s grpc.ServiceRegistrar, srv WireServer) { + s.RegisterService(&Wire_ServiceDesc, srv) +} + +func _Wire_Transmit_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(WireServer).Transmit(&wireTransmitServer{stream}) +} + +type Wire_TransmitServer interface { + Send(*Packet) error + Recv() (*Packet, error) + grpc.ServerStream +} + +type wireTransmitServer struct { + grpc.ServerStream +} + +func (x *wireTransmitServer) Send(m *Packet) error { + return x.ServerStream.SendMsg(m) +} + +func (x *wireTransmitServer) Recv() (*Packet, error) { + m := new(Packet) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Wire_ServiceDesc is the grpc.ServiceDesc for Wire service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Wire_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "wire.Wire", + HandlerType: (*WireServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Transmit", + Handler: _Wire_Transmit_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "wire.proto", +} diff --git a/wire/client/client b/wire/client/client new file mode 100755 index 00000000..cdb59123 Binary files /dev/null and b/wire/client/client differ diff --git a/wire/client/main.go b/wire/client/main.go new file mode 100644 index 00000000..9e550d47 --- /dev/null +++ b/wire/client/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + "flag" + "io" + "fmt" + + log "github.com/golang/glog" + wpb "github.com/openconfig/kne/proto/wire" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/credentials/insecure" +) + +var ( + addr = flag.String("addr", "localhost:50058", "Wire server address") +) + +func main() { + ctx := context.Background() + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + conn, err := grpc.DialContext(ctx, *addr, opts...) + if err != nil { + log.Fatalf("Failed to dial %q: %v", *addr, err) + } + defer conn.Close() + c := wpb.NewWireClient(conn) + mctx := metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{ + "device": "xx01.sql17", + "intf": "HundredGigE0/0/0/0", + })) + stream, err := c.Transmit(mctx) + waitc := make(chan struct{}) + if err != nil { + log.Fatalf("Failed to create transmit stream: %v", err) + } + go func() { + for { + in, err := stream.Recv() + if err == io.EOF { + // read done. + close(waitc) + return + } + if err != nil { + log.Fatalf("Failed to receive a packet: %v", err) + } + log.Infof("Recv packet: %v", in.Data) + } + }() + for i := 0; i < 10; i++ { + p := &wpb.Packet{ + Data: []byte(fmt.Sprintf("packet %v", i)), + } + if err := stream.Send(p); err != nil { + log.Fatalf("Failed to send packet %v: %v", i, err) + } + log.Infof("Sent packet: %v", p.Data) + } + stream.CloseSend() + <-waitc +} diff --git a/wire/main.go b/wire/main.go new file mode 100644 index 00000000..b33f208e --- /dev/null +++ b/wire/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "flag" + "fmt" + "net" + "io" + + log "github.com/golang/glog" + wpb "github.com/openconfig/kne/proto/wire" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +var ( + port = flag.Int("port", 50058, "Wire server port") +) + +type server struct { + wpb.UnimplementedWireServer +} + +func newServer() *server { + return &server{} +} + +func (s *server) Transmit(stream wpb.Wire_TransmitServer) error { + log.Infof("New Transmit stream started") + md, ok := metadata.FromIncomingContext(stream.Context()) + if !ok { + log.Infof("Missing metadata") + return fmt.Errorf("missing metadata") + } + log.Infof("Got metadata: %v", md) + for { + in, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + log.Infof("Recv packet: %v", string(in.Data)) + if err := stream.Send(in); err != nil { + return err + } + log.Infof("Sent packet: %v", string(in.Data)) + } + return nil +} + +func main() { + flag.Parse() + addr := fmt.Sprintf(":%d", *port) + lis, err := net.Listen("tcp6", addr) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + s := grpc.NewServer() + wpb.RegisterWireServer(s, newServer()) + log.Infof("Wire server listening at %v", lis.Addr()) + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } +} diff --git a/wire/wire b/wire/wire new file mode 100755 index 00000000..9a8b1fc3 Binary files /dev/null and b/wire/wire differ