Skip to content

Commit cd998b1

Browse files
committed
CBG-4864 get maxcas for each vbucket
1 parent e67f6dc commit cd998b1

File tree

3 files changed

+52
-0
lines changed

3 files changed

+52
-0
lines changed

base/bucket.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type CouchbaseBucketStore interface {
6060
HttpClient(context.Context) *http.Client
6161
GetSpec() BucketSpec
6262
GetMaxVbno() (uint16, error)
63+
GetCCVStartingCas(context.Context) (map[uint16]uint64, error)
6364

6465
// GetStatsVbSeqno retrieves the high sequence number for all vbuckets and returns
6566
// a map of UUIDS and a map of high sequence numbers (map from vbno -> seq)
@@ -575,3 +576,5 @@ func GetNewDatabaseSleeperFunc() RetrySleeper {
575576
5, // InitialRetrySleepTimeMS
576577
)
577578
}
579+
580+
var _ CouchbaseBucketStore = &GocbV2Bucket{}

base/bucket_gocb_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2632,3 +2632,15 @@ func TestWriteUpdateWithXattrsDocumentTombstone(t *testing.T) {
26322632
require.JSONEq(t, string(xattrModifiedBody), string(xattrs[xattr1Key]))
26332633
require.NotContains(t, xattrs, xattr2Key)
26342634
}
2635+
2636+
func TestGetCCVStartingCAS(t *testing.T) {
2637+
ctx := TestCtx(t)
2638+
bucket := GetTestBucket(t)
2639+
defer bucket.Close(ctx)
2640+
2641+
cbStore, ok := AsCouchbaseBucketStore(bucket)
2642+
require.True(t, ok)
2643+
cas, err := cbStore.GetCCVStartingCas(ctx)
2644+
require.NoError(t, err)
2645+
require.NotZero(t, cas)
2646+
}

base/collection.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"io"
2121
"net/http"
22+
"strconv"
2223
"strings"
2324
"sync"
2425
"time"
@@ -352,6 +353,42 @@ func (b *GocbV2Bucket) GetMaxVbno() (uint16, error) {
352353
return uint16(vbNo), nil
353354
}
354355

356+
func (b *GocbV2Bucket) GetCCVStartingCas(ctx context.Context) (map[uint16]uint64, error) {
357+
uri := "/pools/default/buckets/" + b.GetName()
358+
output, status, err := b.MgmtRequest(ctx, http.MethodGet, uri, "application/json", nil)
359+
if err != nil {
360+
return nil, fmt.Errorf("error executing query for vbucket CAS, status code: %d error: %v output: %s", status, err, string(output))
361+
}
362+
if status != http.StatusOK {
363+
return nil, fmt.Errorf("error executing query for vbucket CAS, status code: %d output: %s", status, string(output))
364+
}
365+
var bucketInfo struct {
366+
EnableCrossClusterVersioning bool `json:"enableCrossClusterVersioning"`
367+
MaxCas []string `json:"vBucketsMaxCas"`
368+
}
369+
err = JSONUnmarshal(output, &bucketInfo)
370+
if err != nil {
371+
return nil, fmt.Errorf("error parsing output from bucket %q error:%v", string(output), err)
372+
}
373+
374+
numVBuckets, err := b.GetMaxVbno()
375+
if err != nil {
376+
return nil, fmt.Errorf("error getting vbucket count: %v", err)
377+
}
378+
if len(bucketInfo.MaxCas) != int(numVBuckets) {
379+
return nil, fmt.Errorf("error getting vbucket CAS, expected %d vbucket CAS values, got %q", numVBuckets, bucketInfo.MaxCas)
380+
}
381+
highCas := make(map[uint16]uint64, len(bucketInfo.MaxCas))
382+
for i, casStr := range bucketInfo.MaxCas {
383+
cas, err := strconv.ParseUint(casStr, 10, 64)
384+
if err != nil {
385+
return nil, fmt.Errorf("error parsing vbucket CAS value %q for vbucket %d: %v", casStr, i, err)
386+
}
387+
highCas[uint16(i)] = cas
388+
}
389+
return highCas, nil
390+
}
391+
355392
func (b *GocbV2Bucket) getConfigSnapshot() (*gocbcore.ConfigSnapshot, error) {
356393
agent, err := b.GetGoCBAgent()
357394
if err != nil {

0 commit comments

Comments
 (0)