Skip to content

Commit

Permalink
Fetch shard IDs from Kinesis if DynamoDB cache is empty (#22)
Browse files Browse the repository at this point in the history
* Skip shard ID cache on first run

On startup, if the metadata table is being created for the
first time, the loadShardIDsFromDynamo() call in refreshShards()
will always return an empty shard list.

This causes the consumer to wait until the next leader
election to start receiving events, because that's the only
time loadShardIDsFromKinesis() is called. By default, this happens
once a minute, which means the consumer basically sleeps for 1
minute on first run.

This commit fixes that by adding a first run flag and skipping the
cache if it's true.

Fixes #17.

* Remove first run check
  • Loading branch information
dcelasun authored and garethlewin committed Feb 23, 2019
1 parent 6d05cb3 commit 7aa26b5
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions kinsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,18 @@ func (k *Kinsumer) refreshShards() (bool, error) {
}

shardIDs, err = loadShardIDsFromDynamo(k.dynamodb, k.metadataTableName)

if err != nil {
return false, err
}

if len(shardIDs) == 0 {
shardIDs, err = loadShardIDsFromKinesis(k.kinesis, k.streamName)
if err == nil {
err = k.setCachedShardIDs(shardIDs)
}
}

if err != nil {
return false, err
}
Expand Down Expand Up @@ -250,6 +262,7 @@ func (k *Kinsumer) dynamoCreateTableIfNotExists(name, distKey string) error {
if k.dynamoTableExists(name) {
return nil
}

_, err := k.dynamodb.CreateTable(&dynamodb.CreateTableInput{
AttributeDefinitions: []*dynamodb.AttributeDefinition{{
AttributeName: aws.String(distKey),
Expand Down

0 comments on commit 7aa26b5

Please sign in to comment.