Skip to content

RIP 31 Support RocketMQ BrokerContainer

rongtong edited this page Feb 6, 2022 · 7 revisions

Status

Background & Motivation

What do we need to do

  • Will we add a new module?

    Yes, a new container module like "store" and "broker" will be added.

  • Will we add new APIs?

    Yes, some APIs will be added to operate the BrokerContainer, such as add broker and remove broker

  • Will we add a new feature?

    Yes, BrokerContainer is a new feature.

Why should we do that

  • Are there any problems of our current project?

Currently, a rocketmq process has only one broker, which is usually deployed in master-slave mode or raft mode. However, slave requires fewer resources than masters. The unequal roles between nodes lead to insufficient utilization.

Master-slave deployment

DLedger deployment

Therefore, RIP-31 hopes to provide a new BrokerContainer mode, in which multiple brokers can be added to a BrokerContainer process (include master broker, slave broker and dledger broker) to improve the resource utilization of a single node, and peer-to-peer deployment between nodes can be realized through various forms of cross deployment.

  • What can we benefit from proposed changes?
  1. Multiple brokers can be added to a BrokerContainer process to improve the resource utilization of a single node through in-process mixing
  2. Through various forms of cross deployment, peer-to-peer deployment between nodes is realized to enhance the high availability of a single node
  3. Using BrokerContainer, you can write multiple commitlog or multiple disks in a single process
  4. The commitlogs in the BrokerContainer are naturally isolated. Different commitlogs (brokers) can take different effects. For example, they can be used to create separate brokers for different TTL commitlogs.

Goals

  • What problem is this proposal designed to solve?

    The goal is to implement a BrokerContainer module. By adding multiple brokers in the BrokerContainer process, the problem of low resource utilization of the following slave single node can be solved. At the same time, cross deployment can realize peer-to-peer deployment between nodes and enhance the high availability of nodes. BrokerContainer can also realize multiple commitlog or multiple disks writing in a single process.

Non-Goals.

  • What problem is this proposal NOT designed to solve?

The goal of this RIP is not to change the original architecture of RocketMQ, nor to replace the master-slave architecture or dledger architecture. Users can still start a broker in a single process without affecting its functions.

Changes

Architecture

BrokerContainer single process view

Compared with the original one broker in one process, RIP-31 puts forward the concept of BrokerContainer. A BrokerContainer can store multiple brokers. Each broker has different ports, but they share the same transport layer, and each broker is completely independent in APIs or functions. The BrokerContainer also has its own port. You can increase or decrease brokers through the admin tools command at runtime.

Peer-to-peer deployment

In the BrokerContainer mode, RIP-31 can complete peer-to-peer deployment through various forms of cross deployment

  • Two replicas peer-to-peer deployment

In two replicas peer-to-peer deployment, each node will have a master and a slave, with equal resource utilization. In addition, suppose node1 in the figure goes down, due to the broker_2 of node2 being readable and writable, broker_1 can be read in slave, so the sending and receiving of ordinary messages will not be affected, and the high availability of a single node has been enhanced.

  • Three replicas peer-to-peer deployment

In two replicas peer-to-peer deployment, each node will have one master and two slaves, with equal resource utilization. In addition, like the two replicas peer-to-peer deployment, the downtime of any node will not affect the sending and receiving of ordinary messages.

Note: we can deploy in the master-slave1-slave2 or dledger mode when three replicas.

Transport layer sharing

All brokers in the BrokerContainer share the same transport layer, just as the consumers and producers of the same process in the RocketMQ client share the same transport layer.

SubRmotingServer support is provided for NettyRemotingServer. A SubRmotingServer can be generated by binding another port to a RemotingServer. It shares the netty instance, computing resources, protocol stack, etc. SubRmotingServer has different ports and processorTable. In addition, all brokers in the same BrokerContainer will share the same brokerOutAPI.

Startup mode and configuration of BrokerContainer

Just like using BrokerStartup to start broker, RIP-31 will use BrokerContainerStartup to start the BrokerContainer. RIP-31 can add brokers to the BrokerContainer in two ways: one is by specifying in the configuration file when startup

The contents of the BrokerContainer configuration file mainly include the netty network layer parameters (due to transport layer sharing), the listening port, namesrv configuration and brokerConfigPaths parameters. brokerConfigPaths refer to the broker configuration file paths that need to be added to the BrokerContainer. Multiple configs are separated by ‘:’. If they are not specified, only the BrokerContainer will be started, Specific brokers can be added through the RocketMQ admin tool.

broker-container.conf:

listenPort=10811
namesrvAddr=127.0.0.1:9876
fetchNamesrvAddrByAddressServer=true
brokerConfigPaths=/home/admin/broker-a.conf:/home/admin/broker-b.conf

The broker configuration is the same as before, but in the BrokerContainer mode, the netty network layer parameters and nameserver parameters in the broker configuration file do not take effect, and the BrokerContainer configuration parameters are used.

After completing the configuration file, you can start it with the following command

sh mqbrokercontainer -c broker-container.conf

Add or remove broker at runtime

After the BrokerContainer process is started, you can add or remove broker through the admin command.

The server implementation is adding a processor to the BrokerContainer to handle ADD_ BROKER、REMOVE_ BROKER、GET_ BROKER_ CONFIG and UPDATE_ BROKER_ CONFIG requests.

public class BrokerContainerProcessor implements NettyRequestProcessor {
    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private final BrokerContainer brokerContainer;
    private List<BrokerBootHook> brokerBootHookList;


    public BrokerContainerProcessor(BrokerContainer brokerContainer) {
        this.brokerContainer = brokerContainer;
    }


    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
        switch (request.getCode()) {
            case RequestCode.ADD_BROKER:
                return this.addBroker(ctx, request);
            case RequestCode.REMOVE_BROKER:
                return this.removeBroker(ctx, request);
            case RequestCode.GET_BROKER_CONFIG:
                return this.getBrokerConfig(ctx, request);
            case RequestCode.UPDATE_BROKER_CONFIG:
                return this.updateBrokerConfig(ctx, request);
            default:
                break;
        }
        return null;
    }
    ……
}

Data Storage

quotaPercentForDiskPartition indicates the percentage of quota that the broker can occupy in the same disk partition (0 < quotaPercentForDiskPartition < = 1). The percentage of capacity that the broker can occupy in the disk partition where the broker's storage directory is located. It is 1 by default. This configuration is used to calculate the disk quota of each broker when multiple brokers on the same BrokerContainer share the same disk partition. e. G. quotaPercentForDiskPartition = = 0.5 (applicable to two brokers in a BrokerContainer and occupying the same disk, each accounting for 50%) and the disk space of the broker is 1T, the disk quota of the broker is 512g, and the logical disk space utilization of the broker is calculated based on 512g.

logicalDiskSpaceCleanForciblyThreshold: This value only takes effect when quotaPercentForDiskPartition is less than 1. It indicates that the logical disk space forced the cleanup threshold. Default is 0.80 (80%). The logical disk space utilization is the space utilization of the broker within its own disk quota, and the physical disk space utilization is the total space utilization of the disk partition. In the implementation of BrokerContainer, considering the computational efficiency, only commitLog + consumeQueue (+ BCQ) + indexFile is counted as the storage space occupied by brokers, and other files such as metadata, consumption progress and disk dirty data are not counted. Therefore, when the storage space of multiple brokers reaches dynamic balance, the space occupied by each broker may be different, Taking two brokers in a BrokerContainer as an example, the storage space difference between the two brokers can be expressed as Where R_ logical is logicalDiskSpaceCleanForciblyThreshold, R_phy is diskSpaceCleanForciblyRatio, t is the total disk partition space, and X is the proportion of files other than the broker storage space calculated above in the total disk space. It can be seen that when It can ensure that the storage space of each broker in the BrokerContainer is almost the same when it reaches dynamic balance.

eg. assuming that the quota obtained by the broker is 500G (calculated according to the quotaPercentForDiskPartition) and the logicalDiskSpaceCleanForciblyThreshold is the default value of 0.8, the default commitLog + consumeQueue (+ BCQ) + indexFile will be forced to clean up if the total amount exceeds 400G.

Other cleaning thresholds (diskSpaceCleanForciblyRatio、diskSpaceWarningLevelRatio) and fileReservedTime logic are the same as before. Note: when starting in the normal broker mode instead of the BrokerContainer mode, and quotaPercentForDiskPartition = 1 (the default), the cleaning logic is exactly the same as before. When quotaPercentForDiskPartition < 1, the logical disk space forced cleanup threshold logicalDiskSpaceCleanForciblyThreshold will take effect.

Log Separation

In the BrokerContainer mode, multiple brokers will be in the same BrokerContainer process, so the logs of all brokers will be output to the same log file. RIP-31 will provide the broker log separation, which can be enabled through isolateLogEnable = true. when isolateLogEnable is true, the logs of different brokers will be output to different files.

RIP-31 will distinguish different brokers by thread name or by setting ThreadLocal variable, and the log appender will be hacked to redirect logs to different files.

Different broker threads are distinguished by setting the thread name. The thread name prefix must be #BrokerClusterName_BrokerName_BrokerId#

Different broker threads are distinguished by setting ThreadLocal variables. The set variables are BrokerClusterName_BrokerName_BrokerId

// set threadlocal broker identity to forward logging to corresponding broker
InnerLoggerFactory.brokerIdentity.set(brokerIdentity.getCanonicalName())

If the thread does not have the above distinction, the log will still be output in the original directory. When you start the broker in the normal way (not BrokerContainer mode), the log will still be output in the original directory.

Interface Design/Change

  • Method signature changes

Add two RequestCode

public static final int ADD_BROKER = 902;
public static final int REMOVE_BROKER = 903;

BrokerContainer Interface Design

/**
 * An interface for broker container to hold multiple master and slave brokers.
 */
public interface IBrokerContainer {

    /**
     * Start broker container
     */
    void start() throws Exception;

    /**
     * Shutdown broker container and all the brokers inside.
     */
    void shutdown();

    /**
     * Add a broker to this container with specific broker config.
     *
     * @param brokerConfig the specified broker config
     * @param storeConfig the specified store config
     * @return the added BrokerController or null if the broker already exists
     * @throws Exception when initialize broker
     */
    BrokerController addBroker(BrokerConfig brokerConfig, MessageStoreConfig storeConfig) throws Exception;

   /**
     * Remove the broker from this container associated with the specific broker identity
     *
     * @param brokerIdentity the specific broker identity
     * @return the removed BrokerController or null if the broker doesn't exists
     */
    BrokerController removeBroker(BrokerIdentity brokerIdentity) throws Exception;

    /**
     * Return the broker controller associated with the specific broker identity
     *
     * @param brokerIdentity the specific broker identity
     * @return the associated messaging broker or null
     */
    BrokerController getBroker(BrokerIdentity brokerIdentity);

    /**
     * Return all the master brokers belong to this container
     *
     * @return the master broker list
     */
    Collection<BrokerController> getMasterBrokers();

    /**
     * Return all the slave brokers belong to this container
     *
     * @return the slave broker list
     */
    Collection<InnerSalveBrokerController> getSlaveBrokers();

   /**
     * Return all broker controller in this container
     *
     * @return all broker controller
     */
    List<BrokerController> getBrokerControllers();

    /**
     * Return the address of broker container.
     *
     * @return broker container address.
     */
    String getBrokerContainerAddr();

    /**
     * Peek the first master broker in container.
     *
     * @return the first master broker in container
     */
    BrokerController peekMasterBroker();

    /**
     * Return the config of the broker container
     *
     * @return the broker container config
     */
    BrokerContainerConfig getBrokerContainerConfig();

    /**
     * Get netty server config.
     *
     * @return netty server config
     */
    NettyServerConfig getNettyServerConfig();

   /**
     * Get netty client config.
     *
     * @return netty client config
     */
    NettyClientConfig getNettyClientConfig();

    /**
     * Return the shared BrokerOuterAPI
     *
     * @return the shared BrokerOuterAPI
     */
    BrokerOuterAPI getBrokerOuterAPI();

    /**
     * Return the shared RemotingServer
     *
     * @return the shared RemotingServer
     */
    RemotingServer getRemotingServer();
}

Data structure in BrokerContainer

public class BrokerContainer implements IBrokerContainer {
    private static final InternalLogger LOG = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
        new BasicThreadFactory.Builder()
            .namingPattern("BrokerContainerScheduledThread")
            .daemon(true)
            .build());
    private final NettyServerConfig nettyServerConfig;
    private final NettyClientConfig nettyClientConfig;
    private final BrokerOuterAPI brokerOuterAPI;
    private final ContainerClientHouseKeepingService containerClientHouseKeepingService;


    private final ConcurrentMap<BrokerIdentity, InnerSalveBrokerController> slaveBrokerControllers = new ConcurrentHashMap<>();
    private final ConcurrentMap<BrokerIdentity, InnerBrokerController> masterBrokerControllers = new ConcurrentHashMap<>();
    private final List<BrokerBootHook> brokerBootHookList = new ArrayList<>();
    private final BrokerContainerProcessor brokerContainerProcessor;
    private final Configuration configuration;
    private final BrokerContainerConfig brokerContainerConfig;


    private RemotingServer remotingServer;
    private RemotingServer fastRemotingServer;
    private ExecutorService brokerContainerExecutor;
    ……
}

The BrokerContainer will manage multiple innerBrokerControllers and innerSlaveBrokercontrollers. InnerBrokerController inherits from the brokerController and has the ability to reuse the brokerController. InnerSlaveBrokerController inherits from the innerBrokerController and has the ability to reuse the innerBrokerController, InnerBrokerController and innerSlaveBrokerController in the same BrokerContainer share the transport layer.

The BrokerContainer is configured as follows

public class BrokerContainerConfig {


    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));


    @ImportantField
    private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));


    @ImportantField
    private boolean fetchNamesrvAddrByAddressServer = false;


    @ImportantField
    private String brokerContainerIP = RemotingUtil.getLocalAddress();


    private String brokerConfigPaths = null;
    
    ……
}
  • CLI command changes

If the producer and consumer have not modified, the admin tool will add a new command to complete the add broker and remove broker operations on the BrokerContainer.

AddBrokerCommand

usage: mqadmin addBroker -b <arg> -c <arg> [-h] [-n <arg>]
 -b,--brokerConfigPath <arg>      Broker config path
 -c,--brokerContainerAddr <arg>   Broker container address
 -h,--help                        Print help
 -n,--namesrvAddr <arg>           Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876

RemoveBroker Command

usage: mqadmin removeBroker -b <arg> -c <arg> [-h] [-n <arg>]
 -b,--brokerIdentity <arg>        Information to identify a broker: clusterName:brokerName:brokerId
 -c,--brokerContainerAddr <arg>   Broker container address
 -h,--help                        Print help
 -n,--namesrvAddr <arg>           Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
  • Log format or content changes

In the BrokerContainer mode and after log separation is enabled, the default output path of the log will change, and the specific path of each broker log will change to

{user.home}/logs/{$brokerCanonicalName}_rocketmqlogs/

brokerCanonicalName is {BrokerClusterName_BrokerName_BrokerId}.

Compatibility, Deprecation, and Migration Plan

  • Are backward and forward compatibility taken into consideration?

    Yes, There is no change in the specific functions and APIs of the broker in the BrokerContainer. There is no change in the functions and APIs of the broker when the user starts the broker in the normal way (not in the BrokerContainer way), and there is no change in the interaction between the broker and the client and the nameserver.

  • Are there deprecated APIs?

    Nothing specific.

  • How do we do migration?

Scenario 1: Normal upgrade

The goal is to change the original master-slave to the mutual master-slave in the figure above.

Take this broker group offline and upgrade to a new version of the RocketMQ package. Modify the broker container configure file, configure brokerConfigPaths according to the expected master-slave relationship, and complete the specific configuration of the configuration files of the two brokers.

Each broker of the same BrokerContainer needs to have a different port number. If the configuration of the original broker's storePathRootDir and storePathCommitlog changes, data migration is also required. Copy the files in the store directory to the new directory and start the BrokerContainer.

Note: if the original broker has an amount of data, after configuring quotaPercentForDiskPartition and logicalDiskSpaceCleanForciblyThreshold, it will quickly clean up to the logical threshold, which needs to be adjusted dynamically.

Scenario 2: The node offline and cleanup, replace it to the new version

  1. Set broker that needs to be upgraded write permission to false, waiting for the data to be completely consumed
  2. Take the broker in step 1 offline and clean up relevant data
  3. Start and join the cluster in BrokerContainer mode.

Implementation Outline

We will implement the proposed changes by 3 phases.

Phase 1

Support master-slave mode.

Phase 1 can be divided into the following steps:

  1. Implementation of transport layer sharing and subRemotingServer
  2. Implement the BrokerContainer module and the admin command
  3. Implement log separation mechanism
  4. Implement clean logic for BrokerContainer Phase 1 will be supported in the 5.0 branch first.

Phase 2

Support dledger mode.

Phase 3

According to the isolation classification of thread pools, make thread pools shared.

Rejected Alternatives

How does alternatives solve the issue you proposed?

Deploy multiple broker processes directly on a single node

Pros and Cons of alternatives

Pros:

Better isolation between brokers

Cons:

  1. When deploying multiple broker processes on a single node, the resources are managed by the operating system, while the brokers in BrokerContainer are managed by the RocketMQ process. RocketMQ can better control each broker, and some resources can also be fully reused such as transport layer, thread pool, etc.
  2. When deploying multiple broker processes on a single node, it cannot provide a unified view for RocketMQ, but the BrokerContainer can provide a unified view to complete the centralized control of brokers. For example, the BrokerContainer can monitor the capacity occupation of every broker and handle them according to the situation.
  3. In order to isolate resources, deploying multiple broker processes on a single node may need to be through docker, but the broker container does not need to install docker. RocketMQ provides the ability of container natively, and the deployment and installation cost is lower.
  4. From the perspective of single process RocketMQ, the resource utilization is improved.

Why should we reject above alternatives

Based on the comparison of Pros and Cons above, RIP-31 provides BrokerContainer capability.

Clone this wiki locally