Skip to content

Commit 9ff2712

Browse files
committed
simplify IsSGWrite
1 parent 9c1dc4c commit 9ff2712

File tree

8 files changed

+90
-81
lines changed

8 files changed

+90
-81
lines changed

db/change_cache.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,15 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent, docType DocumentType)
395395
if syncData == nil {
396396
return
397397
}
398-
isSGWrite, _, _ := syncData.IsSGWrite(event.Cas, doc.Body, rawUserXattr)
398+
var cv *Version
399+
vv := doc.Xattrs[base.VvXattrName]
400+
if len(vv) > 0 {
401+
cv, err = getCurrentVersionFromVVXattr(vv)
402+
if err != nil {
403+
base.WarnfCtx(ctx, "Error unmarshalling %s xattr (%q) for doc %q: %v", base.VvXattrName, string(vv), base.UD(docID), err)
404+
}
405+
}
406+
isSGWrite, _, _ := syncData.IsSGWrite(event.Cas, doc.Body, rawUserXattr, cv)
399407
if !isSGWrite {
400408
return
401409
}

db/crud.go

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,6 @@ func (c *DatabaseCollection) GetDocument(ctx context.Context, docid string, unma
6666
return doc, err
6767
}
6868

69-
// IsSGWrite is used to determine if the document was written by Sync Gateway or needs to be imported.
70-
func (c *DatabaseCollection) IsSGWrite(ctx context.Context, doc *Document, rawBody []byte) (isSGWrite bool, crc32Match bool, bodyChanged bool) {
71-
cv := Version{SourceID: c.dbCtx.EncodedSourceID, Value: doc.Cas}
72-
if doc.HLV != nil {
73-
cv = *doc.HLV.ExtractCurrentVersionFromHLV()
74-
}
75-
return doc.IsSGWrite(ctx, cv, rawBody)
76-
}
77-
7869
// GetDocumentWithRaw returns the document from the bucket. This may perform an on-demand import.
7970
func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) {
8071
key := realDocID(docid)
@@ -86,7 +77,7 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin
8677
if err != nil {
8778
return nil, nil, err
8879
}
89-
isSgWrite, crc32Match, _ := c.IsSGWrite(ctx, doc, rawBucketDoc.Body)
80+
isSgWrite, crc32Match, _ := doc.IsSGWrite(ctx, rawBucketDoc.Body)
9081
if crc32Match {
9182
c.dbStats().Database().Crc32MatchCount.Add(1)
9283
}
@@ -98,7 +89,7 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin
9889
if err != nil {
9990
return nil, nil, err
10091
}
101-
isSgWrite, _, _ := c.IsSGWrite(ctx, doc, rawBucketDoc.Body)
92+
isSgWrite, _, _ := doc.IsSGWrite(ctx, rawBucketDoc.Body)
10293
if !isSgWrite {
10394
var importErr error
10495
doc, importErr = c.OnDemandImportForGet(ctx, docid, doc, rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas)
@@ -189,7 +180,7 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) (
189180
return emptySyncData, unmarshalErr
190181
}
191182

192-
isSgWrite, crc32Match, _ := c.IsSGWrite(ctx, doc, rawDoc)
183+
isSgWrite, crc32Match, _ := doc.IsSGWrite(ctx, rawDoc)
193184
if crc32Match {
194185
c.dbStats().Database().Crc32MatchCount.Add(1)
195186
}
@@ -1137,7 +1128,7 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
11371128

11381129
// Is this doc an sgWrite?
11391130
if doc != nil {
1140-
isSgWrite, crc32Match, _ = db.IsSGWrite(ctx, doc, nil)
1131+
isSgWrite, crc32Match, _ = doc.IsSGWrite(ctx, nil)
11411132
if crc32Match {
11421133
db.dbStats().Database().Crc32MatchCount.Add(1)
11431134
}
@@ -1293,7 +1284,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
12931284

12941285
// Is this doc an sgWrite?
12951286
if doc != nil {
1296-
isSgWrite, crc32Match, _ = db.IsSGWrite(ctx, doc, nil)
1287+
isSgWrite, crc32Match, _ = doc.IsSGWrite(ctx, nil)
12971288
if crc32Match {
12981289
db.dbStats().Database().Crc32MatchCount.Add(1)
12991290
}
@@ -1530,7 +1521,7 @@ func (db *DatabaseCollectionWithUser) PutExistingRevWithConflictResolution(ctx c
15301521

15311522
// Is this doc an sgWrite?
15321523
if doc != nil {
1533-
isSgWrite, crc32Match, _ = db.IsSGWrite(ctx, doc, nil)
1524+
isSgWrite, crc32Match, _ = doc.IsSGWrite(ctx, nil)
15341525
if crc32Match {
15351526
db.dbStats().Database().Crc32MatchCount.Add(1)
15361527
}

db/crud_test.go

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2154,8 +2154,7 @@ func TestIsSGWrite(t *testing.T) {
21542154
db, ctx := setupTestDB(t)
21552155
defer db.Close(ctx)
21562156

2157-
body := []byte(`{"some": "data"}`)
2158-
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
2157+
body := []byte(`{"some":"data"}`)
21592158
testCases := []struct {
21602159
name string
21612160
docBody []byte
@@ -2174,19 +2173,20 @@ func TestIsSGWrite(t *testing.T) {
21742173
doc := createNewTestDocument(t, db, body)
21752174
for _, testCase := range testCases {
21762175
t.Run(testCase.name, func(t *testing.T) {
2177-
isSGWrite, _, _ := collection.IsSGWrite(ctx, doc, testCase.docBody)
2176+
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
21782177
require.True(t, isSGWrite, "Expected doc to be identified as SG write for body %q", string(testCase.docBody))
21792178
})
21802179
}
21812180
})
21822181
t.Run("no HLV", func(t *testing.T) {
2182+
// falls back to body crc32 comparison
21832183
doc := createNewTestDocument(t, db, body)
21842184
doc.HLV = nil
21852185
doc.Cas = 1 // force mismatch cas
21862186
for _, testCase := range testCases {
21872187
t.Run(testCase.name, func(t *testing.T) {
2188-
isSGWrite, _, _ := collection.IsSGWrite(ctx, doc, testCase.docBody)
2189-
require.False(t, isSGWrite, "Expected doc to be identified an SDK write for body %q", string(testCase.docBody))
2188+
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
2189+
require.True(t, isSGWrite, "Expected doc to be identified an SDK write for body %q", string(testCase.docBody))
21902190
})
21912191
}
21922192
})
@@ -2196,8 +2196,8 @@ func TestIsSGWrite(t *testing.T) {
21962196
doc.Cas = 1 // force mismatch cas
21972197
for _, testCase := range testCases {
21982198
t.Run(testCase.name, func(t *testing.T) {
2199-
isSGWrite, _, _ := collection.IsSGWrite(ctx, doc, testCase.docBody)
2200-
require.False(t, isSGWrite, "Expected doc to be identified as SG write for body %q", string(testCase.docBody))
2199+
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
2200+
require.True(t, isSGWrite, "Expected doc to be identified as SG write for body %q", string(testCase.docBody))
22012201
})
22022202
}
22032203
})
@@ -2207,8 +2207,8 @@ func TestIsSGWrite(t *testing.T) {
22072207
doc.Cas = 1 // force mismatch cas
22082208
for _, testCase := range testCases {
22092209
t.Run(testCase.name, func(t *testing.T) {
2210-
isSGWrite, _, _ := collection.IsSGWrite(ctx, doc, testCase.docBody)
2211-
require.False(t, isSGWrite, "Expected doc to be identified as SG write for body %q", string(testCase.docBody))
2210+
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
2211+
require.True(t, isSGWrite, "Expected doc to be identified as SG write for body %q since _sync.rev.ver is empty", string(testCase.docBody))
22122212
})
22132213
}
22142214
})
@@ -2218,21 +2218,21 @@ func TestIsSGWrite(t *testing.T) {
22182218
doc.Cas = 1 // force mismatch cas
22192219
for _, testCase := range testCases {
22202220
t.Run(testCase.name, func(t *testing.T) {
2221-
isSGWrite, _, _ := collection.IsSGWrite(ctx, doc, testCase.docBody)
2222-
require.False(t, isSGWrite, "Expected doc to be identified as SG write for body %q", string(testCase.docBody))
2221+
isSGWrite, _, _ := doc.IsSGWrite(ctx, testCase.docBody)
2222+
require.False(t, isSGWrite, "Expected doc to not be identified as SG write for body %q due to mismatched _sync.rev.ver", string(testCase.docBody))
22232223
})
22242224
}
22252225
})
22262226

22272227
}
22282228

2229-
func TestSyncDataCVOutdated(t *testing.T) {
2229+
func TestSyncDataCVEqual(t *testing.T) {
22302230
testCases := []struct {
22312231
name string
22322232
syncData SyncData
22332233
cvSourceID string
22342234
cvValue uint64
2235-
outdated bool
2235+
cvEqual bool
22362236
}{
22372237
{
22382238
name: "syncData CV matches",
@@ -2244,7 +2244,7 @@ func TestSyncDataCVOutdated(t *testing.T) {
22442244
},
22452245
cvSourceID: "testSourceID",
22462246
cvValue: 1,
2247-
outdated: false,
2247+
cvEqual: false,
22482248
},
22492249
{
22502250
name: "syncData sourceID mismatch",
@@ -2256,7 +2256,7 @@ func TestSyncDataCVOutdated(t *testing.T) {
22562256
},
22572257
cvSourceID: "testSourceID2",
22582258
cvValue: 1,
2259-
outdated: true,
2259+
cvEqual: true,
22602260
},
22612261
{
22622262
name: "syncData sourceID mismatch",
@@ -2268,13 +2268,24 @@ func TestSyncDataCVOutdated(t *testing.T) {
22682268
},
22692269
cvSourceID: "testSourceID",
22702270
cvValue: 1,
2271-
outdated: true,
2271+
cvEqual: true,
2272+
},
2273+
{
2274+
name: "syncData equal",
2275+
syncData: SyncData{
2276+
RevAndVersion: channels.RevAndVersion{
2277+
CurrentSource: "sourceID",
2278+
CurrentVersion: "0x1",
2279+
},
2280+
},
2281+
cvSourceID: "sourceID",
2282+
cvValue: 1,
2283+
cvEqual: true,
22722284
},
22732285
}
22742286
for _, testCase := range testCases {
22752287
t.Run(testCase.name, func(t *testing.T) {
2276-
outdated := testCase.syncData.CVOutdated(testCase.cvSourceID, testCase.cvValue)
2277-
assert.True(t, outdated)
2288+
assert.True(t, testCase.syncData.CVEqual(testCase.cvSourceID, testCase.cvValue))
22782289
})
22792290
}
22802291
}

db/document.go

Lines changed: 20 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -661,39 +661,16 @@ func HasUserXattrChanged(userXattr []byte, prevUserXattrHash string) bool {
661661
return userXattrCrc32cHash(userXattr) != prevUserXattrHash
662662
}
663663

664-
// CVOutdated returns true if the provided CV does not match _sync.rev.ver and _sync.rev.src. The caller is responsible for testing if the values are non-empty.
665-
// This compares _vv.ver and _vv.src to _sync.rev.ver and _sync.rev.src
666-
func (s *SyncData) CVOutdated(cvSourceID string, cvVersion uint64) bool {
664+
// CVEqual returns true if the provided CV does not match _sync.rev.ver and _sync.rev.src. The caller is responsible for testing if the values are non-empty.
665+
func (s *SyncData) CVEqual(cvSourceID string, cvVersion uint64) bool {
667666
if cvSourceID != s.RevAndVersion.CurrentSource {
668667
return true
669668
}
670669
return cvVersion != base.HexCasToUint64(s.RevAndVersion.CurrentVersion)
671670
}
672671

673-
// needsImport checks if a document needs to be imported. Used when Document.IsSGWrite can not be used.
674-
func (s *SyncData) needsImport(ctx context.Context, cas uint64, rawBody []byte, rawUserXattr []byte, rawHLV []byte) (isSGWrite bool, crc32Match bool) {
675-
isSGWrite, crc32Match, _ = s.IsSGWrite(cas, rawBody, rawUserXattr)
676-
if !isSGWrite || rawHLV == nil {
677-
return isSGWrite, crc32Match
678-
}
679-
limitedHLV := struct {
680-
Version string `json:"ver"`
681-
SourceID string `json:"src"`
682-
}{}
683-
if err := base.JSONUnmarshal(rawHLV, &limitedHLV); err != nil {
684-
base.WarnfCtx(ctx, "Unable to unmarshal HLV xattr during SG write check. HLV: %s Error: %v", string(rawHLV), err)
685-
// If we can't unmarshal the HLV, assume it has changed and import
686-
return false, crc32Match
687-
}
688-
hasOutdatedCV := s.CVOutdated(limitedHLV.SourceID, base.HexCasToUint64(limitedHLV.Version))
689-
if hasOutdatedCV {
690-
return false, crc32Match
691-
}
692-
return true, crc32Match
693-
}
694-
695-
// IsSGWrite determines if a document was written by Sync Gateway or via an SDK.
696-
func (s *SyncData) IsSGWrite(cas uint64, rawBody []byte, rawUserXattr []byte) (isSGWrite bool, crc32Match bool, bodyChanged bool) {
672+
// IsSGWrite determines if a document was written by Sync Gateway or via an SDK. CV is an optional parameter to check. This would represent _vv.ver and _vv.src
673+
func (s *SyncData) IsSGWrite(cas uint64, rawBody []byte, rawUserXattr []byte, cv *Version) (isSGWrite bool, crc32Match bool, bodyChanged bool) {
697674

698675
// If cas matches, it was a SG write
699676
if cas == s.GetSyncCas() {
@@ -713,23 +690,16 @@ func (s *SyncData) IsSGWrite(cas uint64, rawBody []byte, rawUserXattr []byte) (i
713690
}
714691

715692
// IsSGWrite - used during on-demand import. Check SyncData and HLV to determine if the document was written by Sync Gateway or by a Couchbase Server SDK write.
716-
func (doc *Document) IsSGWrite(ctx context.Context, cv Version, rawBody []byte) (isSGWrite bool, crc32Match bool, bodyChanged bool) {
693+
func (doc *Document) IsSGWrite(ctx context.Context, rawBody []byte) (isSGWrite bool, crc32Match bool, bodyChanged bool) {
694+
var cv *Version
695+
if doc.RevAndVersion.CurrentSource != "" && doc.RevAndVersion.CurrentVersion != "" && doc.HLV != nil {
696+
sourceID, version := doc.HLV.GetCurrentVersion()
697+
cv = &Version{SourceID: sourceID, Value: version}
698+
}
717699

718700
// If the raw body is available, use SyncData.IsSGWrite
719701
if rawBody != nil && len(rawBody) > 0 {
720-
isSgWriteFeed, crc32MatchFeed, bodyChangedFeed := doc.SyncData.IsSGWrite(doc.Cas, rawBody, doc.rawUserXattr)
721-
if !isSgWriteFeed {
722-
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on cas and body hash. cas:%d syncCas:%d", base.UD(doc.ID), doc.Cas, doc.SyncData.Cas)
723-
return false, crc32MatchFeed, bodyChangedFeed
724-
}
725-
if doc.RevAndVersion.CurrentSource == "" || doc.RevAndVersion.CurrentVersion == "" {
726-
// Doc wasn't a version vector before calling this function, so we can't use _sync.ver / _sync.src to determine if it was an SG write
727-
return true, crc32MatchFeed, bodyChangedFeed
728-
}
729-
if doc.CVOutdated(cv.SourceID, cv.Value) {
730-
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on mismatch between version vector %#v and sync metadata %#v", base.UD(doc.ID), cv, doc.RevAndVersion)
731-
return false, crc32MatchFeed, bodyChangedFeed
732-
}
702+
isSgWriteFeed, crc32MatchFeed, bodyChangedFeed := doc.SyncData.IsSGWrite(doc.Cas, rawBody, doc.rawUserXattr, cv)
733703
return isSgWriteFeed, crc32MatchFeed, bodyChangedFeed
734704
}
735705

@@ -762,9 +732,15 @@ func (doc *Document) IsSGWrite(ctx context.Context, cv Version, rawBody []byte)
762732
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on user xattr hash", base.UD(doc.ID))
763733
return false, true, false
764734
}
765-
if doc.CVOutdated(cv.SourceID, cv.Value) {
766-
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on version vector %#v not matching sync metadata cv %s", base.UD(doc.ID), cv, doc.RevAndVersion.CV())
767-
return false, true, false
735+
if doc.RevAndVersion.CurrentSource == "" || doc.RevAndVersion.CurrentVersion == "" {
736+
return true, true, false
737+
}
738+
739+
if cv != nil {
740+
if !doc.CVEqual(cv.SourceID, cv.Value) {
741+
base.DebugfCtx(ctx, base.KeyCRUD, "Doc %s is not an SG write, based on mismatch between version vector cv %s and sync metadata cv %s", base.UD(doc.ID), doc.HLV.GetCurrentVersionString(), doc.RevAndVersion.CV())
742+
return false, true, false
743+
}
768744
}
769745
return true, true, false
770746
}

db/hybrid_logical_vector.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -979,3 +979,17 @@ func (hlv *HybridLogicalVector) HasRevEncodedCV() bool {
979979
func GetGenerationFromEncodedVersionValue(value uint64) int {
980980
return int((value >> 40) & 0xFFFFFF)
981981
}
982+
983+
// getCurrentVersionFromVVXattr will extract only the current version from a full version vector xattr, used to
984+
// optimize unmarshalling when only the current version is needed.
985+
func getCurrentVersionFromVVXattr(hlvData []byte) (*Version, error) {
986+
limitedHLV := struct {
987+
Version string `json:"ver"`
988+
SourceID string `json:"src"`
989+
}{}
990+
err := base.JSONUnmarshal(hlvData, &limitedHLV)
991+
if err != nil {
992+
return nil, err
993+
}
994+
return &Version{SourceID: limitedHLV.SourceID, Value: base.HexCasToUint64(limitedHLV.Version)}, nil
995+
}

db/import.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid strin
231231
}
232232

233233
// Is this doc an SG Write?
234-
isSgWrite, crc32Match, bodyChanged := db.IsSGWrite(ctx, doc, existingDoc.Body)
234+
isSgWrite, crc32Match, bodyChanged := doc.IsSGWrite(ctx, existingDoc.Body)
235235
if crc32Match {
236236
db.dbStats().Database().Crc32MatchCount.Add(1)
237237
}

db/import_listener.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,16 @@ func (il *importListener) ImportFeedEvent(ctx context.Context, collection *Datab
202202
il.importStats.ImportErrorCount.Add(1)
203203
return
204204
}
205-
isSGWrite, crc32Match = syncData.needsImport(ctx, event.Cas, rawDoc.Body, rawDoc.Xattrs[collection.userXattrKey()], rawDoc.Xattrs[base.VvXattrName])
205+
var cv *Version
206+
vv := rawDoc.Xattrs[base.VvXattrName]
207+
if len(vv) > 0 {
208+
cv, err = getCurrentVersionFromVVXattr(vv)
209+
if err != nil {
210+
base.WarnfCtx(ctx, "Error parsing version xattr for doc %s - not importing. Continuing without doing version vector consistency check between _vv and _sync: %s", base.UD(event.Key), err)
211+
return
212+
}
213+
}
214+
isSGWrite, crc32Match, _ = syncData.IsSGWrite(event.Cas, rawDoc.Body, rawDoc.Xattrs[collection.userXattrKey()], cv)
206215
if crc32Match {
207216
il.dbStats.Crc32MatchCount.Add(1)
208217
}

db/utilities_hlv_testing.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (db *DatabaseCollectionWithUser) CreateDocNoHLV(t testing.TB, ctx context.C
138138

139139
// Is this doc an sgWrite?
140140
if doc != nil {
141-
isSgWrite, crc32Match, _ = db.IsSGWrite(ctx, doc, nil)
141+
isSgWrite, crc32Match, _ = doc.IsSGWrite(ctx, nil)
142142
if crc32Match {
143143
db.dbStats().Database().Crc32MatchCount.Add(1)
144144
}

0 commit comments

Comments
 (0)