Skip to content

Commit

Permalink
Improves/Fixes TableBackedProgressManager, adds
Browse files Browse the repository at this point in the history
- adds method Empty(..) to ChangeConsumer
  (allows for marking CDC log stream time as processed, fixes 'bug' where second start of a reader begins reading CDC log on generated_created timestamp instead of ChangeAgeLimit / actual latest processed state)
- Simplify AdvancedReaderConfig options [PostEmptyQueryDelay, PostFailedQueryDelay] -> single 'PostQueryDelay' which is also considered when calculating next PollWindow
- adds new example 'simple-printer-stateful' which correctly maintains the CDC reader's state using TableBackedProgressManager + NewPeriodicProgressReporter
- adds some commented out 'logging' lines for debugging purposes
  • Loading branch information
hartmut-co-uk committed Dec 14, 2021
1 parent c318151 commit e0b7b88
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 44 deletions.
12 changes: 12 additions & 0 deletions change.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,14 @@ type ChangeConsumer interface {
// If this method returns an error, the library will stop with an error.
Consume(ctx context.Context, change Change) error

// Invoked upon empty results from the CDC log associated with the stream of
// the ChangeConsumer. This method is called to acknowledge a query window
// has been executed against the stream and the CDC log is to be considered
// completed as of 'ackTime' param passed.
//
// If this method returns an error, the library will stop with an error.
Empty(ctx context.Context, ackTime gocql.UUID) error

// Called after all rows from the stream were consumed, and the reader
// is about to switch to a new generation, or stop execution altogether.
//
Expand Down Expand Up @@ -495,6 +503,10 @@ type changeConsumerFuncInstance struct {
f ChangeConsumerFunc
}

func (ccfi *changeConsumerFuncInstance) Empty(ctx context.Context, newTime gocql.UUID) error {
return nil
}

func (ccfi *changeConsumerFuncInstance) End() error {
return nil
}
Expand Down
14 changes: 6 additions & 8 deletions examples/replicator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,11 @@ func main() {
clWrite := parseConsistency(writeConsistency)

adv := scyllacdc.AdvancedReaderConfig{
ConfidenceWindowSize: 30 * time.Second,
ChangeAgeLimit: 10 * time.Minute,
QueryTimeWindowSize: 60 * time.Second,
PostEmptyQueryDelay: 30 * time.Second,
PostNonEmptyQueryDelay: 10 * time.Second,
PostFailedQueryDelay: 1 * time.Second,
ConfidenceWindowSize: 30 * time.Second,
ChangeAgeLimit: 10 * time.Minute,
QueryTimeWindowSize: 60 * time.Second,
PostQueryDelay: 10 * time.Second,
PostFailedQueryDelay: 1 * time.Second,
}

fmt.Println("Parameters:")
Expand All @@ -71,8 +70,7 @@ func main() {
fmt.Printf(" Confidence window size: %s\n", adv.ConfidenceWindowSize)
fmt.Printf(" Change age limit: %s\n", adv.ChangeAgeLimit)
fmt.Printf(" Query window size: %s\n", adv.QueryTimeWindowSize)
fmt.Printf(" Delay after poll with empty results: %s\n", adv.PostEmptyQueryDelay)
fmt.Printf(" Delay after poll with non-empty results: %s\n", adv.PostNonEmptyQueryDelay)
fmt.Printf(" Delay after poll with non-empty results: %s\n", adv.PostQueryDelay)
fmt.Printf(" Delay after failed poll: %s\n", adv.PostFailedQueryDelay)

var fullyQualifiedTables []string
Expand Down
11 changes: 5 additions & 6 deletions examples/replicator/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,11 @@ func TestReplicator(t *testing.T) {
t.Log("running replicators")

adv := scyllacdc.AdvancedReaderConfig{
ChangeAgeLimit: time.Minute,
PostNonEmptyQueryDelay: 3 * time.Second,
PostEmptyQueryDelay: 3 * time.Second,
PostFailedQueryDelay: 3 * time.Second,
QueryTimeWindowSize: 5 * time.Minute,
ConfidenceWindowSize: time.Millisecond,
ChangeAgeLimit: time.Minute,
PostQueryDelay: 3 * time.Second,
PostFailedQueryDelay: 3 * time.Second,
QueryTimeWindowSize: 5 * time.Minute,
ConfidenceWindowSize: time.Millisecond,
}

schemaNames := make([]string, 0)
Expand Down
139 changes: 139 additions & 0 deletions examples/simple-printer-stateful/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package main

import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/gocql/gocql"
scyllacdc "github.com/scylladb/scylla-cdc-go"
)

// Make sure you create the following table before you run this example:
// CREATE TABLE ks.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH cdc = {'enabled': 'true'};

func main() {
cluster := gocql.NewCluster("127.0.0.1")
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
session, err := cluster.CreateSession()
if err != nil {
log.Fatal(err)
return
}
defer session.Close()

var progressManager scyllacdc.ProgressManager
progressManager, err = scyllacdc.NewTableBackedProgressManager(session, "ks.cdc_progress", "cdc-replicator")
if err != nil {
log.Fatal(err)
return
}

logger := log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile)

cfg := &scyllacdc.ReaderConfig{
Session: session,
TableNames: []string{"ks.tbl"},
ChangeConsumerFactory: &myFactory{
logger: logger,
progressReporterInterval: 30 * time.Second,
},
Logger: logger,
Advanced: scyllacdc.AdvancedReaderConfig{
ChangeAgeLimit: 15 * time.Minute,
ConfidenceWindowSize: 10 * time.Second,
PostQueryDelay: 5 * time.Second,
PostFailedQueryDelay: 5 * time.Second,
QueryTimeWindowSize: 5 * 60 * time.Second,
},
ProgressManager: progressManager,
}

reader, err := scyllacdc.NewReader(context.Background(), cfg)
if err != nil {
log.Fatal(err)
}

if err := reader.Run(context.Background()); err != nil {
log.Fatal(err)
}
}

func consumeChange(ctx context.Context, tableName string, c scyllacdc.Change) error {
for _, changeRow := range c.Delta {
pkRaw, _ := changeRow.GetValue("pk")
ckRaw, _ := changeRow.GetValue("ck")
v := changeRow.GetAtomicChange("v")

pk := pkRaw.(*int)
ck := ckRaw.(*int)

fmt.Printf("[%s] Operation: %s, pk: %s, ck: %s\n", tableName, changeRow.GetOperation(),
nullableIntToStr(pk), nullableIntToStr(ck))

if v.IsDeleted {
fmt.Printf(" Column v was set to null/deleted\n")
} else {
vInt := v.Value.(*int)
if vInt != nil {
fmt.Printf(" Column v was set to %d\n", *vInt)
} else {
fmt.Print(" Column v was not changed\n")
}
}
}

return nil
}

func nullableIntToStr(i *int) string {
if i == nil {
return "null"
}
return fmt.Sprintf("%d", *i)
}

type myConsumer struct {
// PeriodicProgressReporter is a wrapper around ProgressReporter
// which rate-limits saving the progress
reporter *scyllacdc.PeriodicProgressReporter
logger scyllacdc.Logger
f scyllacdc.ChangeConsumerFunc
tableName string
}

func (mc *myConsumer) Consume(ctx context.Context, change scyllacdc.Change) error {
// ... do work ...
mc.logger.Printf("myConsumer.Consume...\n")
err := mc.f(ctx, mc.tableName, change)
if err != nil {
return err
}

mc.reporter.Update(change.Time)
return nil
}

func (mc *myConsumer) Empty(ctx context.Context, newTime gocql.UUID) error {
mc.reporter.Update(newTime)
return nil
}

func (mc *myConsumer) End() error {
_ = mc.reporter.SaveAndStop(context.Background())
return nil
}

type myFactory struct {
logger scyllacdc.Logger
progressReporterInterval time.Duration
}

func (f *myFactory) CreateChangeConsumer(ctx context.Context, input scyllacdc.CreateChangeConsumerInput) (scyllacdc.ChangeConsumer, error) {
f.logger.Printf("myFactory.CreateChangeConsumer %s, %s\n", input.TableName, input.StreamID)
reporter := scyllacdc.NewPeriodicProgressReporter(f.logger, f.progressReporterInterval, input.ProgressReporter)
reporter.Start(ctx)
return &myConsumer{reporter, f.logger, consumeChange, input.TableName}, nil
}
17 changes: 14 additions & 3 deletions examples/simple-printer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"os"
"time"

"github.com/gocql/gocql"
scyllacdc "github.com/scylladb/scylla-cdc-go"
Expand All @@ -15,18 +16,28 @@ import (

func main() {
cluster := gocql.NewCluster("127.0.0.1")
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy("local-dc"))
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
session, err := cluster.CreateSession()
if err != nil {
log.Fatal(err)
return
}
defer session.Close()

logger := log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile)

cfg := &scyllacdc.ReaderConfig{
Session: session,
TableNames: []string{"ks.tbl"},
ChangeConsumerFactory: changeConsumerFactory,
Logger: log.New(os.Stderr, "", log.Ldate|log.Lshortfile),
Logger: logger,
Advanced: scyllacdc.AdvancedReaderConfig{
ChangeAgeLimit: 15 * time.Minute,
ConfidenceWindowSize: 10 * time.Second,
PostQueryDelay: 5 * time.Second,
PostFailedQueryDelay: 5 * time.Second,
QueryTimeWindowSize: 5 * 60 * time.Second,
},
}

reader, err := scyllacdc.NewReader(context.Background(), cfg)
Expand All @@ -48,7 +59,7 @@ func consumeChange(ctx context.Context, tableName string, c scyllacdc.Change) er
pk := pkRaw.(*int)
ck := ckRaw.(*int)

fmt.Printf("Operation: %s, pk: %s, ck: %s\n", changeRow.GetOperation(),
fmt.Printf("[%s] Operation: %s, pk: %s, ck: %s\n", tableName, changeRow.GetOperation(),
nullableIntToStr(pk), nullableIntToStr(ck))

if v.IsDeleted {
Expand Down
1 change: 1 addition & 0 deletions progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (tbpm *TableBackedProgressManager) SaveProgress(ctx context.Context, gen ti
tbpm.concurrentQueryLimiter.Acquire(ctx, 1)
defer tbpm.concurrentQueryLimiter.Release(1)

//log.Printf("SaveProgress for %s = %s\n", streamID, progress.LastProcessedRecordTime)
return tbpm.session.Query(
fmt.Sprintf("INSERT INTO %s (generation, application_name, table_name, stream_id, last_timestamp) VALUES (?, ?, ?, ?, ?) USING TTL ?", tbpm.progressTableName),
gen, tbpm.applicationName, tableName, streamID, progress.LastProcessedRecordTime, tbpm.ttl,
Expand Down
21 changes: 5 additions & 16 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,13 @@ type AdvancedReaderConfig struct {

// The library uses select statements to fetch changes from CDC Log tables.
// Each select fetches changes from a single table and fetches only changes
// from a limited set of CDC streams. If such select returns one or more
// changes then next select to this table and set of CDC streams will be
// issued after a delay. This parameter specifies the length of the delay.
//
// If the parameter is left as 0, the library will automatically adjust
// the length of the delay.
PostNonEmptyQueryDelay time.Duration

// The library uses select statements to fetch changes from CDC Log tables.
// Each select fetches changes from a single table and fetches only changes
// from a limited set of CDC streams. If such select returns no changes then
// next select to this table and set of CDC streams will be issued after
// from a limited set of CDC streams. The subsequent select after query
// execution to this table and set of CDC streams will be issued after
// a delay. This parameter specifies the length of the delay.
//
// If the parameter is left as 0, the library will automatically adjust
// the length of the delay.
PostEmptyQueryDelay time.Duration
PostQueryDelay time.Duration

// If the library tries to read from the CDC log and the read operation
// fails, it will wait some time before attempting to read again. This
Expand Down Expand Up @@ -143,8 +133,7 @@ func (arc *AdvancedReaderConfig) setDefaults() {
}
setIfZero(&arc.ConfidenceWindowSize, 30*time.Second)

setIfZero(&arc.PostNonEmptyQueryDelay, 10*time.Second)
setIfZero(&arc.PostEmptyQueryDelay, 30*time.Second)
setIfZero(&arc.PostQueryDelay, 10*time.Second)
setIfZero(&arc.PostFailedQueryDelay, 1*time.Second)

setIfZero(&arc.QueryTimeWindowSize, 30*time.Second)
Expand Down Expand Up @@ -268,7 +257,7 @@ func (r *Reader) Run(ctx context.Context) error {
}
}

sleepAmount := r.config.Advanced.PostNonEmptyQueryDelay / time.Duration(len(readers))
sleepAmount := r.config.Advanced.PostQueryDelay / time.Duration(len(readers))
for i := range readers {
reader := readers[i]
select {
Expand Down
16 changes: 11 additions & 5 deletions stream_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ outer:

if compareTimeuuid(wnd.begin, wnd.end) < 0 {
var iter *changeRowIterator
//sbr.config.Logger.Printf("queryRange: %s.%s :: %s (%s) [%s ... %s]",
// crq.keyspaceName, crq.tableName, crq.pkCondition, crq.bindArgs[0], wnd.begin.Time(), wnd.end.Time())
iter, err = crq.queryRange(wnd.begin, wnd.end)
if err != nil {
sbr.config.Logger.Printf("error while sending a query (will retry): %s", err)
Expand All @@ -112,6 +114,12 @@ outer:
return consumerErr
}
hadRows = rowCount > 0

if !hadRows {
for _, c := range sbr.consumers {
err = c.Empty(ctx, wnd.end)
}
}
}

if err == nil {
Expand All @@ -126,10 +134,8 @@ outer:
var delay time.Duration
if err != nil {
delay = sbr.config.Advanced.PostFailedQueryDelay
} else if hadRows {
delay = sbr.config.Advanced.PostNonEmptyQueryDelay
} else {
delay = sbr.config.Advanced.PostEmptyQueryDelay
delay = sbr.config.Advanced.PostQueryDelay
}

delayUntil := windowProcessingStartTime.Add(delay)
Expand Down Expand Up @@ -203,14 +209,14 @@ func (sbr *streamBatchReader) getPollWindow() pollWindow {
if queryWindowRightEnd.Before(confidenceWindowStart) {
return pollWindow{
begin: windowStart,
end: gocql.MinTimeUUID(queryWindowRightEnd),
end: gocql.MinTimeUUID(queryWindowRightEnd.Add(sbr.config.Advanced.PostQueryDelay)),

touchesConfidenceWindow: false,
}
}
return pollWindow{
begin: windowStart,
end: gocql.MinTimeUUID(confidenceWindowStart),
end: gocql.MinTimeUUID(confidenceWindowStart.Add(sbr.config.Advanced.PostQueryDelay)),

touchesConfidenceWindow: true,
}
Expand Down
11 changes: 5 additions & 6 deletions types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,11 @@ func TestTypes(t *testing.T) {
})

adv := AdvancedReaderConfig{
ChangeAgeLimit: time.Minute,
PostNonEmptyQueryDelay: 3 * time.Second,
PostEmptyQueryDelay: 3 * time.Second,
PostFailedQueryDelay: 3 * time.Second,
QueryTimeWindowSize: 5 * time.Minute,
ConfidenceWindowSize: time.Millisecond,
ChangeAgeLimit: time.Minute,
PostQueryDelay: 3 * time.Second,
PostFailedQueryDelay: 3 * time.Second,
QueryTimeWindowSize: 5 * time.Minute,
ConfidenceWindowSize: time.Millisecond,
}

// Configure a session
Expand Down
1 change: 1 addition & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (ppr *PeriodicProgressReporter) Start(ctx context.Context) {
ppr.mu.Unlock()

// TODO: Log errors?
//ppr.logger.Printf("MarkProgress for %s: %s", ppr.reporter.streamID, timeToReport.Time())
err := ppr.reporter.MarkProgress(ctx, Progress{timeToReport})
if err != nil {
ppr.logger.Printf("failed to save progress for %s: %s", ppr.reporter.streamID, err)
Expand Down

0 comments on commit e0b7b88

Please sign in to comment.