diff --git a/mqtt/client.lua b/mqtt/client.lua index 3ff03b3..c52908d 100644 --- a/mqtt/client.lua +++ b/mqtt/client.lua @@ -207,7 +207,7 @@ function client_mt:__init(args) -- state self.first_connect = true -- contains true to perform one network connection attempt after client creation - self.send_time = 0 -- time of the last network send from client side + self._last_in_time = 0 -- time of the last network received from client side -- packet creation/parse functions according version if not a.version then @@ -566,8 +566,11 @@ function client_mt:disconnect(rc, properties, user_properties) assert(properties == nil or type(properties) == "table", "expecting properties to be a table") assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table") + self.is_disconnecting = true + -- check connection is alive if not self.connection then + self.is_disconnecting = false return false, "network connection is not opened" end @@ -585,11 +588,13 @@ function client_mt:disconnect(rc, properties, user_properties) err = "failed to send DISCONNECT: "..err self:handle("error", err, self) self:close_connection("error") + self.is_disconnecting = false return false, err end -- now close connection self:close_connection("connection closed by client") + self.is_disconnecting = false return true end @@ -650,7 +655,7 @@ function client_mt:close_connection(reason) -- check connection is still closed (self.connection may be re-created in "close" handler) if not self.connection then -- remove from ioloop - if self.ioloop and not args.reconnect then + if self.ioloop and (self.is_disconnecting or not args.reconnect) then self.ioloop:remove(self) end end @@ -780,6 +785,7 @@ function client_mt:send_connect() -- reset last packet id self._last_packet_id = nil + self._last_in_time = os_time() return true end @@ -906,8 +912,16 @@ function client_mt:_ioloop_iteration() if ok then -- send PINGREQ if keep_alive interval is reached - if os_time() - self.send_time >= args.keep_alive then - self:send_pingreq() + if os_time() - self._last_in_time >= args.keep_alive then + if not self._ping_t then + self._ping_t = os_time() + self._last_in_time = os_time() + ok, err = self:send_pingreq() + else + err = "client has exceeded timeout, disconnecting." + self:handle("error", err, self) + self:close_connection("error") + end end end @@ -917,7 +931,7 @@ function client_mt:_ioloop_iteration() if self.first_connect then self.first_connect = false self:start_connecting() - elseif args.reconnect then + elseif args.reconnect and not self.is_disconnecting then if args.reconnect == true then self:start_connecting() else @@ -962,6 +976,9 @@ function client_mt:_io_iteration(recv) -- check for communication error if packet == false then + if self.is_disconnecting then + return true + end if err == "closed" then self:close_connection("connection closed by broker") return false, err @@ -1004,8 +1021,7 @@ function client_mt:_io_iteration(recv) -- handle packet according its type local ptype = packet.type if ptype == packet_type.PINGRESP then -- luacheck: ignore - -- PINGREQ answer, nothing to do - -- TODO: break the connectin in absence of this packet in some timeout + self._ping_t = nil -- mark ping transfer flag to nil elseif ptype == packet_type.SUBACK then self:handle("subscribe", packet, self) elseif ptype == packet_type.UNSUBACK then @@ -1156,7 +1172,6 @@ function client_mt:_send_packet(packet) return false, "connector.send failed: "..err end end - self.send_time = os_time() return true end @@ -1171,6 +1186,7 @@ function client_mt:_receive_packet() if not packet then return false, err end + self._last_in_time = os_time() return packet end