From 56fb3c46888f5e5402b968426fb674749f21fc4e Mon Sep 17 00:00:00 2001 From: leiju Date: Tue, 2 Apr 2024 23:23:08 +0800 Subject: [PATCH 1/3] add a new event hook, name is OnSendToSubscriber --- hooks.go | 21 +++++++++++++++++++++ server.go | 6 ++++++ 2 files changed, 27 insertions(+) diff --git a/hooks.go b/hooks.go index 4da709f7..68af4f39 100644 --- a/hooks.go +++ b/hooks.go @@ -38,6 +38,7 @@ const ( OnUnsubscribe OnUnsubscribed OnPublish + OnSendToSubscriber OnPublished OnPublishDropped OnRetainMessage @@ -97,6 +98,7 @@ type Hook interface { OnUnsubscribe(cl *Client, pk packets.Packet) packets.Packet OnUnsubscribed(cl *Client, pk packets.Packet) OnPublish(cl *Client, pk packets.Packet) (packets.Packet, error) + OnSendToSubscriber(cl *Client, pk packets.Packet) bool OnPublished(cl *Client, pk packets.Packet) OnPublishDropped(cl *Client, pk packets.Packet) OnRetainMessage(cl *Client, pk packets.Packet, r int64) @@ -428,6 +430,20 @@ func (h *Hooks) OnPublished(cl *Client, pk packets.Packet) { } } +// OnPublishToClient is called before the message is sent to the subscriber. +// Default return true. +func (h *Hooks) OnSendToSubscriber(cl *Client, pk packets.Packet) bool { + for _, hook := range h.GetAll() { + if hook.Provides(OnSendToSubscriber) { + if ok := hook.OnSendToSubscriber(cl, pk); !ok { + return false + } + } + } + + return true +} + // OnPublishDropped is called when a message to a client was dropped instead of delivered // such as when a client is too slow to respond. func (h *Hooks) OnPublishDropped(cl *Client, pk packets.Packet) { @@ -800,6 +816,11 @@ func (h *HookBase) OnPublish(cl *Client, pk packets.Packet) (packets.Packet, err // OnPublished is called when a client has published a message to subscribers. func (h *HookBase) OnPublished(cl *Client, pk packets.Packet) {} +// OnSendToSubscriber is called before the message is sent to the subscriber. +func (h *HookBase) OnSendToSubscriber(cl *Client, pk packets.Packet) bool { + return true +} + // OnPublishDropped is called when a message to a client is dropped instead of being delivered. func (h *HookBase) OnPublishDropped(cl *Client, pk packets.Packet) {} diff --git a/server.go b/server.go index d097e85c..2c66569e 100644 --- a/server.go +++ b/server.go @@ -1011,6 +1011,12 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet if !s.hooks.OnACLCheck(cl, pk.TopicName, false) { return out, packets.ErrNotAuthorized } + + if !s.hooks.OnSendToSubscriber(cl, pk) { + // write to the log? + return out, nil + } + if !sub.FwdRetainedFlag && ((cl.Properties.ProtocolVersion == 5 && !sub.RetainAsPublished) || cl.Properties.ProtocolVersion < 5) { // ![MQTT-3.3.1-13] [v3 MQTT-3.3.1-9] out.FixedHeader.Retain = false // [MQTT-3.3.1-12] } From bba0e49f60e59346357a08bc32ef31aee7b16651 Mon Sep 17 00:00:00 2001 From: leiju Date: Fri, 5 Apr 2024 17:09:12 +0800 Subject: [PATCH 2/3] add OnSendToSubscriber hook unit test --- hooks_test.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/hooks_test.go b/hooks_test.go index bb87accd..e356d2e9 100644 --- a/hooks_test.go +++ b/hooks_test.go @@ -78,6 +78,10 @@ func (h *modifiedHookBase) OnPublish(cl *Client, pk packets.Packet) (packets.Pac return pk, nil } +func (h *modifiedHookBase) OnSendToSubscriber(cl *Client, pk packets.Packet) bool { + return false +} + func (h *modifiedHookBase) OnPacketRead(cl *Client, pk packets.Packet) (packets.Packet, error) { if h.fail { if h.err != nil { @@ -244,6 +248,7 @@ func TestHooksNonReturns(t *testing.T) { h.OnSubscribed(cl, packets.Packet{}, []byte{1}) h.OnUnsubscribed(cl, packets.Packet{}) h.OnPublished(cl, packets.Packet{}) + h.OnSendToSubscriber(cl, packets.Packet{}) h.OnPublishDropped(cl, packets.Packet{}) h.OnRetainMessage(cl, packets.Packet{}, 0) h.OnRetainPublished(cl, packets.Packet{}) @@ -358,6 +363,21 @@ func TestHooksOnPublish(t *testing.T) { require.Equal(t, uint16(10), pk.PacketID) } +func TestHooksOnSendToSubscriber(t *testing.T) { + h := new(Hooks) + h.Log = logger + + ok := h.OnSendToSubscriber(new(Client), packets.Packet{PacketID: 10}) + require.True(t, ok) + + hook := new(modifiedHookBase) + err := h.Add(hook, nil) + require.NoError(t, err) + + ok = h.OnSendToSubscriber(new(Client), packets.Packet{PacketID: 10}) + require.False(t, ok) +} + func TestHooksOnPacketRead(t *testing.T) { h := new(Hooks) h.Log = logger @@ -610,6 +630,11 @@ func TestHookBaseOnPublish(t *testing.T) { require.Equal(t, uint16(10), pk.PacketID) } +func TestHookBaseOnSendToSubscriber(t *testing.T) { + h := new(HookBase) + ok := h.OnSendToSubscriber(new(Client), packets.Packet{PacketID: 10}) + require.True(t, ok) +} func TestHookBaseOnPacketRead(t *testing.T) { h := new(HookBase) pk, err := h.OnPacketRead(new(Client), packets.Packet{PacketID: 10}) From 3fed5bc60f847f0e2fb52c6baa3f56cff8dbc9c5 Mon Sep 17 00:00:00 2001 From: leiju <40145943+leijux@users.noreply.github.com> Date: Sun, 7 Apr 2024 11:33:03 +0800 Subject: [PATCH 3/3] Update README.md Add OnPublishToClient description --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 47601d60..41a01d84 100644 --- a/README.md +++ b/README.md @@ -378,6 +378,7 @@ The function signatures for all the hooks and `mqtt.Hook` interface can be found | OnUnsubscribe | Called when a client unsubscribes from one or more filters. Allows packet modification. | | OnUnsubscribed | Called when a client successfully unsubscribes from one or more filters. | | OnPublish | Called when a client publishes a message. Allows packet modification. | +| OnSendToSubscriber | Called when a message is about to be sent to the subscriber. | | OnPublished | Called when a client has published a message to subscribers. | | OnPublishDropped | Called when a message to a client is dropped before delivery, such as if the client is taking too long to respond. | | OnRetainMessage | Called then a published message is retained. |