Skip to content

Commit

Permalink
[ISSUE #8589] Support file format CQ and json format offset in-place …
Browse files Browse the repository at this point in the history
…upgrade to rocksdb management (#8600)
  • Loading branch information
LetLetMe committed Sep 23, 2024
1 parent e8d1472 commit 525f877
Show file tree
Hide file tree
Showing 25 changed files with 755 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
Expand Down Expand Up @@ -789,6 +788,9 @@ public boolean initializeMessageStore() {
defaultMessageStore = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
} else {
defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) {
defaultMessageStore.enableRocksdbCQWrite();
}
}

if (messageStoreConfig.isEnableDLegerCommitLog()) {
Expand All @@ -812,7 +814,7 @@ public boolean initializeMessageStore() {
this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
this.messageStore.setTimerMessageStore(this.timerMessageStore);
}
} catch (IOException e) {
} catch (Exception e) {
result = false;
LOG.error("BrokerController#initialize: unexpected error occurs", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand Down Expand Up @@ -373,6 +374,25 @@ public void setDataVersion(DataVersion dataVersion) {
this.dataVersion = dataVersion;
}

public boolean loadDataVersion() {
String fileName = null;
try {
fileName = this.configFilePath();
String jsonString = MixAll.file2String(fileName);
if (jsonString != null) {
ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
if (obj != null) {
this.dataVersion = obj.dataVersion;
}
LOG.info("load consumer offset dataVersion success,{},{} ", fileName, jsonString);
}
return true;
} catch (Exception e) {
LOG.error("load consumer offset dataVersion failed " + fileName, e);
return false;
}
}

public void removeOffset(final String group) {
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,79 @@
*/
package org.apache.rocketmq.broker.offset;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import java.io.File;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.WriteBatch;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;

public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

protected RocksDBConfigManager rocksDBConfigManager;

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}

@Override
public boolean load() {
if (!rocksDBConfigManager.init()) {
return false;
}
return this.rocksDBConfigManager.loadData(this::decodeOffset);
if (!loadDataVersion() || !loadConsumerOffset()) {
return false;
}

return true;
}

public boolean loadConsumerOffset() {
return this.rocksDBConfigManager.loadData(this::decodeOffset) && merge();
}

private boolean merge() {
if (!brokerController.getMessageStoreConfig().isTransferOffsetJsonToRocksdb()) {
log.info("the switch transferOffsetJsonToRocksdb is off, no merge offset operation is needed.");
return true;
}
if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("consumerOffset json file does not exist, so skip merge");
return true;
}
if (!super.loadDataVersion()) {
log.error("load json consumerOffset dataVersion error, startup will exit");
return false;
}

final DataVersion dataVersion = super.getDataVersion();
final DataVersion kvDataVersion = this.getDataVersion();
if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) {
if (!super.load()) {
log.error("load json consumerOffset info failed, startup will exit");
return false;
}
this.persist();
this.getDataVersion().assignNewOne(dataVersion);
updateDataVersion();
log.info("update offset from json, dataVersion:{}, offsetTable: {} ", this.getDataVersion(), JSON.toJSONString(this.getOffsetTable()));
}
return true;
}


@Override
public boolean stop() {
return this.rocksDBConfigManager.stop();
Expand All @@ -69,8 +112,7 @@ protected void decodeOffset(final byte[] key, final byte[] body) {
LOG.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable());
}

@Override
public String configFilePath() {
public String rocksdbConfigFilePath() {
return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "consumerOffsets" + File.separator;
}

Expand Down Expand Up @@ -103,4 +145,23 @@ private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupN
byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible);
writeBatch.put(keyBytes, valueBytes);
}

@Override
public boolean loadDataVersion() {
return this.rocksDBConfigManager.loadDataVersion();
}

@Override
public DataVersion getDataVersion() {
return rocksDBConfigManager.getKvDataVersion();
}

public void updateDataVersion() {
try {
rocksDBConfigManager.updateKvDataVersion();
} catch (Exception e) {
log.error("update consumer offset dataVersion error", e);
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.common.Attributes;
import java.io.UnsupportedEncodingException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
Expand All @@ -38,7 +40,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import io.opentelemetry.api.common.Attributes;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessValidator;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.apache.rocketmq.common.LockCallback;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UnlockCallback;
Expand Down Expand Up @@ -137,6 +139,7 @@
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateAccessConfigRequestHeader;
Expand Down Expand Up @@ -209,6 +212,7 @@
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.RocksDBMessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
Expand All @@ -217,8 +221,9 @@
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.util.LibC;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;

public class AdminBrokerProcessor implements NettyRequestProcessor {
Expand Down Expand Up @@ -339,6 +344,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
return fetchAllConsumeStatsInBroker(ctx, request);
case RequestCode.QUERY_CONSUME_QUEUE:
return queryConsumeQueue(ctx, request);
case RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS:
return this.checkRocksdbCqWriteProgress(ctx, request);
case RequestCode.UPDATE_AND_GET_GROUP_FORBIDDEN:
return this.updateAndGetGroupForbidden(ctx, request);
case RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG:
Expand Down Expand Up @@ -458,6 +465,71 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re
return response;
}

private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
String requestTopic = requestHeader.getTopic();
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);

DefaultMessageStore messageStore = (DefaultMessageStore) brokerController.getMessageStore();
RocksDBMessageStore rocksDBMessageStore = messageStore.getRocksDBMessageStore();
if (!messageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid")));
return response;
}

ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = messageStore.getConsumeQueueTable();
StringBuilder diffResult = new StringBuilder("check success, all is ok!\n");
try {
if (StringUtils.isNotBlank(requestTopic)) {
processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult,false);
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", diffResult.toString())));
return response;
}
for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) {
String topic = topicEntry.getKey();
processConsumeQueuesForTopic(topicEntry.getValue(), topic, rocksDBMessageStore, diffResult,true);
}
diffResult.append("check all topic successful, size:").append(cqTable.size());
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", diffResult.toString())));

} catch (Exception e) {
LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e);
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", e.getMessage())));
}
return response;
}

private void processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean checkAll) {
for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : queueMap.entrySet()) {
Integer queueId = queueEntry.getKey();
ConsumeQueueInterface jsonCq = queueEntry.getValue();
ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId);
if (!checkAll) {
String format = String.format("\n[topic: %s, queue: %s] \n kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ",
topic, queueId, kvCq.getEarliestUnit(), kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit());
diffResult.append(format).append("\n");
}
long maxFileOffsetInQueue = jsonCq.getMaxOffsetInQueue();
long minOffsetInQueue = kvCq.getMinOffsetInQueue();
for (long i = minOffsetInQueue; i < maxFileOffsetInQueue; i++) {
Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i);
Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
if (fileCqUnit == null || kvCqUnit == null) {
diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file: %s \n",
topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null"));
return;
}
if (!checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) {
String diffInfo = String.format("[topic:%s, queue: %s offset: %s] \n file: %s \n kv: %s",
topic, queueId, i, kvCqUnit.getObject1(), fileCqUnit.getObject1());
LOGGER.error(diffInfo);
diffResult.append(diffInfo).append("\n");
return;
}
}
}
}
@Override
public boolean rejectRequest() {
return false;
Expand Down Expand Up @@ -3305,4 +3377,20 @@ private boolean validateBlackListConfigExist(Properties properties) {
}
return false;
}

private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) {
if (cqUnit1.getQueueOffset() != cqUnit2.getQueueOffset()) {
return false;
}
if (cqUnit1.getSize() != cqUnit2.getSize()) {
return false;
}
if (cqUnit1.getPos() != cqUnit2.getPos()) {
return false;
}
if (cqUnit1.getBatchNum() != cqUnit2.getBatchNum()) {
return false;
}
return cqUnit1.getTagsCode() == cqUnit2.getTagsCode();
}
}
Loading

0 comments on commit 525f877

Please sign in to comment.