Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
mochi-co authored Oct 23, 2024
2 parents f7cf1f3 + 47536b7 commit 94c3d45
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 9 deletions.
3 changes: 2 additions & 1 deletion README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func main() {
server := mqtt.New(&mqtt.Options{
Capabilities: mqtt.Capabilities{
MaximumSessionExpiryInterval: 3600,
MaximumClientWritesPending: 3,
Compatibilities: mqtt.Compatibilities{
ObscureNotAuthorized: true,
},
Expand All @@ -207,7 +208,7 @@ server := mqtt.New(&mqtt.Options{
InlineClient: false,
})
```
请参考 mqtt.Options、mqtt.Capabilities 和 mqtt.Compatibilities 结构体,以查看完整的所有服务端选项。ClientNetWriteBufferSize 和 ClientNetReadBufferSize 可以根据你的需求配置调整每个客户端的内存使用状况。
请参考 mqtt.Options、mqtt.Capabilities 和 mqtt.Compatibilities 结构体,以查看完整的所有服务端选项。 ClientNetWriteBufferSize 和 ClientNetReadBufferSize 可以根据你的需求配置调整每个客户端的内存使用状况。其中 Capabilities.MaximumClientWritesPending 的大小会影响服务器运行内存占用,如果 IoT 设备同时在线的数量比较多,设置的值很大,尽管没有收发数据,服务器运行内存占用也会增加很多,默认该数值为 1024*8 ,可以根据实际情况调整该参数

### 默认配置说明(Default Configuration Notes)

Expand Down
2 changes: 2 additions & 0 deletions README-JP.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ TLSを設定するには`*listeners.Config`を渡すことができます。
server := mqtt.New(&mqtt.Options{
Capabilities: mqtt.Capabilities{
MaximumSessionExpiryInterval: 3600,
MaximumClientWritesPending: 3,
Compatibilities: mqtt.Compatibilities{
ObscureNotAuthorized: true,
},
Expand All @@ -181,6 +182,7 @@ server := mqtt.New(&mqtt.Options{

mqtt.Options、mqtt.Capabilities、mqtt.Compatibilitiesの構造体はオプションの理解に役立ちます。
必要に応じて`ClientNetWriteBufferSize``ClientNetReadBufferSize`はクライアントの使用するメモリに合わせて設定できます。
`Capabilities.MaximumClientWritesPending`のサイズは、サーバーのメモリ使用量に影響を与えます。IoTデバイスが同時にオンラインで多数存在する場合、また設定値が非常に大きい場合、データの送受信がなくても、サーバーのメモリ使用量は大幅に増加します。デフォルト値は1024*8で、実際の状況に応じてこのパラメータを調整することができます。

### デフォルト設定に関する注意事項

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ A number of configurable options are available which can be used to alter the be
server := mqtt.New(&mqtt.Options{
Capabilities: mqtt.Capabilities{
MaximumSessionExpiryInterval: 3600,
MaximumClientWritesPending: 3,
Compatibilities: mqtt.Compatibilities{
ObscureNotAuthorized: true,
},
Expand All @@ -209,7 +210,7 @@ server := mqtt.New(&mqtt.Options{
})
```

Review the mqtt.Options, mqtt.Capabilities, and mqtt.Compatibilities structs for a comprehensive list of options. `ClientNetWriteBufferSize` and `ClientNetReadBufferSize` can be configured to adjust memory usage per client, based on your needs.
Review the mqtt.Options, mqtt.Capabilities, and mqtt.Compatibilities structs for a comprehensive list of options. `ClientNetWriteBufferSize` and `ClientNetReadBufferSize` can be configured to adjust memory usage per client, based on your needs. The size of `Capabilities.MaximumClientWritesPending` will affect the memory usage of the server. If the number of IoT devices online at the same time is large, and the set value is very large, even if there is no data transmission, the memory usage of the server will increase a lot. The default value is 1024*8, and this parameter can be adjusted according to the actual situation.

### Default Configuration Notes

Expand Down
2 changes: 2 additions & 0 deletions hooks/storage/badger/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
TopicName: pk.TopicName,
Payload: pk.Payload,
Created: pk.Created,
Client: cl.ID,
Origin: pk.Origin,
Properties: storage.MessageProperties{
PayloadFormat: props.PayloadFormat,
Expand Down Expand Up @@ -319,6 +320,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
in := &storage.Message{
ID: inflightKey(cl, pk),
T: storage.InflightKey,
Client: cl.ID,
Origin: pk.Origin,
PacketID: pk.PacketID,
FixedHeader: pk.FixedHeader,
Expand Down
2 changes: 2 additions & 0 deletions hooks/storage/bolt/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
TopicName: pk.TopicName,
Payload: pk.Payload,
Created: pk.Created,
Client: cl.ID,
Origin: pk.Origin,
Properties: storage.MessageProperties{
PayloadFormat: props.PayloadFormat,
Expand Down Expand Up @@ -287,6 +288,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
in := &storage.Message{
ID: inflightKey(cl, pk),
T: storage.InflightKey,
Client: cl.ID,
Origin: pk.Origin,
FixedHeader: pk.FixedHeader,
TopicName: pk.TopicName,
Expand Down
2 changes: 2 additions & 0 deletions hooks/storage/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
TopicName: pk.TopicName,
Payload: pk.Payload,
Created: pk.Created,
Client: cl.ID,
Origin: pk.Origin,
Properties: storage.MessageProperties{
PayloadFormat: props.PayloadFormat,
Expand Down Expand Up @@ -295,6 +296,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
in := &storage.Message{
ID: inflightKey(cl, pk),
T: storage.InflightKey,
Client: cl.ID,
Origin: pk.Origin,
PacketID: pk.PacketID,
FixedHeader: pk.FixedHeader,
Expand Down
2 changes: 2 additions & 0 deletions hooks/storage/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
TopicName: pk.TopicName,
Payload: pk.Payload,
Created: pk.Created,
Client: cl.ID,
Origin: pk.Origin,
Properties: storage.MessageProperties{
PayloadFormat: props.PayloadFormat,
Expand Down Expand Up @@ -317,6 +318,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
in := &storage.Message{
ID: inflightKey(cl, pk),
T: storage.InflightKey,
Client: cl.ID,
Origin: pk.Origin,
FixedHeader: pk.FixedHeader,
TopicName: pk.TopicName,
Expand Down
1 change: 1 addition & 0 deletions hooks/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type Message struct {
Payload []byte `json:"payload"` // the message payload (if retained)
T string `json:"t,omitempty"` // the data type
ID string `json:"id,omitempty" storm:"id"` // the storage key
Client string `json:"client,omitempty"` // the client id the message is for
Origin string `json:"origin,omitempty"` // the id of the client who sent the message
TopicName string `json:"topic_name,omitempty"` // the topic the message was sent to (if retained)
FixedHeader packets.FixedHeader `json:"fixedheader"` // the header properties of the message
Expand Down
6 changes: 3 additions & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

const (
Version = "2.6.4" // the current server version.
Version = "2.6.5" // the current server version.
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
LocalListener = "local"
InlineClientId = "inline"
Expand Down Expand Up @@ -560,7 +560,7 @@ func (s *Server) validateConnect(cl *Client, pk packets.Packet) packets.Code {
// connection ID. If clean is true, the state of any previously existing client
// session is abandoned.
func (s *Server) inheritClientSession(pk packets.Packet, cl *Client) bool {
if existing, ok := s.Clients.Get(pk.Connect.ClientIdentifier); ok {
if existing, ok := s.Clients.Get(cl.ID); ok {
_ = s.DisconnectClient(existing, packets.ErrSessionTakenOver) // [MQTT-3.1.4-3]
if pk.Connect.Clean || (existing.Properties.Clean && existing.Properties.ProtocolVersion < 5) { // [MQTT-3.1.2-4] [MQTT-3.1.4-4]
s.UnsubscribeClient(existing)
Expand Down Expand Up @@ -1672,7 +1672,7 @@ func (s *Server) loadClients(v []storage.Client) {
// loadInflight restores inflight messages from the datastore.
func (s *Server) loadInflight(v []storage.Message) {
for _, msg := range v {
if client, ok := s.Clients.Get(msg.Origin); ok {
if client, ok := s.Clients.Get(msg.Client); ok {
client.State.Inflight.Set(msg.ToPacket())
}
}
Expand Down
8 changes: 4 additions & 4 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3416,10 +3416,10 @@ func TestServerLoadInflightMessages(t *testing.T) {
require.Equal(t, 3, s.Clients.Len())

v := []storage.Message{
{Origin: "mochi", PacketID: 1, Payload: []byte("hello world"), TopicName: "a/b/c"},
{Origin: "mochi", PacketID: 2, Payload: []byte("yes"), TopicName: "a/b/c"},
{Origin: "zen", PacketID: 3, Payload: []byte("hello world"), TopicName: "a/b/c"},
{Origin: "mochi-co", PacketID: 4, Payload: []byte("hello world"), TopicName: "a/b/c"},
{Client: "mochi", Origin: "mochi", PacketID: 1, Payload: []byte("hello world"), TopicName: "a/b/c"},
{Client: "mochi", Origin: "mochi", PacketID: 2, Payload: []byte("yes"), TopicName: "a/b/c"},
{Client: "zen", Origin: "zen", PacketID: 3, Payload: []byte("hello world"), TopicName: "a/b/c"},
{Client: "mochi-co", Origin: "mochi-co", PacketID: 4, Payload: []byte("hello world"), TopicName: "a/b/c"},
}
s.loadInflight(v)

Expand Down

0 comments on commit 94c3d45

Please sign in to comment.