Skip to content

Commit

Permalink
Add logging as parameter, Fix redis mappings, Fix sequential batch re…
Browse files Browse the repository at this point in the history
…quests
  • Loading branch information
aminst committed Nov 28, 2023
1 parent 0e9c5fa commit e81d547
Show file tree
Hide file tree
Showing 14 changed files with 125 additions and 124 deletions.
16 changes: 8 additions & 8 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"os"
"path"
"time"

Expand Down Expand Up @@ -62,7 +63,11 @@ func main() {
logPath := flag.String("logpath", "", "path to write logs")
configsPath := flag.String("conf", "../../configs/default", "configs directory path")
flag.Parse()
utils.InitLogging(false, *logPath)
parameters, err := config.ReadParameters(path.Join(*configsPath, "parameters.yaml"))
if err != nil {
os.Exit(1)
}
utils.InitLogging(parameters.Log, *logPath)

routerEndpoints, err := config.ReadRouterEndpoints(path.Join(*configsPath, "router_endpoints.yaml"))
if err != nil {
Expand All @@ -79,13 +84,6 @@ func main() {
log.Fatal().Msgf("Failed to read trace file; %v", err)
}

parameters, err := config.ReadParameters(path.Join(*configsPath, "parameters.yaml"))
if err != nil {
log.Fatal().Msgf("Failed to read parameters from yaml file; %v", err)
}

routerRPCClient := rpcClients.GetRandomRouter()

tracingProvider, err := tracing.NewProvider(context.Background(), "client", "localhost:4317", !parameters.Trace)
if err != nil {
log.Fatal().Msgf("Failed to create tracing provider; %v", err)
Expand All @@ -106,6 +104,7 @@ func main() {
rateLimit.Start()
startTime := time.Now()
for _, request := range requests {
routerRPCClient := rpcClients.GetRandomRouter()
if request.OperationType == client.Read {
readOperations++
go asyncRead(rateLimit, tracer, request.Block, routerRPCClient, readResponseChannel)
Expand Down Expand Up @@ -133,5 +132,6 @@ func main() {
}
}
elapsed := time.Since(startTime)
// TODO: seperate read and write throughput
fmt.Printf("Throughput: %f", float64(readOperations+writeOperations)/elapsed.Seconds())
}
12 changes: 6 additions & 6 deletions cmd/oramnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"flag"
"os"
"path"

"github.com/dsg-uwaterloo/oblishard/pkg/config"
Expand All @@ -23,7 +24,11 @@ func main() {
configsPath := flag.String("conf", "../../configs/default", "configs directory path")
logPath := flag.String("logpath", "", "path to write logs")
flag.Parse()
utils.InitLogging(true, *logPath)
parameters, err := config.ReadParameters(path.Join(*configsPath, "parameters.yaml"))
if err != nil {
os.Exit(1)
}
utils.InitLogging(parameters.Log, *logPath)
if *rpcPort == 0 {
log.Fatal().Msgf("The rpc port should be provided with the -rpcport flag")
}
Expand All @@ -43,11 +48,6 @@ func main() {
log.Fatal().Msgf("Cannot read redis endpoints from yaml file; %v", err)
}

parameters, err := config.ReadParameters(path.Join(*configsPath, "parameters.yaml"))
if err != nil {
log.Fatal().Msgf("Failed to read parameters from yaml file; %v", err)
}

tracingProvider, err := tracing.NewProvider(context.Background(), "oramnode", "localhost:4317", !parameters.Trace)
if err != nil {
log.Fatal().Msgf("Failed to create tracing provider; %v", err)
Expand Down
12 changes: 6 additions & 6 deletions cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"flag"
"os"
"path"

"github.com/dsg-uwaterloo/oblishard/pkg/config"
Expand All @@ -21,7 +22,11 @@ func main() {
configsPath := flag.String("conf", "../../configs/default", "configs directory path")
logPath := flag.String("logpath", "", "path to write the logs")
flag.Parse()
utils.InitLogging(true, *logPath)
parameters, err := config.ReadParameters(path.Join(*configsPath, "parameters.yaml"))
if err != nil {
os.Exit(1)
}
utils.InitLogging(parameters.Log, *logPath)
if *port == 0 {
log.Fatal().Msgf("The port should be provided with the -port flag")
}
Expand All @@ -35,11 +40,6 @@ func main() {
log.Fatal().Msgf("Failed to create client connections with shard node servers; %v", err)
}

parameters, err := config.ReadParameters(path.Join(*configsPath, "parameters.yaml"))
if err != nil {
log.Fatal().Msgf("Failed to read parameters from yaml file; %v", err)
}

tracingProvider, err := tracing.NewProvider(context.Background(), "router", "localhost:4317", !parameters.Trace)
if err != nil {
log.Fatal().Msgf("Failed to create tracing provider; %v", err)
Expand Down
15 changes: 7 additions & 8 deletions cmd/shardnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"flag"
"os"
"path"

"github.com/dsg-uwaterloo/oblishard/pkg/config"
Expand All @@ -23,8 +24,11 @@ func main() {
configsPath := flag.String("conf", "../../configs/default", "configs directory path")
logPath := flag.String("logpath", "", "path to write logs")
flag.Parse()

utils.InitLogging(true, *logPath)
parameters, err := config.ReadParameters(path.Join(*configsPath, "parameters.yaml"))
if err != nil {
os.Exit(1)
}
utils.InitLogging(parameters.Log, *logPath)
if *rpcPort == 0 {
log.Fatal().Msgf("The rpc port should be provided with the -rpcport flag")
}
Expand All @@ -46,11 +50,6 @@ func main() {
log.Fatal().Msgf("Failed to create client connections with oarm node servers; %v", err)
}

parameters, err := config.ReadParameters(path.Join(*configsPath, "parameters.yaml"))
if err != nil {
log.Fatal().Msgf("Failed to read parameters from yaml file; %v", err)
}

tracingProvider, err := tracing.NewProvider(context.Background(), "shardnode", "localhost:4317", !parameters.Trace)
if err != nil {
log.Fatal().Msgf("Failed to create tracing provider; %v", err)
Expand All @@ -61,5 +60,5 @@ func main() {
}
defer stopTracingProvider(context.Background())

shardnode.StartServer(*shardNodeID, *ip, *rpcPort, *replicaID, *raftPort, *joinAddr, rpcClients, parameters, len(redisEndpoints), *configsPath)
shardnode.StartServer(*shardNodeID, *ip, *rpcPort, *replicaID, *raftPort, *joinAddr, rpcClients, parameters, redisEndpoints, *configsPath)
}
8 changes: 5 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ type OramNodeEndpoint struct {
}

type RedisEndpoint struct {
IP string `yaml:"exposed_ip"`
Port int
ID int
IP string `yaml:"exposed_ip"`
Port int
ID int
ORAMNodeID int `yaml:"oramnode_id"`
}

type RouterConfig struct {
Expand Down Expand Up @@ -61,6 +62,7 @@ type Parameters struct {
Shift int `yaml:"shift"`
TreeHeight int `yaml:"tree-height"`
MaxRequests int `yaml:"max-requests"`
Log bool `yaml:"log"`
}

func ReadRouterEndpoints(path string) ([]RouterEndpoint, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ func startShardNode(replicaID int, rpcPort int, raftPort int, joinAddr string) {
if err != nil {
log.Fatal().Msgf("Failed to read parameters from yaml file; %v", err)
}
shardnode.StartServer(0, "localhost", rpcPort, replicaID, raftPort, joinAddr, rpcClients, parameters, 1, "../../configs")
redisEndpoints := []config.RedisEndpoint{{ID: 0, IP: "localhost", Port: 6379}}
shardnode.StartServer(0, "localhost", rpcPort, replicaID, raftPort, joinAddr, rpcClients, parameters, redisEndpoints, "../../configs")
}

func startOramNode(replicaID int, rpcPort int, raftPort int, joinAddr string) {
Expand Down
10 changes: 8 additions & 2 deletions pkg/oramnode/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,14 @@ func StartServer(oramNodeServerID int, ip string, rpcPort int, replicaID int, ra
if err != nil {
log.Fatal().Msgf("failed to listen: %v", err)
}
storageHandler := strg.NewStorageHandler(parameters.TreeHeight, parameters.Z, parameters.S, parameters.Shift, redisEndpoints)
if oramNodeServerID == 0 && isFirst {
var storages []config.RedisEndpoint
for _, redisEndpoint := range redisEndpoints {
if redisEndpoint.ORAMNodeID == oramNodeServerID {
storages = append(storages, redisEndpoint)
}
}
storageHandler := strg.NewStorageHandler(parameters.TreeHeight, parameters.Z, parameters.S, parameters.Shift, storages)
if isFirst {
err = storageHandler.InitDatabase()
if err != nil {
log.Fatal().Msgf("failed to initialize the database: %v", err)
Expand Down
12 changes: 12 additions & 0 deletions pkg/shardnode/batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/dsg-uwaterloo/oblishard/api/oramnode"
"github.com/dsg-uwaterloo/oblishard/pkg/utils"
"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -48,3 +49,14 @@ func (b *batchManager) addRequestToStorageQueueAndWait(req blockRequest, storage

return b.responseChannel[req.block]
}

type batchResponse struct {
*oramnode.ReadPathReply
err error
}

func (b *batchManager) asyncBatchRequests(ctx context.Context, storageID int, requests []blockRequest, oramNodeReplicaMap ReplicaRPCClientMap, responseChan chan batchResponse) {
log.Debug().Msgf("Sending batch of requests to storageID %d with size %d", storageID, len(requests))
reply, err := oramNodeReplicaMap.readPathFromAllOramNodeReplicas(context.Background(), requests, storageID)
responseChan <- batchResponse{reply, err}
}
9 changes: 0 additions & 9 deletions pkg/shardnode/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package shardnode
import (
"context"
"fmt"
"math/rand"

oramnodepb "github.com/dsg-uwaterloo/oblishard/api/oramnode"
"github.com/dsg-uwaterloo/oblishard/pkg/config"
Expand All @@ -22,14 +21,6 @@ type ReplicaRPCClientMap map[int]oramNodeRPCClient

type RPCClientMap map[int]ReplicaRPCClientMap

func (r RPCClientMap) getRandomOramNodeReplicaMap() ReplicaRPCClientMap {
log.Debug().Msgf("Getting random oram node replica map from RPC client map %v", r)
oramNodesLen := len(r)
randomOramNodeIndex := rand.Intn(oramNodesLen)
randomOramNode := r[randomOramNodeIndex]
return randomOramNode
}

func (r *ReplicaRPCClientMap) readPathFromAllOramNodeReplicas(ctx context.Context, requests []blockRequest, storageID int) (*oramnodepb.ReadPathReply, error) {
var replicaFuncs []rpc.CallFunc
var clients []interface{}
Expand Down
20 changes: 0 additions & 20 deletions pkg/shardnode/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,13 @@ package shardnode
import (
"context"
"fmt"
"reflect"
"testing"

"github.com/dsg-uwaterloo/oblishard/api/oramnode"
oramnodepb "github.com/dsg-uwaterloo/oblishard/api/oramnode"
"google.golang.org/grpc"
)

func TestGetRandomOramNodeReplicaMapReturnsRandomClientExistingInOramNodeMap(t *testing.T) {
oramNodes := RPCClientMap{
0: map[int]oramNodeRPCClient{
0: {},
1: {},
},
1: map[int]oramNodeRPCClient{
0: {},
},
}
random := oramNodes.getRandomOramNodeReplicaMap()
for _, replicaMap := range oramNodes {
if reflect.DeepEqual(random, replicaMap) {
return
}
}
t.Errorf("the random map does not exist")
}

type mockOramNodeClient struct {
replyFunc func([]*oramnode.BlockRequest) (*oramnodepb.ReadPathReply, error)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/shardnode/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type shardNodeFSM struct {
nacks map[string][]string // map of requestID to array of blocks
nacksMu sync.Mutex
positionMap map[string]positionState // map of block to positionState
positionMapMu sync.Mutex
positionMapMu sync.RWMutex
raftNode RaftNodeWIthState
raftNodeMu sync.Mutex
}
Expand Down
Loading

0 comments on commit e81d547

Please sign in to comment.