Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize max VersionVector in ApplyChanges to improve memory efficiency #1164

Merged
merged 4 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,12 @@ func FromChanges(pbChanges []*api.Change) ([]*change.Change, error) {
func fromChangeID(id *api.ChangeID) (change.ID, error) {
actorID, err := time.ActorIDFromBytes(id.ActorId)
if err != nil {
return change.InitialID, err
return change.InitialID(), err
}

vector, err := FromVersionVector(id.VersionVector)
if err != nil {
return change.InitialID, err
return change.InitialID(), err
}

return change.NewID(
Expand Down
16 changes: 8 additions & 8 deletions pkg/document/change/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@ const (
InitialLamport = 0
)

var (
// InitialID represents the initial state ID. Usually this is used to
// represent a state where nothing has been edited.
InitialID = NewID(InitialClientSeq, InitialServerSeq, InitialLamport, time.InitialActorID, time.InitialVersionVector)
)

// ID represents the identifier of the change. It is used to identify the
// change and to order the changes. It is also used to detect the relationship
// between changes whether they are causally related or concurrent.
Expand Down Expand Up @@ -75,6 +69,12 @@ func NewID(
}
}

// InitialID creates an initial state ID. Usually this is used to
// represent a state where nothing has been edited.
func InitialID() ID {
return NewID(InitialClientSeq, InitialServerSeq, InitialLamport, time.InitialActorID, time.NewVersionVector())
}

// Next creates a next ID of this ID.
func (id ID) Next() ID {
versionVector := id.versionVector.DeepCopy()
Expand Down Expand Up @@ -113,7 +113,7 @@ func (id ID) SyncClocks(other ID) ID {
otherVV.Set(other.actorID, other.lamport)
}

newID := NewID(id.clientSeq, InitialServerSeq, lamport, id.actorID, id.versionVector.Max(otherVV))
newID := NewID(id.clientSeq, InitialServerSeq, lamport, id.actorID, id.versionVector.Max(&otherVV))
newID.versionVector.Set(id.actorID, lamport)
return newID
}
Expand All @@ -135,7 +135,7 @@ func (id ID) SetClocks(otherLamport int64, vector time.VersionVector) ID {
// problematic. To address this, we remove the InitialActorID from snapshots.
vector.Unset(time.InitialActorID)

newID := NewID(id.clientSeq, id.serverSeq, lamport, id.actorID, id.versionVector.Max(vector))
newID := NewID(id.clientSeq, id.serverSeq, lamport, id.actorID, id.versionVector.Max(&vector))
newID.versionVector.Set(id.actorID, lamport)

return newID
Expand Down
7 changes: 5 additions & 2 deletions pkg/document/internal_document.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func NewInternalDocument(k key.Key) *InternalDocument {
status: StatusDetached,
root: crdt.NewRoot(root),
checkpoint: change.InitialCheckpoint,
changeID: change.InitialID,
changeID: change.InitialID(),
presences: innerpresence.NewMap(),
onlineClients: &gosync.Map{},
}
Expand All @@ -116,14 +116,17 @@ func NewInternalDocumentFromSnapshot(
return nil, err
}

changeID := change.InitialID()
changeID.SetClocks(lamport, vector)

return &InternalDocument{
key: k,
status: StatusDetached,
root: crdt.NewRoot(obj),
presences: presences,
onlineClients: &gosync.Map{},
checkpoint: change.InitialCheckpoint.NextServerSeq(serverSeq),
changeID: change.InitialID.SetClocks(lamport, vector),
changeID: changeID,
}, nil
}

Expand Down
48 changes: 28 additions & 20 deletions pkg/document/time/version_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,54 +130,62 @@ func (v VersionVector) EqualToOrAfter(other *Ticket) bool {
return clientLamport >= other.lamport
}

// Min returns new vv consists of every min value in each column.
func (v VersionVector) Min(other VersionVector) VersionVector {
minVV := NewVersionVector()
// Min modifies the receiver in-place to contain the minimum values between itself
// and the given version vector, and returns the modified receiver.
// Note: This method modifies the receiver for memory efficiency.
func (v VersionVector) Min(other *VersionVector) VersionVector {
if other == nil {
return v
}

for key, value := range v {
if otherValue, exists := other[key]; exists {
if otherValue, exists := (*other)[key]; exists {
if value < otherValue {
minVV[key] = value
v[key] = value
} else {
minVV[key] = otherValue
v[key] = otherValue
}
} else {
minVV[key] = 0
v[key] = 0
}
}

for key := range other {
for key := range *other {
if _, exists := v[key]; !exists {
minVV[key] = 0
v[key] = 0
}
}

return minVV
return v
}

// Max returns new vv consists of every max value in each column.
func (v VersionVector) Max(other VersionVector) VersionVector {
maxVV := NewVersionVector()
// Max modifies the receiver in-place to contain the maximum values between itself
// and the given version vector, and returns the modified receiver.
// Note: This method modifies the receiver for memory efficiency.
func (v VersionVector) Max(other *VersionVector) VersionVector {
if other == nil {
return v
}

for key, value := range v {
if otherValue, exists := other[key]; exists {
if otherValue, exists := (*other)[key]; exists {
if value > otherValue {
maxVV[key] = value
v[key] = value
} else {
maxVV[key] = otherValue
v[key] = otherValue
}
} else {
maxVV[key] = value
v[key] = value
}
}

for key, value := range other {
for key, value := range *other {
if _, exists := v[key]; !exists {
maxVV[key] = value
v[key] = value
}
}

return maxVV
return v
}

// MaxLamport returns max lamport value in version vector.
Expand Down
4 changes: 2 additions & 2 deletions pkg/document/time/version_vector_test.go
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the loop to use direct indexing instead of range to avoid potential pointer aliasing issues. Using range in this context could lead to unintended shared references to memory, while direct indexing ensures each element is safely referenced. This change improves the safety and stability of the code.

For more context, refer to this StackOverflow discussion on implicit memory aliasing in for loops: https://stackoverflow.com/questions/62446118/implicit-memory-aliasing-in-for-loop

Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func TestVersionVector(t *testing.T) {
},
}

for _, tc := range tests {
for i, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
result := tc.v1.Min(tc.v2)
result := tc.v1.Min(&tests[i].v2)
assert.Equal(t, tc.expect, result)
})
}
Expand Down
2 changes: 1 addition & 1 deletion server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,7 +1372,7 @@ func (d *DB) UpdateAndFindMinSyncedVersionVector(
if minVersionVector == nil {
minVersionVector = versionVector
} else {
minVersionVector = minVersionVector.Min(versionVector)
minVersionVector = minVersionVector.Min(&versionVector)
}

// 03. Update current client's version vector. If the client is detached, remove it.
Expand Down
2 changes: 1 addition & 1 deletion server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,7 @@ func (c *Client) UpdateAndFindMinSyncedVersionVector(
if minVersionVector == nil {
minVersionVector = versionVector
} else {
minVersionVector = minVersionVector.Min(versionVector)
minVersionVector = minVersionVector.Min(&versionVector)
}

// 03. Update current client's version vector. If the client is detached, remove it.
Expand Down
2 changes: 1 addition & 1 deletion test/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestRoot() *crdt.Root {
// TextChangeContext returns the context of test change.
func TextChangeContext(root *crdt.Root) *change.Context {
return change.NewContext(
change.InitialID,
change.InitialID(),
"",
root,
)
Expand Down
Loading