Skip to content

Commit

Permalink
Merge pull request #3 from scylladb/scylla-4.4-tables-support
Browse files Browse the repository at this point in the history
Support for Scylla 4.4 new generation tables format
  • Loading branch information
piodul authored Mar 23, 2021
2 parents 57ca843 + 2d0dbfb commit 0bde107
Showing 1 changed file with 190 additions and 21 deletions.
211 changes: 190 additions & 21 deletions topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@ import (
"context"
"encoding/hex"
"errors"
"fmt"
"sort"
"strings"
"time"

"github.com/gocql/gocql"
)

var (
ErrNoGenerationsPresent = errors.New("there are no generations present")
ErrNoGenerationsPresent = errors.New("there are no generations present")
ErrNoSupportedGenerationTablesPresent = errors.New("no supported generation tables are present")
)

const (
generationsTableName = "system_distributed.cdc_streams_descriptions"
generationsTableNamePre4_4 = "system_distributed.cdc_streams_descriptions"

timestampsTableSince4_4 = "system_distributed.cdc_generation_timestamps"
streamsTableSince4_4 = "system_distributed.cdc_streams_descriptions_v2"

// TODO: Switch to a model which reacts to cluster state changes
// and forces a refresh when all worker goroutines did not report any
Expand Down Expand Up @@ -60,13 +66,20 @@ type generationFetcher struct {
generationCh chan *generation
refreshCh chan struct{}
stopCh chan struct{}

source generationSource
}

func newGenerationFetcher(
session *gocql.Session,
startFrom time.Time,
logger Logger,
) (*generationFetcher, error) {
source, err := chooseGenerationSource(session, logger)
if err != nil {
return nil, fmt.Errorf("failed to detect version of the generation tables used by the cluster: %v", err)
}

gf := &generationFetcher{
session: session,
lastTime: startFrom,
Expand All @@ -75,10 +88,48 @@ func newGenerationFetcher(
generationCh: make(chan *generation, 1),
stopCh: make(chan struct{}),
refreshCh: make(chan struct{}, 1),

source: source,
}
return gf, nil
}

func chooseGenerationSource(session *gocql.Session, logger Logger) (generationSource, error) {
hasPre4_4, err := isTableInSchema(session, generationsTableNamePre4_4)
if err != nil {
return nil, err
}
hasPost4_4, err := isTableInSchema(session, streamsTableSince4_4)
if err != nil {
return nil, err
}

if !hasPost4_4 && !hasPre4_4 {
// There are no tables we know how to use - return an error
return nil, ErrNoSupportedGenerationTablesPresent
}

if hasPost4_4 && !hasPre4_4 {
// There is only 4.4+ table, we can immediately start
// using the new table
return &generationSourceSince4_4{
session: session,
logger: logger,
}, nil
}

// If we are here, then the pre-4.4 table is there for sure
// If there is no 4.4+ table - we will start using it right away
// If there is a 4.4+ table - the maybeUpgrade function
// will take care of switching to the new table, but only after
// generation rewriting completes

return &generationSourcePre4_4{
session: session,
logger: logger,
}, nil
}

func (gf *generationFetcher) Run(ctx context.Context) error {
l := gf.logger

Expand Down Expand Up @@ -124,22 +175,28 @@ func (gf *generationFetcher) tryFetchGenerations() {
}

consistency := gocql.One
if size == 2 {
if size >= 2 {
consistency = gocql.Quorum
} else if size >= 3 {
consistency = gocql.All
}

// Try switching to a new format before fetching any generations
newSource, err := gf.source.maybeUpgrade()
if err != nil {
gf.logger.Printf("an error occurred while trying to switch to new generations format: %s", err)
} else {
gf.source = newSource
}

// Fetch some generation times
times, err := gf.getGenerationTimes(consistency)
times, err := gf.source.getGenerationTimes(consistency)
if err != nil {
gf.logger.Printf("an error occured while fetching generation times: %s", err)
return
}
sort.Sort(timeList(times))

fetchAndPush := func(t time.Time) (shouldBreak bool) {
streams, err := gf.getGeneration(t, consistency)
streams, err := gf.source.getGeneration(t, consistency)
if err != nil {
gf.logger.Printf("an error occured while fetching generation streams for %s: %s", t, err)
return true
Expand Down Expand Up @@ -222,9 +279,32 @@ func (gf *generationFetcher) pushGeneration(gen *generation) (shouldStop bool) {
}
}

func (gf *generationFetcher) getGeneration(genTime time.Time, consistency gocql.Consistency) ([]StreamID, error) {
// Unfortunately, gocql does not expose information about the cluster,
// therefore we need to poll system.peers manually
func (gf *generationFetcher) getClusterSize() (int, error) {
var size int
err := gf.session.Query("SELECT COUNT(*) FROM system.peers").Scan(&size)
if err != nil {
return 0, err
}
return size + 1, nil
}

type generationSource interface {
getGeneration(genTime time.Time, consistency gocql.Consistency) ([]StreamID, error)
getGenerationTimes(consistency gocql.Consistency) ([]time.Time, error)

maybeUpgrade() (generationSource, error)
}

type generationSourcePre4_4 struct {
session *gocql.Session
logger Logger
}

func (gs *generationSourcePre4_4) getGeneration(genTime time.Time, consistency gocql.Consistency) ([]StreamID, error) {
var streams []StreamID
err := gf.session.Query("SELECT streams FROM "+generationsTableName+" WHERE time = ?", genTime).
err := gs.session.Query("SELECT streams FROM "+generationsTableNamePre4_4+" WHERE time = ?", genTime).
Consistency(consistency).
Scan(&streams)
if err != nil {
Expand All @@ -233,8 +313,8 @@ func (gf *generationFetcher) getGeneration(genTime time.Time, consistency gocql.
return streams, err
}

func (gf *generationFetcher) getGenerationTimes(consistency gocql.Consistency) ([]time.Time, error) {
iter := gf.session.Query("SELECT time FROM " + generationsTableName).
func (gs *generationSourcePre4_4) getGenerationTimes(consistency gocql.Consistency) ([]time.Time, error) {
iter := gs.session.Query("SELECT time FROM " + generationsTableNamePre4_4).
Consistency(consistency).
Iter()
var (
Expand All @@ -250,18 +330,107 @@ func (gf *generationFetcher) getGenerationTimes(consistency gocql.Consistency) (
return times, nil
}

// Unfortunately, gocql does not expose information about the cluster,
// therefore we need to poll system.peers manually
func (gf *generationFetcher) getClusterSize() (int, error) {
var size int
err := gf.session.Query("SELECT COUNT(*) FROM system.peers").Scan(&size)
// Follows the migration procedure from Scylla's documentation
// https://docs.scylladb.com/using-scylla/cdc/cdc-querying-streams/
func (gs *generationSourcePre4_4) maybeUpgrade() (generationSource, error) {
// Check if table is present
hasNewStreamsTable, err := isTableInSchema(gs.session, streamsTableSince4_4)
if err != nil {
return 0, err
return gs, err
}
return size + 1, nil
if !hasNewStreamsTable {
// Don't upgrade, the new table is not there yet
return gs, nil
}

// Was the migration completed?
data := make(map[string]interface{})
err = gs.session.Query("SELECT streams_timestamp FROM system.cdc_local WHERE key = 'rewritten'").
MapScan(data)

if err == gocql.ErrNotFound {
// The "rewritten" row is not present yet, this means that the generations
// weren't rewritten yet
// Try again later
return gs, nil
}

if err != nil {
// Some other error
return gs, err
}

newGs := &generationSourceSince4_4{
session: gs.session,
logger: gs.logger,
}

return newGs.maybeUpgrade()
}

type generationSourceSince4_4 struct {
session *gocql.Session
logger Logger
}

func (gs *generationSourceSince4_4) getGeneration(genTime time.Time, consistency gocql.Consistency) ([]StreamID, error) {
var streams []StreamID
iter := gs.session.Query("SELECT streams FROM "+streamsTableSince4_4+" WHERE time = ?", genTime).
Consistency(consistency).
Iter()

var vnodeStreams []StreamID
for iter.Scan(&vnodeStreams) {
streams = append(streams, vnodeStreams...)
}

if err := iter.Close(); err != nil {
return nil, err
}
return streams, nil
}

// Finds a name of a supported table for fetching cdc streams
func getGenerationsTableName(session *gocql.Session) (string, error) {
return "system_distributed.cdc_streams_descriptions", nil
func (gs *generationSourceSince4_4) getGenerationTimes(consistency gocql.Consistency) ([]time.Time, error) {
iter := gs.session.Query("SELECT time FROM " + timestampsTableSince4_4 + " WHERE key = 'timestamps'").
Consistency(consistency).
Iter()
var (
times []time.Time
currTime time.Time
)
for iter.Scan(&currTime) {
times = append(times, currTime)
}
if err := iter.Close(); err != nil {
return nil, err
}
return times, nil
}

func (gs *generationSourceSince4_4) maybeUpgrade() (generationSource, error) {
// No newer format is known
return gs, nil
}

// Takes a fully-qualified name of a table and returns if a table of given name
// is in the schema.
// Panics if the table name is not qualified, i.e. it does not contain a dot.
func isTableInSchema(session *gocql.Session, tableName string) (bool, error) {
decomposed := strings.SplitN(tableName, ".", 2)
if len(decomposed) < 2 {
panic("unqualified table name passed to inTableInSchema")
}

keyspace := decomposed[0]
table := decomposed[1]

meta, err := session.KeyspaceMetadata(keyspace)
if err == gocql.ErrKeyspaceDoesNotExist {
return false, nil
} else if err != nil {
return false, err
}

_, ok := meta.Tables[table]
return ok, nil
}

0 comments on commit 0bde107

Please sign in to comment.