Skip to content

Commit

Permalink
Fix gRPC stream pausing
Browse files Browse the repository at this point in the history
Try to pause as soon as possible to prevent buffer overflow
  • Loading branch information
kalabukdima committed May 1, 2023
1 parent 96f4b90 commit b1ecef4
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/streamdb/consumerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ export class StreamDBConsumerClient {

return PausableStream.create<StreamEvent>((observer, pauseState) => {
grpcStreamResponse.on("data", d => {
if (pauseState.isPaused && !grpcStreamResponse.isPaused()) {
grpcStreamResponse.pause();
}
for (const obj of d.stateTransition) {
observer.next(stateTransitionProtoToStreamEvent(obj));
}
Expand Down
4 changes: 3 additions & 1 deletion src/streamdb/converters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ export function protoToOffset(offsetProto: proto_model.Offset): Offset {
return new Offset(
offsetProto.id,
BigInt(offsetProto.height),
offsetProto.timestamp ? protoToTimestamp(offsetProto.timestamp) : Timestamp.zero
offsetProto.timestamp
? protoToTimestamp(offsetProto.timestamp)
: Timestamp.zero
);
}
export function offsetToProto(offset: Offset): proto_model.Offset {
Expand Down

0 comments on commit b1ecef4

Please sign in to comment.