Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,12 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent, docType DocumentType)
if syncData == nil {
return
}
isSGWrite, _, _ := syncData.IsSGWrite(event.Cas, doc.Body, rawUserXattr)
var rawVV *rawHLV
vv := doc.Xattrs[base.VvXattrName]
if len(vv) > 0 {
rawVV = base.Ptr(rawHLV(vv))
}
isSGWrite, _, _ := syncData.IsSGWrite(ctx, event.Cas, doc.Body, rawUserXattr, rawVV)
if !isSGWrite {
return
}
Expand Down
152 changes: 152 additions & 0 deletions db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2130,3 +2130,155 @@ func TestGetCVActivePathway(t *testing.T) {
})
}
}

// createNewTestDocument creates a valid document for testing.
func createNewTestDocument(t *testing.T, db *Database, body []byte) *Document {
collection, ctx := GetSingleDatabaseCollectionWithUser(base.TestCtx(t), t, db)
ctx = base.UserLogCtx(ctx, "gotest", base.UserDomainBuiltin, nil)
ctx = base.DatabaseLogCtx(ctx, db.Name, nil)
name := SafeDocumentName(t, t.Name())
var b Body
require.NoError(t, base.JSONUnmarshal(body, &b))
_, _, err := collection.Put(ctx, name, b)
require.NoError(t, err)
doc, err := collection.GetDocument(ctx, name, DocUnmarshalAll)
require.NoError(t, err)
require.NotNil(t, doc.HLV)
require.NotEmpty(t, doc.RevAndVersion.CurrentSource)
require.NotEmpty(t, doc.RevAndVersion.CurrentVersion)
return doc
}

func TestIsSGWrite(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport)
db, ctx := setupTestDB(t)
defer db.Close(ctx)

body := []byte(`{"some":"data"}`)
testCases := []struct {
name string
docBody []byte
}{
{
name: "normal body",
docBody: body,
},
{
name: "nil body",
docBody: nil,
},
}

t.Run("standard Put", func(t *testing.T) {
doc := createNewTestDocument(t, db, body)
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
require.True(t, isSGWrite, "Expected doc to be identified as SG write for body %q", string(testCase.docBody))
})
}
})
t.Run("no HLV", func(t *testing.T) {
// falls back to body crc32 comparison
doc := createNewTestDocument(t, db, body)
doc.HLV = nil
doc.Cas = 1 // force mismatch cas
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
require.True(t, isSGWrite, "Expected doc to be identified an SDK write for body %q", string(testCase.docBody))
})
}
})
t.Run("no _sync.rev.src", func(t *testing.T) {
// this is a corrupt _sync.rev, so assume that it was a _vv at some point and import just in case
doc := createNewTestDocument(t, db, body)
doc.RevAndVersion.CurrentSource = ""
doc.Cas = 1 // force mismatch cas
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
require.False(t, isSGWrite, "Expected doc not to be identified as SG write for body %q since _sync.rev.src is empty", string(testCase.docBody))
})
}
})
t.Run("no _sync.rev.ver", func(t *testing.T) {
doc := createNewTestDocument(t, db, body)
doc.RevAndVersion.CurrentVersion = ""
doc.Cas = 1 // force mismatch cas
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
require.False(t, isSGWrite, "Expected doc not to be identified as SG write for body %q since _sync.rev.ver is empty", string(testCase.docBody))
})
}
})
t.Run("mismatch sync.rev.ver", func(t *testing.T) {
doc := createNewTestDocument(t, db, body)
doc.RevAndVersion.CurrentVersion = "0x1234"
doc.Cas = 1 // force mismatch cas
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
require.False(t, isSGWrite, "Expected doc to not be identified as SG write for body %q due to mismatched _sync.rev.ver", string(testCase.docBody))
})
}
})

}

func TestSyncDataCVEqual(t *testing.T) {
testCases := []struct {
name string
syncData SyncData
cvSourceID string
cvValue uint64
cvEqual bool
}{
{
name: "syncData CV matches",
syncData: SyncData{
RevAndVersion: channels.RevAndVersion{
CurrentSource: "testSourceID",
CurrentVersion: "0x0100000000000000",
},
},
cvSourceID: "testSourceID",
cvValue: 1,
cvEqual: true,
},
{
name: "syncData sourceID mismatch",
syncData: SyncData{
RevAndVersion: channels.RevAndVersion{
CurrentSource: "testSourceID",
CurrentVersion: "0x1",
},
},
cvSourceID: "testSourceID2",
cvValue: 1,
cvEqual: false,
},
{
name: "syncData sourceID mismatch",
syncData: SyncData{
RevAndVersion: channels.RevAndVersion{
CurrentSource: "testSourceID",
CurrentVersion: "0x2",
},
},
cvSourceID: "testSourceID",
cvValue: 1,
cvEqual: false,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
cv := Version{
SourceID: testCase.cvSourceID,
Value: testCase.cvValue,
}
require.Equal(t, testCase.cvEqual, testCase.syncData.CVEqual(cv))
})
}
}
49 changes: 37 additions & 12 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,8 +661,16 @@ func HasUserXattrChanged(userXattr []byte, prevUserXattrHash string) bool {
return userXattrCrc32cHash(userXattr) != prevUserXattrHash
}

// SyncData.IsSGWrite - used during feed-based import
func (s *SyncData) IsSGWrite(cas uint64, rawBody []byte, rawUserXattr []byte) (isSGWrite bool, crc32Match bool, bodyChanged bool) {
// CVEqual returns true if the provided CV does not match _sync.rev.ver and _sync.rev.src. The caller is responsible for testing if the values are non-empty.
func (s *SyncData) CVEqual(cv Version) bool {
if cv.SourceID != s.RevAndVersion.CurrentSource {
return false
}
return cv.Value == base.HexCasToUint64(s.RevAndVersion.CurrentVersion)
}

// IsSGWrite determines if a document was written by Sync Gateway or via an SDK. CV is an optional parameter to check. This would represent _vv.ver and _vv.src
func (s *SyncData) IsSGWrite(ctx context.Context, cas uint64, rawBody []byte, rawUserXattr []byte, cv cvExtractor) (isSGWrite bool, crc32Match bool, bodyChanged bool) {

// If cas matches, it was a SG write
if cas == s.GetSyncCas() {
Expand All @@ -675,24 +683,31 @@ func (s *SyncData) IsSGWrite(cas uint64, rawBody []byte, rawUserXattr []byte) (i
}

if HasUserXattrChanged(rawUserXattr, s.Crc32cUserXattr) {
// technically the crc32 matches but return false on crc32Match so Crc32MatchCount is not incremented, to mark that it would be imported
return false, false, false
}

if s.RevAndVersion.CurrentVersion != "" || s.RevAndVersion.CurrentSource != "" {
extractedCV, err := cv.ExtractCV()
if !errors.Is(err, base.ErrNotFound) {
if err != nil {
base.InfofCtx(ctx, base.KeyImport, "Unable to extract cv during IsSGWrite write check - skipping cv match check: %v", err)
return true, true, false
}
if !s.CVEqual(*extractedCV) {
// technically the crc32 matches but return false so Crc32MatchCount is not incremented, to mark that it would be imported
return false, false, false
}
}
}
return true, true, false
}

// doc.IsSGWrite - used during on-demand import. Doesn't invoke SyncData.IsSGWrite so that we
// can complete the inexpensive cas check before the (potential) doc marshalling.
// IsSGWrite - used during on-demand import. Check SyncData and HLV to determine if the document was written by Sync Gateway or by a Couchbase Server SDK write.
func (doc *Document) IsSGWrite(ctx context.Context, rawBody []byte) (isSGWrite bool, crc32Match bool, bodyChanged bool) {

// If the raw body is available, use SyncData.IsSGWrite
if rawBody != nil && len(rawBody) > 0 {

isSgWriteFeed, crc32MatchFeed, bodyChangedFeed := doc.SyncData.IsSGWrite(doc.Cas, rawBody, doc.rawUserXattr)
if !isSgWriteFeed {
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on cas and body hash. cas:%x syncCas:%q", base.UD(doc.ID), doc.Cas, doc.SyncData.Cas)
}

isSgWriteFeed, crc32MatchFeed, bodyChangedFeed := doc.SyncData.IsSGWrite(ctx, doc.Cas, rawBody, doc.rawUserXattr, doc.HLV)
return isSgWriteFeed, crc32MatchFeed, bodyChangedFeed
}

Expand All @@ -717,15 +732,25 @@ func (doc *Document) IsSGWrite(ctx context.Context, rawBody []byte) (isSGWrite b

// If the current body crc32c matches the one in doc.SyncData, this was an SG write (i.e. has already been imported)
if currentBodyCrc32c != doc.SyncData.Crc32c {
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on cas and body hash. cas:%x syncCas:%q", base.UD(doc.ID), doc.Cas, doc.SyncData.Cas)
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on crc32 hash", base.UD(doc.ID))
return false, false, true
}

if HasUserXattrChanged(doc.rawUserXattr, doc.Crc32cUserXattr) {
// technically the crc32 matches but return false for crc32Match so Crc32MatchCount is not incremented. The document is not a match and wil get imported.
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on user xattr hash", base.UD(doc.ID))
return false, false, false
}
if doc.RevAndVersion.CurrentSource == "" && doc.RevAndVersion.CurrentVersion == "" {
return true, true, false
}

if doc.HLV != nil {
if !doc.CVEqual(*doc.HLV.ExtractCurrentVersionFromHLV()) {
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on mismatch between version vector cv %s and sync metadata cv %s", base.UD(doc.ID), doc.HLV.GetCurrentVersionString(), doc.RevAndVersion.CV())
return false, true, false
}
}
return true, true, false
}

Expand Down
35 changes: 35 additions & 0 deletions db/hybrid_logical_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,14 @@ func (hlv *HybridLogicalVector) ExtractCurrentVersionFromHLV() *Version {
return &currVersion
}

// ExtractCV is used to sastify CV only interface. Consider using ExtractCurrentVersionFromHLV or GetCurrentVersion instead if you know hlv is not nil.
func (hlv *HybridLogicalVector) ExtractCV() (*Version, error) {
if hlv == nil {
return nil, base.ErrNotFound
}
return hlv.ExtractCurrentVersionFromHLV(), nil
}

// HybridLogicalVector is the in memory format for the hLv.
type HybridLogicalVector struct {
CurrentVersionCAS uint64 // current version cas (or cvCAS) stores the current CAS in little endian hex format at the time of replication
Expand Down Expand Up @@ -979,3 +987,30 @@ func (hlv *HybridLogicalVector) HasRevEncodedCV() bool {
func GetGenerationFromEncodedVersionValue(value uint64) int {
return int((value >> 40) & 0xFFFFFF)
}

type rawHLV []byte

// GetCurrentVersion returns the current version from the HLV by unmarshalling a raw _vv xattr. If the rawHLV is nil, returns ErrNotFound.
func (r *rawHLV) ExtractCV() (*Version, error) {
if r == nil {
return nil, base.ErrNotFound
}
limitedHLV := struct {
Version string `json:"ver"`
SourceID string `json:"src"`
}{}
err := base.JSONUnmarshal(*r, &limitedHLV)
if err != nil {
return nil, fmt.Errorf("could not extract ver and src from _vv xattr (%q): %w", string(*r), err)
}
return &Version{SourceID: limitedHLV.SourceID, Value: base.HexCasToUint64(limitedHLV.Version)}, nil
}

// cvExtractor can extract a current version different HLV representations.
type cvExtractor interface {
// ExtractVersion returns the current version from the HLV.
ExtractCV() (*Version, error)
}

var _ cvExtractor = &rawHLV{}
var _ cvExtractor = &HybridLogicalVector{}
8 changes: 7 additions & 1 deletion db/import_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,13 @@ func (il *importListener) ImportFeedEvent(ctx context.Context, collection *Datab
il.importStats.ImportErrorCount.Add(1)
return
}
isSGWrite, crc32Match, _ = syncData.IsSGWrite(event.Cas, rawDoc.Body, rawDoc.Xattrs[collection.userXattrKey()])
var cv cvExtractor
vv := rawDoc.Xattrs[base.VvXattrName]
if len(vv) > 0 {
cv = base.Ptr(rawHLV(vv))
}

isSGWrite, crc32Match, _ = syncData.IsSGWrite(ctx, event.Cas, rawDoc.Body, rawDoc.Xattrs[collection.userXattrKey()], cv)
if crc32Match {
il.dbStats.Crc32MatchCount.Add(1)
}
Expand Down
11 changes: 11 additions & 0 deletions db/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net/http"
"slices"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -1008,3 +1009,13 @@ func GetChangeEntryCV(t *testing.T, entry *ChangeEntry) Version {
}
return Version{}
}

// SafeDocumentName returns a document name free of any special characters for use in tests.
func SafeDocumentName(t *testing.T, name string) string {
docName := strings.ToLower(name)
for _, c := range []string{" ", "<", ">", "/", "="} {
docName = strings.ReplaceAll(docName, c, "_")
}
require.Less(t, len(docName), 251, "Document name %s is too long, must be less than 251 characters", name)
return docName
}
Loading
Loading