Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OnSendToSubscriber Event Hooks #383

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ The function signatures for all the hooks and `mqtt.Hook` interface can be found
| OnUnsubscribe | Called when a client unsubscribes from one or more filters. Allows packet modification. |
| OnUnsubscribed | Called when a client successfully unsubscribes from one or more filters. |
| OnPublish | Called when a client publishes a message. Allows packet modification. |
| OnSendToSubscriber | Called when a message is about to be sent to the subscriber. |
| OnPublished | Called when a client has published a message to subscribers. |
| OnPublishDropped | Called when a message to a client is dropped before delivery, such as if the client is taking too long to respond. |
| OnRetainMessage | Called then a published message is retained. |
Expand Down
21 changes: 21 additions & 0 deletions hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
OnUnsubscribe
OnUnsubscribed
OnPublish
OnSendToSubscriber
OnPublished
OnPublishDropped
OnRetainMessage
Expand Down Expand Up @@ -97,6 +98,7 @@ type Hook interface {
OnUnsubscribe(cl *Client, pk packets.Packet) packets.Packet
OnUnsubscribed(cl *Client, pk packets.Packet)
OnPublish(cl *Client, pk packets.Packet) (packets.Packet, error)
OnSendToSubscriber(cl *Client, pk packets.Packet) bool
OnPublished(cl *Client, pk packets.Packet)
OnPublishDropped(cl *Client, pk packets.Packet)
OnRetainMessage(cl *Client, pk packets.Packet, r int64)
Expand Down Expand Up @@ -428,6 +430,20 @@ func (h *Hooks) OnPublished(cl *Client, pk packets.Packet) {
}
}

// OnPublishToClient is called before the message is sent to the subscriber.
// Default return true.
func (h *Hooks) OnSendToSubscriber(cl *Client, pk packets.Packet) bool {
for _, hook := range h.GetAll() {
if hook.Provides(OnSendToSubscriber) {
if ok := hook.OnSendToSubscriber(cl, pk); !ok {
return false
}
}
}

return true
}

// OnPublishDropped is called when a message to a client was dropped instead of delivered
// such as when a client is too slow to respond.
func (h *Hooks) OnPublishDropped(cl *Client, pk packets.Packet) {
Expand Down Expand Up @@ -800,6 +816,11 @@ func (h *HookBase) OnPublish(cl *Client, pk packets.Packet) (packets.Packet, err
// OnPublished is called when a client has published a message to subscribers.
func (h *HookBase) OnPublished(cl *Client, pk packets.Packet) {}

// OnSendToSubscriber is called before the message is sent to the subscriber.
func (h *HookBase) OnSendToSubscriber(cl *Client, pk packets.Packet) bool {
return true
}

// OnPublishDropped is called when a message to a client is dropped instead of being delivered.
func (h *HookBase) OnPublishDropped(cl *Client, pk packets.Packet) {}

Expand Down
25 changes: 25 additions & 0 deletions hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (h *modifiedHookBase) OnPublish(cl *Client, pk packets.Packet) (packets.Pac
return pk, nil
}

func (h *modifiedHookBase) OnSendToSubscriber(cl *Client, pk packets.Packet) bool {
return false
}

func (h *modifiedHookBase) OnPacketRead(cl *Client, pk packets.Packet) (packets.Packet, error) {
if h.fail {
if h.err != nil {
Expand Down Expand Up @@ -244,6 +248,7 @@ func TestHooksNonReturns(t *testing.T) {
h.OnSubscribed(cl, packets.Packet{}, []byte{1})
h.OnUnsubscribed(cl, packets.Packet{})
h.OnPublished(cl, packets.Packet{})
h.OnSendToSubscriber(cl, packets.Packet{})
h.OnPublishDropped(cl, packets.Packet{})
h.OnRetainMessage(cl, packets.Packet{}, 0)
h.OnRetainPublished(cl, packets.Packet{})
Expand Down Expand Up @@ -358,6 +363,21 @@ func TestHooksOnPublish(t *testing.T) {
require.Equal(t, uint16(10), pk.PacketID)
}

func TestHooksOnSendToSubscriber(t *testing.T) {
h := new(Hooks)
h.Log = logger

ok := h.OnSendToSubscriber(new(Client), packets.Packet{PacketID: 10})
require.True(t, ok)

hook := new(modifiedHookBase)
err := h.Add(hook, nil)
require.NoError(t, err)

ok = h.OnSendToSubscriber(new(Client), packets.Packet{PacketID: 10})
require.False(t, ok)
}

func TestHooksOnPacketRead(t *testing.T) {
h := new(Hooks)
h.Log = logger
Expand Down Expand Up @@ -610,6 +630,11 @@ func TestHookBaseOnPublish(t *testing.T) {
require.Equal(t, uint16(10), pk.PacketID)
}

func TestHookBaseOnSendToSubscriber(t *testing.T) {
h := new(HookBase)
ok := h.OnSendToSubscriber(new(Client), packets.Packet{PacketID: 10})
require.True(t, ok)
}
func TestHookBaseOnPacketRead(t *testing.T) {
h := new(HookBase)
pk, err := h.OnPacketRead(new(Client), packets.Packet{PacketID: 10})
Expand Down
6 changes: 6 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,12 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet
if !s.hooks.OnACLCheck(cl, pk.TopicName, false) {
return out, packets.ErrNotAuthorized
}

if !s.hooks.OnSendToSubscriber(cl, pk) {
// write to the log?
return out, nil
}

if !sub.FwdRetainedFlag && ((cl.Properties.ProtocolVersion == 5 && !sub.RetainAsPublished) || cl.Properties.ProtocolVersion < 5) { // ![MQTT-3.3.1-13] [v3 MQTT-3.3.1-9]
out.FixedHeader.Retain = false // [MQTT-3.3.1-12]
}
Expand Down