Skip to content

Commit 74824c9

Browse files
committed
update
1 parent df5f5d0 commit 74824c9

File tree

8 files changed

+48
-57
lines changed

8 files changed

+48
-57
lines changed

Diff for: acceptor/main.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ func readPortsFile() {
113113
}
114114
}
115115

116+
func commandId2String(cid int64) string {
117+
return strconv.FormatInt(cid>>54, 10) + strconv.FormatInt(cid%int64(1)<<54, 10)
118+
}
119+
116120
// gRPC handlers
117121
func (s *acceptorServer) Scouting(ctx context.Context, in *pb.Message) (*pb.Message, error) {
118122

@@ -178,13 +182,13 @@ func (s *acceptorServer) Commanding(ctx context.Context, in *pb.Message) (*pb.Me
178182

179183
<-mutexChannel
180184
ballotNumber := ballotNumber // concurrency concern, avoid ballot number update during execution
181-
fmt.Printf("Commanding received: ballot number %d, ballot leader %d, comID: %s, slot: %d\n", ballotNumber, ballotLeader, in.Bsc.Command.CommandId, in.Bsc.SlotNumber)
182-
acceptCid := ""
185+
fmt.Printf("Commanding received: ballot number %d, ballot leader %d, comID: %s, slot: %d\n", ballotNumber, ballotLeader, commandId2String(in.Bsc.Command.CommandId), in.Bsc.SlotNumber)
186+
acceptCid := int64(-1)
183187
if in.Bsc.BallotNumber >= ballotNumber && in.LeaderId == ballotLeader {
184188
ballotNumber = in.Bsc.BallotNumber
185189
acceptCid = in.Bsc.Command.CommandId
186190
accepted[ballotNumber] = append(accepted[ballotNumber], in.Bsc)
187-
fmt.Printf("Commanding accepted: ballot number %d, ballot leader %d, comID: %s, slot: %d\n", ballotNumber, ballotLeader, in.Bsc.Command.CommandId, in.Bsc.SlotNumber)
191+
fmt.Printf("Commanding accepted: ballot number %d, ballot leader %d, comID: %s, slot: %d\n", ballotNumber, ballotLeader, commandId2String(in.Bsc.Command.CommandId), in.Bsc.SlotNumber)
188192
}
189193
currentBallotNumber := ballotNumber
190194
currentBallotLeader := ballotLeader

Diff for: client/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func main() {
7979
fmt.Scanf("%s", &value)
8080
// generate a new commandID
8181
commandCount += 1
82-
cid := "client" + strconv.Itoa(int(clientId)) + "-W" + strconv.Itoa(commandCount)
82+
cid := int64(clientId)<<54 + int64(commandCount)
8383
// push client commands to command buffers
8484
for i := 0; i < replicaNum; i++ {
8585
messageBuffers[i] <- &pb.Message{Type: WRITE, Command: &pb.Command{Type: WRITE, CommandId: cid, ClientId: clientId, Key: key, Value: value}, CommandId: cid, ClientId: clientId, Key: key, Value: value}
@@ -93,7 +93,7 @@ func main() {
9393
fmt.Scanf("%s", &key)
9494
// generate a new commandID
9595
commandCount += 1
96-
cid := "client" + strconv.Itoa(int(clientId)) + "-R" + strconv.Itoa(commandCount)
96+
cid := int64(clientId)<<54 + int64(commandCount)
9797
for i := 0; i < replicaNum; i++ {
9898
messageBuffers[i] <- &pb.Message{Type: READ, Command: &pb.Command{Type: READ, CommandId: cid, ClientId: clientId, Key: key}}
9999
}

Diff for: gopaxos.pb.go

+8-8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: gopaxos.proto

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ message Message {
2727
int32 type = 1;
2828

2929
int32 clientId = 2;
30-
string commandId = 3;
30+
int64 commandId = 3;
3131
string key = 4;
3232
string value = 5;
3333
Command command = 6;
@@ -50,7 +50,7 @@ message Message {
5050
message Command {
5151
int32 type = 1;
5252
int32 clientId = 2;
53-
string commandId = 3;
53+
int64 commandId = 3;
5454
string key = 4;
5555
string value = 5;
5656
}

Diff for: leader/main.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ func ScoutMessenger(serial int, scoutCollectChannel chan *pb.P1B, scoutBallotNum
356356
}
357357

358358
func CommanderRoutine(bsc *pb.BSC) {
359-
fmt.Printf("Commander spawned for ballot number %d, slot number %d, command id %s\n", bsc.BallotNumber, bsc.SlotNumber, bsc.Command.CommandId)
359+
fmt.Printf("Commander spawned for ballot number %d, slot number %d, command id %s\n", bsc.BallotNumber, bsc.SlotNumber, commandId2String(bsc.Command.CommandId))
360360
commanderCollectChannel := make(chan *pb.P2B)
361361
// send messages
362362
for i := 0; i < acceptorNum; i++ {
@@ -494,6 +494,10 @@ func readPortsFile() {
494494
}
495495
}
496496

497+
func commandId2String(cid int64) string {
498+
return strconv.FormatInt(cid>>54, 10) + strconv.FormatInt(cid%int64(1)<<54, 10)
499+
}
500+
497501
// gRPC HANDLERS
498502
func (s *leaderServer) Propose(ctx context.Context, in *pb.Message) (*pb.Message, error) {
499503

@@ -510,7 +514,7 @@ func (s *leaderServer) Propose(ctx context.Context, in *pb.Message) (*pb.Message
510514
}
511515

512516
leaderStateUpdateChannel <- &leaderStateUpdateRequest{updateType: 1, newProposal: &pb.Proposal{SlotNumber: in.SlotNumber, Command: in.Command}} // Weird? Yes! Things can only be down after chekcing proposals
513-
fmt.Printf("Received proposal with commandId %s and slot number %d\n", in.Command.CommandId, in.SlotNumber)
517+
fmt.Printf("Received proposal with commandId %s and slot number %d\n", commandId2String(in.Command.CommandId), in.SlotNumber)
514518
return &pb.Message{Type: EMPTY, Content: "success"}, nil
515519
}
516520

Diff for: paxosClient/client.go

+4-20
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package client
33
import (
44
"context"
55
"errors"
6-
"strconv"
6+
"sync/atomic"
77
"time"
88

99
pb "github.com/yizhuoliang/gopaxos"
@@ -32,7 +32,7 @@ const (
3232

3333
type Client struct {
3434
clientId int32
35-
commandCount int
35+
commandCount uint32
3636
replicaPorts []string
3737
// commandChannels map[int][]chan *pb.Message
3838
// replyChannels map[int][]chan *reply
@@ -76,17 +76,12 @@ func NewPaxosClient(clientId int, simon int, replicaPorts []string) *Client {
7676
client.incrementCommandNumberChannel = make(chan int, 1)
7777
client.commandNumberReplyChannel = make(chan int, 1)
7878

79-
go client.OperationPreperationAndCleanupRoutine()
80-
8179
return client
8280
}
8381

8482
func (client *Client) Store(key string, value string) error {
8583

86-
client.incrementCommandNumberChannel <- 0
87-
cNum := <-client.commandNumberReplyChannel
88-
89-
cid := "client" + strconv.Itoa(int(client.clientId)) + "-W" + strconv.Itoa(cNum)
84+
cid := int64(atomic.AddUint32(&client.commandCount, 1))
9085

9186
replyChannels := make([]chan *reply, replicaNum)
9287
for i := 0; i < replicaNum; i++ {
@@ -114,10 +109,7 @@ func (client *Client) Store(key string, value string) error {
114109

115110
func (client *Client) Read(key string) (string, error) {
116111

117-
client.incrementCommandNumberChannel <- 0
118-
cNum := <-client.commandNumberReplyChannel
119-
120-
cid := "client" + strconv.Itoa(int(client.clientId)) + "-R" + strconv.Itoa(cNum)
112+
cid := int64(atomic.AddUint32(&client.commandCount, 1))
121113

122114
replyChannels := make([]chan *reply, replicaNum)
123115
for i := 0; i < replicaNum; i++ {
@@ -146,14 +138,6 @@ func (client *Client) Read(key string) (string, error) {
146138
return value, nil
147139
}
148140

149-
func (client *Client) OperationPreperationAndCleanupRoutine() {
150-
for {
151-
<-client.incrementCommandNumberChannel
152-
client.commandCount += 1
153-
client.commandNumberReplyChannel <- client.commandCount
154-
}
155-
}
156-
157141
func (client *Client) TempMessengerRoutine(msg *pb.Message, replyChannel chan *reply, replicaSerial int) {
158142
// reset connection for each message
159143
// conn, err := grpc.Dial(client.replicaPorts[replicaSerial], grpc.WithTransportCredentials(insecure.NewCredentials()))

Diff for: replica/main.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ var (
6767
keyValueLog map[string]string
6868

6969
// this is for replying
70-
clientReplyMap map[string]chan string
70+
clientReplyMap map[int64]chan string
7171
replyMapLock sync.RWMutex
7272

7373
// simc *comm.RPCConnection
@@ -106,7 +106,7 @@ func main() {
106106
proposals = make(map[int32]*pb.Proposal)
107107
decisions = make(map[int32]*pb.Decision)
108108
keyValueLog = make(map[string]string)
109-
clientReplyMap = make(map[string]chan string)
109+
clientReplyMap = make(map[int64]chan string)
110110
replicaStateUpdateChannel = make(chan *replicaStateUpdateRequest, 1)
111111
newCommandsChannel = make(chan *pb.Command, 1)
112112
for i := 0; i < leaderNum; i++ {
@@ -259,9 +259,13 @@ func readPortsFile() {
259259
}
260260
}
261261

262+
func commandId2String(cid int64) string {
263+
return strconv.FormatInt(cid>>54, 10) + strconv.FormatInt(cid%int64(1)<<54, 10)
264+
}
265+
262266
// gRPC handlers for Clients
263267
func (s *replicaServer) Write(ctx context.Context, in *pb.Message) (*pb.Message, error) {
264-
fmt.Printf("Request with command id %s received", in.CommandId)
268+
fmt.Printf("Request with command id %s received", commandId2String(in.CommandId))
265269
myChan := make(chan string, 1)
266270
replyMapLock.Lock()
267271
clientReplyMap[in.Command.CommandId] = myChan
@@ -292,7 +296,7 @@ func (s *replicaServer) Read(ctx context.Context, in *pb.Message) (*pb.Message,
292296

293297
// gRPC handlers for Leader
294298
func (s *replicaServer) Decide(ctx context.Context, in *pb.Message) (*pb.Message, error) {
295-
fmt.Printf("Decision with command id %s received", in.Decision.Command.CommandId)
299+
fmt.Printf("Decision with command id %s received", commandId2String(in.Decision.Command.CommandId))
296300
replicaStateUpdateChannel <- &replicaStateUpdateRequest{updateType: 1, newDecision: in.Decision}
297301
return &pb.Message{Type: EMPTY}, nil
298302
}

Diff for: test/test.go

+11-16
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
package main
22

3-
import (
4-
"fmt"
5-
6-
pb "github.com/yizhuoliang/gopaxos"
7-
)
8-
93
type struc struct {
104
str string
115
}
@@ -30,14 +24,15 @@ func main() {
3024
// }
3125
// }
3226

33-
p1 := &pb.Command{CommandId: "client0-1", Key: "ding", Value: "ding"}
34-
p2 := &pb.Command{CommandId: "client0-2", Key: "zhang", Value: "han"}
35-
// fmt.Printf("%t\n", reflect.DeepEqual(p1, p2))
36-
37-
sli := make([]*pb.Command, 1)
38-
sli[0] = p1
39-
sli2 := make([]*pb.Command, 2)
40-
sli = append(sli, sli2...)
41-
sli[2] = p2
42-
fmt.Printf("%v\n", sli)
27+
// p1 := &pb.Command{CommandId: "client0-1", Key: "ding", Value: "ding"}
28+
// p2 := &pb.Command{CommandId: "client0-2", Key: "zhang", Value: "han"}
29+
// // fmt.Printf("%t\n", reflect.DeepEqual(p1, p2))
30+
31+
// sli := make([]*pb.Command, 1)
32+
// sli[0] = p1
33+
// sli2 := make([]*pb.Command, 2)
34+
// fmt.Printf("%d\n", len(sli2))
35+
// sli = append(sli, sli2...)
36+
// sli[2] = p2
37+
// fmt.Printf("%v\n", sli)
4338
}

0 commit comments

Comments
 (0)