Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improves/Fixes TableBackedProgressManager #10

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions change.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,14 @@ type ChangeConsumer interface {
// If this method returns an error, the library will stop with an error.
Consume(ctx context.Context, change Change) error

// Invoked upon empty results from the CDC log associated with the stream of
// the ChangeConsumer. This method is called to acknowledge a query window
// has been executed against the stream and the CDC log is to be considered
// completed as of 'ackTime' param passed.
//
// If this method returns an error, the library will stop with an error.
Empty(ctx context.Context, ackTime gocql.UUID) error

// Called after all rows from the stream were consumed, and the reader
// is about to switch to a new generation, or stop execution altogether.
//
Expand Down Expand Up @@ -495,6 +503,10 @@ type changeConsumerFuncInstance struct {
f ChangeConsumerFunc
}

func (ccfi *changeConsumerFuncInstance) Empty(ctx context.Context, newTime gocql.UUID) error {
return nil
}

func (ccfi *changeConsumerFuncInstance) End() error {
return nil
}
Expand Down
14 changes: 6 additions & 8 deletions examples/replicator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,11 @@ func main() {
clWrite := parseConsistency(writeConsistency)

adv := scyllacdc.AdvancedReaderConfig{
ConfidenceWindowSize: 30 * time.Second,
ChangeAgeLimit: 10 * time.Minute,
QueryTimeWindowSize: 60 * time.Second,
PostEmptyQueryDelay: 30 * time.Second,
PostNonEmptyQueryDelay: 10 * time.Second,
PostFailedQueryDelay: 1 * time.Second,
ConfidenceWindowSize: 30 * time.Second,
ChangeAgeLimit: 10 * time.Minute,
QueryTimeWindowSize: 60 * time.Second,
PostQueryDelay: 10 * time.Second,
PostFailedQueryDelay: 1 * time.Second,
}

fmt.Println("Parameters:")
Expand All @@ -71,8 +70,7 @@ func main() {
fmt.Printf(" Confidence window size: %s\n", adv.ConfidenceWindowSize)
fmt.Printf(" Change age limit: %s\n", adv.ChangeAgeLimit)
fmt.Printf(" Query window size: %s\n", adv.QueryTimeWindowSize)
fmt.Printf(" Delay after poll with empty results: %s\n", adv.PostEmptyQueryDelay)
fmt.Printf(" Delay after poll with non-empty results: %s\n", adv.PostNonEmptyQueryDelay)
fmt.Printf(" Delay after poll with non-empty results: %s\n", adv.PostQueryDelay)
fmt.Printf(" Delay after failed poll: %s\n", adv.PostFailedQueryDelay)

var fullyQualifiedTables []string
Expand Down
11 changes: 5 additions & 6 deletions examples/replicator/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,11 @@ func TestReplicator(t *testing.T) {
t.Log("running replicators")

adv := scyllacdc.AdvancedReaderConfig{
ChangeAgeLimit: time.Minute,
PostNonEmptyQueryDelay: 3 * time.Second,
PostEmptyQueryDelay: 3 * time.Second,
PostFailedQueryDelay: 3 * time.Second,
QueryTimeWindowSize: 5 * time.Minute,
ConfidenceWindowSize: time.Millisecond,
ChangeAgeLimit: time.Minute,
PostQueryDelay: 3 * time.Second,
PostFailedQueryDelay: 3 * time.Second,
QueryTimeWindowSize: 5 * time.Minute,
ConfidenceWindowSize: time.Millisecond,
}

schemaNames := make([]string, 0)
Expand Down
139 changes: 139 additions & 0 deletions examples/simple-printer-stateful/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package main

import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/gocql/gocql"
scyllacdc "github.com/scylladb/scylla-cdc-go"
)

// Make sure you create the following table before you run this example:
// CREATE TABLE ks.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH cdc = {'enabled': 'true'};

func main() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what are the differences between simple-printer-stateful and complicated-consumer. Is there a good reason why you kept them separate? Perhaps the complicated-consumer could be improved in some way?

Copy link
Contributor Author

@hartmut-co-uk hartmut-co-uk Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complicated-consumer

What do you mean by 'complicated-consumer'?

I think this simply was the example I've put together and was working with while debugging and attempting to fix the behaviour. So I've added it for reference.

Please feel free to keep, improve or drop. :-)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, "complicated-consumer" (which is actually called "complicated-printer") was an example that I apparently forgot to push. It's fine then, we can take your example instead.

cluster := gocql.NewCluster("127.0.0.1")
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
session, err := cluster.CreateSession()
if err != nil {
log.Fatal(err)
return
}
defer session.Close()

var progressManager scyllacdc.ProgressManager
progressManager, err = scyllacdc.NewTableBackedProgressManager(session, "ks.cdc_progress", "cdc-replicator")
if err != nil {
log.Fatal(err)
return
}

logger := log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile)

cfg := &scyllacdc.ReaderConfig{
Session: session,
TableNames: []string{"ks.tbl"},
ChangeConsumerFactory: &myFactory{
logger: logger,
progressReporterInterval: 30 * time.Second,
},
Logger: logger,
Advanced: scyllacdc.AdvancedReaderConfig{
ChangeAgeLimit: 15 * time.Minute,
ConfidenceWindowSize: 10 * time.Second,
PostQueryDelay: 5 * time.Second,
PostFailedQueryDelay: 5 * time.Second,
QueryTimeWindowSize: 5 * 60 * time.Second,
},
ProgressManager: progressManager,
}

reader, err := scyllacdc.NewReader(context.Background(), cfg)
if err != nil {
log.Fatal(err)
}

if err := reader.Run(context.Background()); err != nil {
log.Fatal(err)
}
}

func consumeChange(ctx context.Context, tableName string, c scyllacdc.Change) error {
for _, changeRow := range c.Delta {
pkRaw, _ := changeRow.GetValue("pk")
ckRaw, _ := changeRow.GetValue("ck")
v := changeRow.GetAtomicChange("v")

pk := pkRaw.(*int)
ck := ckRaw.(*int)

fmt.Printf("[%s] Operation: %s, pk: %s, ck: %s\n", tableName, changeRow.GetOperation(),
nullableIntToStr(pk), nullableIntToStr(ck))

if v.IsDeleted {
fmt.Printf(" Column v was set to null/deleted\n")
} else {
vInt := v.Value.(*int)
if vInt != nil {
fmt.Printf(" Column v was set to %d\n", *vInt)
} else {
fmt.Print(" Column v was not changed\n")
}
}
}

return nil
}

func nullableIntToStr(i *int) string {
if i == nil {
return "null"
}
return fmt.Sprintf("%d", *i)
}

type myConsumer struct {
// PeriodicProgressReporter is a wrapper around ProgressReporter
// which rate-limits saving the progress
reporter *scyllacdc.PeriodicProgressReporter
logger scyllacdc.Logger
f scyllacdc.ChangeConsumerFunc
tableName string
}

func (mc *myConsumer) Consume(ctx context.Context, change scyllacdc.Change) error {
// ... do work ...
mc.logger.Printf("myConsumer.Consume...\n")
err := mc.f(ctx, mc.tableName, change)
if err != nil {
return err
}

mc.reporter.Update(change.Time)
return nil
}

func (mc *myConsumer) Empty(ctx context.Context, newTime gocql.UUID) error {
mc.reporter.Update(newTime)
return nil
}

func (mc *myConsumer) End() error {
_ = mc.reporter.SaveAndStop(context.Background())
return nil
}

type myFactory struct {
logger scyllacdc.Logger
progressReporterInterval time.Duration
}

func (f *myFactory) CreateChangeConsumer(ctx context.Context, input scyllacdc.CreateChangeConsumerInput) (scyllacdc.ChangeConsumer, error) {
f.logger.Printf("myFactory.CreateChangeConsumer %s, %s\n", input.TableName, input.StreamID)
reporter := scyllacdc.NewPeriodicProgressReporter(f.logger, f.progressReporterInterval, input.ProgressReporter)
reporter.Start(ctx)
return &myConsumer{reporter, f.logger, consumeChange, input.TableName}, nil
}
17 changes: 14 additions & 3 deletions examples/simple-printer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"os"
"time"

"github.com/gocql/gocql"
scyllacdc "github.com/scylladb/scylla-cdc-go"
Expand All @@ -15,18 +16,28 @@ import (

func main() {
cluster := gocql.NewCluster("127.0.0.1")
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy("local-dc"))
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
session, err := cluster.CreateSession()
if err != nil {
log.Fatal(err)
return
}
defer session.Close()

logger := log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile)

cfg := &scyllacdc.ReaderConfig{
Session: session,
TableNames: []string{"ks.tbl"},
ChangeConsumerFactory: changeConsumerFactory,
Logger: log.New(os.Stderr, "", log.Ldate|log.Lshortfile),
Logger: logger,
Advanced: scyllacdc.AdvancedReaderConfig{
ChangeAgeLimit: 15 * time.Minute,
ConfidenceWindowSize: 10 * time.Second,
PostQueryDelay: 5 * time.Second,
PostFailedQueryDelay: 5 * time.Second,
QueryTimeWindowSize: 5 * 60 * time.Second,
},
}

reader, err := scyllacdc.NewReader(context.Background(), cfg)
Expand All @@ -48,7 +59,7 @@ func consumeChange(ctx context.Context, tableName string, c scyllacdc.Change) er
pk := pkRaw.(*int)
ck := ckRaw.(*int)

fmt.Printf("Operation: %s, pk: %s, ck: %s\n", changeRow.GetOperation(),
fmt.Printf("[%s] Operation: %s, pk: %s, ck: %s\n", tableName, changeRow.GetOperation(),
nullableIntToStr(pk), nullableIntToStr(ck))

if v.IsDeleted {
Expand Down
1 change: 1 addition & 0 deletions progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (tbpm *TableBackedProgressManager) SaveProgress(ctx context.Context, gen ti
tbpm.concurrentQueryLimiter.Acquire(ctx, 1)
defer tbpm.concurrentQueryLimiter.Release(1)

//log.Printf("SaveProgress for %s = %s\n", streamID, progress.LastProcessedRecordTime)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: here's one more unused log

return tbpm.session.Query(
fmt.Sprintf("INSERT INTO %s (generation, application_name, table_name, stream_id, last_timestamp) VALUES (?, ?, ?, ?, ?) USING TTL ?", tbpm.progressTableName),
gen, tbpm.applicationName, tableName, streamID, progress.LastProcessedRecordTime, tbpm.ttl,
Expand Down
21 changes: 5 additions & 16 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,13 @@ type AdvancedReaderConfig struct {

// The library uses select statements to fetch changes from CDC Log tables.
// Each select fetches changes from a single table and fetches only changes
// from a limited set of CDC streams. If such select returns one or more
// changes then next select to this table and set of CDC streams will be
// issued after a delay. This parameter specifies the length of the delay.
//
// If the parameter is left as 0, the library will automatically adjust
// the length of the delay.
PostNonEmptyQueryDelay time.Duration

// The library uses select statements to fetch changes from CDC Log tables.
// Each select fetches changes from a single table and fetches only changes
// from a limited set of CDC streams. If such select returns no changes then
// next select to this table and set of CDC streams will be issued after
// from a limited set of CDC streams. The subsequent select after query
// execution to this table and set of CDC streams will be issued after
// a delay. This parameter specifies the length of the delay.
//
// If the parameter is left as 0, the library will automatically adjust
// the length of the delay.
PostEmptyQueryDelay time.Duration
PostQueryDelay time.Duration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this is a backwards-incompatible change and will require releasing a v2. I'd rather postpone it until more ideas for breaking changes accumulate and releasing v2 makes more sense.

Could you explain what is your motivation for unifying both parameters? The idea for having separate delays was that an empty query result was a signal that the library is polling too fast and wastes time on empty queries. Waiting longer for the next query should increase chances that the next query will have a non-empty result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I can't remember. I think I couldn't see any valid use case.
For my own CDC scenarios I was keen to get/keep my latency from record write time to CDC event as low as possible.

I agree it's not a good idea to introduce a breaking change for this.

But I think it also had something to do with the problem mentioned in regards to getPollWindow() executed before the delay is happening... (see further down)


// If the library tries to read from the CDC log and the read operation
// fails, it will wait some time before attempting to read again. This
Expand Down Expand Up @@ -143,8 +133,7 @@ func (arc *AdvancedReaderConfig) setDefaults() {
}
setIfZero(&arc.ConfidenceWindowSize, 30*time.Second)

setIfZero(&arc.PostNonEmptyQueryDelay, 10*time.Second)
setIfZero(&arc.PostEmptyQueryDelay, 30*time.Second)
setIfZero(&arc.PostQueryDelay, 10*time.Second)
setIfZero(&arc.PostFailedQueryDelay, 1*time.Second)

setIfZero(&arc.QueryTimeWindowSize, 30*time.Second)
Expand Down Expand Up @@ -268,7 +257,7 @@ func (r *Reader) Run(ctx context.Context) error {
}
}

sleepAmount := r.config.Advanced.PostNonEmptyQueryDelay / time.Duration(len(readers))
sleepAmount := r.config.Advanced.PostQueryDelay / time.Duration(len(readers))
for i := range readers {
reader := readers[i]
select {
Expand Down
16 changes: 11 additions & 5 deletions stream_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ outer:

if compareTimeuuid(wnd.begin, wnd.end) < 0 {
var iter *changeRowIterator
//sbr.config.Logger.Printf("queryRange: %s.%s :: %s (%s) [%s ... %s]",
// crq.keyspaceName, crq.tableName, crq.pkCondition, crq.bindArgs[0], wnd.begin.Time(), wnd.end.Time())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not leave commented out code in the repository. Perhaps the Logger interface could be extended so that it understand various verbosity level and this message could have low verbosity (debug/trace)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine for me either way - I mostly wanted to help understand how I've been debugging this - so I committed this along with for review purposes.

If you actually would like to ultimately merge this PR, let me know if I should drop..

iter, err = crq.queryRange(wnd.begin, wnd.end)
if err != nil {
sbr.config.Logger.Printf("error while sending a query (will retry): %s", err)
Expand All @@ -112,6 +114,12 @@ outer:
return consumerErr
}
hadRows = rowCount > 0

if !hadRows {
for _, c := range sbr.consumers {
err = c.Empty(ctx, wnd.end)
}
}
}

if err == nil {
Expand All @@ -126,10 +134,8 @@ outer:
var delay time.Duration
if err != nil {
delay = sbr.config.Advanced.PostFailedQueryDelay
} else if hadRows {
delay = sbr.config.Advanced.PostNonEmptyQueryDelay
} else {
delay = sbr.config.Advanced.PostEmptyQueryDelay
delay = sbr.config.Advanced.PostQueryDelay
}

delayUntil := windowProcessingStartTime.Add(delay)
Expand Down Expand Up @@ -203,14 +209,14 @@ func (sbr *streamBatchReader) getPollWindow() pollWindow {
if queryWindowRightEnd.Before(confidenceWindowStart) {
return pollWindow{
begin: windowStart,
end: gocql.MinTimeUUID(queryWindowRightEnd),
end: gocql.MinTimeUUID(queryWindowRightEnd.Add(sbr.config.Advanced.PostQueryDelay)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem correct. Due to the distributed nature of Scylla and CDC, rows with recent cdc$time may appear after some delay, so ConfidenceWindowSize is supposed to protect from that - we don't read rows with cdc$time newer than Now() - ConfidenceWindowSize. Here, it looks like you may break that assumption as queryWindowRightEnd + PostQueryDelay may be further than confidenceWindowStart.

Do you remember why you introduced that change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. after reading my code multiple times now - I think it was because the window is calculated first, then the delay is resolved and executed. I think this was very weird to understand and maybe some wider refactoring should be done?

see ref:

wnd = sbr.getPollWindow()
var delay time.Duration
if err != nil {
delay = sbr.config.Advanced.PostFailedQueryDelay
} else {
delay = sbr.config.Advanced.PostQueryDelay
}
delayUntil := windowProcessingStartTime.Add(delay)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was one of those details I only was able to discover while debugging with the added detailed logs. I remember this was confusing and time consuming. ^^

With this in place I was able to achieve and actually see the exact 15s expected latency, as I mentioned in my 4th comment.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. after reading my code multiple times now - I think it was because the window is calculated first, then the delay is resolved and executed. I think this was very weird to understand and maybe some wider refactoring should be done?

Right, now I see that the delay is wrong. The window is calculated first, then there is a sleep, then the loop goes to the next iteration and uses the query window calculated before the sleep. This is clearly wrong, the window should be used in a query immediately after being computed. The proper fix would be to move wnd = sbr.getPollWindow() at the end of the loop.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or - even better - wnd := sbr.getPollWindow() should be put at the beginning of the for loop and the other calls to getPollWindow() removed. I really don't know why it wasn't written like that in the beginning...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree that this code could use some refactoring :) Perhaps the logic responsible for tracking per-stream progress and calculating windows could be moved to an abstraction separate from the streamBatchReader.

Copy link
Contributor Author

@hartmut-co-uk hartmut-co-uk Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, current state of the PR could be called a workaround at best 😅, glad I remembered why added this at least. Thanks for pointing out 🧐


touchesConfidenceWindow: false,
}
}
return pollWindow{
begin: windowStart,
end: gocql.MinTimeUUID(confidenceWindowStart),
end: gocql.MinTimeUUID(confidenceWindowStart.Add(sbr.config.Advanced.PostQueryDelay)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same issue as above - here, the end will be later than confidenceWindowStart.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(see above)


touchesConfidenceWindow: true,
}
Expand Down
11 changes: 5 additions & 6 deletions types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,11 @@ func TestTypes(t *testing.T) {
})

adv := AdvancedReaderConfig{
ChangeAgeLimit: time.Minute,
PostNonEmptyQueryDelay: 3 * time.Second,
PostEmptyQueryDelay: 3 * time.Second,
PostFailedQueryDelay: 3 * time.Second,
QueryTimeWindowSize: 5 * time.Minute,
ConfidenceWindowSize: time.Millisecond,
ChangeAgeLimit: time.Minute,
PostQueryDelay: 3 * time.Second,
PostFailedQueryDelay: 3 * time.Second,
QueryTimeWindowSize: 5 * time.Minute,
ConfidenceWindowSize: time.Millisecond,
}

// Configure a session
Expand Down
1 change: 1 addition & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (ppr *PeriodicProgressReporter) Start(ctx context.Context) {
ppr.mu.Unlock()

// TODO: Log errors?
//ppr.logger.Printf("MarkProgress for %s: %s", ppr.reporter.streamID, timeToReport.Time())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue with logging here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, same as above

err := ppr.reporter.MarkProgress(ctx, Progress{timeToReport})
if err != nil {
ppr.logger.Printf("failed to save progress for %s: %s", ppr.reporter.streamID, err)
Expand Down