Skip to content

Commit 53dc3ac

Browse files
authored
feature(persistence): adjust storage (#24)
* feature(goroutine): add goroutine * feature(persistence): adjust storage * feature(persistence): adjust storage * feature(persistence): update storage
1 parent a5720f8 commit 53dc3ac

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+4116
-352
lines changed

cmd/lighthouse/config.yaml

+7-1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,10 @@ mqtt:
33
log:
44
level: debug
55
format: json
6-
filename: log.log
6+
filename: log.log
7+
persistence:
8+
session:
9+
type: memory
10+
queue:
11+
type: memory
12+

cmd/lighthouse/main.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ import (
44
_ "embed"
55
"github.com/go-playground/validator/v10"
66
"github.com/yunqi/lighthouse/config"
7+
_ "github.com/yunqi/lighthouse/internal/persistence/session/memory"
8+
_ "github.com/yunqi/lighthouse/internal/persistence/session/redis"
79
"github.com/yunqi/lighthouse/internal/server"
810
"github.com/yunqi/lighthouse/internal/xlog"
911
"gopkg.in/yaml.v3"
1012
"net/http"
13+
_ "net/http/pprof"
1114
)
1215

1316
//go:embed config.yaml
@@ -30,9 +33,9 @@ func main() {
3033
panic(err)
3134
}
3235
go func() {
33-
http.ListenAndServe("localhost:6060", nil)
36+
_ = http.ListenAndServe("localhost:6060", nil)
3437
}()
3538

36-
newServer := server.NewServer(server.WithTcpListen(":1883"))
39+
newServer := server.NewServer(server.WithTcpListen(":1883"), server.WithPersistence(&c.Persistence))
3740
newServer.ServeTCP()
3841
}

config/config.go

+21-20
Original file line numberDiff line numberDiff line change
@@ -21,61 +21,62 @@ import (
2121
)
2222

2323
type Config struct {
24-
Mqtt Mqtt `yaml:"mqtt"`
25-
Log Log `yaml:"log"`
24+
Mqtt Mqtt `yaml:"mqtt"`
25+
Log Log `yaml:"log"`
26+
Persistence Persistence `yaml:"persistence"`
2627
}
2728

2829
type Mqtt struct {
2930
// SessionExpiry is the maximum session expiry interval in seconds.
30-
SessionExpiry time.Duration `yaml:"session_expiry"`
31+
SessionExpiry time.Duration `yaml:"sessionExpiry"`
3132
// SessionExpiryCheckInterval is the interval time for session expiry checker to check whether there
3233
// are expired sessions.
33-
SessionExpiryCheckInterval time.Duration `yaml:"session_expiry_check_interval"`
34+
SessionExpiryCheckInterval time.Duration `yaml:"sessionExpiryCheckInterval"`
3435
// MessageExpiry is the maximum lifetime of the message in seconds.
3536
// If a message in the queue is not sent in MessageExpiry time, it will be removed, which means it will not be sent to the subscriber.
36-
MessageExpiry time.Duration `yaml:"message_expiry"`
37+
MessageExpiry time.Duration `yaml:"messageExpiry"`
3738
// InflightExpiry is the lifetime of the "inflight" message in seconds.
3839
// If a "inflight" message is not acknowledged by a client in InflightExpiry time, it will be removed when the message queue is full.
39-
InflightExpiry time.Duration `yaml:"inflight_expiry"`
40+
InflightExpiry time.Duration `yaml:"inflightExpiry"`
4041
// MaxPacketSize is the maximum packet size that the server is willing to accept from the client
41-
MaxPacketSize uint32 `yaml:"max_packet_size"`
42+
MaxPacketSize uint32 `yaml:"maxPacketSize"`
4243
// ReceiveMax limits the number of QoS 1 and QoS 2 publications that the server is willing to process concurrently for the client.
43-
ReceiveMax uint16 `yaml:"server_receive_maximum"`
44+
ReceiveMax uint16 `yaml:"serverReceiveMaximum"`
4445
// MaxKeepAlive is the maximum keep alive time in seconds allows by the server.
4546
// If the client requests a keepalive time bigger than MaxKeepalive,
4647
// the server will use MaxKeepAlive as the keepalive time.
4748
// In this case, if the client version is v5, the server will set MaxKeepalive into CONNACK to inform the client.
4849
// But if the client version is 3.x, the server has no way to inform the client that the keepalive time has been changed.
49-
MaxKeepAlive uint16 `yaml:"max_keepalive"`
50+
MaxKeepAlive uint16 `yaml:"maxKeepalive"`
5051
// TopicAliasMax indicates the highest value that the server will accept as a Topic Alias sent by the client.
5152
// No-op if the client version is MQTTv3.x
52-
TopicAliasMax uint16 `yaml:"topic_alias_maximum"`
53+
TopicAliasMax uint16 `yaml:"topicAliasMaximum"`
5354
// SubscriptionIDAvailable indicates whether the server supports Subscription Identifiers.
5455
// No-op if the client version is MQTTv3.x .
55-
SubscriptionIDAvailable bool `yaml:"subscription_identifier_available"`
56+
SubscriptionIDAvailable bool `yaml:"subscriptionIdentifierAvailable"`
5657
// SharedSubAvailable indicates whether the server supports Shared Subscriptions.
57-
SharedSubAvailable bool `yaml:"shared_subscription_available"`
58+
SharedSubAvailable bool `yaml:"sharedSubscriptionAvailable"`
5859
// WildcardSubAvailable indicates whether the server supports Wildcard Subscriptions.
59-
WildcardAvailable bool `yaml:"wildcard_subscription_available"`
60+
WildcardAvailable bool `yaml:"wildcardSubscriptionAvailable"`
6061
// RetainAvailable indicates whether the server supports retained messages.
61-
RetainAvailable bool `yaml:"retain_available"`
62+
RetainAvailable bool `yaml:"retainAvailable"`
6263
// MaxQueuedMsg is the maximum queue length of the outgoing messages.
6364
// If the queue is full, some message will be dropped.
6465
// The message dropping strategy is described in the document of the persistence/queue.Store interface.
65-
MaxQueueMessages int `yaml:"max_queue_messages"`
66+
MaxQueueMessages int `yaml:"maxQueueMessages"`
6667
// MaxInflight limits inflight message length of the outgoing messages.
6768
// Inflight message is also stored in the message queue, so it must be less than or equal to MaxQueuedMsg.
6869
// Inflight message is the QoS 1 or QoS 2 message that has been sent out to a client but not been acknowledged yet.
69-
MaxInflight uint16 `yaml:"max_inflight"`
70+
MaxInflight uint16 `yaml:"maxInflight"`
7071
// MaximumQoS is the highest QOS level permitted for a Publish.
71-
MaximumQoS uint8 `yaml:"maximum_qos"`
72+
MaximumQoS uint8 `yaml:"maximumQos"`
7273
// QueueQos0Msg indicates whether to store QoS 0 message for a offline session.
73-
QueueQos0Msg bool `yaml:"queue_qos0_messages"`
74+
QueueQos0Msg bool `yaml:"queueQos0Messages"`
7475
// DeliveryMode is the delivery mode. The possible value can be "overlap" or "onlyonce".
7576
// It is possible for a client’s subscriptions to overlap so that a published message might match multiple filters.
7677
// When set to "overlap" , the server will deliver one message for each matching subscription and respecting the subscription’s QoS in each case.
7778
// When set to "onlyOnce",the server will deliver the message to the client respecting the maximum QoS of all the matching subscriptions.
78-
DeliveryMode string `yaml:"delivery_mode"`
79+
DeliveryMode string `yaml:"deliveryMode"`
7980
// AllowZeroLenClientId indicates whether to allow a client to connect with empty client id.
80-
AllowZeroLenClientId bool `yaml:"allow_zero_len_client_id"`
81+
AllowZeroLenClientId bool `yaml:"allowZeroLenClientId"`
8182
}

config/config.yaml

-2
This file was deleted.

config/config_test.go

-36
This file was deleted.

config/persistence.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package config
2+
3+
import "time"
4+
5+
type (
6+
Persistence struct {
7+
Session StoreType `yaml:"session"`
8+
Queue StoreType `yaml:"queue"`
9+
}
10+
11+
StoreType struct {
12+
Type string `yaml:"type"` // memory|redis
13+
Redis RedisStoreType `yaml:"redis"`
14+
}
15+
16+
RedisStoreType struct {
17+
Type string `yaml:"nodeType"`
18+
// Addr is the redis server address.
19+
// If empty, use "127.0.0.1:6379" as default.
20+
Addr string `yaml:"addr"`
21+
// Password is the redis password.
22+
Password string `yaml:"password"`
23+
// Database is the number of the redis database to be connected.
24+
Database uint `yaml:"database"`
25+
// MaxIdle is the maximum number of idle connections in the pool.
26+
// If nil, use 1000 as default.
27+
// This value will pass to redis.Pool.MaxIde.
28+
MaxIdle *uint `yaml:"maxIdle"`
29+
// MaxActive is the maximum number of connections allocated by the pool at a given time.
30+
// If nil, use 0 as default.
31+
// If zero, there is no limit on the number of connections in the pool.
32+
// This value will pass to redis.Pool.MaxActive.
33+
MaxActive *uint `yaml:"maxActive"`
34+
// Close connections after remaining idle for this duration. If the value
35+
// is zero, then idle connections are not closed. Applications should set
36+
// the timeout to a value less than the server's timeout.
37+
// Ff zero, use 240 * time.Second as default.
38+
// This value will pass to redis.Pool.IdleTimeout.
39+
IdleTimeout time.Duration `yaml:"idleTimeout"`
40+
Timeout time.Duration `yaml:"timeout"`
41+
}
42+
)

go.mod

+7-5
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,32 @@ module github.com/yunqi/lighthouse
33
go 1.17
44

55
require (
6-
github.com/chenquan/go-pkg v0.1.9
6+
github.com/bytedance/gopkg v0.0.0-20220118075514-1372042b2bbc
7+
github.com/chenquan/go-pkg v0.1.18
8+
github.com/go-playground/validator/v10 v10.10.0
79
github.com/go-redis/redis/v8 v8.11.4
810
github.com/golang/mock v1.6.0
911
github.com/gorilla/websocket v1.4.2
1012
github.com/panjf2000/ants/v2 v2.4.7
1113
github.com/stretchr/testify v1.7.0
1214
go.uber.org/zap v1.19.0
13-
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
15+
gopkg.in/natefinch/lumberjack.v2 v2.0.0
1416
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
1517
)
1618

1719
require (
20+
github.com/BurntSushi/toml v1.0.0 // indirect
1821
github.com/cespare/xxhash/v2 v2.1.2 // indirect
1922
github.com/davecgh/go-spew v1.1.1 // indirect
2023
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
2124
github.com/go-playground/locales v0.14.0 // indirect
2225
github.com/go-playground/universal-translator v0.18.0 // indirect
23-
github.com/go-playground/validator/v10 v10.10.0 // indirect
2426
github.com/leodido/go-urn v1.2.1 // indirect
2527
github.com/pmezard/go-difflib v1.0.0 // indirect
2628
go.uber.org/atomic v1.7.0 // indirect
2729
go.uber.org/multierr v1.6.0 // indirect
2830
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect
29-
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 // indirect
31+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
32+
golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe // indirect
3033
golang.org/x/text v0.3.6 // indirect
31-
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
3234
)

0 commit comments

Comments
 (0)