Skip to content

Commit 377b93a

Browse files
fix: comsumer segment without producer (#89)
* fix: comsumer segment without producer * docs: update release notes * fix: check queue reader and comsumer empty queue add some logs --------- Co-authored-by: hardy <[email protected]>
1 parent 7b8cc2d commit 377b93a

File tree

5 files changed

+21
-3
lines changed

5 files changed

+21
-3
lines changed

docs/content.en/docs/release-notes/_index.md

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Information about release notes of INFINI Framework is provided here.
1717
- Add utility to securely marshal JSON (#85)
1818

1919
### Bug fix
20+
- Fixed comsumer segment without producer (#89)
2021

2122
### Improvements
2223
- Structure http error response (#86)

modules/queue/disk_queue/consumer.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ READ_MSG:
142142
}
143143
}
144144

145+
// check reader
146+
if d.reader == nil {
147+
return messages, false, errors.New("reader is nil")
148+
}
145149
//read message size
146150
err = binary.Read(d.reader, binary.BigEndian, &msgSize)
147151
if err != nil {
@@ -344,7 +348,7 @@ READ_MSG:
344348
}
345349
newData, err := zstd.ZSTDDecompress(nil, readBuf)
346350
if err != nil {
347-
log.Error(err)
351+
log.Errorf("decompress message error: %v %v,%v %v", d.fileName, d.segment, d.readPos, err)
348352
ctx.UpdateNextOffset(d.segment, nextReadPos)
349353
return messages, false, err
350354
}
@@ -467,6 +471,11 @@ func (d *Consumer) ResetOffset(segment, readPos int64) error {
467471
log.Debugf("reset offset: %v,%v, file: %v, queue:%v", segment, readPos, d.fileName, d.queue)
468472
}
469473

474+
// no producer write data to this queue
475+
if d.diskQueue.writeSegmentNum == 0 && d.diskQueue.writePos == 0 {
476+
return nil
477+
}
478+
470479
if segment > d.diskQueue.writeSegmentNum {
471480
log.Errorf("reading segment [%v] is greater than writing segment [%v]", segment, d.diskQueue.writeSegmentNum)
472481
return io.EOF

modules/queue/disk_queue/diskqueue.go

+2
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,7 @@ func (d *DiskBasedQueue) readOne() ([]byte, error) {
504504
}
505505
newData, err := zstd.ZSTDDecompress(nil, readBuf)
506506
if err != nil {
507+
log.Errorf("diskqueue(%s) failed to decompress %v,%v - %s", d.name, d.readSegmentFileNum, d.readPos)
507508
return nil, err
508509
}
509510
return newData, nil
@@ -552,6 +553,7 @@ func (d *DiskBasedQueue) writeOne(data []byte) WriteResponse {
552553
}
553554
newData, err := zstd.ZSTDCompress(nil, data, d.cfg.Compress.Message.Level)
554555
if err != nil {
556+
log.Errorf("diskqueue(%s) failed to compress %v,%v - %s", d.name, d.readSegmentFileNum, d.readPos)
555557
res.Error = err
556558
return res
557559
}

modules/queue/disk_queue/module.go

+4
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,10 @@ func (module *DiskQueue) AcquireConsumer(qconfig *queue.QueueConfig, consumer *q
374374
}
375375
if ok {
376376
q1 := q.(*DiskBasedQueue)
377+
if q1.writeSegmentNum == 0 && q1.writePos == 0 {
378+
//empty queue, no need to create consumer
379+
return nil, errors.New("empty queue")
380+
}
377381
return q1.AcquireConsumer(qconfig, consumer, offset)
378382
}
379383
panic(errors.Errorf("queue [%v] not found", qconfig.Name))

plugins/queue/consumer/consumer.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -485,8 +485,10 @@ func (processor *QueueConsumerProcessor) NewSlicedWorker(ctx *pipeline.Context,
485485
case string:
486486
v = r.(string)
487487
}
488-
log.Errorf("worker[%v], queue:[%v], slice:[%v], offset:[%v]->[%v],%v", workerID, qConfig.ID, sliceID, initOffset, offset, v)
489-
ctx.Failed(fmt.Errorf("panic in slice worker: %+v", r))
488+
if v != "empty queue" {
489+
log.Errorf("worker[%v], queue:[%v], slice:[%v], offset:[%v]->[%v],%v", workerID, qConfig.ID, sliceID, initOffset, offset, v)
490+
ctx.Failed(fmt.Errorf("panic in slice worker: %+v", r))
491+
}
490492
if parentContext != nil {
491493
parentContext.RecordError(fmt.Errorf("panic in slice worker: %+v", r))
492494
}

0 commit comments

Comments
 (0)