-
Notifications
You must be signed in to change notification settings - Fork 234
Subscriber
AresDB subscriber is the core ingestion component that connects external data stream with AresDB. It consumes messages from Kafka, transforms data based on user-defined rules and inserts data into AresDB data nodes. supports both single node mode and distributed mode. The differences are highlighted below.
In AresDB’s single node mode, the subscriber first registers itself in etcd as a lease node; then periodically fetches job assignments from the controller, consumes messages from Kafka, transforms and batches messages; finally directly sends a batch of rows to one AresDB server.
In AresDB’s distributed mode, the subscriber first registers itself in etcd as a lease node; then periodically fetches job assignments and enum dictionary from the controller, consumes messages from Kafka, transforms messages to AresDB upsert batch format, partitions upsert batches based on table primary keys; finally publishes batches to its specified partitionID in Kafka. On AresDB cluster side, each server consumes upsert batches for the assigned shards.
AresDB subscriber first fetches all job assignments from AresDB controller and then starts all consumers. Once initialization is complete, AresDB subscriber is running in four scenarios: Normal
- Normal
In this case, AresDB subscriber continuously consumes messages from Kafka, transforms data, upserts batch into the sink(AresDB server or msg_out_topic in Kafka), and then commit the message offset.
- Add a job config
AresDB subscriber periodically checks AresDB controller. If there is a new job (ie, “add a job config”), it first creates a consumer for the topic, and then continuously consumes messages from Kafka, transforms data, upserts batch into AresDB server, and then commit the message offset.
- Update a job config
AresDB subscriber periodically checks AresDB controller. If there is a job config update (ie, “update job config”), it first restarts consumer for the topic, and then continuously consumes messages from Kafka, transforms data, upserts batch into AresDB server, and then commit the message offset.
- Delete a job config
AresDB subscriber periodically checks AresDB controller. If there is a job config delete (ie, “delete a table config”), It triggers the consumer is closed.
An AresDB subscriber uses the namespace concept to achieve scalability. It can support local static configuration or dynamic configuration from AresDB Controller.
In a static configuration file, a namespace is applied on an AresDB cluster level and job level. An AresDB cluster namespace defines a set of AresDB hosts; a job namespace defines a set of job names. One AresDB subscriber is an instance which loads a pair of AresDB namespace and job namespace configuration. For example:
ares:
# A list of namespaces. Each namespace includes a list of AresDB clusters
namespaces:
dev01:
- dev-ares01
- dev-ares02
# A list of AresDB hosts. Each cluster includes master node address
hosts:
dev-ares01:
address: "localhost:9998"
timeout: 5
dev-ares02:
address: "localhost:9999"
timeout: 5
# A list of jobs which are defined in config/jobs
jobs:
# A list of namespaces. Each namespace includes a list of jobs.
namespaces:
job-test1:
- test1
job-test2:
- test2
Internally AresDB subscriber uses a map to maintain the relationship between sink(AresDB server or Kafka) and jobs. For example.
AresDB subscriber is not only able to maximize parallelism, but also to guarantee message lossless. Internally, one job has one or more drivers; one driver has one or more processors; one processor has one or more threads to handle batch upserts. The job commits Kafka offset once a batch upsert succeeds.
A processor maintains one taskQueue and one batch queue. It has one consumer to consume&decode messages from Kafka and put messages into taskQueue; one batch worker fetches messages from taskQueue, creates batches and save to batch queue; one DB worker fetches batches from batch queue, parses/transforms a batch, creates upsert batches, starts one or more threads to save messages into AresDB, finally commits Kafka offset for this batch.