Skip to content

Storing consumer offsets with super streams #278

@tunniclm

Description

@tunniclm

In order to store consumer offsets every 1000 messages processed in my SuperStreamConsumer message handler, I needed to know which StreamConsumer the message was received on so I can periodically call consumer.storeOffset(msg.offset) on the correct underlying stream. I have 8 partitions in my super stream each backed by its own separate stream and corresponding offset I need to keep track of and update.

In order to get this working in my application I have made local changes to the stream client to add the StreamConsumer as a second argument to the message handler. I'd be interested to understand if there is a way to implement this behaviour without my modifications (or with better modifications) to the library.

Here is some example code similar to my application making use of the modified client library with the extra consumer: StreamConsumer parameter on the message handler function:

const OFFSET_STORE_THRESHOLD = 1000;
const superStreamConsumerLastStoredOffset: Record<string, bigint> = {};
const shouldStoreOffset = (consumer: StreamConsumer, offset: bigint) => {
    if (consumer.streamName in superStreamConsumerLastStoredOffset) {
        const lastStoredOffset = superStreamConsumerLastStoredOffset[consumer.streamName];
        return offset - lastStoredOffset >= OFFSET_STORE_THRESHOLD;
    }
    return true;
};
await this.streamClient.declareSuperStreamConsumer(
    {
        superStream: 'mySuperStream',
        consumerRef: 'my-app',
        offset: Offset.first(),
        creditPolicy: creditsOnChunkCompleted(1, 1)
    },
    async (msg: Message, consumer: StreamConsumer) => {
        try {

            // <MESSAGE PROCESSING HERE>>

            if (msg.offset && shouldStoreOffset(consumer, msg.offset)) {
                await consumer.storeOffset(msg.offset);
                superStreamConsumerLastStoredOffset[consumer.streamName] = msg.offset;
            }
        } catch (err) {

            // <HANDLE ERRORS HERE>

        }
    }
);

My changes are in the following PR: #279

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions