Skip to content

Commit 6cab66f

Browse files
committed
restore common signatures
1 parent 9ff2712 commit 6cab66f

File tree

6 files changed

+69
-57
lines changed

6 files changed

+69
-57
lines changed

db/change_cache.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -395,18 +395,13 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent, docType DocumentType)
395395
if syncData == nil {
396396
return
397397
}
398-
var cv *Version
399398
vv := doc.Xattrs[base.VvXattrName]
400399
if len(vv) > 0 {
401-
cv, err = getCurrentVersionFromVVXattr(vv)
402-
if err != nil {
403-
base.WarnfCtx(ctx, "Error unmarshalling %s xattr (%q) for doc %q: %v", base.VvXattrName, string(vv), base.UD(docID), err)
400+
isSGWrite, _, _ := syncData.IsSGWrite(ctx, event.Cas, doc.Body, rawUserXattr, base.Ptr(rawHLV(vv)))
401+
if !isSGWrite {
402+
return
404403
}
405404
}
406-
isSGWrite, _, _ := syncData.IsSGWrite(event.Cas, doc.Body, rawUserXattr, cv)
407-
if !isSGWrite {
408-
return
409-
}
410405
}
411406

412407
// If not using xattrs and no sync metadata found, check whether we're mid-upgrade and attempting to read a doc w/ metadata stored in xattr

db/crud_test.go

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2191,13 +2191,14 @@ func TestIsSGWrite(t *testing.T) {
21912191
}
21922192
})
21932193
t.Run("no _sync.rev.src", func(t *testing.T) {
2194+
// this is a corrupt _sync.rev, so assume that it was a _vv at some point and import just in case
21942195
doc := createNewTestDocument(t, db, body)
21952196
doc.RevAndVersion.CurrentSource = ""
21962197
doc.Cas = 1 // force mismatch cas
21972198
for _, testCase := range testCases {
21982199
t.Run(testCase.name, func(t *testing.T) {
21992200
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
2200-
require.True(t, isSGWrite, "Expected doc to be identified as SG write for body %q", string(testCase.docBody))
2201+
require.False(t, isSGWrite, "Expected doc not to be identified as SG write for body %q since _sync.rev.src is empty", string(testCase.docBody))
22012202
})
22022203
}
22032204
})
@@ -2208,7 +2209,7 @@ func TestIsSGWrite(t *testing.T) {
22082209
for _, testCase := range testCases {
22092210
t.Run(testCase.name, func(t *testing.T) {
22102211
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
2211-
require.True(t, isSGWrite, "Expected doc to be identified as SG write for body %q since _sync.rev.ver is empty", string(testCase.docBody))
2212+
require.False(t, isSGWrite, "Expected doc not to be identified as SG write for body %q since _sync.rev.ver is empty", string(testCase.docBody))
22122213
})
22132214
}
22142215
})
@@ -2239,12 +2240,12 @@ func TestSyncDataCVEqual(t *testing.T) {
22392240
syncData: SyncData{
22402241
RevAndVersion: channels.RevAndVersion{
22412242
CurrentSource: "testSourceID",
2242-
CurrentVersion: "0x1",
2243+
CurrentVersion: "0x0100000000000000",
22432244
},
22442245
},
22452246
cvSourceID: "testSourceID",
22462247
cvValue: 1,
2247-
cvEqual: false,
2248+
cvEqual: true,
22482249
},
22492250
{
22502251
name: "syncData sourceID mismatch",
@@ -2256,7 +2257,7 @@ func TestSyncDataCVEqual(t *testing.T) {
22562257
},
22572258
cvSourceID: "testSourceID2",
22582259
cvValue: 1,
2259-
cvEqual: true,
2260+
cvEqual: false,
22602261
},
22612262
{
22622263
name: "syncData sourceID mismatch",
@@ -2268,24 +2269,16 @@ func TestSyncDataCVEqual(t *testing.T) {
22682269
},
22692270
cvSourceID: "testSourceID",
22702271
cvValue: 1,
2271-
cvEqual: true,
2272-
},
2273-
{
2274-
name: "syncData equal",
2275-
syncData: SyncData{
2276-
RevAndVersion: channels.RevAndVersion{
2277-
CurrentSource: "sourceID",
2278-
CurrentVersion: "0x1",
2279-
},
2280-
},
2281-
cvSourceID: "sourceID",
2282-
cvValue: 1,
2283-
cvEqual: true,
2272+
cvEqual: false,
22842273
},
22852274
}
22862275
for _, testCase := range testCases {
22872276
t.Run(testCase.name, func(t *testing.T) {
2288-
assert.True(t, testCase.syncData.CVEqual(testCase.cvSourceID, testCase.cvValue))
2277+
cv := Version{
2278+
SourceID: testCase.cvSourceID,
2279+
Value: testCase.cvValue,
2280+
}
2281+
require.Equal(t, testCase.cvEqual, testCase.syncData.CVEqual(cv))
22892282
})
22902283
}
22912284
}

db/document.go

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -662,15 +662,15 @@ func HasUserXattrChanged(userXattr []byte, prevUserXattrHash string) bool {
662662
}
663663

664664
// CVEqual returns true if the provided CV does not match _sync.rev.ver and _sync.rev.src. The caller is responsible for testing if the values are non-empty.
665-
func (s *SyncData) CVEqual(cvSourceID string, cvVersion uint64) bool {
666-
if cvSourceID != s.RevAndVersion.CurrentSource {
667-
return true
665+
func (s *SyncData) CVEqual(cv Version) bool {
666+
if cv.SourceID != s.RevAndVersion.CurrentSource {
667+
return false
668668
}
669-
return cvVersion != base.HexCasToUint64(s.RevAndVersion.CurrentVersion)
669+
return cv.Value == base.HexCasToUint64(s.RevAndVersion.CurrentVersion)
670670
}
671671

672672
// IsSGWrite determines if a document was written by Sync Gateway or via an SDK. CV is an optional parameter to check. This would represent _vv.ver and _vv.src
673-
func (s *SyncData) IsSGWrite(cas uint64, rawBody []byte, rawUserXattr []byte, cv *Version) (isSGWrite bool, crc32Match bool, bodyChanged bool) {
673+
func (s *SyncData) IsSGWrite(ctx context.Context, cas uint64, rawBody []byte, rawUserXattr []byte, cv cvExtractor) (isSGWrite bool, crc32Match bool, bodyChanged bool) {
674674

675675
// If cas matches, it was a SG write
676676
if cas == s.GetSyncCas() {
@@ -686,20 +686,26 @@ func (s *SyncData) IsSGWrite(cas uint64, rawBody []byte, rawUserXattr []byte, cv
686686
return false, false, false
687687
}
688688

689+
if s.RevAndVersion.CurrentVersion != "" || s.RevAndVersion.CurrentSource != "" {
690+
extractedCV, err := cv.ExtractCV()
691+
if !errors.Is(err, base.ErrNotFound) {
692+
if err != nil {
693+
base.WarnfCtx(ctx, "Error extracting cv during IsSGWrite write check: %v", err)
694+
return true, true, false
695+
}
696+
if !s.CVEqual(*extractedCV) {
697+
return false, true, false
698+
}
699+
}
700+
}
689701
return true, true, false
690702
}
691703

692704
// IsSGWrite - used during on-demand import. Check SyncData and HLV to determine if the document was written by Sync Gateway or by a Couchbase Server SDK write.
693705
func (doc *Document) IsSGWrite(ctx context.Context, rawBody []byte) (isSGWrite bool, crc32Match bool, bodyChanged bool) {
694-
var cv *Version
695-
if doc.RevAndVersion.CurrentSource != "" && doc.RevAndVersion.CurrentVersion != "" && doc.HLV != nil {
696-
sourceID, version := doc.HLV.GetCurrentVersion()
697-
cv = &Version{SourceID: sourceID, Value: version}
698-
}
699-
700706
// If the raw body is available, use SyncData.IsSGWrite
701707
if rawBody != nil && len(rawBody) > 0 {
702-
isSgWriteFeed, crc32MatchFeed, bodyChangedFeed := doc.SyncData.IsSGWrite(doc.Cas, rawBody, doc.rawUserXattr, cv)
708+
isSgWriteFeed, crc32MatchFeed, bodyChangedFeed := doc.SyncData.IsSGWrite(ctx, doc.Cas, rawBody, doc.rawUserXattr, doc.HLV)
703709
return isSgWriteFeed, crc32MatchFeed, bodyChangedFeed
704710
}
705711

@@ -730,14 +736,14 @@ func (doc *Document) IsSGWrite(ctx context.Context, rawBody []byte) (isSGWrite b
730736

731737
if HasUserXattrChanged(doc.rawUserXattr, doc.Crc32cUserXattr) {
732738
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on user xattr hash", base.UD(doc.ID))
733-
return false, true, false
739+
return false, false, false
734740
}
735-
if doc.RevAndVersion.CurrentSource == "" || doc.RevAndVersion.CurrentVersion == "" {
741+
if doc.RevAndVersion.CurrentSource == "" && doc.RevAndVersion.CurrentVersion == "" {
736742
return true, true, false
737743
}
738744

739-
if cv != nil {
740-
if !doc.CVEqual(cv.SourceID, cv.Value) {
745+
if doc.HLV != nil {
746+
if !doc.CVEqual(*doc.HLV.ExtractCurrentVersionFromHLV()) {
741747
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on mismatch between version vector cv %s and sync metadata cv %s", base.UD(doc.ID), doc.HLV.GetCurrentVersionString(), doc.RevAndVersion.CV())
742748
return false, true, false
743749
}

db/hybrid_logical_vector.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,14 @@ func (hlv *HybridLogicalVector) ExtractCurrentVersionFromHLV() *Version {
248248
return &currVersion
249249
}
250250

251+
// ExtractCV is used to sastify CV only interface. Since it can never return an error, consider ExtractCurrentVersionFromHLV or GetCurrentVersion instead.
252+
func (hlv *HybridLogicalVector) ExtractCV() (*Version, error) {
253+
if hlv == nil {
254+
return nil, base.ErrNotFound
255+
}
256+
return hlv.ExtractCurrentVersionFromHLV(), nil
257+
}
258+
251259
// HybridLogicalVector is the in memory format for the hLv.
252260
type HybridLogicalVector struct {
253261
CurrentVersionCAS uint64 // current version cas (or cvCAS) stores the current CAS in little endian hex format at the time of replication
@@ -980,16 +988,29 @@ func GetGenerationFromEncodedVersionValue(value uint64) int {
980988
return int((value >> 40) & 0xFFFFFF)
981989
}
982990

983-
// getCurrentVersionFromVVXattr will extract only the current version from a full version vector xattr, used to
984-
// optimize unmarshalling when only the current version is needed.
985-
func getCurrentVersionFromVVXattr(hlvData []byte) (*Version, error) {
991+
type rawHLV []byte
992+
993+
// GetCurrentVersion returns the current version from the HLV by unmarshalling a raw _vv xattr.
994+
func (r *rawHLV) ExtractCV() (*Version, error) {
995+
if r == nil {
996+
return nil, base.ErrNotFound
997+
}
986998
limitedHLV := struct {
987999
Version string `json:"ver"`
9881000
SourceID string `json:"src"`
9891001
}{}
990-
err := base.JSONUnmarshal(hlvData, &limitedHLV)
1002+
err := base.JSONUnmarshal(*r, &limitedHLV)
9911003
if err != nil {
992-
return nil, err
1004+
return nil, fmt.Errorf("could not extract ver and src from _vv xattr (%q): %w", string(*r), err)
9931005
}
9941006
return &Version{SourceID: limitedHLV.SourceID, Value: base.HexCasToUint64(limitedHLV.Version)}, nil
9951007
}
1008+
1009+
// cvExtractor can extract a current version different HLV representations.
1010+
type cvExtractor interface {
1011+
// ExtractVersion returns the current version from the HLV.
1012+
ExtractCV() (*Version, error)
1013+
}
1014+
1015+
var _ cvExtractor = &rawHLV{}
1016+
var _ cvExtractor = &HybridLogicalVector{}

db/import_listener.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -202,16 +202,13 @@ func (il *importListener) ImportFeedEvent(ctx context.Context, collection *Datab
202202
il.importStats.ImportErrorCount.Add(1)
203203
return
204204
}
205-
var cv *Version
205+
var cv cvExtractor
206206
vv := rawDoc.Xattrs[base.VvXattrName]
207207
if len(vv) > 0 {
208-
cv, err = getCurrentVersionFromVVXattr(vv)
209-
if err != nil {
210-
base.WarnfCtx(ctx, "Error parsing version xattr for doc %s - not importing. Continuing without doing version vector consistency check between _vv and _sync: %s", base.UD(event.Key), err)
211-
return
212-
}
208+
cv = base.Ptr(rawHLV(vv))
213209
}
214-
isSGWrite, crc32Match, _ = syncData.IsSGWrite(event.Cas, rawDoc.Body, rawDoc.Xattrs[collection.userXattrKey()], cv)
210+
211+
isSGWrite, crc32Match, _ = syncData.IsSGWrite(ctx, event.Cas, rawDoc.Body, rawDoc.Xattrs[collection.userXattrKey()], cv)
215212
if crc32Match {
216213
il.dbStats.Crc32MatchCount.Add(1)
217214
}

xdcr/xdcr_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,7 @@ func TestXDCRBeforeAttachmentMigration(t *testing.T) {
690690
requireWaitForXDCRDocsWritten(t, xdcr, 3)
691691

692692
// check that we can fetch the attachment now
693-
// Due to CBG-4864, this writes a new revision since _vv.ver > _sync.rev.ver even though the body didn't change.
693+
// Due to CBG-4878, this writes a new revision since _vv.ver > _sync.rev.ver even though the body didn't change.
694694
// This isn't ideal and it would be better to not have this issue a new revision.
695695
//
696696
// Note: The versions remaining the same means that clients that observe the missing attachment never get it until a subsequent doc update
@@ -705,7 +705,7 @@ func TestXDCRBeforeAttachmentMigration(t *testing.T) {
705705
attachmentsAfter := db.GetRawGlobalSyncAttachments(t, dstDs, docID)
706706
require.Contains(t, attachmentsAfter, attName)
707707

708-
// CBG-4864 causes these to not be equal but equality would be better.
708+
// CBG-4878 causes these to not be equal but equality would be better.
709709
assert.NotEqual(t, dstPreMigrateSeq, dstPostMigrateSeq)
710710
assert.NotEqual(t, dstPreMigrateRev, dstPostMigrateRev)
711711
}

0 commit comments

Comments
 (0)