Skip to content

Commit 29f2234

Browse files
author
guyinyou
committed
fix 4M in writeWithoutMmap
Change-Id: I6ea680fd0db6657af8442dfc78a5f40a3e588803
1 parent 37441c2 commit 29f2234

File tree

1 file changed

+42
-38
lines changed

1 file changed

+42
-38
lines changed

store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public ByteBuffer acquire() {
159159

160160
SHARED_BYTE_BUFFER = new SharedByteBuffer[maxSharedNum];
161161
for (int i = 0; i < maxSharedNum; i++) {
162-
SHARED_BYTE_BUFFER[i] = new SharedByteBuffer(4 * 1024 * 1024);
162+
SHARED_BYTE_BUFFER[i] = new SharedByteBuffer(5 * 1024 * 1024);
163163
}
164164
}
165165

@@ -336,24 +336,26 @@ public AppendMessageResult appendMessage(final ByteBuffer byteBufferMsg, final C
336336
byteBuffer.position(currentPos);
337337
}
338338

339-
AppendMessageResult result = cb.doAppend(byteBuffer, fileFromOffset, this.fileSize - currentPos, byteBufferMsg);
340-
341-
if (sharedByteBuffer != null) {
342-
try {
343-
this.fileChannel.position(currentPos);
344-
byteBuffer.position(0).limit(result.getWroteBytes());
345-
this.fileChannel.write(byteBuffer);
346-
} catch (Throwable t) {
347-
log.error("Failed to write to mappedFile {}", this.fileName, t);
348-
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
349-
} finally {
339+
try {
340+
AppendMessageResult result = cb.doAppend(byteBuffer, fileFromOffset, this.fileSize - currentPos, byteBufferMsg);
341+
if (sharedByteBuffer != null) {
342+
try {
343+
this.fileChannel.position(currentPos);
344+
byteBuffer.position(0).limit(result.getWroteBytes());
345+
this.fileChannel.write(byteBuffer);
346+
} catch (Throwable t) {
347+
log.error("Failed to write to mappedFile {}", this.fileName, t);
348+
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
349+
}
350+
}
351+
WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
352+
this.storeTimestamp = result.getStoreTimestamp();
353+
return result;
354+
} finally {
355+
if (sharedByteBuffer != null) {
350356
sharedByteBuffer.release();
351357
}
352358
}
353-
354-
WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
355-
this.storeTimestamp = result.getStoreTimestamp();
356-
return result;
357359
}
358360
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
359361
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
@@ -393,30 +395,31 @@ public AppendMessageResult appendMessagesInner(final MessageExt messageExt, fina
393395
}
394396

395397
AppendMessageResult result;
396-
if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) {
397-
// traditional batch message
398-
result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize - currentPos,
399-
(MessageExtBatch) messageExt, putMessageContext);
400-
} else if (messageExt instanceof MessageExtBrokerInner) {
401-
// traditional single message or newly introduced inner-batch message
402-
result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize - currentPos,
403-
(MessageExtBrokerInner) messageExt, putMessageContext);
404-
} else {
405-
if (sharedByteBuffer != null) {
406-
sharedByteBuffer.release();
398+
try {
399+
if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) {
400+
// traditional batch message
401+
result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize - currentPos,
402+
(MessageExtBatch) messageExt, putMessageContext);
403+
} else if (messageExt instanceof MessageExtBrokerInner) {
404+
// traditional single message or newly introduced inner-batch message
405+
result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize - currentPos,
406+
(MessageExtBrokerInner) messageExt, putMessageContext);
407+
} else {
408+
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
407409
}
408-
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
409-
}
410410

411-
if (sharedByteBuffer != null) {
412-
try {
413-
this.fileChannel.position(currentPos);
414-
byteBuffer.position(0).limit(result.getWroteBytes());
415-
this.fileChannel.write(byteBuffer);
416-
} catch (Throwable t) {
417-
log.error("Failed to write to mappedFile {}", this.fileName, t);
418-
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
419-
} finally {
411+
if (sharedByteBuffer != null) {
412+
try {
413+
this.fileChannel.position(currentPos);
414+
byteBuffer.position(0).limit(result.getWroteBytes());
415+
this.fileChannel.write(byteBuffer);
416+
} catch (Throwable t) {
417+
log.error("Failed to write to mappedFile {}", this.fileName, t);
418+
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
419+
}
420+
}
421+
} finally {
422+
if (sharedByteBuffer != null) {
420423
sharedByteBuffer.release();
421424
}
422425
}
@@ -428,6 +431,7 @@ public AppendMessageResult appendMessagesInner(final MessageExt messageExt, fina
428431
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
429432
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
430433
}
434+
431435
protected ByteBuffer appendMessageBuffer() {
432436
this.mappedByteBufferAccessCountSinceLastSwap++;
433437
return writeBuffer != null ? writeBuffer : this.mappedByteBuffer;

0 commit comments

Comments
 (0)