From 7aa26b5f204f5bcd3aca34683a9b097f40056e82 Mon Sep 17 00:00:00 2001 From: Duru Can Celasun Date: Sat, 23 Feb 2019 09:02:42 +0100 Subject: [PATCH] Fetch shard IDs from Kinesis if DynamoDB cache is empty (#22) * Skip shard ID cache on first run On startup, if the metadata table is being created for the first time, the loadShardIDsFromDynamo() call in refreshShards() will always return an empty shard list. This causes the consumer to wait until the next leader election to start receiving events, because that's the only time loadShardIDsFromKinesis() is called. By default, this happens once a minute, which means the consumer basically sleeps for 1 minute on first run. This commit fixes that by adding a first run flag and skipping the cache if it's true. Fixes #17. * Remove first run check --- kinsumer.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/kinsumer.go b/kinsumer.go index e79f727..ea130b5 100644 --- a/kinsumer.go +++ b/kinsumer.go @@ -157,6 +157,18 @@ func (k *Kinsumer) refreshShards() (bool, error) { } shardIDs, err = loadShardIDsFromDynamo(k.dynamodb, k.metadataTableName) + + if err != nil { + return false, err + } + + if len(shardIDs) == 0 { + shardIDs, err = loadShardIDsFromKinesis(k.kinesis, k.streamName) + if err == nil { + err = k.setCachedShardIDs(shardIDs) + } + } + if err != nil { return false, err } @@ -250,6 +262,7 @@ func (k *Kinsumer) dynamoCreateTableIfNotExists(name, distKey string) error { if k.dynamoTableExists(name) { return nil } + _, err := k.dynamodb.CreateTable(&dynamodb.CreateTableInput{ AttributeDefinitions: []*dynamodb.AttributeDefinition{{ AttributeName: aws.String(distKey),