From 945b16025a9498f99bc7fb5d32262faec5766b38 Mon Sep 17 00:00:00 2001 From: Dirk Chang Date: Wed, 13 Jan 2021 17:34:37 +0800 Subject: [PATCH 1/2] disconnect from server when exceeded timeout --- mqtt/client.lua | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/mqtt/client.lua b/mqtt/client.lua index d3ccf4e..bd013ee 100644 --- a/mqtt/client.lua +++ b/mqtt/client.lua @@ -218,7 +218,7 @@ function client_mt:__init(args) -- state self.first_connect = true -- contains true to perform one network connection attemt after client creation - self.send_time = 0 -- time of the last network send from client side + self._last_in_time = 0 -- time of the last network received from client side -- packet creation/parse functions according version if not a.version then @@ -791,6 +791,7 @@ function client_mt:send_connect() -- reset last packet id self._last_packet_id = nil + self._last_in_time = os_time() return true end @@ -917,8 +918,16 @@ function client_mt:_ioloop_iteration() if ok then -- send PINGREQ if keep_alive interval is reached - if os_time() - self.send_time >= args.keep_alive then - self:send_pingreq() + if os_time() - self._last_in_time >= args.keep_alive then + if not self._ping_t then + self._ping_t = os_time() + self._last_in_time = os_time() + ok, err = self:send_pingreq() + else + err = "client has exceeded timeout, disconnecting." + self:handle("error", err, self) + self:close_connection("error") + end end end @@ -1015,8 +1024,7 @@ function client_mt:_io_iteration(recv) -- handle packet according its type local ptype = packet.type if ptype == packet_type.PINGRESP then -- luacheck: ignore - -- PINGREQ answer, nothing to do - -- TODO: break the connectin in absence of this packet in some timeout + self._ping_t = nil -- mark ping transfer flag to nil elseif ptype == packet_type.SUBACK then self:handle("subscribe", packet, self) elseif ptype == packet_type.UNSUBACK then @@ -1167,7 +1175,6 @@ function client_mt:_send_packet(packet) return false, "connector.send failed: "..err end end - self.send_time = os_time() return true end @@ -1182,6 +1189,7 @@ function client_mt:_receive_packet() if not packet then return false, err end + self._last_in_time = os_time() return packet end From e65699924c6100fad62d97aa864a83d064df4421 Mon Sep 17 00:00:00 2001 From: Dirk Chang Date: Wed, 13 Jan 2021 17:42:40 +0800 Subject: [PATCH 2/2] remove client form ioloop and do not start reconect proc when disconnect is called --- mqtt/client.lua | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/mqtt/client.lua b/mqtt/client.lua index bd013ee..3313a7c 100644 --- a/mqtt/client.lua +++ b/mqtt/client.lua @@ -577,8 +577,11 @@ function client_mt:disconnect(rc, properties, user_properties) assert(properties == nil or type(properties) == "table", "expecting properties to be a table") assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table") + self.is_disconnecting = true + -- check connection is alive if not self.connection then + self.is_disconnecting = false return false, "network connection is not opened" end @@ -596,11 +599,13 @@ function client_mt:disconnect(rc, properties, user_properties) err = "failed to send DISCONNECT: "..err self:handle("error", err, self) self:close_connection("error") + self.is_disconnecting = false return false, err end -- now close connection self:close_connection("connection closed by client") + self.is_disconnecting = false return true end @@ -661,7 +666,7 @@ function client_mt:close_connection(reason) -- check connection is still closed (self.connection may be re-created in "close" handler) if not self.connection then -- remove from ioloop - if self.ioloop and not args.reconnect then + if self.ioloop and (self.is_disconnecting or not args.reconnect) then self.ioloop:remove(self) end end @@ -937,7 +942,7 @@ function client_mt:_ioloop_iteration() if self.first_connect then self.first_connect = false self:start_connecting() - elseif args.reconnect then + elseif args.reconnect and not self.is_disconnecting then if args.reconnect == true then self:start_connecting() else @@ -982,6 +987,9 @@ function client_mt:_io_iteration(recv) -- check for communication error if packet == false then + if self.is_disconnecting then + return true + end if err == "closed" then self:close_connection("connection closed by broker") return false, err