diff --git a/cmd/router/main.go b/cmd/router/main.go index 237cb4f..c3b98a8 100644 --- a/cmd/router/main.go +++ b/cmd/router/main.go @@ -35,6 +35,11 @@ func main() { log.Fatal().Msgf("Failed to create client connections with shard node servers; %v", err) } + parameters, err := config.ReadParameters(path.Join(*configsPath, "parameters.yaml")) + if err != nil { + log.Fatal().Msgf("Failed to read parameters from yaml file; %v", err) + } + tracingProvider, err := tracing.NewProvider(context.Background(), "router", "localhost:4317") if err != nil { log.Fatal().Msgf("Failed to create tracing provider; %v", err) @@ -45,5 +50,5 @@ func main() { } defer stopTracingProvider(context.Background()) - router.StartRPCServer(*ip, rpcClients, *routerID, *port) + router.StartRPCServer(*ip, rpcClients, *routerID, *port, parameters) } diff --git a/configs/parameters.yaml b/configs/parameters.yaml index 7e6fbac..813222a 100644 --- a/configs/parameters.yaml +++ b/configs/parameters.yaml @@ -1,3 +1,4 @@ max-blocks-to-send: 5 # The maximum number of blocks to send from each shard node to the oram node during evictions eviction-rate: 40 # How many ReadPath operations before eviction -batch-size: 1 # Size of each batch of requests \ No newline at end of file +batch-size: 2 # Size of each batch of requests +epoch-time: 10 # How many milliseconds between each epoch \ No newline at end of file diff --git a/pkg/config/config.go b/pkg/config/config.go index d70b816..97ab859 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -43,6 +43,7 @@ type Parameters struct { MaxBlocksToSend int `yaml:"max-blocks-to-send"` EvictionRate int `yaml:"eviction-rate"` BatchSize int `yaml:"batch-size"` + EpochTime int `yaml:"epoch-time"` } func ReadRouterEndpoints(path string) ([]RouterEndpoint, error) { diff --git a/pkg/e2e/e2e_test.go b/pkg/e2e/e2e_test.go index e5be63e..1544d49 100644 --- a/pkg/e2e/e2e_test.go +++ b/pkg/e2e/e2e_test.go @@ -22,7 +22,7 @@ func startRouter() { if err != nil { log.Fatal().Msgf("Failed to create client connections with shard node servers; %v", err) } - router.StartRPCServer("localhost", rpcClients, 0, 8745) + router.StartRPCServer("localhost", rpcClients, 0, 8745, config.Parameters{EpochTime: 10}) } func startShardNode(replicaID int, rpcPort int, raftPort int, joinAddr string) { diff --git a/pkg/router/server.go b/pkg/router/server.go index 6ad2ecb..79ae25f 100644 --- a/pkg/router/server.go +++ b/pkg/router/server.go @@ -7,6 +7,7 @@ import ( "time" pb "github.com/dsg-uwaterloo/oblishard/api/router" + "github.com/dsg-uwaterloo/oblishard/pkg/config" "github.com/dsg-uwaterloo/oblishard/pkg/rpc" "github.com/rs/zerolog/log" "go.opentelemetry.io/otel" @@ -57,14 +58,14 @@ func (r *routerServer) Write(ctx context.Context, writeRequest *pb.WriteRequest) return &pb.WriteReply{Success: writeResponse.success}, nil } -func StartRPCServer(ip string, shardNodeRPCClients map[int]ReplicaRPCClientMap, routerID int, port int) { +func StartRPCServer(ip string, shardNodeRPCClients map[int]ReplicaRPCClientMap, routerID int, port int, parameters config.Parameters) { lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ip, port)) if err != nil { log.Fatal().Msgf("failed to listen: %v", err) } grpcServer := grpc.NewServer(grpc.UnaryInterceptor(rpc.ContextPropagationUnaryServerInterceptor())) - epochManager := newEpochManager(shardNodeRPCClients, 10*time.Millisecond) + epochManager := newEpochManager(shardNodeRPCClients, time.Duration(parameters.EpochTime)*time.Millisecond) go epochManager.run() routerServer := newRouterServer(routerID, epochManager) pb.RegisterRouterServer(grpcServer, &routerServer)