diff --git a/change.go b/change.go index f754dfb..b9dc694 100644 --- a/change.go +++ b/change.go @@ -456,6 +456,14 @@ type ChangeConsumer interface { // If this method returns an error, the library will stop with an error. Consume(ctx context.Context, change Change) error + // Invoked upon empty results from the CDC log associated with the stream of + // the ChangeConsumer. This method is called to acknowledge a query window + // has been executed against the stream and the CDC log is to be considered + // completed as of 'ackTime' param passed. + // + // If this method returns an error, the library will stop with an error. + Empty(ctx context.Context, ackTime gocql.UUID) error + // Called after all rows from the stream were consumed, and the reader // is about to switch to a new generation, or stop execution altogether. // @@ -495,6 +503,10 @@ type changeConsumerFuncInstance struct { f ChangeConsumerFunc } +func (ccfi *changeConsumerFuncInstance) Empty(ctx context.Context, newTime gocql.UUID) error { + return nil +} + func (ccfi *changeConsumerFuncInstance) End() error { return nil } diff --git a/examples/replicator/main.go b/examples/replicator/main.go index 426e9d4..2683c9f 100644 --- a/examples/replicator/main.go +++ b/examples/replicator/main.go @@ -51,12 +51,11 @@ func main() { clWrite := parseConsistency(writeConsistency) adv := scyllacdc.AdvancedReaderConfig{ - ConfidenceWindowSize: 30 * time.Second, - ChangeAgeLimit: 10 * time.Minute, - QueryTimeWindowSize: 60 * time.Second, - PostEmptyQueryDelay: 30 * time.Second, - PostNonEmptyQueryDelay: 10 * time.Second, - PostFailedQueryDelay: 1 * time.Second, + ConfidenceWindowSize: 30 * time.Second, + ChangeAgeLimit: 10 * time.Minute, + QueryTimeWindowSize: 60 * time.Second, + PostQueryDelay: 10 * time.Second, + PostFailedQueryDelay: 1 * time.Second, } fmt.Println("Parameters:") @@ -71,8 +70,7 @@ func main() { fmt.Printf(" Confidence window size: %s\n", adv.ConfidenceWindowSize) fmt.Printf(" Change age limit: %s\n", adv.ChangeAgeLimit) fmt.Printf(" Query window size: %s\n", adv.QueryTimeWindowSize) - fmt.Printf(" Delay after poll with empty results: %s\n", adv.PostEmptyQueryDelay) - fmt.Printf(" Delay after poll with non-empty results: %s\n", adv.PostNonEmptyQueryDelay) + fmt.Printf(" Delay after poll with non-empty results: %s\n", adv.PostQueryDelay) fmt.Printf(" Delay after failed poll: %s\n", adv.PostFailedQueryDelay) var fullyQualifiedTables []string diff --git a/examples/replicator/replicator_test.go b/examples/replicator/replicator_test.go index 82a14b1..0306fb7 100644 --- a/examples/replicator/replicator_test.go +++ b/examples/replicator/replicator_test.go @@ -375,12 +375,11 @@ func TestReplicator(t *testing.T) { t.Log("running replicators") adv := scyllacdc.AdvancedReaderConfig{ - ChangeAgeLimit: time.Minute, - PostNonEmptyQueryDelay: 3 * time.Second, - PostEmptyQueryDelay: 3 * time.Second, - PostFailedQueryDelay: 3 * time.Second, - QueryTimeWindowSize: 5 * time.Minute, - ConfidenceWindowSize: time.Millisecond, + ChangeAgeLimit: time.Minute, + PostQueryDelay: 3 * time.Second, + PostFailedQueryDelay: 3 * time.Second, + QueryTimeWindowSize: 5 * time.Minute, + ConfidenceWindowSize: time.Millisecond, } schemaNames := make([]string, 0) diff --git a/examples/simple-printer-stateful/main.go b/examples/simple-printer-stateful/main.go new file mode 100644 index 0000000..b08695d --- /dev/null +++ b/examples/simple-printer-stateful/main.go @@ -0,0 +1,139 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "time" + + "github.com/gocql/gocql" + scyllacdc "github.com/scylladb/scylla-cdc-go" +) + +// Make sure you create the following table before you run this example: +// CREATE TABLE ks.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH cdc = {'enabled': 'true'}; + +func main() { + cluster := gocql.NewCluster("127.0.0.1") + cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) + session, err := cluster.CreateSession() + if err != nil { + log.Fatal(err) + return + } + defer session.Close() + + var progressManager scyllacdc.ProgressManager + progressManager, err = scyllacdc.NewTableBackedProgressManager(session, "ks.cdc_progress", "cdc-replicator") + if err != nil { + log.Fatal(err) + return + } + + logger := log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile) + + cfg := &scyllacdc.ReaderConfig{ + Session: session, + TableNames: []string{"ks.tbl"}, + ChangeConsumerFactory: &myFactory{ + logger: logger, + progressReporterInterval: 30 * time.Second, + }, + Logger: logger, + Advanced: scyllacdc.AdvancedReaderConfig{ + ChangeAgeLimit: 15 * time.Minute, + ConfidenceWindowSize: 10 * time.Second, + PostQueryDelay: 5 * time.Second, + PostFailedQueryDelay: 5 * time.Second, + QueryTimeWindowSize: 5 * 60 * time.Second, + }, + ProgressManager: progressManager, + } + + reader, err := scyllacdc.NewReader(context.Background(), cfg) + if err != nil { + log.Fatal(err) + } + + if err := reader.Run(context.Background()); err != nil { + log.Fatal(err) + } +} + +func consumeChange(ctx context.Context, tableName string, c scyllacdc.Change) error { + for _, changeRow := range c.Delta { + pkRaw, _ := changeRow.GetValue("pk") + ckRaw, _ := changeRow.GetValue("ck") + v := changeRow.GetAtomicChange("v") + + pk := pkRaw.(*int) + ck := ckRaw.(*int) + + fmt.Printf("[%s] Operation: %s, pk: %s, ck: %s\n", tableName, changeRow.GetOperation(), + nullableIntToStr(pk), nullableIntToStr(ck)) + + if v.IsDeleted { + fmt.Printf(" Column v was set to null/deleted\n") + } else { + vInt := v.Value.(*int) + if vInt != nil { + fmt.Printf(" Column v was set to %d\n", *vInt) + } else { + fmt.Print(" Column v was not changed\n") + } + } + } + + return nil +} + +func nullableIntToStr(i *int) string { + if i == nil { + return "null" + } + return fmt.Sprintf("%d", *i) +} + +type myConsumer struct { + // PeriodicProgressReporter is a wrapper around ProgressReporter + // which rate-limits saving the progress + reporter *scyllacdc.PeriodicProgressReporter + logger scyllacdc.Logger + f scyllacdc.ChangeConsumerFunc + tableName string +} + +func (mc *myConsumer) Consume(ctx context.Context, change scyllacdc.Change) error { + // ... do work ... + mc.logger.Printf("myConsumer.Consume...\n") + err := mc.f(ctx, mc.tableName, change) + if err != nil { + return err + } + + mc.reporter.Update(change.Time) + return nil +} + +func (mc *myConsumer) Empty(ctx context.Context, newTime gocql.UUID) error { + mc.reporter.Update(newTime) + return nil +} + +func (mc *myConsumer) End() error { + _ = mc.reporter.SaveAndStop(context.Background()) + return nil +} + +type myFactory struct { + logger scyllacdc.Logger + progressReporterInterval time.Duration +} + +func (f *myFactory) CreateChangeConsumer(ctx context.Context, input scyllacdc.CreateChangeConsumerInput) (scyllacdc.ChangeConsumer, error) { + f.logger.Printf("myFactory.CreateChangeConsumer %s, %s\n", input.TableName, input.StreamID) + reporter := scyllacdc.NewPeriodicProgressReporter(f.logger, f.progressReporterInterval, input.ProgressReporter) + reporter.Start(ctx) + return &myConsumer{reporter, f.logger, consumeChange, input.TableName}, nil +} diff --git a/examples/simple-printer/main.go b/examples/simple-printer/main.go index 8ee1230..e8b14e1 100644 --- a/examples/simple-printer/main.go +++ b/examples/simple-printer/main.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "os" + "time" "github.com/gocql/gocql" scyllacdc "github.com/scylladb/scylla-cdc-go" @@ -15,18 +16,28 @@ import ( func main() { cluster := gocql.NewCluster("127.0.0.1") - cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy("local-dc")) + cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) session, err := cluster.CreateSession() if err != nil { log.Fatal(err) + return } defer session.Close() + logger := log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile) + cfg := &scyllacdc.ReaderConfig{ Session: session, TableNames: []string{"ks.tbl"}, ChangeConsumerFactory: changeConsumerFactory, - Logger: log.New(os.Stderr, "", log.Ldate|log.Lshortfile), + Logger: logger, + Advanced: scyllacdc.AdvancedReaderConfig{ + ChangeAgeLimit: 15 * time.Minute, + ConfidenceWindowSize: 10 * time.Second, + PostQueryDelay: 5 * time.Second, + PostFailedQueryDelay: 5 * time.Second, + QueryTimeWindowSize: 5 * 60 * time.Second, + }, } reader, err := scyllacdc.NewReader(context.Background(), cfg) @@ -48,7 +59,7 @@ func consumeChange(ctx context.Context, tableName string, c scyllacdc.Change) er pk := pkRaw.(*int) ck := ckRaw.(*int) - fmt.Printf("Operation: %s, pk: %s, ck: %s\n", changeRow.GetOperation(), + fmt.Printf("[%s] Operation: %s, pk: %s, ck: %s\n", tableName, changeRow.GetOperation(), nullableIntToStr(pk), nullableIntToStr(ck)) if v.IsDeleted { diff --git a/progress.go b/progress.go index 7ea142c..2e93f2e 100644 --- a/progress.go +++ b/progress.go @@ -236,6 +236,7 @@ func (tbpm *TableBackedProgressManager) SaveProgress(ctx context.Context, gen ti tbpm.concurrentQueryLimiter.Acquire(ctx, 1) defer tbpm.concurrentQueryLimiter.Release(1) + //log.Printf("SaveProgress for %s = %s\n", streamID, progress.LastProcessedRecordTime) return tbpm.session.Query( fmt.Sprintf("INSERT INTO %s (generation, application_name, table_name, stream_id, last_timestamp) VALUES (?, ?, ?, ?, ?) USING TTL ?", tbpm.progressTableName), gen, tbpm.applicationName, tableName, streamID, progress.LastProcessedRecordTime, tbpm.ttl, diff --git a/reader.go b/reader.go index 3b08157..6a47122 100644 --- a/reader.go +++ b/reader.go @@ -85,23 +85,13 @@ type AdvancedReaderConfig struct { // The library uses select statements to fetch changes from CDC Log tables. // Each select fetches changes from a single table and fetches only changes - // from a limited set of CDC streams. If such select returns one or more - // changes then next select to this table and set of CDC streams will be - // issued after a delay. This parameter specifies the length of the delay. - // - // If the parameter is left as 0, the library will automatically adjust - // the length of the delay. - PostNonEmptyQueryDelay time.Duration - - // The library uses select statements to fetch changes from CDC Log tables. - // Each select fetches changes from a single table and fetches only changes - // from a limited set of CDC streams. If such select returns no changes then - // next select to this table and set of CDC streams will be issued after + // from a limited set of CDC streams. The subsequent select after query + // execution to this table and set of CDC streams will be issued after // a delay. This parameter specifies the length of the delay. // // If the parameter is left as 0, the library will automatically adjust // the length of the delay. - PostEmptyQueryDelay time.Duration + PostQueryDelay time.Duration // If the library tries to read from the CDC log and the read operation // fails, it will wait some time before attempting to read again. This @@ -143,8 +133,7 @@ func (arc *AdvancedReaderConfig) setDefaults() { } setIfZero(&arc.ConfidenceWindowSize, 30*time.Second) - setIfZero(&arc.PostNonEmptyQueryDelay, 10*time.Second) - setIfZero(&arc.PostEmptyQueryDelay, 30*time.Second) + setIfZero(&arc.PostQueryDelay, 10*time.Second) setIfZero(&arc.PostFailedQueryDelay, 1*time.Second) setIfZero(&arc.QueryTimeWindowSize, 30*time.Second) @@ -268,7 +257,7 @@ func (r *Reader) Run(ctx context.Context) error { } } - sleepAmount := r.config.Advanced.PostNonEmptyQueryDelay / time.Duration(len(readers)) + sleepAmount := r.config.Advanced.PostQueryDelay / time.Duration(len(readers)) for i := range readers { reader := readers[i] select { diff --git a/stream_batch.go b/stream_batch.go index 90d5ea4..f531a93 100644 --- a/stream_batch.go +++ b/stream_batch.go @@ -100,6 +100,8 @@ outer: if compareTimeuuid(wnd.begin, wnd.end) < 0 { var iter *changeRowIterator + //sbr.config.Logger.Printf("queryRange: %s.%s :: %s (%s) [%s ... %s]", + // crq.keyspaceName, crq.tableName, crq.pkCondition, crq.bindArgs[0], wnd.begin.Time(), wnd.end.Time()) iter, err = crq.queryRange(wnd.begin, wnd.end) if err != nil { sbr.config.Logger.Printf("error while sending a query (will retry): %s", err) @@ -112,6 +114,12 @@ outer: return consumerErr } hadRows = rowCount > 0 + + if !hadRows { + for _, c := range sbr.consumers { + err = c.Empty(ctx, wnd.end) + } + } } if err == nil { @@ -126,10 +134,8 @@ outer: var delay time.Duration if err != nil { delay = sbr.config.Advanced.PostFailedQueryDelay - } else if hadRows { - delay = sbr.config.Advanced.PostNonEmptyQueryDelay } else { - delay = sbr.config.Advanced.PostEmptyQueryDelay + delay = sbr.config.Advanced.PostQueryDelay } delayUntil := windowProcessingStartTime.Add(delay) @@ -203,14 +209,14 @@ func (sbr *streamBatchReader) getPollWindow() pollWindow { if queryWindowRightEnd.Before(confidenceWindowStart) { return pollWindow{ begin: windowStart, - end: gocql.MinTimeUUID(queryWindowRightEnd), + end: gocql.MinTimeUUID(queryWindowRightEnd.Add(sbr.config.Advanced.PostQueryDelay)), touchesConfidenceWindow: false, } } return pollWindow{ begin: windowStart, - end: gocql.MinTimeUUID(confidenceWindowStart), + end: gocql.MinTimeUUID(confidenceWindowStart.Add(sbr.config.Advanced.PostQueryDelay)), touchesConfidenceWindow: true, } diff --git a/types_test.go b/types_test.go index fc45018..1bacb25 100644 --- a/types_test.go +++ b/types_test.go @@ -315,12 +315,11 @@ func TestTypes(t *testing.T) { }) adv := AdvancedReaderConfig{ - ChangeAgeLimit: time.Minute, - PostNonEmptyQueryDelay: 3 * time.Second, - PostEmptyQueryDelay: 3 * time.Second, - PostFailedQueryDelay: 3 * time.Second, - QueryTimeWindowSize: 5 * time.Minute, - ConfidenceWindowSize: time.Millisecond, + ChangeAgeLimit: time.Minute, + PostQueryDelay: 3 * time.Second, + PostFailedQueryDelay: 3 * time.Second, + QueryTimeWindowSize: 5 * time.Minute, + ConfidenceWindowSize: time.Millisecond, } // Configure a session diff --git a/utils.go b/utils.go index e8572d1..73087af 100644 --- a/utils.go +++ b/utils.go @@ -71,6 +71,7 @@ func (ppr *PeriodicProgressReporter) Start(ctx context.Context) { ppr.mu.Unlock() // TODO: Log errors? + //ppr.logger.Printf("MarkProgress for %s: %s", ppr.reporter.streamID, timeToReport.Time()) err := ppr.reporter.MarkProgress(ctx, Progress{timeToReport}) if err != nil { ppr.logger.Printf("failed to save progress for %s: %s", ppr.reporter.streamID, err)