Skip to content

Commit

Permalink
fix(0021): change replayq to msg_q in the diagrams
Browse files Browse the repository at this point in the history
  • Loading branch information
terry-xiaoyu committed Jul 15, 2022
1 parent 1dec42e commit 58fc11d
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 8 deletions.
Binary file modified active/0021-assets/new_batch_process.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
22 changes: 14 additions & 8 deletions active/0021-msg-batch-for-resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ In emqx 4.x and previous versions, only 2 drivers implemented the message queue,
Before this change, the batching mechanism is implemented by each driver.

Taking the MySQL as an example, we added a pool of `batcher` processes in front of the MySQL driver.
And if we want to queue the message locally before sending SQL queries, we have to add `replayq` to each of the `mysql worker`.
This is shown by the following figure:
And if we want to queue the message locally before sending SQL queries, we have to add message queue to each of the `mysql worker`. This is shown by the following figure:

![Old Batching](0021-assets/old_batch_process.png)

Expand All @@ -55,15 +54,15 @@ The messages are first saved by the worker to the queue (which can be memory or

Here is the sequences for querying a resource after the resource workers are added:

![Resource Worker Sequences](0021-assets/resource-worker-sequences.drawio.png)
![Resource Worker Sequences](0021-assets/resource_worker_sequences_solution1.drawio.png)

- When creating the resource worker pool, we can specify the `max_batch_num`, `batch_interval` parameters to control the batching process.
- Every time a caller calls the resource worker, it can specify `query_mode = sync | async` for control whether wait the result or not.

An improved solution is to let the MQTT connection process append to the message queue directly, and then the resource worker (the consumer process) fetch messages from the queue:
![Resource Worker Sequences S2](0021-assets/resource_worker_sequences_solution2.drawio.png)

This way we separate the producers (the MQTT connection processes) from the consumers (the resource workers). But for now the `replayq` doesn't support accessing by multiple modules concurrently, so we may need more works on it.
This way we separate the producers (the MQTT connection processes) from the consumers (the resource workers). But for now the `replayq` doesn't support accessing by multiple modules concurrently, so we may need to refactor it for concurrent accessing, or use another underlying DB such as RocksDB.

### The Layers Before the Change

Expand Down Expand Up @@ -95,7 +94,7 @@ We suggest introduce the "resource workers" into the resource layer.
So that the old resource layer is divided into two parts: "resource workers" and "resource management".

The "resource workers" is a pool maintains several worker processes.
It is the component for querying external resources, maintaining the message queue (`replayq`), and sending the messages to the drivers.
It is the component for querying external resources, maintaining the message queue, and sending the messages to the drivers.

The "resource management" part is the old resource layer, and remains unchanged as before, which is responsible for resource management operations.

Expand All @@ -109,11 +108,18 @@ The "resource management" part is the old resource layer, and remains unchanged

### RocksDB vs. ReplayQ

There are two choices to implement the message queue. One is the mnesia database using RocksDB as the backend, and the other is the [replayq](https://github.com/emqx/replayq).
There are two choices to implement the message queue. One is the mnesia database using [RocksDB](https://github.com/emqx/erlang-rocksdb) as the backend, and the other is the [ReplayQ](https://github.com/emqx/replayq).

We prefer the `replayq`, mainly because in this feature messages are always added and accessed in a queue, we never access data by primary keys like a KV database. This is exactly the applicable scenario of `replayq`. The data files will be stored in the specified directory of the local file system, which is very simple.
The `ReplayQ` is a simple disk/mem queue based on segment files. It was first added to the EMQX's Kafka driver in 2018, it has experienced several emqx versions and has been proved to be very stable.

The `replayq` was added to the Erlang Kafka driver in 2018, it has experienced several emqx versions and has been proved to be very stable.
The `RocksDB` was added into emqx 5.0 as the backend of mneisa, it is a NIF of the upstream RocksDB written in C (https://github.com/facebook/rocksdb).

The persistent message queue feature is exactly the applicable scenario of `ReplayQ`, as messages are always added and accessed in a FIFO manner, we never access data by primary keys like a KV database.
The data files will be stored in the specified directory of the local file system, which is very simple.

But there's some limitations in `ReplayQ`, mainly because it is a simple queue maintained in a single erlang process, it doesn't support concurrently accessing by multiple processes. So we may need some work to improve that.

Another way is implement a persistent queue based on RocksDB. It also need some more works.

## Configuration Changes

Expand Down

0 comments on commit 58fc11d

Please sign in to comment.