Skip to content

Commit

Permalink
add monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
HDT3213 committed Feb 16, 2024
1 parent f3ca206 commit 2e259ed
Show file tree
Hide file tree
Showing 12 changed files with 832 additions and 24 deletions.
9 changes: 8 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@
// 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch Package",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${fileDirname}"
},
{
"name": "Run Example",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/example"
"program": "${workspaceFolder}/example/getstarted"
}
]
}
118 changes: 116 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ Core Advantages:
- Works out of the box, Config Nothing and Deploy Nothing, A Redis is all you need.
- Natively adapted to the distributed environment, messages processed concurrently on multiple machines
. Workers can be added, removed or migrated at any time
- Support Redis Cluster for high availability
- Support Redis Cluster or clusters of most cloud service providers. see chapter [Cluster](./README.md#Cluster)
- Easy to use monitoring data exporter, see [Monitoring](./README.md#Monitoring)

## Install

Expand Down Expand Up @@ -73,7 +74,9 @@ func main() {
## Producer consumer distributed deployment

By default, delayqueue instances can be both producers and consumers. If your program only need producers and consumers are placed elsewhere, `delayqueue.NewProducer` is a good option for you.
By default, delayqueue instances can be both producers and consumers.

If your program only need producers and consumers are placed elsewhere, `delayqueue.NewProducer` is a good option for you.

```go
func consumer() {
Expand Down Expand Up @@ -143,6 +146,117 @@ WithDefaultRetryCount customizes the max number of retry, it effects of messages

use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message

## Monitoring

We provides Monitor to monitor the running status.

```go
monitor := delayqueue.NewMonitor("example", redisCli)
```

Monitor.ListenEvent can register a listener that can receive all internal events, so you can use it to implement customized data reporting and metrics.

The monitor can receive events from all workers, even if they are running on another server.

```go
type EventListener interface {
OnEvent(*Event)
}

// returns: close function, error
func (m *Monitor) ListenEvent(listener EventListener) (func(), error)
```

The definition of event could be found in [events.go](./events.go).

Besides, We provide a demo that uses EventListener to monitor the production and consumption amount per minute.

The complete demo code can be found in [example/monitor](./example/monitor/main.go).

```go
type MyProfiler struct {
List []*Metrics
Start int64
}

func (p *MyProfiler) OnEvent(event *delayqueue.Event) {
sinceUptime := event.Timestamp - p.Start
upMinutes := sinceUptime / 60
if len(p.List) <= int(upMinutes) {
p.List = append(p.List, &Metrics{})
}
current := p.List[upMinutes]
switch event.Code {
case delayqueue.NewMessageEvent:
current.ProduceCount += event.MsgCount
case delayqueue.DeliveredEvent:
current.DeliverCount += event.MsgCount
case delayqueue.AckEvent:
current.ConsumeCount += event.MsgCount
case delayqueue.RetryEvent:
current.RetryCount += event.MsgCount
case delayqueue.FinalFailedEvent:
current.FailCount += event.MsgCount
}
}

func main() {
queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool {
return true
})
start := time.Now()
// IMPORTANT: EnableReport must be called so monitor can do its work
queue.EnableReport()

// setup monitor
monitor := delayqueue.NewMonitor("example", redisCli)
listener := &MyProfiler{
Start: start.Unix(),
}
monitor.ListenEvent(listener)

// print metrics every minute
tick := time.Tick(time.Minute)
go func() {
for range tick {
minutes := len(listener.List)-1
fmt.Printf("%d: %#v", minutes, listener.List[minutes])
}
}()
}
```

Monitor use redis pub/sub to collect data, so it is important to call `DelayQueue.EnableReport` of all workers, to enable events reporting for monitor.

If you do not want to use redis pub/sub, you can use `DelayQueue.ListenEvent` to collect data yourself.

Please be advised, `DelayQueue.ListenEvent` can only receive events from the current instance, while monitor can receive events from all instances in the queue.

Once `DelayQueue.ListenEvent` is called, the monitor's listener will be overwritten unless EnableReport is called again to re-enable the monitor.

### Get Status

You could get Pending Count, Ready Count and Processing Count from the monitor:

```go
func (m *Monitor) GetPendingCount() (int64, error)
```

GetPendingCount returns the number of which delivery time has not arrived.

```go
func (m *Monitor) GetReadyCount() (int64, error)
```

GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered yet

```go
func (m *Monitor) GetProcessingCount() (int64, error)
```

GetProcessingCount returns the number of messages which are being processed


## Cluster

If you are using Redis Cluster, please use `NewQueueOnCluster`
Expand Down
111 changes: 110 additions & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ DelayQueue 的主要优势:
- 自动重试处理失败的消息
- 开箱即用, 无需部署或安装中间件, 只需要一个 Redis 即可工作
- 原生适配分布式环境, 可在多台机器上并发的处理消息. 可以随时增加、减少或迁移 Worker
- 支持各类 Redis 集群
- 支持各类 Redis 集群, 详见[集群](./README_CN.md#集群)
- 简单易用的监控数据导出,详见[监控](./README_CN.md#监控)

## 安装

Expand Down Expand Up @@ -137,6 +138,114 @@ WithDefaultRetryCount(count uint)

在调用 DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg 发送消息时,可以调用 WithRetryCount 为这条消息单独指定重试次数。

## 监控

我们提供了 `Monitor` 来监控运行数据:

```go
monitor := delayqueue.NewMonitor("example", redisCli)
```

我们可以使用 `Monitor.ListenEvent` 注册一个可以收到队列中所有事件的监听器, 从而实现自定义的事件上报和指标监控。

Monitor 可以受到所有 Worker 的事件, 包括运行在其它服务器上的 Worker.

```go
type EventListener interface {
OnEvent(*Event)
}

// returns: close function, error
func (m *Monitor) ListenEvent(listener EventListener) (func(), error)
```

Event 的定义在 [events.go](./events.go).

此外,我们提供了一个 Demo,它会每分钟显示一次队列中产生和处理的消息数量。

Demo 完整代码在 [example/monitor](./example/monitor/main.go).

```go
type MyProfiler struct {
List []*Metrics
Start int64
}

func (p *MyProfiler) OnEvent(event *delayqueue.Event) {
sinceUptime := event.Timestamp - p.Start
upMinutes := sinceUptime / 60
if len(p.List) <= int(upMinutes) {
p.List = append(p.List, &Metrics{})
}
current := p.List[upMinutes]
switch event.Code {
case delayqueue.NewMessageEvent:
current.ProduceCount += event.MsgCount
case delayqueue.DeliveredEvent:
current.DeliverCount += event.MsgCount
case delayqueue.AckEvent:
current.ConsumeCount += event.MsgCount
case delayqueue.RetryEvent:
current.RetryCount += event.MsgCount
case delayqueue.FinalFailedEvent:
current.FailCount += event.MsgCount
}
}

func main() {
queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool {
return true
})
start := time.Now()
// 注意: 使用 Monitor 前必须调用 EnableReport
queue.EnableReport()

// setup monitor
monitor := delayqueue.NewMonitor("example", redisCli)
listener := &MyProfiler{
Start: start.Unix(),
}
monitor.ListenEvent(listener)

// 每分钟打印一次报告
tick := time.Tick(time.Minute)
go func() {
for range tick {
minutes := len(listener.List)-1
fmt.Printf("%d: %#v", minutes, listener.List[minutes])
}
}()
}
```

Monitor 使用 redis 的发布订阅功能来收集数据,使用 Monitor 前必须在所有 Worker 处调用 `EnableReport` 来启用上报。

如果你不想使用 redis pub/sub, 可以调用 `DelayQueue.ListenEvent` 来直接收集数据。请注意,`DelayQueue.ListenEvent` 只能收到当前 Worker 的事件, 而 Monitor 可以收到所有 Worker 的事件。

另外,`DelayQueue.ListenEvent` 会覆盖掉 Monitor 的监听器,再次调用 `EnableReport` 后 Monitor 才能恢复工作。

### 获得状态信息

Monitor 也可以直接获得一些队列的状态信息。

```go
func (m *Monitor) GetPendingCount() (int64, error)
```

返回未到投递时间的消息数。

```go
func (m *Monitor) GetReadyCount() (int64, error)
```

返回已到投递时间但尚未发给 Worker 的消息数。

```go
func (m *Monitor) GetProcessingCount() (int64, error)
```

返回 Worker 正在处理中的消息数。

## 集群

如果需要在 Redis Cluster 上工作, 请使用 `NewQueueOnCluster`:
Expand Down
Loading

0 comments on commit 2e259ed

Please sign in to comment.