Skip to content

Commit 2052ad6

Browse files
authored
CBG-4864 Import document if cv changes, even if body is unchanged (#7775)
1 parent e67f6dc commit 2052ad6

File tree

8 files changed

+283
-32
lines changed

8 files changed

+283
-32
lines changed

db/change_cache.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,12 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent, docType DocumentType)
395395
if syncData == nil {
396396
return
397397
}
398-
isSGWrite, _, _ := syncData.IsSGWrite(event.Cas, doc.Body, rawUserXattr)
398+
var rawVV *rawHLV
399+
vv := doc.Xattrs[base.VvXattrName]
400+
if len(vv) > 0 {
401+
rawVV = base.Ptr(rawHLV(vv))
402+
}
403+
isSGWrite, _, _ := syncData.IsSGWrite(ctx, event.Cas, doc.Body, rawUserXattr, rawVV)
399404
if !isSGWrite {
400405
return
401406
}

db/crud_test.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2130,3 +2130,155 @@ func TestGetCVActivePathway(t *testing.T) {
21302130
})
21312131
}
21322132
}
2133+
2134+
// createNewTestDocument creates a valid document for testing.
2135+
func createNewTestDocument(t *testing.T, db *Database, body []byte) *Document {
2136+
collection, ctx := GetSingleDatabaseCollectionWithUser(base.TestCtx(t), t, db)
2137+
ctx = base.UserLogCtx(ctx, "gotest", base.UserDomainBuiltin, nil)
2138+
ctx = base.DatabaseLogCtx(ctx, db.Name, nil)
2139+
name := SafeDocumentName(t, t.Name())
2140+
var b Body
2141+
require.NoError(t, base.JSONUnmarshal(body, &b))
2142+
_, _, err := collection.Put(ctx, name, b)
2143+
require.NoError(t, err)
2144+
doc, err := collection.GetDocument(ctx, name, DocUnmarshalAll)
2145+
require.NoError(t, err)
2146+
require.NotNil(t, doc.HLV)
2147+
require.NotEmpty(t, doc.RevAndVersion.CurrentSource)
2148+
require.NotEmpty(t, doc.RevAndVersion.CurrentVersion)
2149+
return doc
2150+
}
2151+
2152+
func TestIsSGWrite(t *testing.T) {
2153+
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport)
2154+
db, ctx := setupTestDB(t)
2155+
defer db.Close(ctx)
2156+
2157+
body := []byte(`{"some":"data"}`)
2158+
testCases := []struct {
2159+
name string
2160+
docBody []byte
2161+
}{
2162+
{
2163+
name: "normal body",
2164+
docBody: body,
2165+
},
2166+
{
2167+
name: "nil body",
2168+
docBody: nil,
2169+
},
2170+
}
2171+
2172+
t.Run("standard Put", func(t *testing.T) {
2173+
doc := createNewTestDocument(t, db, body)
2174+
for _, testCase := range testCases {
2175+
t.Run(testCase.name, func(t *testing.T) {
2176+
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
2177+
require.True(t, isSGWrite, "Expected doc to be identified as SG write for body %q", string(testCase.docBody))
2178+
})
2179+
}
2180+
})
2181+
t.Run("no HLV", func(t *testing.T) {
2182+
// falls back to body crc32 comparison
2183+
doc := createNewTestDocument(t, db, body)
2184+
doc.HLV = nil
2185+
doc.Cas = 1 // force mismatch cas
2186+
for _, testCase := range testCases {
2187+
t.Run(testCase.name, func(t *testing.T) {
2188+
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
2189+
require.True(t, isSGWrite, "Expected doc to be identified an SDK write for body %q", string(testCase.docBody))
2190+
})
2191+
}
2192+
})
2193+
t.Run("no _sync.rev.src", func(t *testing.T) {
2194+
// this is a corrupt _sync.rev, so assume that it was a _vv at some point and import just in case
2195+
doc := createNewTestDocument(t, db, body)
2196+
doc.RevAndVersion.CurrentSource = ""
2197+
doc.Cas = 1 // force mismatch cas
2198+
for _, testCase := range testCases {
2199+
t.Run(testCase.name, func(t *testing.T) {
2200+
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
2201+
require.False(t, isSGWrite, "Expected doc not to be identified as SG write for body %q since _sync.rev.src is empty", string(testCase.docBody))
2202+
})
2203+
}
2204+
})
2205+
t.Run("no _sync.rev.ver", func(t *testing.T) {
2206+
doc := createNewTestDocument(t, db, body)
2207+
doc.RevAndVersion.CurrentVersion = ""
2208+
doc.Cas = 1 // force mismatch cas
2209+
for _, testCase := range testCases {
2210+
t.Run(testCase.name, func(t *testing.T) {
2211+
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
2212+
require.False(t, isSGWrite, "Expected doc not to be identified as SG write for body %q since _sync.rev.ver is empty", string(testCase.docBody))
2213+
})
2214+
}
2215+
})
2216+
t.Run("mismatch sync.rev.ver", func(t *testing.T) {
2217+
doc := createNewTestDocument(t, db, body)
2218+
doc.RevAndVersion.CurrentVersion = "0x1234"
2219+
doc.Cas = 1 // force mismatch cas
2220+
for _, testCase := range testCases {
2221+
t.Run(testCase.name, func(t *testing.T) {
2222+
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
2223+
require.False(t, isSGWrite, "Expected doc to not be identified as SG write for body %q due to mismatched _sync.rev.ver", string(testCase.docBody))
2224+
})
2225+
}
2226+
})
2227+
2228+
}
2229+
2230+
func TestSyncDataCVEqual(t *testing.T) {
2231+
testCases := []struct {
2232+
name string
2233+
syncData SyncData
2234+
cvSourceID string
2235+
cvValue uint64
2236+
cvEqual bool
2237+
}{
2238+
{
2239+
name: "syncData CV matches",
2240+
syncData: SyncData{
2241+
RevAndVersion: channels.RevAndVersion{
2242+
CurrentSource: "testSourceID",
2243+
CurrentVersion: "0x0100000000000000",
2244+
},
2245+
},
2246+
cvSourceID: "testSourceID",
2247+
cvValue: 1,
2248+
cvEqual: true,
2249+
},
2250+
{
2251+
name: "syncData sourceID mismatch",
2252+
syncData: SyncData{
2253+
RevAndVersion: channels.RevAndVersion{
2254+
CurrentSource: "testSourceID",
2255+
CurrentVersion: "0x1",
2256+
},
2257+
},
2258+
cvSourceID: "testSourceID2",
2259+
cvValue: 1,
2260+
cvEqual: false,
2261+
},
2262+
{
2263+
name: "syncData sourceID mismatch",
2264+
syncData: SyncData{
2265+
RevAndVersion: channels.RevAndVersion{
2266+
CurrentSource: "testSourceID",
2267+
CurrentVersion: "0x2",
2268+
},
2269+
},
2270+
cvSourceID: "testSourceID",
2271+
cvValue: 1,
2272+
cvEqual: false,
2273+
},
2274+
}
2275+
for _, testCase := range testCases {
2276+
t.Run(testCase.name, func(t *testing.T) {
2277+
cv := Version{
2278+
SourceID: testCase.cvSourceID,
2279+
Value: testCase.cvValue,
2280+
}
2281+
require.Equal(t, testCase.cvEqual, testCase.syncData.CVEqual(cv))
2282+
})
2283+
}
2284+
}

db/document.go

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -661,8 +661,16 @@ func HasUserXattrChanged(userXattr []byte, prevUserXattrHash string) bool {
661661
return userXattrCrc32cHash(userXattr) != prevUserXattrHash
662662
}
663663

664-
// SyncData.IsSGWrite - used during feed-based import
665-
func (s *SyncData) IsSGWrite(cas uint64, rawBody []byte, rawUserXattr []byte) (isSGWrite bool, crc32Match bool, bodyChanged bool) {
664+
// CVEqual returns true if the provided CV does not match _sync.rev.ver and _sync.rev.src. The caller is responsible for testing if the values are non-empty.
665+
func (s *SyncData) CVEqual(cv Version) bool {
666+
if cv.SourceID != s.RevAndVersion.CurrentSource {
667+
return false
668+
}
669+
return cv.Value == base.HexCasToUint64(s.RevAndVersion.CurrentVersion)
670+
}
671+
672+
// IsSGWrite determines if a document was written by Sync Gateway or via an SDK. CV is an optional parameter to check. This would represent _vv.ver and _vv.src
673+
func (s *SyncData) IsSGWrite(ctx context.Context, cas uint64, rawBody []byte, rawUserXattr []byte, cv cvExtractor) (isSGWrite bool, crc32Match bool, bodyChanged bool) {
666674

667675
// If cas matches, it was a SG write
668676
if cas == s.GetSyncCas() {
@@ -675,24 +683,31 @@ func (s *SyncData) IsSGWrite(cas uint64, rawBody []byte, rawUserXattr []byte) (i
675683
}
676684

677685
if HasUserXattrChanged(rawUserXattr, s.Crc32cUserXattr) {
686+
// technically the crc32 matches but return false on crc32Match so Crc32MatchCount is not incremented, to mark that it would be imported
678687
return false, false, false
679688
}
680689

690+
if s.RevAndVersion.CurrentVersion != "" || s.RevAndVersion.CurrentSource != "" {
691+
extractedCV, err := cv.ExtractCV()
692+
if !errors.Is(err, base.ErrNotFound) {
693+
if err != nil {
694+
base.InfofCtx(ctx, base.KeyImport, "Unable to extract cv during IsSGWrite write check - skipping cv match check: %v", err)
695+
return true, true, false
696+
}
697+
if !s.CVEqual(*extractedCV) {
698+
// technically the crc32 matches but return false so Crc32MatchCount is not incremented, to mark that it would be imported
699+
return false, false, false
700+
}
701+
}
702+
}
681703
return true, true, false
682704
}
683705

684-
// doc.IsSGWrite - used during on-demand import. Doesn't invoke SyncData.IsSGWrite so that we
685-
// can complete the inexpensive cas check before the (potential) doc marshalling.
706+
// IsSGWrite - used during on-demand import. Check SyncData and HLV to determine if the document was written by Sync Gateway or by a Couchbase Server SDK write.
686707
func (doc *Document) IsSGWrite(ctx context.Context, rawBody []byte) (isSGWrite bool, crc32Match bool, bodyChanged bool) {
687-
688708
// If the raw body is available, use SyncData.IsSGWrite
689709
if rawBody != nil && len(rawBody) > 0 {
690-
691-
isSgWriteFeed, crc32MatchFeed, bodyChangedFeed := doc.SyncData.IsSGWrite(doc.Cas, rawBody, doc.rawUserXattr)
692-
if !isSgWriteFeed {
693-
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on cas and body hash. cas:%x syncCas:%q", base.UD(doc.ID), doc.Cas, doc.SyncData.Cas)
694-
}
695-
710+
isSgWriteFeed, crc32MatchFeed, bodyChangedFeed := doc.SyncData.IsSGWrite(ctx, doc.Cas, rawBody, doc.rawUserXattr, doc.HLV)
696711
return isSgWriteFeed, crc32MatchFeed, bodyChangedFeed
697712
}
698713

@@ -717,15 +732,25 @@ func (doc *Document) IsSGWrite(ctx context.Context, rawBody []byte) (isSGWrite b
717732

718733
// If the current body crc32c matches the one in doc.SyncData, this was an SG write (i.e. has already been imported)
719734
if currentBodyCrc32c != doc.SyncData.Crc32c {
720-
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on cas and body hash. cas:%x syncCas:%q", base.UD(doc.ID), doc.Cas, doc.SyncData.Cas)
735+
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on crc32 hash", base.UD(doc.ID))
721736
return false, false, true
722737
}
723738

724739
if HasUserXattrChanged(doc.rawUserXattr, doc.Crc32cUserXattr) {
740+
// technically the crc32 matches but return false for crc32Match so Crc32MatchCount is not incremented. The document is not a match and wil get imported.
725741
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on user xattr hash", base.UD(doc.ID))
726742
return false, false, false
727743
}
744+
if doc.RevAndVersion.CurrentSource == "" && doc.RevAndVersion.CurrentVersion == "" {
745+
return true, true, false
746+
}
728747

748+
if doc.HLV != nil {
749+
if !doc.CVEqual(*doc.HLV.ExtractCurrentVersionFromHLV()) {
750+
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on mismatch between version vector cv %s and sync metadata cv %s", base.UD(doc.ID), doc.HLV.GetCurrentVersionString(), doc.RevAndVersion.CV())
751+
return false, true, false
752+
}
753+
}
729754
return true, true, false
730755
}
731756

db/hybrid_logical_vector.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,14 @@ func (hlv *HybridLogicalVector) ExtractCurrentVersionFromHLV() *Version {
248248
return &currVersion
249249
}
250250

251+
// ExtractCV is used to sastify CV only interface. Consider using ExtractCurrentVersionFromHLV or GetCurrentVersion instead if you know hlv is not nil.
252+
func (hlv *HybridLogicalVector) ExtractCV() (*Version, error) {
253+
if hlv == nil {
254+
return nil, base.ErrNotFound
255+
}
256+
return hlv.ExtractCurrentVersionFromHLV(), nil
257+
}
258+
251259
// HybridLogicalVector is the in memory format for the hLv.
252260
type HybridLogicalVector struct {
253261
CurrentVersionCAS uint64 // current version cas (or cvCAS) stores the current CAS in little endian hex format at the time of replication
@@ -979,3 +987,30 @@ func (hlv *HybridLogicalVector) HasRevEncodedCV() bool {
979987
func GetGenerationFromEncodedVersionValue(value uint64) int {
980988
return int((value >> 40) & 0xFFFFFF)
981989
}
990+
991+
type rawHLV []byte
992+
993+
// GetCurrentVersion returns the current version from the HLV by unmarshalling a raw _vv xattr. If the rawHLV is nil, returns ErrNotFound.
994+
func (r *rawHLV) ExtractCV() (*Version, error) {
995+
if r == nil {
996+
return nil, base.ErrNotFound
997+
}
998+
limitedHLV := struct {
999+
Version string `json:"ver"`
1000+
SourceID string `json:"src"`
1001+
}{}
1002+
err := base.JSONUnmarshal(*r, &limitedHLV)
1003+
if err != nil {
1004+
return nil, fmt.Errorf("could not extract ver and src from _vv xattr (%q): %w", string(*r), err)
1005+
}
1006+
return &Version{SourceID: limitedHLV.SourceID, Value: base.HexCasToUint64(limitedHLV.Version)}, nil
1007+
}
1008+
1009+
// cvExtractor can extract a current version different HLV representations.
1010+
type cvExtractor interface {
1011+
// ExtractVersion returns the current version from the HLV.
1012+
ExtractCV() (*Version, error)
1013+
}
1014+
1015+
var _ cvExtractor = &rawHLV{}
1016+
var _ cvExtractor = &HybridLogicalVector{}

db/import_listener.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,13 @@ func (il *importListener) ImportFeedEvent(ctx context.Context, collection *Datab
202202
il.importStats.ImportErrorCount.Add(1)
203203
return
204204
}
205-
isSGWrite, crc32Match, _ = syncData.IsSGWrite(event.Cas, rawDoc.Body, rawDoc.Xattrs[collection.userXattrKey()])
205+
var cv cvExtractor
206+
vv := rawDoc.Xattrs[base.VvXattrName]
207+
if len(vv) > 0 {
208+
cv = base.Ptr(rawHLV(vv))
209+
}
210+
211+
isSGWrite, crc32Match, _ = syncData.IsSGWrite(ctx, event.Cas, rawDoc.Body, rawDoc.Xattrs[collection.userXattrKey()], cv)
206212
if crc32Match {
207213
il.dbStats.Crc32MatchCount.Add(1)
208214
}

db/util_testing.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"net/http"
1919
"slices"
2020
"strconv"
21+
"strings"
2122
"sync/atomic"
2223
"testing"
2324
"time"
@@ -1008,3 +1009,13 @@ func GetChangeEntryCV(t *testing.T, entry *ChangeEntry) Version {
10081009
}
10091010
return Version{}
10101011
}
1012+
1013+
// SafeDocumentName returns a document name free of any special characters for use in tests.
1014+
func SafeDocumentName(t *testing.T, name string) string {
1015+
docName := strings.ToLower(name)
1016+
for _, c := range []string{" ", "<", ">", "/", "="} {
1017+
docName = strings.ReplaceAll(docName, c, "_")
1018+
}
1019+
require.Less(t, len(docName), 251, "Document name %s is too long, must be less than 251 characters", name)
1020+
return docName
1021+
}

0 commit comments

Comments
 (0)