diff --git a/api/oramnode/oramnode.pb.go b/api/oramnode/oramnode.pb.go index ba01d57..f39969a 100644 --- a/api/oramnode/oramnode.pb.go +++ b/api/oramnode/oramnode.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: oramnode.proto package oramnode diff --git a/api/oramnode/oramnode_grpc.pb.go b/api/oramnode/oramnode_grpc.pb.go index 0b6cee8..eb26ab0 100644 --- a/api/oramnode/oramnode_grpc.pb.go +++ b/api/oramnode/oramnode_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v4.23.4 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.24.3 // source: oramnode.proto package oramnode @@ -18,6 +18,11 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + OramNode_ReadPath_FullMethodName = "/oramnode.OramNode/ReadPath" + OramNode_JoinRaftVoter_FullMethodName = "/oramnode.OramNode/JoinRaftVoter" +) + // OramNodeClient is the client API for OramNode service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -36,7 +41,7 @@ func NewOramNodeClient(cc grpc.ClientConnInterface) OramNodeClient { func (c *oramNodeClient) ReadPath(ctx context.Context, in *ReadPathRequest, opts ...grpc.CallOption) (*ReadPathReply, error) { out := new(ReadPathReply) - err := c.cc.Invoke(ctx, "/oramnode.OramNode/ReadPath", in, out, opts...) + err := c.cc.Invoke(ctx, OramNode_ReadPath_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -45,7 +50,7 @@ func (c *oramNodeClient) ReadPath(ctx context.Context, in *ReadPathRequest, opts func (c *oramNodeClient) JoinRaftVoter(ctx context.Context, in *JoinRaftVoterRequest, opts ...grpc.CallOption) (*JoinRaftVoterReply, error) { out := new(JoinRaftVoterReply) - err := c.cc.Invoke(ctx, "/oramnode.OramNode/JoinRaftVoter", in, out, opts...) + err := c.cc.Invoke(ctx, OramNode_JoinRaftVoter_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -94,7 +99,7 @@ func _OramNode_ReadPath_Handler(srv interface{}, ctx context.Context, dec func(i } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/oramnode.OramNode/ReadPath", + FullMethod: OramNode_ReadPath_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(OramNodeServer).ReadPath(ctx, req.(*ReadPathRequest)) @@ -112,7 +117,7 @@ func _OramNode_JoinRaftVoter_Handler(srv interface{}, ctx context.Context, dec f } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/oramnode.OramNode/JoinRaftVoter", + FullMethod: OramNode_JoinRaftVoter_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(OramNodeServer).JoinRaftVoter(ctx, req.(*JoinRaftVoterRequest)) diff --git a/api/router/router.pb.go b/api/router/router.pb.go index e9d8373..40aa525 100644 --- a/api/router/router.pb.go +++ b/api/router/router.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: router.proto package router diff --git a/api/router/router_grpc.pb.go b/api/router/router_grpc.pb.go index 693c4db..601f944 100644 --- a/api/router/router_grpc.pb.go +++ b/api/router/router_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v4.23.4 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.24.3 // source: router.proto package router @@ -18,6 +18,11 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + Router_Read_FullMethodName = "/router.Router/Read" + Router_Write_FullMethodName = "/router.Router/Write" +) + // RouterClient is the client API for Router service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -36,7 +41,7 @@ func NewRouterClient(cc grpc.ClientConnInterface) RouterClient { func (c *routerClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadReply, error) { out := new(ReadReply) - err := c.cc.Invoke(ctx, "/router.Router/Read", in, out, opts...) + err := c.cc.Invoke(ctx, Router_Read_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -45,7 +50,7 @@ func (c *routerClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.C func (c *routerClient) Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteReply, error) { out := new(WriteReply) - err := c.cc.Invoke(ctx, "/router.Router/Write", in, out, opts...) + err := c.cc.Invoke(ctx, Router_Write_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -94,7 +99,7 @@ func _Router_Read_Handler(srv interface{}, ctx context.Context, dec func(interfa } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/router.Router/Read", + FullMethod: Router_Read_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(RouterServer).Read(ctx, req.(*ReadRequest)) @@ -112,7 +117,7 @@ func _Router_Write_Handler(srv interface{}, ctx context.Context, dec func(interf } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/router.Router/Write", + FullMethod: Router_Write_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(RouterServer).Write(ctx, req.(*WriteRequest)) diff --git a/api/shardnode.proto b/api/shardnode.proto index 5bb75aa..86c77a3 100644 --- a/api/shardnode.proto +++ b/api/shardnode.proto @@ -5,28 +5,41 @@ option go_package = "github.com/dsg-uwaterloo/oblishard/api/shardnode"; package shardnode; service ShardNode { - rpc Read (ReadRequest) returns (ReadReply) {} - rpc Write (WriteRequest) returns (WriteReply) {} + rpc BatchQuery (RequestBatch) returns (ReplyBatch) {} rpc SendBlocks(SendBlocksRequest) returns (SendBlocksReply) {} rpc AckSentBlocks(AckSentBlocksRequest) returns (AckSentBlocksReply) {} rpc JoinRaftVoter (JoinRaftVoterRequest) returns (JoinRaftVoterReply) {} } +message RequestBatch { + repeated ReadRequest read_requests = 1; + repeated WriteRequest write_requests = 2; +} + +message ReplyBatch { + repeated ReadReply read_replies = 1; + repeated WriteReply write_replies = 2; +} + message ReadRequest { - string block = 1; + string request_id = 1; + string block = 2; } message ReadReply { - string value = 1; + string request_id = 1; + string value = 2; } message WriteRequest { - string block = 1; - string value = 2; + string request_id = 1; + string block = 2; + string value = 3; } message WriteReply { - bool success = 1; + string request_id = 1; + bool success = 2; } message JoinRaftVoterRequest { diff --git a/api/shardnode/shardnode.pb.go b/api/shardnode/shardnode.pb.go index 9a1b0c0..3b5469b 100644 --- a/api/shardnode/shardnode.pb.go +++ b/api/shardnode/shardnode.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: shardnode.proto package shardnode @@ -20,18 +20,129 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type RequestBatch struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ReadRequests []*ReadRequest `protobuf:"bytes,1,rep,name=read_requests,json=readRequests,proto3" json:"read_requests,omitempty"` + WriteRequests []*WriteRequest `protobuf:"bytes,2,rep,name=write_requests,json=writeRequests,proto3" json:"write_requests,omitempty"` +} + +func (x *RequestBatch) Reset() { + *x = RequestBatch{} + if protoimpl.UnsafeEnabled { + mi := &file_shardnode_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RequestBatch) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RequestBatch) ProtoMessage() {} + +func (x *RequestBatch) ProtoReflect() protoreflect.Message { + mi := &file_shardnode_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RequestBatch.ProtoReflect.Descriptor instead. +func (*RequestBatch) Descriptor() ([]byte, []int) { + return file_shardnode_proto_rawDescGZIP(), []int{0} +} + +func (x *RequestBatch) GetReadRequests() []*ReadRequest { + if x != nil { + return x.ReadRequests + } + return nil +} + +func (x *RequestBatch) GetWriteRequests() []*WriteRequest { + if x != nil { + return x.WriteRequests + } + return nil +} + +type ReplyBatch struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ReadReplies []*ReadReply `protobuf:"bytes,1,rep,name=read_replies,json=readReplies,proto3" json:"read_replies,omitempty"` + WriteReplies []*WriteReply `protobuf:"bytes,2,rep,name=write_replies,json=writeReplies,proto3" json:"write_replies,omitempty"` +} + +func (x *ReplyBatch) Reset() { + *x = ReplyBatch{} + if protoimpl.UnsafeEnabled { + mi := &file_shardnode_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReplyBatch) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReplyBatch) ProtoMessage() {} + +func (x *ReplyBatch) ProtoReflect() protoreflect.Message { + mi := &file_shardnode_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReplyBatch.ProtoReflect.Descriptor instead. +func (*ReplyBatch) Descriptor() ([]byte, []int) { + return file_shardnode_proto_rawDescGZIP(), []int{1} +} + +func (x *ReplyBatch) GetReadReplies() []*ReadReply { + if x != nil { + return x.ReadReplies + } + return nil +} + +func (x *ReplyBatch) GetWriteReplies() []*WriteReply { + if x != nil { + return x.WriteReplies + } + return nil +} + type ReadRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Block string `protobuf:"bytes,1,opt,name=block,proto3" json:"block,omitempty"` + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + Block string `protobuf:"bytes,2,opt,name=block,proto3" json:"block,omitempty"` } func (x *ReadRequest) Reset() { *x = ReadRequest{} if protoimpl.UnsafeEnabled { - mi := &file_shardnode_proto_msgTypes[0] + mi := &file_shardnode_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -44,7 +155,7 @@ func (x *ReadRequest) String() string { func (*ReadRequest) ProtoMessage() {} func (x *ReadRequest) ProtoReflect() protoreflect.Message { - mi := &file_shardnode_proto_msgTypes[0] + mi := &file_shardnode_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -57,7 +168,14 @@ func (x *ReadRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ReadRequest.ProtoReflect.Descriptor instead. func (*ReadRequest) Descriptor() ([]byte, []int) { - return file_shardnode_proto_rawDescGZIP(), []int{0} + return file_shardnode_proto_rawDescGZIP(), []int{2} +} + +func (x *ReadRequest) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" } func (x *ReadRequest) GetBlock() string { @@ -72,13 +190,14 @@ type ReadReply struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` } func (x *ReadReply) Reset() { *x = ReadReply{} if protoimpl.UnsafeEnabled { - mi := &file_shardnode_proto_msgTypes[1] + mi := &file_shardnode_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -91,7 +210,7 @@ func (x *ReadReply) String() string { func (*ReadReply) ProtoMessage() {} func (x *ReadReply) ProtoReflect() protoreflect.Message { - mi := &file_shardnode_proto_msgTypes[1] + mi := &file_shardnode_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -104,7 +223,14 @@ func (x *ReadReply) ProtoReflect() protoreflect.Message { // Deprecated: Use ReadReply.ProtoReflect.Descriptor instead. func (*ReadReply) Descriptor() ([]byte, []int) { - return file_shardnode_proto_rawDescGZIP(), []int{1} + return file_shardnode_proto_rawDescGZIP(), []int{3} +} + +func (x *ReadReply) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" } func (x *ReadReply) GetValue() string { @@ -119,14 +245,15 @@ type WriteRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Block string `protobuf:"bytes,1,opt,name=block,proto3" json:"block,omitempty"` - Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + Block string `protobuf:"bytes,2,opt,name=block,proto3" json:"block,omitempty"` + Value string `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` } func (x *WriteRequest) Reset() { *x = WriteRequest{} if protoimpl.UnsafeEnabled { - mi := &file_shardnode_proto_msgTypes[2] + mi := &file_shardnode_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -139,7 +266,7 @@ func (x *WriteRequest) String() string { func (*WriteRequest) ProtoMessage() {} func (x *WriteRequest) ProtoReflect() protoreflect.Message { - mi := &file_shardnode_proto_msgTypes[2] + mi := &file_shardnode_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -152,7 +279,14 @@ func (x *WriteRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use WriteRequest.ProtoReflect.Descriptor instead. func (*WriteRequest) Descriptor() ([]byte, []int) { - return file_shardnode_proto_rawDescGZIP(), []int{2} + return file_shardnode_proto_rawDescGZIP(), []int{4} +} + +func (x *WriteRequest) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" } func (x *WriteRequest) GetBlock() string { @@ -174,13 +308,14 @@ type WriteReply struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + Success bool `protobuf:"varint,2,opt,name=success,proto3" json:"success,omitempty"` } func (x *WriteReply) Reset() { *x = WriteReply{} if protoimpl.UnsafeEnabled { - mi := &file_shardnode_proto_msgTypes[3] + mi := &file_shardnode_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -193,7 +328,7 @@ func (x *WriteReply) String() string { func (*WriteReply) ProtoMessage() {} func (x *WriteReply) ProtoReflect() protoreflect.Message { - mi := &file_shardnode_proto_msgTypes[3] + mi := &file_shardnode_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -206,7 +341,14 @@ func (x *WriteReply) ProtoReflect() protoreflect.Message { // Deprecated: Use WriteReply.ProtoReflect.Descriptor instead. func (*WriteReply) Descriptor() ([]byte, []int) { - return file_shardnode_proto_rawDescGZIP(), []int{3} + return file_shardnode_proto_rawDescGZIP(), []int{5} +} + +func (x *WriteReply) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" } func (x *WriteReply) GetSuccess() bool { @@ -228,7 +370,7 @@ type JoinRaftVoterRequest struct { func (x *JoinRaftVoterRequest) Reset() { *x = JoinRaftVoterRequest{} if protoimpl.UnsafeEnabled { - mi := &file_shardnode_proto_msgTypes[4] + mi := &file_shardnode_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -241,7 +383,7 @@ func (x *JoinRaftVoterRequest) String() string { func (*JoinRaftVoterRequest) ProtoMessage() {} func (x *JoinRaftVoterRequest) ProtoReflect() protoreflect.Message { - mi := &file_shardnode_proto_msgTypes[4] + mi := &file_shardnode_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -254,7 +396,7 @@ func (x *JoinRaftVoterRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use JoinRaftVoterRequest.ProtoReflect.Descriptor instead. func (*JoinRaftVoterRequest) Descriptor() ([]byte, []int) { - return file_shardnode_proto_rawDescGZIP(), []int{4} + return file_shardnode_proto_rawDescGZIP(), []int{6} } func (x *JoinRaftVoterRequest) GetNodeId() int32 { @@ -282,7 +424,7 @@ type JoinRaftVoterReply struct { func (x *JoinRaftVoterReply) Reset() { *x = JoinRaftVoterReply{} if protoimpl.UnsafeEnabled { - mi := &file_shardnode_proto_msgTypes[5] + mi := &file_shardnode_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -295,7 +437,7 @@ func (x *JoinRaftVoterReply) String() string { func (*JoinRaftVoterReply) ProtoMessage() {} func (x *JoinRaftVoterReply) ProtoReflect() protoreflect.Message { - mi := &file_shardnode_proto_msgTypes[5] + mi := &file_shardnode_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -308,7 +450,7 @@ func (x *JoinRaftVoterReply) ProtoReflect() protoreflect.Message { // Deprecated: Use JoinRaftVoterReply.ProtoReflect.Descriptor instead. func (*JoinRaftVoterReply) Descriptor() ([]byte, []int) { - return file_shardnode_proto_rawDescGZIP(), []int{5} + return file_shardnode_proto_rawDescGZIP(), []int{7} } func (x *JoinRaftVoterReply) GetSuccess() bool { @@ -331,7 +473,7 @@ type SendBlocksRequest struct { func (x *SendBlocksRequest) Reset() { *x = SendBlocksRequest{} if protoimpl.UnsafeEnabled { - mi := &file_shardnode_proto_msgTypes[6] + mi := &file_shardnode_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -344,7 +486,7 @@ func (x *SendBlocksRequest) String() string { func (*SendBlocksRequest) ProtoMessage() {} func (x *SendBlocksRequest) ProtoReflect() protoreflect.Message { - mi := &file_shardnode_proto_msgTypes[6] + mi := &file_shardnode_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -357,7 +499,7 @@ func (x *SendBlocksRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use SendBlocksRequest.ProtoReflect.Descriptor instead. func (*SendBlocksRequest) Descriptor() ([]byte, []int) { - return file_shardnode_proto_rawDescGZIP(), []int{6} + return file_shardnode_proto_rawDescGZIP(), []int{8} } func (x *SendBlocksRequest) GetMaxBlocks() int32 { @@ -393,7 +535,7 @@ type Block struct { func (x *Block) Reset() { *x = Block{} if protoimpl.UnsafeEnabled { - mi := &file_shardnode_proto_msgTypes[7] + mi := &file_shardnode_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -406,7 +548,7 @@ func (x *Block) String() string { func (*Block) ProtoMessage() {} func (x *Block) ProtoReflect() protoreflect.Message { - mi := &file_shardnode_proto_msgTypes[7] + mi := &file_shardnode_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -419,7 +561,7 @@ func (x *Block) ProtoReflect() protoreflect.Message { // Deprecated: Use Block.ProtoReflect.Descriptor instead. func (*Block) Descriptor() ([]byte, []int) { - return file_shardnode_proto_rawDescGZIP(), []int{7} + return file_shardnode_proto_rawDescGZIP(), []int{9} } func (x *Block) GetBlock() string { @@ -447,7 +589,7 @@ type SendBlocksReply struct { func (x *SendBlocksReply) Reset() { *x = SendBlocksReply{} if protoimpl.UnsafeEnabled { - mi := &file_shardnode_proto_msgTypes[8] + mi := &file_shardnode_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -460,7 +602,7 @@ func (x *SendBlocksReply) String() string { func (*SendBlocksReply) ProtoMessage() {} func (x *SendBlocksReply) ProtoReflect() protoreflect.Message { - mi := &file_shardnode_proto_msgTypes[8] + mi := &file_shardnode_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -473,7 +615,7 @@ func (x *SendBlocksReply) ProtoReflect() protoreflect.Message { // Deprecated: Use SendBlocksReply.ProtoReflect.Descriptor instead. func (*SendBlocksReply) Descriptor() ([]byte, []int) { - return file_shardnode_proto_rawDescGZIP(), []int{8} + return file_shardnode_proto_rawDescGZIP(), []int{10} } func (x *SendBlocksReply) GetBlocks() []*Block { @@ -496,7 +638,7 @@ type Ack struct { func (x *Ack) Reset() { *x = Ack{} if protoimpl.UnsafeEnabled { - mi := &file_shardnode_proto_msgTypes[9] + mi := &file_shardnode_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -509,7 +651,7 @@ func (x *Ack) String() string { func (*Ack) ProtoMessage() {} func (x *Ack) ProtoReflect() protoreflect.Message { - mi := &file_shardnode_proto_msgTypes[9] + mi := &file_shardnode_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -522,7 +664,7 @@ func (x *Ack) ProtoReflect() protoreflect.Message { // Deprecated: Use Ack.ProtoReflect.Descriptor instead. func (*Ack) Descriptor() ([]byte, []int) { - return file_shardnode_proto_rawDescGZIP(), []int{9} + return file_shardnode_proto_rawDescGZIP(), []int{11} } func (x *Ack) GetBlock() string { @@ -550,7 +692,7 @@ type AckSentBlocksRequest struct { func (x *AckSentBlocksRequest) Reset() { *x = AckSentBlocksRequest{} if protoimpl.UnsafeEnabled { - mi := &file_shardnode_proto_msgTypes[10] + mi := &file_shardnode_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -563,7 +705,7 @@ func (x *AckSentBlocksRequest) String() string { func (*AckSentBlocksRequest) ProtoMessage() {} func (x *AckSentBlocksRequest) ProtoReflect() protoreflect.Message { - mi := &file_shardnode_proto_msgTypes[10] + mi := &file_shardnode_proto_msgTypes[12] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -576,7 +718,7 @@ func (x *AckSentBlocksRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use AckSentBlocksRequest.ProtoReflect.Descriptor instead. func (*AckSentBlocksRequest) Descriptor() ([]byte, []int) { - return file_shardnode_proto_rawDescGZIP(), []int{10} + return file_shardnode_proto_rawDescGZIP(), []int{12} } func (x *AckSentBlocksRequest) GetAcks() []*Ack { @@ -597,7 +739,7 @@ type AckSentBlocksReply struct { func (x *AckSentBlocksReply) Reset() { *x = AckSentBlocksReply{} if protoimpl.UnsafeEnabled { - mi := &file_shardnode_proto_msgTypes[11] + mi := &file_shardnode_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -610,7 +752,7 @@ func (x *AckSentBlocksReply) String() string { func (*AckSentBlocksReply) ProtoMessage() {} func (x *AckSentBlocksReply) ProtoReflect() protoreflect.Message { - mi := &file_shardnode_proto_msgTypes[11] + mi := &file_shardnode_proto_msgTypes[13] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -623,7 +765,7 @@ func (x *AckSentBlocksReply) ProtoReflect() protoreflect.Message { // Deprecated: Use AckSentBlocksReply.ProtoReflect.Descriptor instead. func (*AckSentBlocksReply) Descriptor() ([]byte, []int) { - return file_shardnode_proto_rawDescGZIP(), []int{11} + return file_shardnode_proto_rawDescGZIP(), []int{13} } func (x *AckSentBlocksReply) GetSuccess() bool { @@ -637,76 +779,98 @@ var File_shardnode_proto protoreflect.FileDescriptor var file_shardnode_proto_rawDesc = []byte{ 0x0a, 0x0f, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x12, 0x09, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x22, 0x23, 0x0a, 0x0b, - 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x62, - 0x6c, 0x6f, 0x63, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x62, 0x6c, 0x6f, 0x63, - 0x6b, 0x22, 0x21, 0x0a, 0x09, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x14, - 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x22, 0x3a, 0x0a, 0x0c, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x05, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x22, 0x26, 0x0a, 0x0a, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, - 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, - 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x22, 0x4c, 0x0a, 0x14, 0x4a, 0x6f, 0x69, 0x6e, - 0x52, 0x61, 0x66, 0x74, 0x56, 0x6f, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x17, 0x0a, 0x07, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x05, 0x52, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x6f, 0x64, - 0x65, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x6f, - 0x64, 0x65, 0x41, 0x64, 0x64, 0x72, 0x22, 0x2e, 0x0a, 0x12, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x61, - 0x66, 0x74, 0x56, 0x6f, 0x74, 0x65, 0x72, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, - 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, - 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x22, 0x66, 0x0a, 0x11, 0x53, 0x65, 0x6e, 0x64, 0x42, 0x6c, - 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6d, - 0x61, 0x78, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, - 0x6d, 0x61, 0x78, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x61, 0x74, - 0x68, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x05, 0x52, 0x05, 0x70, 0x61, 0x74, 0x68, 0x73, 0x12, - 0x1d, 0x0a, 0x0a, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x05, 0x52, 0x09, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x49, 0x64, 0x22, 0x33, - 0x0a, 0x05, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x14, 0x0a, 0x05, 0x62, 0x6c, 0x6f, 0x63, 0x6b, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x14, 0x0a, - 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x22, 0x3b, 0x0a, 0x0f, 0x53, 0x65, 0x6e, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, - 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x28, 0x0a, 0x06, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, - 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, - 0x64, 0x65, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x06, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, - 0x22, 0x32, 0x0a, 0x03, 0x41, 0x63, 0x6b, 0x12, 0x14, 0x0a, 0x05, 0x62, 0x6c, 0x6f, 0x63, 0x6b, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x15, 0x0a, - 0x06, 0x69, 0x73, 0x5f, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x69, - 0x73, 0x41, 0x63, 0x6b, 0x22, 0x3a, 0x0a, 0x14, 0x41, 0x63, 0x6b, 0x53, 0x65, 0x6e, 0x74, 0x42, - 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x22, 0x0a, 0x04, - 0x61, 0x63, 0x6b, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x73, 0x68, 0x61, - 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x04, 0x61, 0x63, 0x6b, 0x73, - 0x22, 0x2e, 0x0a, 0x12, 0x41, 0x63, 0x6b, 0x53, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, - 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, - 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, - 0x32, 0xee, 0x02, 0x0a, 0x09, 0x53, 0x68, 0x61, 0x72, 0x64, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x36, - 0x0a, 0x04, 0x52, 0x65, 0x61, 0x64, 0x12, 0x16, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, - 0x64, 0x65, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, - 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, - 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x05, 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, - 0x17, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x57, 0x72, 0x69, 0x74, - 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, - 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, - 0x00, 0x12, 0x48, 0x0a, 0x0a, 0x53, 0x65, 0x6e, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x12, - 0x1c, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x53, 0x65, 0x6e, 0x64, - 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, - 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x42, 0x6c, - 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x0d, 0x41, - 0x63, 0x6b, 0x53, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x12, 0x1f, 0x2e, 0x73, - 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x41, 0x63, 0x6b, 0x53, 0x65, 0x6e, 0x74, - 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, - 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x41, 0x63, 0x6b, 0x53, 0x65, 0x6e, - 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x51, - 0x0a, 0x0d, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x61, 0x66, 0x74, 0x56, 0x6f, 0x74, 0x65, 0x72, 0x12, - 0x1f, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, - 0x52, 0x61, 0x66, 0x74, 0x56, 0x6f, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1d, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x4a, 0x6f, 0x69, - 0x6e, 0x52, 0x61, 0x66, 0x74, 0x56, 0x6f, 0x74, 0x65, 0x72, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, - 0x00, 0x42, 0x32, 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x64, 0x73, 0x67, 0x2d, 0x75, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6c, 0x6f, 0x6f, 0x2f, 0x6f, 0x62, - 0x6c, 0x69, 0x73, 0x68, 0x61, 0x72, 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, 0x61, 0x72, - 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x12, 0x09, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x22, 0x8b, 0x01, 0x0a, + 0x0c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x42, 0x61, 0x74, 0x63, 0x68, 0x12, 0x3b, 0x0a, + 0x0d, 0x72, 0x65, 0x61, 0x64, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x18, 0x01, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, + 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x0c, 0x72, 0x65, + 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x12, 0x3e, 0x0a, 0x0e, 0x77, 0x72, + 0x69, 0x74, 0x65, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x57, + 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x0d, 0x77, 0x72, 0x69, + 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x22, 0x81, 0x01, 0x0a, 0x0a, 0x52, + 0x65, 0x70, 0x6c, 0x79, 0x42, 0x61, 0x74, 0x63, 0x68, 0x12, 0x37, 0x0a, 0x0c, 0x72, 0x65, 0x61, + 0x64, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x14, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x52, 0x65, 0x61, 0x64, + 0x52, 0x65, 0x70, 0x6c, 0x79, 0x52, 0x0b, 0x72, 0x65, 0x61, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x69, + 0x65, 0x73, 0x12, 0x3a, 0x0a, 0x0d, 0x77, 0x72, 0x69, 0x74, 0x65, 0x5f, 0x72, 0x65, 0x70, 0x6c, + 0x69, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x73, 0x68, 0x61, 0x72, + 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, + 0x52, 0x0c, 0x77, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x65, 0x73, 0x22, 0x42, + 0x0a, 0x0b, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, + 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, + 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x62, 0x6c, 0x6f, + 0x63, 0x6b, 0x22, 0x40, 0x0a, 0x09, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, + 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x14, + 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x22, 0x59, 0x0a, 0x0c, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x05, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, + 0x45, 0x0a, 0x0a, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x1d, 0x0a, + 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, + 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, + 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x22, 0x4c, 0x0a, 0x14, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x61, + 0x66, 0x74, 0x56, 0x6f, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x17, + 0x0a, 0x07, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, + 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x5f, + 0x61, 0x64, 0x64, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, + 0x41, 0x64, 0x64, 0x72, 0x22, 0x2e, 0x0a, 0x12, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x61, 0x66, 0x74, + 0x56, 0x6f, 0x74, 0x65, 0x72, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, + 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, + 0x63, 0x65, 0x73, 0x73, 0x22, 0x66, 0x0a, 0x11, 0x53, 0x65, 0x6e, 0x64, 0x42, 0x6c, 0x6f, 0x63, + 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6d, 0x61, 0x78, + 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x6d, 0x61, + 0x78, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x61, 0x74, 0x68, 0x73, + 0x18, 0x02, 0x20, 0x03, 0x28, 0x05, 0x52, 0x05, 0x70, 0x61, 0x74, 0x68, 0x73, 0x12, 0x1d, 0x0a, + 0x0a, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x09, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x49, 0x64, 0x22, 0x33, 0x0a, 0x05, + 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x14, 0x0a, 0x05, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x14, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x22, 0x3b, 0x0a, 0x0f, 0x53, 0x65, 0x6e, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x52, + 0x65, 0x70, 0x6c, 0x79, 0x12, 0x28, 0x0a, 0x06, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x18, 0x01, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, + 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x06, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x22, 0x32, + 0x0a, 0x03, 0x41, 0x63, 0x6b, 0x12, 0x14, 0x0a, 0x05, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x15, 0x0a, 0x06, 0x69, + 0x73, 0x5f, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x69, 0x73, 0x41, + 0x63, 0x6b, 0x22, 0x3a, 0x0a, 0x14, 0x41, 0x63, 0x6b, 0x53, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, + 0x63, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x22, 0x0a, 0x04, 0x61, 0x63, + 0x6b, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, + 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x04, 0x61, 0x63, 0x6b, 0x73, 0x22, 0x2e, + 0x0a, 0x12, 0x41, 0x63, 0x6b, 0x53, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x52, + 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x32, 0xbb, + 0x02, 0x0a, 0x09, 0x53, 0x68, 0x61, 0x72, 0x64, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x3e, 0x0a, 0x0a, + 0x42, 0x61, 0x74, 0x63, 0x68, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x17, 0x2e, 0x73, 0x68, 0x61, + 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x42, 0x61, + 0x74, 0x63, 0x68, 0x1a, 0x15, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, + 0x52, 0x65, 0x70, 0x6c, 0x79, 0x42, 0x61, 0x74, 0x63, 0x68, 0x22, 0x00, 0x12, 0x48, 0x0a, 0x0a, + 0x53, 0x65, 0x6e, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x12, 0x1c, 0x2e, 0x73, 0x68, 0x61, + 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, + 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x52, + 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x0d, 0x41, 0x63, 0x6b, 0x53, 0x65, 0x6e, + 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x12, 0x1f, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, + 0x6f, 0x64, 0x65, 0x2e, 0x41, 0x63, 0x6b, 0x53, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x64, + 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x41, 0x63, 0x6b, 0x53, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, + 0x6b, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x0d, 0x4a, 0x6f, 0x69, + 0x6e, 0x52, 0x61, 0x66, 0x74, 0x56, 0x6f, 0x74, 0x65, 0x72, 0x12, 0x1f, 0x2e, 0x73, 0x68, 0x61, + 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x61, 0x66, 0x74, 0x56, + 0x6f, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x73, 0x68, + 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x61, 0x66, 0x74, + 0x56, 0x6f, 0x74, 0x65, 0x72, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x32, 0x5a, 0x30, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x73, 0x67, 0x2d, 0x75, + 0x77, 0x61, 0x74, 0x65, 0x72, 0x6c, 0x6f, 0x6f, 0x2f, 0x6f, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x61, + 0x72, 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, 0x61, 0x72, 0x64, 0x6e, 0x6f, 0x64, 0x65, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -721,39 +885,43 @@ func file_shardnode_proto_rawDescGZIP() []byte { return file_shardnode_proto_rawDescData } -var file_shardnode_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_shardnode_proto_msgTypes = make([]protoimpl.MessageInfo, 14) var file_shardnode_proto_goTypes = []interface{}{ - (*ReadRequest)(nil), // 0: shardnode.ReadRequest - (*ReadReply)(nil), // 1: shardnode.ReadReply - (*WriteRequest)(nil), // 2: shardnode.WriteRequest - (*WriteReply)(nil), // 3: shardnode.WriteReply - (*JoinRaftVoterRequest)(nil), // 4: shardnode.JoinRaftVoterRequest - (*JoinRaftVoterReply)(nil), // 5: shardnode.JoinRaftVoterReply - (*SendBlocksRequest)(nil), // 6: shardnode.SendBlocksRequest - (*Block)(nil), // 7: shardnode.Block - (*SendBlocksReply)(nil), // 8: shardnode.SendBlocksReply - (*Ack)(nil), // 9: shardnode.Ack - (*AckSentBlocksRequest)(nil), // 10: shardnode.AckSentBlocksRequest - (*AckSentBlocksReply)(nil), // 11: shardnode.AckSentBlocksReply + (*RequestBatch)(nil), // 0: shardnode.RequestBatch + (*ReplyBatch)(nil), // 1: shardnode.ReplyBatch + (*ReadRequest)(nil), // 2: shardnode.ReadRequest + (*ReadReply)(nil), // 3: shardnode.ReadReply + (*WriteRequest)(nil), // 4: shardnode.WriteRequest + (*WriteReply)(nil), // 5: shardnode.WriteReply + (*JoinRaftVoterRequest)(nil), // 6: shardnode.JoinRaftVoterRequest + (*JoinRaftVoterReply)(nil), // 7: shardnode.JoinRaftVoterReply + (*SendBlocksRequest)(nil), // 8: shardnode.SendBlocksRequest + (*Block)(nil), // 9: shardnode.Block + (*SendBlocksReply)(nil), // 10: shardnode.SendBlocksReply + (*Ack)(nil), // 11: shardnode.Ack + (*AckSentBlocksRequest)(nil), // 12: shardnode.AckSentBlocksRequest + (*AckSentBlocksReply)(nil), // 13: shardnode.AckSentBlocksReply } var file_shardnode_proto_depIdxs = []int32{ - 7, // 0: shardnode.SendBlocksReply.blocks:type_name -> shardnode.Block - 9, // 1: shardnode.AckSentBlocksRequest.acks:type_name -> shardnode.Ack - 0, // 2: shardnode.ShardNode.Read:input_type -> shardnode.ReadRequest - 2, // 3: shardnode.ShardNode.Write:input_type -> shardnode.WriteRequest - 6, // 4: shardnode.ShardNode.SendBlocks:input_type -> shardnode.SendBlocksRequest - 10, // 5: shardnode.ShardNode.AckSentBlocks:input_type -> shardnode.AckSentBlocksRequest - 4, // 6: shardnode.ShardNode.JoinRaftVoter:input_type -> shardnode.JoinRaftVoterRequest - 1, // 7: shardnode.ShardNode.Read:output_type -> shardnode.ReadReply - 3, // 8: shardnode.ShardNode.Write:output_type -> shardnode.WriteReply - 8, // 9: shardnode.ShardNode.SendBlocks:output_type -> shardnode.SendBlocksReply - 11, // 10: shardnode.ShardNode.AckSentBlocks:output_type -> shardnode.AckSentBlocksReply - 5, // 11: shardnode.ShardNode.JoinRaftVoter:output_type -> shardnode.JoinRaftVoterReply - 7, // [7:12] is the sub-list for method output_type - 2, // [2:7] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 2, // 0: shardnode.RequestBatch.read_requests:type_name -> shardnode.ReadRequest + 4, // 1: shardnode.RequestBatch.write_requests:type_name -> shardnode.WriteRequest + 3, // 2: shardnode.ReplyBatch.read_replies:type_name -> shardnode.ReadReply + 5, // 3: shardnode.ReplyBatch.write_replies:type_name -> shardnode.WriteReply + 9, // 4: shardnode.SendBlocksReply.blocks:type_name -> shardnode.Block + 11, // 5: shardnode.AckSentBlocksRequest.acks:type_name -> shardnode.Ack + 0, // 6: shardnode.ShardNode.BatchQuery:input_type -> shardnode.RequestBatch + 8, // 7: shardnode.ShardNode.SendBlocks:input_type -> shardnode.SendBlocksRequest + 12, // 8: shardnode.ShardNode.AckSentBlocks:input_type -> shardnode.AckSentBlocksRequest + 6, // 9: shardnode.ShardNode.JoinRaftVoter:input_type -> shardnode.JoinRaftVoterRequest + 1, // 10: shardnode.ShardNode.BatchQuery:output_type -> shardnode.ReplyBatch + 10, // 11: shardnode.ShardNode.SendBlocks:output_type -> shardnode.SendBlocksReply + 13, // 12: shardnode.ShardNode.AckSentBlocks:output_type -> shardnode.AckSentBlocksReply + 7, // 13: shardnode.ShardNode.JoinRaftVoter:output_type -> shardnode.JoinRaftVoterReply + 10, // [10:14] is the sub-list for method output_type + 6, // [6:10] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_shardnode_proto_init() } @@ -763,7 +931,7 @@ func file_shardnode_proto_init() { } if !protoimpl.UnsafeEnabled { file_shardnode_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReadRequest); i { + switch v := v.(*RequestBatch); i { case 0: return &v.state case 1: @@ -775,7 +943,7 @@ func file_shardnode_proto_init() { } } file_shardnode_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReadReply); i { + switch v := v.(*ReplyBatch); i { case 0: return &v.state case 1: @@ -787,7 +955,7 @@ func file_shardnode_proto_init() { } } file_shardnode_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*WriteRequest); i { + switch v := v.(*ReadRequest); i { case 0: return &v.state case 1: @@ -799,7 +967,7 @@ func file_shardnode_proto_init() { } } file_shardnode_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*WriteReply); i { + switch v := v.(*ReadReply); i { case 0: return &v.state case 1: @@ -811,7 +979,7 @@ func file_shardnode_proto_init() { } } file_shardnode_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*JoinRaftVoterRequest); i { + switch v := v.(*WriteRequest); i { case 0: return &v.state case 1: @@ -823,7 +991,7 @@ func file_shardnode_proto_init() { } } file_shardnode_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*JoinRaftVoterReply); i { + switch v := v.(*WriteReply); i { case 0: return &v.state case 1: @@ -835,7 +1003,7 @@ func file_shardnode_proto_init() { } } file_shardnode_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SendBlocksRequest); i { + switch v := v.(*JoinRaftVoterRequest); i { case 0: return &v.state case 1: @@ -847,7 +1015,7 @@ func file_shardnode_proto_init() { } } file_shardnode_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Block); i { + switch v := v.(*JoinRaftVoterReply); i { case 0: return &v.state case 1: @@ -859,7 +1027,7 @@ func file_shardnode_proto_init() { } } file_shardnode_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SendBlocksReply); i { + switch v := v.(*SendBlocksRequest); i { case 0: return &v.state case 1: @@ -871,7 +1039,7 @@ func file_shardnode_proto_init() { } } file_shardnode_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Ack); i { + switch v := v.(*Block); i { case 0: return &v.state case 1: @@ -883,7 +1051,7 @@ func file_shardnode_proto_init() { } } file_shardnode_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*AckSentBlocksRequest); i { + switch v := v.(*SendBlocksReply); i { case 0: return &v.state case 1: @@ -895,6 +1063,30 @@ func file_shardnode_proto_init() { } } file_shardnode_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Ack); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_shardnode_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*AckSentBlocksRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_shardnode_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*AckSentBlocksReply); i { case 0: return &v.state @@ -913,7 +1105,7 @@ func file_shardnode_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_shardnode_proto_rawDesc, NumEnums: 0, - NumMessages: 12, + NumMessages: 14, NumExtensions: 0, NumServices: 1, }, diff --git a/api/shardnode/shardnode_grpc.pb.go b/api/shardnode/shardnode_grpc.pb.go index d15e6f1..66dd316 100644 --- a/api/shardnode/shardnode_grpc.pb.go +++ b/api/shardnode/shardnode_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v4.23.4 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.24.3 // source: shardnode.proto package shardnode @@ -18,12 +18,18 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + ShardNode_BatchQuery_FullMethodName = "/shardnode.ShardNode/BatchQuery" + ShardNode_SendBlocks_FullMethodName = "/shardnode.ShardNode/SendBlocks" + ShardNode_AckSentBlocks_FullMethodName = "/shardnode.ShardNode/AckSentBlocks" + ShardNode_JoinRaftVoter_FullMethodName = "/shardnode.ShardNode/JoinRaftVoter" +) + // ShardNodeClient is the client API for ShardNode service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type ShardNodeClient interface { - Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadReply, error) - Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteReply, error) + BatchQuery(ctx context.Context, in *RequestBatch, opts ...grpc.CallOption) (*ReplyBatch, error) SendBlocks(ctx context.Context, in *SendBlocksRequest, opts ...grpc.CallOption) (*SendBlocksReply, error) AckSentBlocks(ctx context.Context, in *AckSentBlocksRequest, opts ...grpc.CallOption) (*AckSentBlocksReply, error) JoinRaftVoter(ctx context.Context, in *JoinRaftVoterRequest, opts ...grpc.CallOption) (*JoinRaftVoterReply, error) @@ -37,18 +43,9 @@ func NewShardNodeClient(cc grpc.ClientConnInterface) ShardNodeClient { return &shardNodeClient{cc} } -func (c *shardNodeClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadReply, error) { - out := new(ReadReply) - err := c.cc.Invoke(ctx, "/shardnode.ShardNode/Read", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shardNodeClient) Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteReply, error) { - out := new(WriteReply) - err := c.cc.Invoke(ctx, "/shardnode.ShardNode/Write", in, out, opts...) +func (c *shardNodeClient) BatchQuery(ctx context.Context, in *RequestBatch, opts ...grpc.CallOption) (*ReplyBatch, error) { + out := new(ReplyBatch) + err := c.cc.Invoke(ctx, ShardNode_BatchQuery_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -57,7 +54,7 @@ func (c *shardNodeClient) Write(ctx context.Context, in *WriteRequest, opts ...g func (c *shardNodeClient) SendBlocks(ctx context.Context, in *SendBlocksRequest, opts ...grpc.CallOption) (*SendBlocksReply, error) { out := new(SendBlocksReply) - err := c.cc.Invoke(ctx, "/shardnode.ShardNode/SendBlocks", in, out, opts...) + err := c.cc.Invoke(ctx, ShardNode_SendBlocks_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -66,7 +63,7 @@ func (c *shardNodeClient) SendBlocks(ctx context.Context, in *SendBlocksRequest, func (c *shardNodeClient) AckSentBlocks(ctx context.Context, in *AckSentBlocksRequest, opts ...grpc.CallOption) (*AckSentBlocksReply, error) { out := new(AckSentBlocksReply) - err := c.cc.Invoke(ctx, "/shardnode.ShardNode/AckSentBlocks", in, out, opts...) + err := c.cc.Invoke(ctx, ShardNode_AckSentBlocks_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -75,7 +72,7 @@ func (c *shardNodeClient) AckSentBlocks(ctx context.Context, in *AckSentBlocksRe func (c *shardNodeClient) JoinRaftVoter(ctx context.Context, in *JoinRaftVoterRequest, opts ...grpc.CallOption) (*JoinRaftVoterReply, error) { out := new(JoinRaftVoterReply) - err := c.cc.Invoke(ctx, "/shardnode.ShardNode/JoinRaftVoter", in, out, opts...) + err := c.cc.Invoke(ctx, ShardNode_JoinRaftVoter_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -86,8 +83,7 @@ func (c *shardNodeClient) JoinRaftVoter(ctx context.Context, in *JoinRaftVoterRe // All implementations must embed UnimplementedShardNodeServer // for forward compatibility type ShardNodeServer interface { - Read(context.Context, *ReadRequest) (*ReadReply, error) - Write(context.Context, *WriteRequest) (*WriteReply, error) + BatchQuery(context.Context, *RequestBatch) (*ReplyBatch, error) SendBlocks(context.Context, *SendBlocksRequest) (*SendBlocksReply, error) AckSentBlocks(context.Context, *AckSentBlocksRequest) (*AckSentBlocksReply, error) JoinRaftVoter(context.Context, *JoinRaftVoterRequest) (*JoinRaftVoterReply, error) @@ -98,11 +94,8 @@ type ShardNodeServer interface { type UnimplementedShardNodeServer struct { } -func (UnimplementedShardNodeServer) Read(context.Context, *ReadRequest) (*ReadReply, error) { - return nil, status.Errorf(codes.Unimplemented, "method Read not implemented") -} -func (UnimplementedShardNodeServer) Write(context.Context, *WriteRequest) (*WriteReply, error) { - return nil, status.Errorf(codes.Unimplemented, "method Write not implemented") +func (UnimplementedShardNodeServer) BatchQuery(context.Context, *RequestBatch) (*ReplyBatch, error) { + return nil, status.Errorf(codes.Unimplemented, "method BatchQuery not implemented") } func (UnimplementedShardNodeServer) SendBlocks(context.Context, *SendBlocksRequest) (*SendBlocksReply, error) { return nil, status.Errorf(codes.Unimplemented, "method SendBlocks not implemented") @@ -126,38 +119,20 @@ func RegisterShardNodeServer(s grpc.ServiceRegistrar, srv ShardNodeServer) { s.RegisterService(&ShardNode_ServiceDesc, srv) } -func _ShardNode_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ReadRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShardNodeServer).Read(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/shardnode.ShardNode/Read", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShardNodeServer).Read(ctx, req.(*ReadRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _ShardNode_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(WriteRequest) +func _ShardNode_BatchQuery_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RequestBatch) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ShardNodeServer).Write(ctx, in) + return srv.(ShardNodeServer).BatchQuery(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/shardnode.ShardNode/Write", + FullMethod: ShardNode_BatchQuery_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShardNodeServer).Write(ctx, req.(*WriteRequest)) + return srv.(ShardNodeServer).BatchQuery(ctx, req.(*RequestBatch)) } return interceptor(ctx, in, info, handler) } @@ -172,7 +147,7 @@ func _ShardNode_SendBlocks_Handler(srv interface{}, ctx context.Context, dec fun } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/shardnode.ShardNode/SendBlocks", + FullMethod: ShardNode_SendBlocks_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(ShardNodeServer).SendBlocks(ctx, req.(*SendBlocksRequest)) @@ -190,7 +165,7 @@ func _ShardNode_AckSentBlocks_Handler(srv interface{}, ctx context.Context, dec } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/shardnode.ShardNode/AckSentBlocks", + FullMethod: ShardNode_AckSentBlocks_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(ShardNodeServer).AckSentBlocks(ctx, req.(*AckSentBlocksRequest)) @@ -208,7 +183,7 @@ func _ShardNode_JoinRaftVoter_Handler(srv interface{}, ctx context.Context, dec } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/shardnode.ShardNode/JoinRaftVoter", + FullMethod: ShardNode_JoinRaftVoter_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(ShardNodeServer).JoinRaftVoter(ctx, req.(*JoinRaftVoterRequest)) @@ -224,12 +199,8 @@ var ShardNode_ServiceDesc = grpc.ServiceDesc{ HandlerType: (*ShardNodeServer)(nil), Methods: []grpc.MethodDesc{ { - MethodName: "Read", - Handler: _ShardNode_Read_Handler, - }, - { - MethodName: "Write", - Handler: _ShardNode_Write_Handler, + MethodName: "BatchQuery", + Handler: _ShardNode_BatchQuery_Handler, }, { MethodName: "SendBlocks", diff --git a/pkg/oramnode/server_test.go b/pkg/oramnode/server_test.go index 5db84f0..a652192 100644 --- a/pkg/oramnode/server_test.go +++ b/pkg/oramnode/server_test.go @@ -22,6 +22,10 @@ type mockShardNodeClient struct { ackSentBlocksReply func() (*shardnodepb.AckSentBlocksReply, error) } +func (m *mockShardNodeClient) BatchQuery(ctx context.Context, in *shardnodepb.RequestBatch, opts ...grpc.CallOption) (*shardnodepb.ReplyBatch, error) { + return nil, nil +} + func (m *mockShardNodeClient) Read(ctx context.Context, in *shardnodepb.ReadRequest, opts ...grpc.CallOption) (*shardnodepb.ReadReply, error) { return nil, nil } diff --git a/pkg/router/epoch.go b/pkg/router/epoch.go index 05de4c1..fc1eb29 100644 --- a/pkg/router/epoch.go +++ b/pkg/router/epoch.go @@ -15,8 +15,8 @@ import ( type epochManager struct { shardNodeRPCClients map[int]ReplicaRPCClientMap - requests map[int][]*request // map of epoch round to requests - reponseChans map[int]map[*request]chan any // map of epoch round to map of request to response channel + requests map[int][]*request // map of epoch round to requests + reponseChans map[int]map[string]chan any // map of epoch round to map of request id to response channel currentEpoch int epochDuration time.Duration hasher utils.Hasher @@ -27,7 +27,7 @@ func newEpochManager(shardNodeRPCClients map[int]ReplicaRPCClientMap, epochDurat return &epochManager{ shardNodeRPCClients: shardNodeRPCClients, requests: make(map[int][]*request), - reponseChans: make(map[int]map[*request]chan any), + reponseChans: make(map[int]map[string]chan any), currentEpoch: 0, epochDuration: epochDuration, hasher: utils.Hasher{KnownHashes: make(map[string]uint32)}, @@ -41,6 +41,7 @@ const ( type request struct { ctx context.Context + requestId string operationType int block string value string @@ -58,10 +59,10 @@ func (e *epochManager) addRequestToCurrentEpoch(r *request) chan any { }() e.requests[e.currentEpoch] = append(e.requests[e.currentEpoch], r) if _, exists := e.reponseChans[e.currentEpoch]; !exists { - e.reponseChans[e.currentEpoch] = make(map[*request]chan any) + e.reponseChans[e.currentEpoch] = make(map[string]chan any) } - e.reponseChans[e.currentEpoch][r] = make(chan any) - return e.reponseChans[e.currentEpoch][r] + e.reponseChans[e.currentEpoch][r.requestId] = make(chan any) + return e.reponseChans[e.currentEpoch][r.requestId] } func (e *epochManager) whereToForward(block string) (shardNodeID int) { @@ -79,99 +80,88 @@ type writeResponse struct { err error } -type requestResponse struct { - req *request - response any +type batchResponse struct { + readResponses []*shardnodepb.ReadReply + writeResponses []*shardnodepb.WriteReply + err error } -func (e *epochManager) sendReadRequest(req *request, responseChannel chan requestResponse) { - log.Debug().Msgf("Sending read request %v", req) - whereToForward := e.whereToForward(req.block) - shardNodeRPCClient := e.shardNodeRPCClients[whereToForward] - +func (e *epochManager) sendBatch(ctx context.Context, shardnodeClient ReplicaRPCClientMap, requestBatch *shardnodepb.RequestBatch, batchResponseChan chan batchResponse) { + log.Debug().Msgf("Sending batch of requests %v to shardnode", requestBatch) var replicaFuncs []rpc.CallFunc var clients []any - for _, c := range shardNodeRPCClient { + for _, client := range shardnodeClient { replicaFuncs = append(replicaFuncs, func(ctx context.Context, client any, request any, opts ...grpc.CallOption) (any, error) { - return client.(ShardNodeRPCClient).ClientAPI.Read(ctx, request.(*shardnodepb.ReadRequest), opts...) + return client.(ShardNodeRPCClient).ClientAPI.BatchQuery(ctx, request.(*shardnodepb.RequestBatch), opts...) }, ) - clients = append(clients, c) + clients = append(clients, client) } - reply, err := rpc.CallAllReplicas(req.ctx, clients, replicaFuncs, &shardnodepb.ReadRequest{Block: req.block}) + reply, err := rpc.CallAllReplicas(ctx, clients, replicaFuncs, requestBatch) if err != nil { - log.Error().Msgf("Error sending read request %v", err) - responseChannel <- requestResponse{req: req, response: readResponse{err: err}} - } else { - shardNodeReply := reply.(*shardnodepb.ReadReply) - log.Debug().Msgf("Received read reply %v", shardNodeReply) - responseChannel <- requestResponse{req: req, response: readResponse{value: shardNodeReply.Value, err: err}} - log.Debug().Msgf("Sent read reply %v", shardNodeReply) + batchResponseChan <- batchResponse{err: err} + return } + log.Debug().Msgf("Received batch of requests from shardnode; reply: %v", reply) + batchResponseChan <- batchResponse{readResponses: reply.(*shardnodepb.ReplyBatch).ReadReplies, writeResponses: reply.(*shardnodepb.ReplyBatch).WriteReplies, err: nil} } -func (e *epochManager) sendWriteRequest(req *request, responseChannel chan requestResponse) { - log.Debug().Msgf("Sending write request %v", req) - whereToForward := e.whereToForward(req.block) - shardNodeRPCClient := e.shardNodeRPCClients[whereToForward] - - var replicaFuncs []rpc.CallFunc - var clients []any - for _, c := range shardNodeRPCClient { - replicaFuncs = append(replicaFuncs, - func(ctx context.Context, client any, request any, opts ...grpc.CallOption) (any, error) { - return client.(ShardNodeRPCClient).ClientAPI.Write(ctx, request.(*shardnodepb.WriteRequest), opts...) - }, - ) - clients = append(clients, c) - } - - reply, err := rpc.CallAllReplicas(req.ctx, clients, replicaFuncs, &shardnodepb.WriteRequest{Block: req.block, Value: req.value}) - if err != nil { - log.Error().Msgf("Error sending write request %v", err) - responseChannel <- requestResponse{req: req, response: writeResponse{err: err}} - } else { - shardNodeReply := reply.(*shardnodepb.WriteReply) - log.Debug().Msgf("Received write reply %v", shardNodeReply) - responseChannel <- requestResponse{req: req, response: writeResponse{success: shardNodeReply.Success, err: err}} +func (e *epochManager) getShardnodeBatches(requests []*request) map[int]*shardnodepb.RequestBatch { + requestBatches := make(map[int]*shardnodepb.RequestBatch) + for _, r := range requests { + shardNodeID := e.whereToForward(r.block) + if _, exists := requestBatches[shardNodeID]; !exists { + requestBatches[shardNodeID] = &shardnodepb.RequestBatch{} + } + if r.operationType == Read { + requestBatches[shardNodeID].ReadRequests = append(requestBatches[shardNodeID].ReadRequests, &shardnodepb.ReadRequest{RequestId: r.requestId, Block: r.block}) + } else { + requestBatches[shardNodeID].WriteRequests = append(requestBatches[shardNodeID].WriteRequests, &shardnodepb.WriteRequest{RequestId: r.requestId, Block: r.block, Value: r.value}) + } } + return requestBatches } // This function waits for all the responses then answers all of the requests. // It can time out since a request may have failed. -func (e *epochManager) sendEpochRequestsAndAnswerThem(epochNumber int, requests []*request, responseChans map[*request]chan any) { - responseChannel := make(chan requestResponse) +func (e *epochManager) sendEpochRequestsAndAnswerThem(epochNumber int, requests []*request, responseChans map[string]chan any) { requestsCount := len(requests) if requestsCount == 0 { return } log.Debug().Msgf("Sending epoch requests and answering them for epoch %d with %d requests", epochNumber, requestsCount) - for _, r := range requests { - if r.operationType == Read { - go e.sendReadRequest(r, responseChannel) - } else if r.operationType == Write { - go e.sendWriteRequest(r, responseChannel) + batchRequests := e.getShardnodeBatches(requests) + batchResponseChan := make(chan batchResponse) + waitingCount := 0 + for shardNodeID, shardNodeRequests := range batchRequests { + if len(shardNodeRequests.ReadRequests) == 0 && len(shardNodeRequests.WriteRequests) == 0 { + continue } + waitingCount++ + go e.sendBatch(context.Background(), e.shardNodeRPCClients[shardNodeID], shardNodeRequests, batchResponseChan) } - timeout := time.After(10 * time.Second) // TODO: make this a parameter - responsesReceived := make(map[*request]any) - - for { - if len(responsesReceived) == requestsCount { - break - } + timeout := time.After(10 * time.Second) + for i := 0; i < waitingCount; i++ { select { case <-timeout: + log.Error().Msgf("Timed out while waiting for batch response") return - case requestResponse := <-responseChannel: - responsesReceived[requestResponse.req] = requestResponse.response + case reply := <-batchResponseChan: + if reply.err != nil { + log.Error().Msgf("Error while sending batch of requests; %s", reply.err) + continue + } + log.Debug().Msgf("Received batch reply %v", reply) + log.Debug().Msgf("Answering epoch requests for epoch %d", epochNumber) + for _, r := range reply.readResponses { + responseChans[r.RequestId] <- readResponse{value: r.Value} + } + for _, r := range reply.writeResponses { + responseChans[r.RequestId] <- writeResponse{success: r.Success} + } } } - log.Debug().Msgf("Answering epoch requests for epoch %d", epochNumber) - for req, response := range responsesReceived { - responseChans[req] <- response - } } // This function runs the epochManger forever. diff --git a/pkg/router/epoch_test.go b/pkg/router/epoch_test.go index fdc904e..d0f9498 100644 --- a/pkg/router/epoch_test.go +++ b/pkg/router/epoch_test.go @@ -13,12 +13,12 @@ import ( func TestAddRequestToCurrentEpochAddsRequestAndChannel(t *testing.T) { e := newEpochManager(make(map[int]ReplicaRPCClientMap), time.Second) e.currentEpoch = 12 - req := &request{ctx: context.Background(), operationType: Read, block: "a", value: "value"} + req := &request{ctx: context.Background(), requestId: "test_request_id", operationType: Read, block: "a", value: "value"} e.addRequestToCurrentEpoch(req) - if len(e.requests[12]) != 1 || e.requests[12][0].block != "a" || e.requests[12][0].value != "value" || e.requests[12][0].operationType != Read { + if len(e.requests[12]) != 1 || e.requests[12][0].requestId != "test_request_id" || e.requests[12][0].block != "a" || e.requests[12][0].value != "value" || e.requests[12][0].operationType != Read { t.Errorf("Expected request to be added to current epoch requests") } - _, exists := e.reponseChans[12][req] + _, exists := e.reponseChans[12]["test_request_id"] if len(e.reponseChans[12]) != 1 || !exists { t.Errorf("Expected request to be added to the channel map") } @@ -64,16 +64,59 @@ func TestWhereToForward(t *testing.T) { } } -type mockShardNodeClient struct { - readReply func() (*shardnodepb.ReadReply, error) - writeReply func() (*shardnodepb.WriteReply, error) +func TestGetShardnodeBatchesAddsEachRequestToCorrectBatch(t *testing.T) { + e := createTestEpochManager(3) + requests := []*request{ + {ctx: context.Background(), requestId: "1", operationType: Read, block: "a", value: "value"}, + {ctx: context.Background(), requestId: "2", operationType: Write, block: "b", value: "value"}, + {ctx: context.Background(), requestId: "3", operationType: Read, block: "c", value: "value"}, + {ctx: context.Background(), requestId: "4", operationType: Write, block: "d", value: "value"}, + {ctx: context.Background(), requestId: "5", operationType: Write, block: "e", value: "value"}, + } + expectedBatchs := map[int]*shardnodepb.RequestBatch{ + 0: {}, + 1: { + ReadRequests: []*shardnodepb.ReadRequest{ + {RequestId: "1", Block: "a"}, + }, + WriteRequests: []*shardnodepb.WriteRequest{ + {RequestId: "2", Block: "b", Value: "value"}, + {RequestId: "4", Block: "d", Value: "value"}, + }, + }, + 2: { + ReadRequests: []*shardnodepb.ReadRequest{ + {RequestId: "3", Block: "c"}, + }, + WriteRequests: []*shardnodepb.WriteRequest{ + {RequestId: "5", Block: "e", Value: "value"}, + }, + }, + } + batches := e.getShardnodeBatches(requests) + for shardNodeID, batch := range batches { + if len(batch.ReadRequests) != len(expectedBatchs[shardNodeID].ReadRequests) || len(batch.WriteRequests) != len(expectedBatchs[shardNodeID].WriteRequests) { + t.Errorf("Expected to see %d read requests and %d write requests for shard node %d", len(expectedBatchs[shardNodeID].ReadRequests), len(expectedBatchs[shardNodeID].WriteRequests), shardNodeID) + } + for i := 0; i < len(batch.ReadRequests); i++ { + if batch.ReadRequests[i].RequestId != expectedBatchs[shardNodeID].ReadRequests[i].RequestId || batch.ReadRequests[i].Block != expectedBatchs[shardNodeID].ReadRequests[i].Block { + t.Errorf("Expected to see read request %v at index %d for shard node %d", expectedBatchs[shardNodeID].ReadRequests[i], i, shardNodeID) + } + } + for i := 0; i < len(batch.WriteRequests); i++ { + if batch.WriteRequests[i].RequestId != expectedBatchs[shardNodeID].WriteRequests[i].RequestId || batch.WriteRequests[i].Block != expectedBatchs[shardNodeID].WriteRequests[i].Block || batch.WriteRequests[i].Value != expectedBatchs[shardNodeID].WriteRequests[i].Value { + t.Errorf("Expected to see write request %v at index %d for shard node %d", expectedBatchs[shardNodeID].WriteRequests[i], i, shardNodeID) + } + } + } } -func (m *mockShardNodeClient) Read(ctx context.Context, in *shardnodepb.ReadRequest, opts ...grpc.CallOption) (*shardnodepb.ReadReply, error) { - return m.readReply() +type mockShardNodeClient struct { + batchReply func() (*shardnodepb.ReplyBatch, error) } -func (m *mockShardNodeClient) Write(ctx context.Context, in *shardnodepb.WriteRequest, opts ...grpc.CallOption) (*shardnodepb.WriteReply, error) { - return m.writeReply() + +func (m *mockShardNodeClient) BatchQuery(ctx context.Context, in *shardnodepb.RequestBatch, opts ...grpc.CallOption) (*shardnodepb.ReplyBatch, error) { + return m.batchReply() } func (m *mockShardNodeClient) SendBlocks(ctx context.Context, in *shardnodepb.SendBlocksRequest, opts ...grpc.CallOption) (*shardnodepb.SendBlocksReply, error) { return nil, nil @@ -90,21 +133,38 @@ func getMockShardNodeClients() map[int]ReplicaRPCClientMap { 0: map[int]ShardNodeRPCClient{ 0: { ClientAPI: &mockShardNodeClient{ - readReply: func() (*shardnodepb.ReadReply, error) { - return &shardnodepb.ReadReply{Value: "valA"}, nil - }, - writeReply: func() (*shardnodepb.WriteReply, error) { - return &shardnodepb.WriteReply{Success: true}, nil + batchReply: func() (*shardnodepb.ReplyBatch, error) { + return &shardnodepb.ReplyBatch{ + ReadReplies: []*shardnodepb.ReadReply{ + {RequestId: "a", Value: "123"}, + }, + WriteReplies: []*shardnodepb.WriteReply{ + {RequestId: "c", Success: true}, + }, + }, nil }, }, }, 1: { ClientAPI: &mockShardNodeClient{ - readReply: func() (*shardnodepb.ReadReply, error) { + batchReply: func() (*shardnodepb.ReplyBatch, error) { return nil, fmt.Errorf("not the leader") }, - writeReply: func() (*shardnodepb.WriteReply, error) { - return nil, fmt.Errorf("not the leader") + }, + }, + }, + 1: map[int]ShardNodeRPCClient{ + 0: { + ClientAPI: &mockShardNodeClient{ + batchReply: func() (*shardnodepb.ReplyBatch, error) { + return &shardnodepb.ReplyBatch{ + ReadReplies: []*shardnodepb.ReadReply{ + {RequestId: "b", Value: "123"}, + }, + WriteReplies: []*shardnodepb.WriteReply{ + {RequestId: "d", Success: true}, + }, + }, nil }, }, }, @@ -115,22 +175,28 @@ func getMockShardNodeClients() map[int]ReplicaRPCClientMap { func TestSendEpochRequestsAndAnswerThemReturnsAllResponses(t *testing.T) { e := newEpochManager(getMockShardNodeClients(), time.Second) e.currentEpoch = 2 - request1 := &request{ctx: context.Background(), operationType: Write, block: "a", value: "123"} - request2 := &request{ctx: context.Background(), operationType: Read, block: "b"} + request1 := &request{ctx: context.Background(), requestId: "a", operationType: Read, block: "a"} + request2 := &request{ctx: context.Background(), requestId: "c", operationType: Write, block: "b", value: "123"} + request3 := &request{ctx: context.Background(), requestId: "b", operationType: Read, block: "c"} + request4 := &request{ctx: context.Background(), requestId: "d", operationType: Write, block: "d", value: "123"} e.requests[1] = []*request{ - request1, request2, + request1, request2, request3, request4, } - e.reponseChans[1] = make(map[*request]chan any) + e.reponseChans[1] = make(map[string]chan any) chan1 := make(chan any) chan2 := make(chan any) - e.reponseChans[1][request1] = chan1 - e.reponseChans[1][request2] = chan2 + chan3 := make(chan any) + chan4 := make(chan any) + e.reponseChans[1]["a"] = chan1 + e.reponseChans[1]["c"] = chan2 + e.reponseChans[1]["b"] = chan3 + e.reponseChans[1]["d"] = chan4 go e.sendEpochRequestsAndAnswerThem(1, e.requests[1], e.reponseChans[1]) timeout := time.After(5 * time.Second) responseCount := 0 for { - if responseCount == 2 { + if responseCount == 4 { return } select { @@ -141,6 +207,10 @@ func TestSendEpochRequestsAndAnswerThemReturnsAllResponses(t *testing.T) { responseCount++ case <-chan2: responseCount++ + case <-chan3: + responseCount++ + case <-chan4: + responseCount++ } } } diff --git a/pkg/router/server.go b/pkg/router/server.go index 79ae25f..e9712ad 100644 --- a/pkg/router/server.go +++ b/pkg/router/server.go @@ -9,6 +9,7 @@ import ( pb "github.com/dsg-uwaterloo/oblishard/api/router" "github.com/dsg-uwaterloo/oblishard/pkg/config" "github.com/dsg-uwaterloo/oblishard/pkg/rpc" + "github.com/google/uuid" "github.com/rs/zerolog/log" "go.opentelemetry.io/otel" "google.golang.org/grpc" @@ -32,7 +33,7 @@ func (r *routerServer) Read(ctx context.Context, readRequest *pb.ReadRequest) (* log.Debug().Msgf("Received read request for block %s", readRequest.Block) tracer := otel.Tracer("") ctx, span := tracer.Start(ctx, "router read request") - responseChannel := r.epochManager.addRequestToCurrentEpoch(&request{ctx: rpc.GetContextWithRequestID(ctx), operationType: Read, block: readRequest.Block}) + responseChannel := r.epochManager.addRequestToCurrentEpoch(&request{ctx: ctx, requestId: uuid.New().String(), operationType: Read, block: readRequest.Block}) response := <-responseChannel readResponse := response.(readResponse) if readResponse.err != nil { @@ -47,7 +48,7 @@ func (r *routerServer) Write(ctx context.Context, writeRequest *pb.WriteRequest) log.Debug().Msgf("Received write request for block %s", writeRequest.Block) tracer := otel.Tracer("") ctx, span := tracer.Start(ctx, "router write request") - responseChannel := r.epochManager.addRequestToCurrentEpoch(&request{ctx: rpc.GetContextWithRequestID(ctx), operationType: Write, block: writeRequest.Block, value: writeRequest.Value}) + responseChannel := r.epochManager.addRequestToCurrentEpoch(&request{ctx: ctx, requestId: uuid.New().String(), operationType: Write, block: writeRequest.Block, value: writeRequest.Value}) response := <-responseChannel writeResponse := response.(writeResponse) if writeResponse.err != nil { diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 485b76a..57cd4a4 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -2,10 +2,7 @@ package rpc import ( "context" - "fmt" - "github.com/google/uuid" - "github.com/rs/zerolog/log" "go.opentelemetry.io/otel" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -48,23 +45,3 @@ func ContextPropagationUnaryServerInterceptor() grpc.UnaryServerInterceptor { return handler(ctx, req) } } - -func GetContextWithRequestID(ctx context.Context) context.Context { - requestID := uuid.New().String() - ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("requestid", requestID)) - log.Debug().Msgf("Added request id %s to context", requestID) - return ctx -} - -func GetRequestIDFromContext(ctx context.Context) (string, error) { - tracer := otel.Tracer("") - ctx, span := tracer.Start(ctx, "get request id from context") - md, _ := metadata.FromIncomingContext(ctx) - requestID, exists := md["requestid"] - if !exists || len(requestID) == 0 { - return "", fmt.Errorf("requestid not found in the request metadata") - } - span.End() - log.Debug().Msgf("Got request id %s from context", requestID[0]) - return requestID[0], nil -} diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index dd4fe69..f8f29aa 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -9,24 +9,6 @@ import ( "google.golang.org/grpc/metadata" ) -func TestGetContextWithRequestIDHasUUIDInMetadata(t *testing.T) { - ctx := rpc.GetContextWithRequestID(context.Background()) - md, _ := metadata.FromOutgoingContext(ctx) - if len(md["requestid"]) != 1 { - t.Errorf("Expected a value in the requestid metadata for the request but got metadata: %v", md) - } -} - -func TestGetContextWithRequestIDWhenCalledMultipleTimesGeneratesDistinctRequestIDs(t *testing.T) { - ctx1 := rpc.GetContextWithRequestID(context.Background()) - ctx2 := rpc.GetContextWithRequestID(context.Background()) - md1, _ := metadata.FromOutgoingContext(ctx1) - md2, _ := metadata.FromOutgoingContext(ctx2) - if md1["requestid"][0] == md2["requestid"][0] { - t.Errorf("Expected different values for requestid for different contexts but got: %s", md1["requestid"][0]) - } -} - func TestContextPropagationUnaryServerInterceptorSendsIngoingContextToOutgoingContext(t *testing.T) { initCtx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("test", "test_metadata")) @@ -41,21 +23,3 @@ func TestContextPropagationUnaryServerInterceptorSendsIngoingContextToOutgoingCo t.Errorf("Expected to see metadata on outgoing context") } } - -func TestGetRequestIDFromContextReturnsRequestIDFromMetadata(t *testing.T) { - ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("requestid", "1238394")) - requestID, err := rpc.GetRequestIDFromContext(ctx) - if err != nil { - t.Errorf("Expected no error in call to GetContextWithRequestID when requestid is available in the metadata") - } - if requestID != "1238394" { - t.Errorf("Expected requestID to be 1238394 but got: %s", requestID) - } -} - -func TestGetRequestIDFromContextWithNoRequestIDReturnsError(t *testing.T) { - _, err := rpc.GetRequestIDFromContext(context.Background()) - if err == nil { - t.Errorf("GetRequestIDFromContext should return error when no request id is available") - } -} diff --git a/pkg/shardnode/raft.go b/pkg/shardnode/raft.go index e0aef13..76cf406 100644 --- a/pkg/shardnode/raft.go +++ b/pkg/shardnode/raft.go @@ -80,19 +80,23 @@ func (fsm *shardNodeFSM) String() string { return out } -func (fsm *shardNodeFSM) handleReplicateRequestAndPathAndStorage(requestID string, r ReplicateRequestAndPathAndStoragePayload) (isFirst bool) { - fsm.requestLog[r.RequestedBlock] = append(fsm.requestLog[r.RequestedBlock], requestID) - fsm.pathMap[requestID] = r.Path - fsm.storageIDMap[requestID] = r.StorageID - if len(fsm.requestLog[r.RequestedBlock]) == 1 { - isFirst = true - } else { - isFirst = false +func (fsm *shardNodeFSM) handleBatchReplicateRequestAndPathAndStorage(r BatchReplicateRequestAndPathAndStoragePayload) (isFirstMap map[string]bool) { + isFirstMap = make(map[string]bool) + for _, r := range r.Requests { + fsm.requestLog[r.RequestedBlock] = append(fsm.requestLog[r.RequestedBlock], r.RequestID) + fsm.pathMap[r.RequestID] = r.Path + fsm.storageIDMap[r.RequestID] = r.StorageID + if len(fsm.requestLog[r.RequestedBlock]) == 1 { + isFirstMap[r.RequestID] = true + } else { + isFirstMap[r.RequestID] = false + } } - return isFirst + return isFirstMap } -func (fsm *shardNodeFSM) handleReplicateResponse(requestID string, r ReplicateResponsePayload) string { +func (fsm *shardNodeFSM) handleReplicateResponse(r ReplicateResponsePayload) string { + requestID := r.RequestID log.Debug().Msgf("Aquiring lock for shardNodeFSM in handleReplicateResponse") start := time.Now() fsm.stashMu.Lock() @@ -129,7 +133,10 @@ func (fsm *shardNodeFSM) handleReplicateResponse(requestID string, r ReplicateRe for i := len(fsm.requestLog[r.RequestedBlock]) - 1; i >= 1; i-- { // We don't need to send the response to the first request log.Debug().Msgf("Sending response to concurrent request number %d in requestLog for block %s", i, r.RequestedBlock) timeout := time.After(5 * time.Second) // TODO: think about this in the batching scenario - responseChan, _ := fsm.responseChannel.Load(fsm.requestLog[r.RequestedBlock][i]) + responseChan, exists := fsm.responseChannel.Load(fsm.requestLog[r.RequestedBlock][i]) + if !exists { + log.Fatal().Msgf("response channel for request %s does not exist", fsm.requestLog[r.RequestedBlock][i]) + } select { case <-timeout: log.Error().Msgf("timeout in sending response to concurrent request number %d in requestLog for block %s", i, r.RequestedBlock) @@ -210,15 +217,14 @@ func (fsm *shardNodeFSM) Apply(rLog *raft.Log) interface{} { if err != nil { return fmt.Errorf("could not unmarshall the command; %s", err) } - requestID := command.RequestID - if command.Type == ReplicateRequestAndPathAndStorageCommand { + if command.Type == BatchReplicateRequestAndPathAndStorageCommand { log.Debug().Msgf("got replication command for replicate request") - var requestReplicationPayload ReplicateRequestAndPathAndStoragePayload + var requestReplicationPayload BatchReplicateRequestAndPathAndStoragePayload err := msgpack.Unmarshal(command.Payload, &requestReplicationPayload) if err != nil { return fmt.Errorf("could not unmarshall the request replication command; %s", err) } - return fsm.handleReplicateRequestAndPathAndStorage(requestID, requestReplicationPayload) + return fsm.handleBatchReplicateRequestAndPathAndStorage(requestReplicationPayload) } else if command.Type == ReplicateResponseCommand { log.Debug().Msgf("got replication command for replicate response") var responseReplicationPayload ReplicateResponsePayload @@ -226,7 +232,7 @@ func (fsm *shardNodeFSM) Apply(rLog *raft.Log) interface{} { if err != nil { return fmt.Errorf("could not unmarshall the response replication command; %s", err) } - return fsm.handleReplicateResponse(requestID, responseReplicationPayload) + return fsm.handleReplicateResponse(responseReplicationPayload) } else if command.Type == ReplicateSentBlocksCommand { log.Debug().Msgf("got replication command for replicate sent blocks") var replicateSentBlocksPayload ReplicateSentBlocksPayload diff --git a/pkg/shardnode/raft_msg.go b/pkg/shardnode/raft_msg.go index a160be7..618fcee 100644 --- a/pkg/shardnode/raft_msg.go +++ b/pkg/shardnode/raft_msg.go @@ -9,40 +9,41 @@ import ( type CommandType int const ( - ReplicateRequestAndPathAndStorageCommand CommandType = iota + BatchReplicateRequestAndPathAndStorageCommand CommandType = iota ReplicateResponseCommand ReplicateSentBlocksCommand ReplicateAcksNacksCommand ) type Command struct { - Type CommandType - RequestID string - Payload []byte + Type CommandType + Payload []byte } type ReplicateRequestAndPathAndStoragePayload struct { RequestedBlock string Path int StorageID int + RequestID string } -func newRequestReplicationCommand(block string, requestID string, path int, storageID int) ([]byte, error) { - requestReplicationPayload, err := msgpack.Marshal( - &ReplicateRequestAndPathAndStoragePayload{ - RequestedBlock: block, - Path: path, - StorageID: storageID, +type BatchReplicateRequestAndPathAndStoragePayload struct { + Requests []ReplicateRequestAndPathAndStoragePayload +} + +func newRequestReplicationCommand(requests []ReplicateRequestAndPathAndStoragePayload) ([]byte, error) { + batchRequestReplicationPayload, err := msgpack.Marshal( + &BatchReplicateRequestAndPathAndStoragePayload{ + Requests: requests, }, ) if err != nil { - return nil, fmt.Errorf("could not marshal the request, path, storage replication payload %s", err) + return nil, fmt.Errorf("could not marshal the batch request, path, storage replication payload %s", err) } requestReplicationCommand, err := msgpack.Marshal( &Command{ - Type: ReplicateRequestAndPathAndStorageCommand, - RequestID: requestID, - Payload: requestReplicationPayload, + Type: BatchReplicateRequestAndPathAndStorageCommand, + Payload: batchRequestReplicationPayload, }, ) if err != nil { @@ -57,6 +58,7 @@ type ReplicateResponsePayload struct { Response string NewValue string OpType OperationType + RequestID string } func newResponseReplicationCommand(response string, requestID string, block string, newValue string, opType OperationType) ([]byte, error) { @@ -66,6 +68,7 @@ func newResponseReplicationCommand(response string, requestID string, block stri RequestedBlock: block, NewValue: newValue, OpType: opType, + RequestID: requestID, }, ) if err != nil { @@ -73,9 +76,8 @@ func newResponseReplicationCommand(response string, requestID string, block stri } responseReplicationCommand, err := msgpack.Marshal( &Command{ - Type: ReplicateResponseCommand, - RequestID: requestID, - Payload: responseReplicationPayload, + Type: ReplicateResponseCommand, + Payload: responseReplicationPayload, }, ) if err != nil { diff --git a/pkg/shardnode/raft_test.go b/pkg/shardnode/raft_test.go index 17cb590..a210673 100644 --- a/pkg/shardnode/raft_test.go +++ b/pkg/shardnode/raft_test.go @@ -8,26 +8,41 @@ import ( "github.com/hashicorp/raft" ) -func createTestReplicateRequestAndPathAndStoragePayload(block string, path int, storageID int) ReplicateRequestAndPathAndStoragePayload { - return ReplicateRequestAndPathAndStoragePayload{ - RequestedBlock: block, - Path: path, - StorageID: storageID, - } -} - -func TestHandleReplicateRequestAndPathAndStorageToEmptyFSM(t *testing.T) { +func TestHandleBatchReplicateRequestAndPathAndStorageToEmptyFSM(t *testing.T) { shardNodeFSM := newShardNodeFSM() - payload := createTestReplicateRequestAndPathAndStoragePayload("block", 11, 12) - shardNodeFSM.handleReplicateRequestAndPathAndStorage("request1", payload) - if len(shardNodeFSM.requestLog["block"]) != 1 || shardNodeFSM.requestLog["block"][0] != "request1" { - t.Errorf("Expected request1 to be in the requestLog, but the array is equal to %v", shardNodeFSM.requestLog["block"]) + payload := BatchReplicateRequestAndPathAndStoragePayload{ + Requests: []ReplicateRequestAndPathAndStoragePayload{ + {RequestedBlock: "block1", Path: 1, StorageID: 2, RequestID: "request1"}, + {RequestedBlock: "block2", Path: 3, StorageID: 4, RequestID: "request2"}, + }, } - if shardNodeFSM.pathMap["request1"] != 11 { - t.Errorf("Expected path for request1 to be equal to 11, but the path is equal to %d", shardNodeFSM.pathMap["request1"]) + isFirstMap := shardNodeFSM.handleBatchReplicateRequestAndPathAndStorage(payload) + expectedIsFirstMap := map[string]bool{"request1": true, "request2": true} + if len(isFirstMap) != len(expectedIsFirstMap) { + t.Errorf("Expected isFirstMap to have length %d, but it has length %d", len(expectedIsFirstMap), len(isFirstMap)) } - if shardNodeFSM.storageIDMap["request1"] != 12 { - t.Errorf("Expected storage id for request1 to be equal to 12, but the storage id is equal to %d", shardNodeFSM.storageIDMap["request1"]) + for key, val := range isFirstMap { + if val != expectedIsFirstMap[key] { + t.Errorf("Expected isFirstMap[%s] to be %t, but it's %t", key, expectedIsFirstMap[key], val) + } + } + if len(shardNodeFSM.requestLog["block1"]) != 1 || shardNodeFSM.requestLog["block1"][0] != "request1" { + t.Errorf("Expected request1 to be in the requestLog, but the array is equal to %v", shardNodeFSM.requestLog["block1"]) + } + if len(shardNodeFSM.requestLog["block2"]) != 1 || shardNodeFSM.requestLog["block2"][0] != "request2" { + t.Errorf("Expected request2 to be in the requestLog, but the array is equal to %v", shardNodeFSM.requestLog["block2"]) + } + if shardNodeFSM.pathMap["request1"] != 1 { + t.Errorf("Expected path for request1 to be equal to 1, but the path is equal to %d", shardNodeFSM.pathMap["request1"]) + } + if shardNodeFSM.pathMap["request2"] != 3 { + t.Errorf("Expected path for request2 to be equal to 3, but the path is equal to %d", shardNodeFSM.pathMap["request2"]) + } + if shardNodeFSM.storageIDMap["request1"] != 2 { + t.Errorf("Expected storage id for request1 to be equal to 2, but the storage id is equal to %d", shardNodeFSM.storageIDMap["request1"]) + } + if shardNodeFSM.storageIDMap["request2"] != 4 { + t.Errorf("Expected storage id for request2 to be equal to 4, but the storage id is equal to %d", shardNodeFSM.storageIDMap["request2"]) } } @@ -36,12 +51,27 @@ func TestHandleReplicateRequestAndPathAndStorageToWithValueFSM(t *testing.T) { shardNodeFSM.requestLog["block"] = []string{"randomrequest"} shardNodeFSM.pathMap["request1"] = 20 shardNodeFSM.storageIDMap["request1"] = 30 - payload := createTestReplicateRequestAndPathAndStoragePayload("block", 11, 12) - shardNodeFSM.handleReplicateRequestAndPathAndStorage("request1", payload) - if len(shardNodeFSM.requestLog["block"]) != 2 || + payload := BatchReplicateRequestAndPathAndStoragePayload{ + Requests: []ReplicateRequestAndPathAndStoragePayload{ + {RequestedBlock: "block", Path: 11, StorageID: 12, RequestID: "request1"}, + {RequestedBlock: "block", Path: 3, StorageID: 4, RequestID: "request2"}, + }, + } + isFirstMap := shardNodeFSM.handleBatchReplicateRequestAndPathAndStorage(payload) + expectedIsFirstMap := map[string]bool{"request1": false, "request2": false} + if len(isFirstMap) != len(expectedIsFirstMap) { + t.Errorf("Expected isFirstMap to have length %d, but it has length %d", len(expectedIsFirstMap), len(isFirstMap)) + } + for key, val := range isFirstMap { + if val != expectedIsFirstMap[key] { + t.Errorf("Expected isFirstMap[%s] to be %t, but it's %t", key, expectedIsFirstMap[key], val) + } + } + if len(shardNodeFSM.requestLog["block"]) != 3 || shardNodeFSM.requestLog["block"][0] != "randomrequest" || - shardNodeFSM.requestLog["block"][1] != "request1" { - t.Errorf("Expected request1 to be in the second position of requestLog, but the array is equal to %v", shardNodeFSM.requestLog["block"]) + shardNodeFSM.requestLog["block"][1] != "request1" || + shardNodeFSM.requestLog["block"][2] != "request2" { + t.Errorf("Expected request1 and request2 to be in the requestLog, but the array is equal to %v", shardNodeFSM.requestLog["block"]) } if shardNodeFSM.pathMap["request1"] != 11 { t.Errorf("Expected path for request1 to be equal to 11, but the path is equal to %d", shardNodeFSM.pathMap["request1"]) @@ -49,11 +79,18 @@ func TestHandleReplicateRequestAndPathAndStorageToWithValueFSM(t *testing.T) { if shardNodeFSM.storageIDMap["request1"] != 12 { t.Errorf("Expected storage id for request1 to be equal to 12, but the storage id is equal to %d", shardNodeFSM.storageIDMap["request1"]) } + if shardNodeFSM.pathMap["request2"] != 3 { + t.Errorf("Expected path for request2 to be equal to 3, but the path is equal to %d", shardNodeFSM.pathMap["request2"]) + } + if shardNodeFSM.storageIDMap["request2"] != 4 { + t.Errorf("Expected storage id for request2 to be equal to 4, but the storage id is equal to %d", shardNodeFSM.storageIDMap["request2"]) + } } -func createTestReplicateResponsePayload(block string, response string, value string, op OperationType) ReplicateResponsePayload { +func createTestReplicateResponsePayload(block string, requestID string, response string, value string, op OperationType) ReplicateResponsePayload { return ReplicateResponsePayload{ RequestedBlock: block, + RequestID: requestID, Response: response, NewValue: value, OpType: op, @@ -125,8 +162,8 @@ func TestHandleReplicateResponseWhenValueInStashReturnsCorrectReadValueToAllWait shardNodeFSM.responseChannel.Store("request3", make(chan string)) shardNodeFSM.stash["block"] = stashState{value: "test_value"} - payload := createTestReplicateResponsePayload("block", "response", "value", Read) - go shardNodeFSM.handleReplicateResponse("request1", payload) + payload := createTestReplicateResponsePayload("block", "request1", "response", "value", Read) + go shardNodeFSM.handleReplicateResponse(payload) checkWaitingChannelsHelper(t, shardNodeFSM.responseChannel, "test_value") } @@ -139,8 +176,8 @@ func TestHandleReplicateResponseWhenValueInStashReturnsCorrectWriteValueToAllWai shardNodeFSM.responseChannel.Store("request3", make(chan string)) shardNodeFSM.stash["block"] = stashState{value: "test_value"} - payload := createTestReplicateResponsePayload("block", "response", "value_write", Write) - go shardNodeFSM.handleReplicateResponse("request1", payload) + payload := createTestReplicateResponsePayload("block", "request1", "response", "value_write", Write) + go shardNodeFSM.handleReplicateResponse(payload) checkWaitingChannelsHelper(t, shardNodeFSM.responseChannel, "value_write") @@ -156,8 +193,8 @@ func TestHandleReplicateResponseWhenValueNotInStashReturnsResponseToAllWaitingRe shardNodeFSM.responseChannel.Store("request2", make(chan string)) shardNodeFSM.responseChannel.Store("request3", make(chan string)) - payload := createTestReplicateResponsePayload("block", "response_from_oramnode", "", Read) - go shardNodeFSM.handleReplicateResponse("request1", payload) + payload := createTestReplicateResponsePayload("block", "request1", "response_from_oramnode", "", Read) + go shardNodeFSM.handleReplicateResponse(payload) checkWaitingChannelsHelper(t, shardNodeFSM.responseChannel, "response_from_oramnode") @@ -173,8 +210,8 @@ func TestHandleReplicateResponseWhenValueNotInStashReturnsWriteResponseToAllWait shardNodeFSM.responseChannel.Store("request2", make(chan string)) shardNodeFSM.responseChannel.Store("request3", make(chan string)) - payload := createTestReplicateResponsePayload("block", "response", "write_val", Write) - go shardNodeFSM.handleReplicateResponse("request1", payload) + payload := createTestReplicateResponsePayload("block", "request1", "response", "write_val", Write) + go shardNodeFSM.handleReplicateResponse(payload) checkWaitingChannelsHelper(t, shardNodeFSM.responseChannel, "write_val") @@ -191,8 +228,8 @@ func TestHandleReplicateResponseWhenNotLeaderDoesNotWriteOnChannels(t *testing.T shardNodeFSM.responseChannel.Store("request2", make(chan string)) shardNodeFSM.stash["block"] = stashState{value: "test_value"} - payload := createTestReplicateResponsePayload("block", "response", "", Read) - go shardNodeFSM.handleReplicateResponse("request1", payload) + payload := createTestReplicateResponsePayload("block", "request1", "response", "", Read) + go shardNodeFSM.handleReplicateResponse(payload) for { ch1Any, _ := shardNodeFSM.responseChannel.Load("request1") diff --git a/pkg/shardnode/server.go b/pkg/shardnode/server.go index df2134e..0af325c 100644 --- a/pkg/shardnode/server.go +++ b/pkg/shardnode/server.go @@ -68,10 +68,17 @@ func (s *shardNodeServer) getWhatToSendBasedOnRequest(ctx context.Context, block } // It creates a channel for receiving the response from the raft FSM for the current requestID. -func (s *shardNodeServer) createResponseChannelForRequestID(requestID string) chan string { - ch := make(chan string) - s.shardNodeFSM.responseChannel.Store(requestID, ch) - return ch +func (s *shardNodeServer) createResponseChannelForBatch(readRequests []*pb.ReadRequest, writeRequests []*pb.WriteRequest) map[string]chan string { + channelMap := make(map[string]chan string) + for _, req := range readRequests { + channelMap[req.RequestId] = make(chan string) + s.shardNodeFSM.responseChannel.Store(req.RequestId, channelMap[req.RequestId]) + } + for _, req := range writeRequests { + channelMap[req.RequestId] = make(chan string) + s.shardNodeFSM.responseChannel.Store(req.RequestId, channelMap[req.RequestId]) + } + return channelMap } // It periodically sends batches. @@ -112,7 +119,7 @@ func (s *shardNodeServer) sendCurrentBatches() { } waitingBatchCount++ oramNodeReplicaMap := s.oramNodeClients[s.storageORAMNodeMap[storageID]] - go s.batchManager.asyncBatchRequests(requests[0].ctx, storageID, requests, oramNodeReplicaMap, batchRequestResponseChan) + go s.batchManager.asyncBatchRequests(context.Background(), storageID, requests, oramNodeReplicaMap, batchRequestResponseChan) } for i := 0; i < waitingBatchCount; i++ { @@ -139,82 +146,127 @@ func (s *shardNodeServer) sendCurrentBatches() { } } -func (s *shardNodeServer) query(ctx context.Context, op OperationType, block string, value string) (string, error) { - if s.raftNode.State() != raft.Leader { - return "", fmt.Errorf(commonerrs.NotTheLeaderError) +func (s *shardNodeServer) getRequestReplicationBlocks(readRequests []*pb.ReadRequest, writeRequests []*pb.WriteRequest) (requestReplicationBlocks []ReplicateRequestAndPathAndStoragePayload) { + for _, readRequest := range readRequests { + newPath, newStorageID := storage.GetRandomPathAndStorageID(s.storageTreeHeight, len(s.storageORAMNodeMap)) + requestReplicationBlocks = append(requestReplicationBlocks, ReplicateRequestAndPathAndStoragePayload{ + RequestedBlock: readRequest.Block, + RequestID: readRequest.RequestId, + Path: newPath, + StorageID: newStorageID, + }) } - tracer := otel.Tracer("") - ctx, querySpan := tracer.Start(ctx, "shardnode query") - requestID, err := rpc.GetRequestIDFromContext(ctx) - if err != nil { - return "", fmt.Errorf("unable to read requestid from request; %s", err) + for _, writeRequest := range writeRequests { + newPath, newStorageID := storage.GetRandomPathAndStorageID(s.storageTreeHeight, len(s.storageORAMNodeMap)) + requestReplicationBlocks = append(requestReplicationBlocks, ReplicateRequestAndPathAndStoragePayload{ + RequestedBlock: writeRequest.Block, + RequestID: writeRequest.RequestId, + Path: newPath, + StorageID: newStorageID, + }) } + return requestReplicationBlocks +} - responseChannel := s.createResponseChannelForRequestID(requestID) - newPath, newStorageID := storage.GetRandomPathAndStorageID(s.storageTreeHeight, len(s.storageORAMNodeMap)) - requestReplicationCommand, err := newRequestReplicationCommand(block, requestID, newPath, newStorageID) - if err != nil { - return "", fmt.Errorf("could not create request replication command; %s", err) - } - _, requestReplicationSpan := tracer.Start(ctx, "apply request replication") - requestApplyFuture := s.raftNode.Apply(requestReplicationCommand, 0) - err = requestApplyFuture.Error() - requestReplicationSpan.End() - if err != nil { - return "", fmt.Errorf("could not apply log to the FSM; %s", err) - } - isFirst := requestApplyFuture.Response().(bool) +type finalResponse struct { + requestId string + value string + opType OperationType + err error +} - blockToRequest, path, storageID := s.getWhatToSendBasedOnRequest(ctx, block, requestID, isFirst) +func (s *shardNodeServer) query(ctx context.Context, block string, requestID string, isFirst bool, newVal string, opType OperationType, raftResponseChannel chan string, finalResponseChannel chan finalResponse) { + tracer := otel.Tracer("") + blockToRequest, path, storageID := s.getWhatToSendBasedOnRequest(ctx, block, requestID, isFirst) var replyValue string _, waitOnReplySpan := tracer.Start(ctx, "wait on reply") - - log.Debug().Msgf("Adding request to storage queue and waiting for block %s", block) + log.Debug().Msgf("Adding request to storage queue and waiting for block %s", blockToRequest) oramReplyChan := s.batchManager.addRequestToStorageQueueAndWait(blockRequest{ctx: ctx, block: blockToRequest, path: path}, storageID) replyValue = <-oramReplyChan - log.Debug().Msgf("Got reply from oram node channel for block %s; value: %s", block, replyValue) - + log.Debug().Msgf("Got reply from oram node channel for block %s; value: %s", blockToRequest, replyValue) waitOnReplySpan.End() + if isFirst { - log.Debug().Msgf("Adding response to response channel for block %s", block) - responseReplicationCommand, err := newResponseReplicationCommand(replyValue, requestID, block, value, op) + log.Debug().Msgf("Adding response to response channel for block %s", blockToRequest) + responseReplicationCommand, err := newResponseReplicationCommand(replyValue, requestID, block, newVal, opType) if err != nil { - return "", fmt.Errorf("could not create response replication command; %s", err) + finalResponseChannel <- finalResponse{requestId: requestID, value: "", opType: opType, err: fmt.Errorf("could not create response replication command; %s", err)} + return } _, responseReplicationSpan := tracer.Start(ctx, "apply response replication") responseApplyFuture := s.raftNode.Apply(responseReplicationCommand, 0) - responseReplicationSpan.End() err = responseApplyFuture.Error() + responseReplicationSpan.End() if err != nil { - return "", fmt.Errorf("could not apply log to the FSM; %s", err) + finalResponseChannel <- finalResponse{requestId: requestID, value: "", opType: opType, err: fmt.Errorf("could not apply log to the FSM; %s", err)} + return } response := responseApplyFuture.Response().(string) log.Debug().Msgf("Got is first response from response channel for block %s; value: %s", block, response) - return response, nil + finalResponseChannel <- finalResponse{requestId: requestID, value: response, opType: opType, err: nil} + return } - responseValue := <-responseChannel + responseValue := <-raftResponseChannel log.Debug().Msgf("Got response from response channel for block %s; value: %s", block, responseValue) - querySpan.End() - return responseValue, nil + finalResponseChannel <- finalResponse{requestId: requestID, value: responseValue, opType: opType, err: nil} } -func (s *shardNodeServer) Read(ctx context.Context, readRequest *pb.ReadRequest) (*pb.ReadReply, error) { - log.Debug().Msgf("Received read request for block %s", readRequest.Block) - val, err := s.query(ctx, Read, readRequest.Block, "") +func (s *shardNodeServer) queryBatch(ctx context.Context, request *pb.RequestBatch) (reply *pb.ReplyBatch, err error) { + if s.raftNode.State() != raft.Leader { + return nil, fmt.Errorf(commonerrs.NotTheLeaderError) + } + tracer := otel.Tracer("") + ctx, querySpan := tracer.Start(ctx, "shardnode query") + + responseChannel := s.createResponseChannelForBatch(request.ReadRequests, request.WriteRequests) + requestReplicationBlocks := s.getRequestReplicationBlocks(request.ReadRequests, request.WriteRequests) + requestReplicationCommand, err := newRequestReplicationCommand(requestReplicationBlocks) if err != nil { - return nil, err + return nil, fmt.Errorf("could not create request replication command; %s", err) + } + _, requestReplicationSpan := tracer.Start(ctx, "apply request replication") + requestApplyFuture := s.raftNode.Apply(requestReplicationCommand, 0) + err = requestApplyFuture.Error() + requestReplicationSpan.End() + if err != nil { + return nil, fmt.Errorf("could not apply log to the FSM; %s", err) + } + isFirstMap := requestApplyFuture.Response().(map[string]bool) + + finalResponseChan := make(chan finalResponse) + for _, readRequest := range request.ReadRequests { + go s.query(ctx, readRequest.Block, readRequest.RequestId, isFirstMap[readRequest.RequestId], "", Read, responseChannel[readRequest.RequestId], finalResponseChan) + } + for _, writeRequest := range request.WriteRequests { + go s.query(ctx, writeRequest.Block, writeRequest.RequestId, isFirstMap[writeRequest.RequestId], writeRequest.Value, Write, responseChannel[writeRequest.RequestId], finalResponseChan) + } + + var readReplies []*pb.ReadReply + var writeReplies []*pb.WriteReply + for i := 0; i < len(request.ReadRequests)+len(request.WriteRequests); i++ { + response := <-finalResponseChan + if response.err != nil { + return nil, fmt.Errorf("could not get response from the oramnode; %s", response.err) + } + if response.opType == Read { + readReplies = append(readReplies, &pb.ReadReply{RequestId: response.requestId, Value: response.value}) + } else { + writeReplies = append(writeReplies, &pb.WriteReply{RequestId: response.requestId, Success: true}) + } } - return &pb.ReadReply{Value: val}, nil + querySpan.End() + return &pb.ReplyBatch{ReadReplies: readReplies, WriteReplies: writeReplies}, nil } -func (s *shardNodeServer) Write(ctx context.Context, writeRequest *pb.WriteRequest) (*pb.WriteReply, error) { - log.Debug().Msgf("Received write request for block %s", writeRequest.Block) - val, err := s.query(ctx, Write, writeRequest.Block, writeRequest.Value) +func (s *shardNodeServer) BatchQuery(ctx context.Context, request *pb.RequestBatch) (*pb.ReplyBatch, error) { + log.Debug().Msgf("Received request batch request %v", request) + reply, err := s.queryBatch(ctx, request) if err != nil { return nil, err } - return &pb.WriteReply{Success: val == writeRequest.Value}, nil + log.Debug().Msgf("Returning request batch reply %v", reply) + return reply, nil } // It gets maxBlocks from the stash to send to the requesting oram node. diff --git a/pkg/shardnode/server_test.go b/pkg/shardnode/server_test.go index f6243e7..f104e70 100644 --- a/pkg/shardnode/server_test.go +++ b/pkg/shardnode/server_test.go @@ -10,7 +10,6 @@ import ( shardnodepb "github.com/dsg-uwaterloo/oblishard/api/shardnode" "github.com/hashicorp/raft" "github.com/phayes/freeport" - "google.golang.org/grpc/metadata" ) func TestGetPathAndStorageBasedOnRequestWhenInitialRequestReturnsRealBlockAndPathAndStorage(t *testing.T) { @@ -30,17 +29,34 @@ func TestGetPathAndStorageBasedOnRequestWhenInitialRequestReturnsRealBlockAndPat } } -func TestCreateResponseChannelForRequestIDAddsChannelToResponseChannel(t *testing.T) { +func TestCreateResponseChannelForBatchAddsChannelToResponseChannel(t *testing.T) { s := newShardNodeServer(0, 0, &raft.Raft{}, newShardNodeFSM(), nil, map[int]int{0: 0, 1: 1, 2: 2, 3: 3}, 5, newBatchManager(1)) - s.createResponseChannelForRequestID("req1") - if _, exists := s.shardNodeFSM.responseChannel.Load("req1"); !exists { - t.Errorf("Expected a new channel for key req1 but nothing found!") + readRequests := []*shardnodepb.ReadRequest{ + {Block: "a", RequestId: "req1"}, + {Block: "b", RequestId: "req2"}, + {Block: "c", RequestId: "req3"}, + } + writeRequests := []*shardnodepb.WriteRequest{ + {Block: "a", RequestId: "req1", Value: "val1"}, + {Block: "b", RequestId: "req2", Value: "val2"}, + {Block: "c", RequestId: "req3", Value: "val3"}, + } + s.createResponseChannelForBatch(readRequests, writeRequests) + for _, request := range readRequests { + if _, exists := s.shardNodeFSM.responseChannel.Load(request.RequestId); !exists { + t.Errorf("Expected a new channel for key %s but nothing found!", request.RequestId) + } + } + for _, request := range writeRequests { + if _, exists := s.shardNodeFSM.responseChannel.Load(request.RequestId); !exists { + t.Errorf("Expected a new channel for key %s but nothing found!", request.RequestId) + } } } -func TestQueryReturnsErrorForNonLeaderRaftPeer(t *testing.T) { +func TestQueryBatchReturnsErrorForNonLeaderRaftPeer(t *testing.T) { s := newShardNodeServer(0, 0, &raft.Raft{}, newShardNodeFSM(), nil, map[int]int{0: 0, 1: 1, 2: 2, 3: 3}, 5, newBatchManager(1)) - _, err := s.query(context.Background(), Read, "block", "") + _, err := s.queryBatch(context.Background(), nil) if err == nil { t.Errorf("A non-leader raft peer should return error after call to query.") } @@ -177,29 +193,64 @@ func TestSendCurrentBatchesIgnoresEmptyQueues(t *testing.T) { } } -func TestQueryReturnsResponseRecievedFromOramNode(t *testing.T) { +func TestQueryBatchReturnsResponseRecievedFromOramNode(t *testing.T) { s := startLeaderRaftNodeServer(t, 1, false) - ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("requestid", "request1")) - response, err := s.query(ctx, Read, "a", "") - if response != "response_from_leader" { - t.Errorf("expected the response to be \"response_from_leader\" but it is: %s", response) + readRequests := []*shardnodepb.ReadRequest{ + {Block: "a", RequestId: "request1"}, + {Block: "b", RequestId: "request2"}, + } + writeRequests := []*shardnodepb.WriteRequest{ + {Block: "c", RequestId: "request3", Value: "val1"}, + } + + response, err := s.queryBatch(context.Background(), &shardnodepb.RequestBatch{ReadRequests: readRequests, WriteRequests: writeRequests}) + if response == nil { + t.Errorf("expected a response from the queryBatch call") } if err != nil { - t.Errorf("expected no error in call to query") + t.Errorf("expected no error in call to queryBatch") + } + expectedReadReplies := map[string]bool{ + "request1": true, + "request2": true, + } + expectedWriteReplies := map[string]bool{ + "request3": true, + } + for _, readResponse := range response.ReadReplies { + if _, exists := expectedReadReplies[readResponse.RequestId]; !exists { + t.Errorf("expected the request id to be in the expectedReadReplies") + } + if readResponse.Value != "response_from_leader" { + t.Errorf("expected the response to be \"response_from_leader\" but it is: %s", readResponse.Value) + } + } + for _, writeResponse := range response.WriteReplies { + if _, exists := expectedWriteReplies[writeResponse.RequestId]; !exists { + t.Errorf("expected the request id to be in the expectedWriteReplies") + } + if !writeResponse.Success { + t.Errorf("expected the write to be successful") + } } } -func TestQueryReturnsResponseRecievedFromOramNodeWithBatching(t *testing.T) { +func TestQueryBatchReturnsResponseRecievedFromOramNodeWithBatching(t *testing.T) { s := startLeaderRaftNodeServer(t, 3, true) responseChan := make(chan string) for _, el := range []string{"a", "b", "c"} { go func(block string) { - ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("requestid", fmt.Sprintf("request:%s", block))) - response, err := s.query(ctx, Read, block, "") + requestBatch := &shardnodepb.RequestBatch{ + ReadRequests: []*shardnodepb.ReadRequest{ + {Block: block, RequestId: "request1"}, + }, + WriteRequests: []*shardnodepb.WriteRequest{}, + } + response, err := s.queryBatch(context.Background(), requestBatch) if err == nil { - responseChan <- response + responseChan <- response.ReadReplies[0].Value } }(el) } @@ -219,14 +270,19 @@ func TestQueryReturnsResponseRecievedFromOramNodeWithBatching(t *testing.T) { } } -func TestQueryPrioritizesStashValueToOramNodeResponse(t *testing.T) { +func TestQueryBatchPrioritizesStashValueToOramNodeResponse(t *testing.T) { s := startLeaderRaftNodeServer(t, 1, false) - ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("requestid", "request1")) s.shardNodeFSM.stashMu.Lock() s.shardNodeFSM.stash["a"] = stashState{value: "stash_value", logicalTime: 0, waitingStatus: false} s.shardNodeFSM.stashMu.Unlock() - response, err := s.query(ctx, Read, "a", "") - if response != "stash_value" { + requestBatch := &shardnodepb.RequestBatch{ + ReadRequests: []*shardnodepb.ReadRequest{ + {Block: "a", RequestId: "request1"}, + }, + WriteRequests: []*shardnodepb.WriteRequest{}, + } + response, err := s.queryBatch(context.Background(), requestBatch) + if response.ReadReplies[0].Value != "stash_value" { t.Errorf("expected the response to be \"stash_value\" but it is: %s", response) } if err != nil { @@ -234,40 +290,43 @@ func TestQueryPrioritizesStashValueToOramNodeResponse(t *testing.T) { } } -func TestQueryReturnsSentValueForWriteRequests(t *testing.T) { +func TestQueryBatchCleansTempValuesInFSMAfterExecution(t *testing.T) { s := startLeaderRaftNodeServer(t, 1, false) - ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("requestid", "request1")) - response, err := s.query(ctx, Write, "a", "val") - if response != "val" { - t.Errorf("expected the response to be \"val\" but it is: %s", response) + requestBatch := &shardnodepb.RequestBatch{ + ReadRequests: []*shardnodepb.ReadRequest{ + {Block: "a", RequestId: "request1"}, + {Block: "b", RequestId: "request2"}, + }, + WriteRequests: []*shardnodepb.WriteRequest{ + {Block: "c", RequestId: "request3", Value: "val1"}, + }, } - if err != nil { - t.Errorf("expected no error in call to query") + s.queryBatch(context.Background(), requestBatch) + for _, request := range []string{"request1", "request2", "request3"} { + if _, exists := s.shardNodeFSM.pathMap[request]; exists { + t.Errorf("query should remove the request from the pathMap after successful execution.") + } + if _, exists := s.shardNodeFSM.storageIDMap[request]; exists { + t.Errorf("query should remove the request from the storageIDMap after successful execution.") + } + if _, exists := s.shardNodeFSM.requestLog[request]; exists { + t.Errorf("query should remove the request from the requestLog after successful execution.") + } + if _, exists := s.shardNodeFSM.responseChannel.Load(request); exists { + t.Errorf("query should remove the request from the responseChannel after successful execution.") + } } } -func TestQueryCleansTempValuesInFSMAfterExecution(t *testing.T) { +func TestQueryBatchAddsReadValueToStash(t *testing.T) { s := startLeaderRaftNodeServer(t, 1, false) - ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("requestid", "request1")) - s.query(ctx, Write, "a", "val") - if _, exists := s.shardNodeFSM.pathMap["request1"]; exists { - t.Errorf("query should remove the request from the pathMap after successful execution.") - } - if _, exists := s.shardNodeFSM.storageIDMap["request1"]; exists { - t.Errorf("query should remove the request from the storageIDMap after successful execution.") - } - if _, exists := s.shardNodeFSM.requestLog["request1"]; exists { - t.Errorf("query should remove the request from the requestLog after successful execution.") - } - if _, exists := s.shardNodeFSM.responseChannel.Load("request1"); exists { - t.Errorf("query should remove the request from the responseChannel after successful execution.") + requestBatch := &shardnodepb.RequestBatch{ + ReadRequests: []*shardnodepb.ReadRequest{ + {Block: "a", RequestId: "request1"}, + }, + WriteRequests: []*shardnodepb.WriteRequest{}, } -} - -func TestQueryAddsReadValueToStash(t *testing.T) { - s := startLeaderRaftNodeServer(t, 1, false) - ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("requestid", "request1")) - s.query(ctx, Read, "a", "") + s.queryBatch(context.Background(), requestBatch) s.shardNodeFSM.stashMu.Lock() defer s.shardNodeFSM.stashMu.Unlock() if s.shardNodeFSM.stash["a"].value != "response_from_leader" { @@ -275,22 +334,32 @@ func TestQueryAddsReadValueToStash(t *testing.T) { } } -func TestQueryAddsWriteValueToStash(t *testing.T) { +func TestQueryBatchAddsWriteValueToStash(t *testing.T) { s := startLeaderRaftNodeServer(t, 1, false) - ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("requestid", "request1")) - s.query(ctx, Write, "a", "valW") + requestBatch := &shardnodepb.RequestBatch{ + ReadRequests: []*shardnodepb.ReadRequest{}, + WriteRequests: []*shardnodepb.WriteRequest{ + {Block: "a", RequestId: "request1", Value: "val1"}, + }, + } + s.queryBatch(context.Background(), requestBatch) s.shardNodeFSM.stashMu.Lock() defer s.shardNodeFSM.stashMu.Unlock() - if s.shardNodeFSM.stash["a"].value != "valW" { + if s.shardNodeFSM.stash["a"].value != "val1" { t.Errorf("The write value should be added to the stash") } } -func TestQueryUpdatesPositionMap(t *testing.T) { +func TestQueryBatchUpdatesPositionMap(t *testing.T) { s := startLeaderRaftNodeServer(t, 1, false) - ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("requestid", "request1")) s.shardNodeFSM.positionMap["a"] = positionState{path: 13423432, storageID: 3223113} - s.query(ctx, Write, "a", "valW") + requestBatch := &shardnodepb.RequestBatch{ + ReadRequests: []*shardnodepb.ReadRequest{}, + WriteRequests: []*shardnodepb.WriteRequest{ + {Block: "a", RequestId: "request1", Value: "val1"}, + }, + } + s.queryBatch(context.Background(), requestBatch) s.shardNodeFSM.positionMapMu.Lock() defer s.shardNodeFSM.positionMapMu.Unlock() if s.shardNodeFSM.positionMap["a"].path == 13423432 || s.shardNodeFSM.positionMap["a"].storageID == 3223113 { @@ -298,31 +367,6 @@ func TestQueryUpdatesPositionMap(t *testing.T) { } } -func TestQueryReturnsResponseToAllWaitingRequests(t *testing.T) { - s := startLeaderRaftNodeServer(t, 1, false) - responseChannel := make(chan string) - for i := 0; i < 3; i++ { - go func(idx int) { - ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("requestid", fmt.Sprintf("request%d", idx))) - response, _ := s.query(ctx, Read, "a", "") - responseChannel <- response - }(i) - } - responseCount := 0 - timout := time.After(10 * time.Second) - for { - if responseCount == 2 { - break - } - select { - case <-responseChannel: - responseCount++ - case <-timout: - t.Errorf("timeout before receiving all responses") - } - } -} - func TestGetBlocksForSendReturnsAtMostMaxBlocksFromTheStash(t *testing.T) { s := newShardNodeServer(0, 0, &raft.Raft{}, newShardNodeFSM(), make(RPCClientMap), map[int]int{0: 0, 1: 1, 2: 2, 3: 3}, 5, newBatchManager(1)) s.shardNodeFSM.stash = map[string]stashState{