File tree Expand file tree Collapse file tree 3 files changed +22
-2
lines changed Expand file tree Collapse file tree 3 files changed +22
-2
lines changed Original file line number Diff line number Diff line change
1
+ github : [kelindar]
Original file line number Diff line number Diff line change @@ -137,9 +137,9 @@ func (c *Client) OnError(handler ErrorHandler) {
137
137
138
138
// onMessage occurs when MQTT client receives a message
139
139
func (c * Client ) onMessage (_ mqtt.Client , m mqtt.Message ) {
140
- if c . message != nil && ! strings .HasPrefix (m .Topic (), "emitter/" ) {
140
+ if ! strings .HasPrefix (m .Topic (), "emitter/" ) {
141
141
handlers := c .handlers .Lookup (m .Topic ())
142
- if len (handlers ) == 0 { // Invoke the default message handler
142
+ if len (handlers ) == 0 && c . message != nil { // Invoke the default message handler
143
143
c .message (c , m )
144
144
}
145
145
Original file line number Diff line number Diff line change @@ -149,3 +149,22 @@ func WithAtMostOnce() Option {
149
149
func WithAtLeastOnce () Option {
150
150
return withQos1
151
151
}
152
+
153
+ func getUTCTimestamp (input time.Time ) int64 {
154
+ t := input
155
+ if zone , _ := t .Zone (); zone != "UTC" {
156
+ loc , _ := time .LoadLocation ("UTC" )
157
+ t = t .In (loc )
158
+ }
159
+ return t .Unix ()
160
+ }
161
+
162
+ // WithFrom request messages from a point in time.
163
+ func WithFrom (from time.Time ) Option {
164
+ return option ("from=" + strconv .FormatInt (getUTCTimestamp (from ), 10 ))
165
+ }
166
+
167
+ // WithUntil request messages until a point in time.
168
+ func WithUntil (until time.Time ) Option {
169
+ return option ("until=" + strconv .FormatInt (getUTCTimestamp (until ), 10 ))
170
+ }
You can’t perform that action at this time.
0 commit comments