From fcee79aae906e9ba77826d435b6c99ce0c9d6d3f Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Tue, 9 Aug 2022 04:08:56 +0000 Subject: [PATCH 01/31] Added TCP socket --- rosbridge_server/CMakeLists.txt | 12 +- .../launch/rosbridge_tcp_launch.py | 69 ++++ .../launch/rosbridge_tcp_launch.xml | 42 +++ rosbridge_server/scripts/rosbridge_tcp | 300 ++++++++++++++++++ .../src/rosbridge_server/__init__.py | 1 + .../src/rosbridge_server/tcp_handler.py | 138 ++++++++ 6 files changed, 559 insertions(+), 3 deletions(-) create mode 100644 rosbridge_server/launch/rosbridge_tcp_launch.py create mode 100644 rosbridge_server/launch/rosbridge_tcp_launch.xml create mode 100755 rosbridge_server/scripts/rosbridge_tcp create mode 100644 rosbridge_server/src/rosbridge_server/tcp_handler.py diff --git a/rosbridge_server/CMakeLists.txt b/rosbridge_server/CMakeLists.txt index 74194436f..69e89d83e 100644 --- a/rosbridge_server/CMakeLists.txt +++ b/rosbridge_server/CMakeLists.txt @@ -13,12 +13,18 @@ ament_package() install(PROGRAMS scripts/rosbridge_websocket.py scripts/rosbridge_websocket + scripts/rosbridge_tcp DESTINATION lib/${PROJECT_NAME} ) -install(FILES - launch/rosbridge_websocket_launch.xml - DESTINATION share/${PROJECT_NAME}/launch +# install(FILES +# launch/rosbridge_websocket_launch.xml +# launch/rosbridge_tcp_launch.xml +# launch/rosbridge_tcp_launch.py +# DESTINATION share/${PROJECT_NAME}/launch +# ) +install(DIRECTORY launch + DESTINATION share/${PROJECT_NAME} ) if(BUILD_TESTING) diff --git a/rosbridge_server/launch/rosbridge_tcp_launch.py b/rosbridge_server/launch/rosbridge_tcp_launch.py new file mode 100644 index 000000000..c540a7b55 --- /dev/null +++ b/rosbridge_server/launch/rosbridge_tcp_launch.py @@ -0,0 +1,69 @@ +import launch +from launch.substitutions import EnvironmentVariable +from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription +from launch.substitutions import PathJoinSubstitution, TextSubstitution, LaunchConfiguration +import os +import launch_ros.actions +import pathlib + +def generate_launch_description(): + launch_description = [] + + launch_description.append(DeclareLaunchArgument('port', default_value='9090')) + launch_description.append(DeclareLaunchArgument('host', default_value='127.0.0.1')) + + launch_description.append(DeclareLaunchArgument('incoming_buffer', default_value='65536')) + launch_description.append(DeclareLaunchArgument('socket_timeout', default_value='10')) + launch_description.append(DeclareLaunchArgument('retry_startup_delay', default_value='5')) + + launch_description.append(DeclareLaunchArgument('fragment_timeout', default_value='600')) + launch_description.append(DeclareLaunchArgument('delay_between_messages', default_value='0')) + launch_description.append(DeclareLaunchArgument('max_message_size', default_value='None')) + launch_description.append(DeclareLaunchArgument('unregister_timeout', default_value='10')) + + launch_description.append(DeclareLaunchArgument('topics_glob', default_value='')) + launch_description.append(DeclareLaunchArgument('services_glob', default_value='')) + launch_description.append(DeclareLaunchArgument('params_glob', default_value='')) + launch_description.append(DeclareLaunchArgument('bson_only_mode', default_value='True')) + + launch_description.append( + launch_ros.actions.Node( + package='rosbridge_server', + node_executable='rosbridge_tcp', + node_name='rosbridge_tcp', + output='screen', + parameters=[ + { + "port": LaunchConfiguration('port'), + "host": LaunchConfiguration('host'), + "incoming_buffer": LaunchConfiguration('incoming_buffer'), + "socket_timeout": LaunchConfiguration('socket_timeout'), + "retry_startup_delay": LaunchConfiguration('retry_startup_delay'), + "fragment_timeout": LaunchConfiguration('fragment_timeout'), + "delay_between_messages": LaunchConfiguration('delay_between_messages'), + "max_message_size": LaunchConfiguration('max_message_size'), + "unregister_timeout": LaunchConfiguration('unregister_timeout'), + "topics_glob": LaunchConfiguration('topics_glob'), + "services_glob": LaunchConfiguration('services_glob'), + "params_glob": LaunchConfiguration('params_glob'), + "bson_only_mode": LaunchConfiguration('bson_only_mode'), + } + ], + ) + ) + launch_description.append( + launch_ros.actions.Node( + package='rosapi', + node_executable='rosapi_node', + node_name='rosapi', + output='screen', + parameters=[ + { + "topics_glob": LaunchConfiguration('topics_glob'), + "services_glob": LaunchConfiguration('services_glob'), + "params_glob": LaunchConfiguration('params_glob'), + } + ], + ) + ) + return launch.LaunchDescription(launch_description) \ No newline at end of file diff --git a/rosbridge_server/launch/rosbridge_tcp_launch.xml b/rosbridge_server/launch/rosbridge_tcp_launch.xml new file mode 100644 index 000000000..5a74a885c --- /dev/null +++ b/rosbridge_server/launch/rosbridge_tcp_launch.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp new file mode 100755 index 000000000..7a43f7dc6 --- /dev/null +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -0,0 +1,300 @@ +#!/usr/bin/env python + +# from rospy import init_node, get_param, loginfo, on_shutdown, Publisher +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, QoSDurabilityPolicy +from rosbridge_server import RosbridgeTcpSocket + +from rosbridge_library.capabilities.advertise import Advertise +from rosbridge_library.capabilities.publish import Publish +from rosbridge_library.capabilities.subscribe import Subscribe +from rosbridge_library.capabilities.advertise_service import AdvertiseService +from rosbridge_library.capabilities.unadvertise_service import UnadvertiseService +from rosbridge_library.capabilities.call_service import CallService + +from functools import partial +from signal import signal, SIGINT, SIG_DFL +from std_msgs.msg import Int32 + +try: + import SocketServer +except ImportError: + import socketserver as SocketServer + +import sys +import time +import traceback +import threading +#TODO: take care of socket timeouts and make sure to close sockets after killing program to release network ports + +#TODO: add new parameters to websocket version! those of rosbridge_tcp.py might not be needed, but the others should work well when adding them to .._websocket.py + +# update reference https://docs.ros.org/en/foxy/Contributing/Migration-Guide-Python.html +# update to a class and could be compile to a excutable file +def shutdown_hook(server): + server.shutdown() + +class RosbridgeTcpsocketNode(Node): + def __init__(self): + super().__init__('rosbridge_tcp') + + """ + Parameter handling: + - try to get parameter from parameter server (..define those via launch-file) + - overwrite value if given as commandline-parameter + + BEGIN... + """ + + #TODO: ensure types get cast correctly after getting from parameter server + #TODO: check if ROS parameter server uses None string for 'None-value' or Null or something else, then change code accordingly + + # update parameters from parameter server or use default value ( second parameter of get_param ) + # port = get_param('~port', 9090) + port = self.declare_parameter('port', 9090).value + self._port = port + # host = get_param('~host', '') + host = self.declare_parameter('host', '127.0.0.1').value + self._host = host + + RosbridgeTcpSocket.ros_node = self + # incoming_buffer = get_param('~incoming_buffer', RosbridgeTcpSocket.incoming_buffer) + incoming_buffer = self.declare_parameter('incoming_buffer', RosbridgeTcpSocket.incoming_buffer).value + # socket_timeout = get_param('~socket_timeout', RosbridgeTcpSocket.socket_timeout) + socket_timeout = self.declare_parameter('socket_timeout', RosbridgeTcpSocket.socket_timeout).value + # retry_startup_delay = get_param('~retry_startup_delay', 5.0) # seconds + retry_startup_delay = self.declare_parameter('retry_startup_delay', 5.0).value # seconds + # fragment_timeout = get_param('~fragment_timeout', RosbridgeTcpSocket.fragment_timeout) + fragment_timeout = self.declare_parameter('fragment_timeout', RosbridgeTcpSocket.fragment_timeout).value + # delay_between_messages = get_param('~delay_between_messages', RosbridgeTcpSocket.delay_between_messages) + delay_between_messages = self.declare_parameter('delay_between_messages', RosbridgeTcpSocket.delay_between_messages).value + self._delay_between_messages = delay_between_messages + # max_message_size = get_param('~max_message_size', RosbridgeTcpSocket.max_message_size) + max_message_size = self.declare_parameter('max_message_size', RosbridgeTcpSocket.max_message_size).value + # unregister_timeout = get_param('~unregister_timeout', RosbridgeTcpSocket.unregister_timeout) + unregister_timeout = self.declare_parameter('unregister_timeout', RosbridgeTcpSocket.unregister_timeout).value + # bson_only_mode = get_param('~bson_only_mode', False) + bson_only_mode = self.declare_parameter('bson_only_mode', RosbridgeTcpSocket.bson_only_mode).value + + if max_message_size == "None": + max_message_size = None + + # Get the glob strings and parse them as arrays. + RosbridgeTcpSocket.topics_glob = [ + element.strip().strip("'") + # for element in get_param('~topics_glob', '')[1:-1].split(',') + for element in self.declare_parameter('topics_glob', '').value[1:-1].split(',') + if len(element.strip().strip("'")) > 0] + RosbridgeTcpSocket.services_glob = [ + element.strip().strip("'") + # for element in get_param('~services_glob', '')[1:-1].split(',') + for element in self.declare_parameter('services_glob', '').value[1:-1].split(',') + if len(element.strip().strip("'")) > 0] + RosbridgeTcpSocket.params_glob = [ + element.strip().strip("'") + # for element in get_param('~params_glob', '')[1:-1].split(',') + for element in self.declare_parameter('params_glob', '').value[1:-1].split(',') + if len(element.strip().strip("'")) > 0] + + # Publisher for number of connected clients + # RosbridgeTcpSocket.client_count_pub = Publisher('client_count', Int32, queue_size=10, latch=True) + # Publisher for number of connected clients + # QoS profile with transient local durability (latched topic in ROS 1). + client_count_qos_profile = QoSProfile( + depth=10, + durability=QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL + ) + RosbridgeTcpSocket.client_count_pub = self.create_publisher(Int32, 'client_count', qos_profile=client_count_qos_profile) + # RosbridgeTcpSocket.client_count_pub.publish(0) + RosbridgeTcpSocket.client_count_pub.publish(Int32(data=0)) + + # update parameters if provided via commandline + # .. could implemented 'better' (value/type checking, etc.. ) + if "--port" in sys.argv: + idx = sys.argv.index("--port") + 1 + if idx < len(sys.argv): + port = int(sys.argv[idx]) + else: + print("--port argument provided without a value.") + sys.exit(-1) + + if "--host" in sys.argv: + idx = sys.argv.index("--host") + 1 + if idx < len(sys.argv): + host = str(sys.argv[idx]) + else: + print("--host argument provided without a value.") + sys.exit(-1) + + if "--incoming_buffer" in sys.argv: + idx = sys.argv.index("--incoming_buffer") + 1 + if idx < len(sys.argv): + incoming_buffer = int(sys.argv[idx]) + else: + print("--incoming_buffer argument provided without a value.") + sys.exit(-1) + + if "--socket_timeout" in sys.argv: + idx = sys.argv.index("--socket_timeout") + 1 + if idx < len(sys.argv): + socket_timeout = int(sys.argv[idx]) + else: + print("--socket_timeout argument provided without a value.") + sys.exit(-1) + + if "--retry_startup_delay" in sys.argv: + idx = sys.argv.index("--retry_startup_delay") + 1 + if idx < len(sys.argv): + retry_startup_delay = int(sys.argv[idx]) + else: + print("--retry_startup_delay argument provided without a value.") + sys.exit(-1) + + if "--fragment_timeout" in sys.argv: + idx = sys.argv.index("--fragment_timeout") + 1 + if idx < len(sys.argv): + fragment_timeout = int(sys.argv[idx]) + else: + print("--fragment_timeout argument provided without a value.") + sys.exit(-1) + + if "--delay_between_messages" in sys.argv: + idx = sys.argv.index("--delay_between_messages") + 1 + if idx < len(sys.argv): + delay_between_messages = float(sys.argv[idx]) + else: + print("--delay_between_messages argument provided without a value.") + sys.exit(-1) + + if "--max_message_size" in sys.argv: + idx = sys.argv.index("--max_message_size") + 1 + if idx < len(sys.argv): + value = sys.argv[idx] + if value == "None": + max_message_size = None + else: + max_message_size = int(value) + else: + print("--max_message_size argument provided without a value. (can be None or )") + sys.exit(-1) + + if "--unregister_timeout" in sys.argv: + idx = sys.argv.index("--unregister_timeout") + 1 + if idx < len(sys.argv): + unregister_timeout = float(sys.argv[idx]) + else: + print("--unregister_timeout argument provided without a value.") + sys.exit(-1) + + # export parameters to handler class + RosbridgeTcpSocket.incoming_buffer = incoming_buffer + RosbridgeTcpSocket.socket_timeout = socket_timeout + RosbridgeTcpSocket.fragment_timeout = fragment_timeout + RosbridgeTcpSocket.delay_between_messages = delay_between_messages + RosbridgeTcpSocket.max_message_size = max_message_size + RosbridgeTcpSocket.unregister_timeout = unregister_timeout + RosbridgeTcpSocket.bson_only_mode = bson_only_mode + + if "--topics_glob" in sys.argv: + idx = sys.argv.index("--topics_glob") + 1 + if idx < len(sys.argv): + value = sys.argv[idx] + if value == "None": + RosbridgeTcpSocket.topics_glob = [] + else: + RosbridgeTcpSocket.topics_glob = [element.strip().strip("'") for element in value[1:-1].split(',')] + else: + print("--topics_glob argument provided without a value. (can be None or a list)") + sys.exit(-1) + + if "--services_glob" in sys.argv: + idx = sys.argv.index("--services_glob") + 1 + if idx < len(sys.argv): + value = sys.argv[idx] + if value == "None": + RosbridgeTcpSocket.services_glob = [] + else: + RosbridgeTcpSocket.services_glob = [element.strip().strip("'") for element in value[1:-1].split(',')] + else: + print("--services_glob argument provided without a value. (can be None or a list)") + sys.exit(-1) + + if "--params_glob" in sys.argv: + idx = sys.argv.index("--params_glob") + 1 + if idx < len(sys.argv): + value = sys.argv[idx] + if value == "None": + RosbridgeTcpSocket.params_glob = [] + else: + RosbridgeTcpSocket.params_glob = [element.strip().strip("'") for element in value[1:-1].split(',')] + else: + print("--params_glob argument provided without a value. (can be None or a list)") + sys.exit(-1) + + if "--bson_only_mode" in sys.argv: + bson_only_mode = True + + # To be able to access the list of topics and services, you must be able to access the rosapi services. + if RosbridgeTcpSocket.services_glob: + RosbridgeTcpSocket.services_glob.append("/rosapi/*") + + Subscribe.topics_glob = RosbridgeTcpSocket.topics_glob + Advertise.topics_glob = RosbridgeTcpSocket.topics_glob + Publish.topics_glob = RosbridgeTcpSocket.topics_glob + AdvertiseService.services_glob = RosbridgeTcpSocket.services_glob + UnadvertiseService.services_glob = RosbridgeTcpSocket.services_glob + CallService.services_glob = RosbridgeTcpSocket.services_glob + + """ + ...END (parameter handling) + """ + +# if __name__ == "__main__": +def main(args=None): + if args is None: + args = sys.argv + + loaded = False + retry_count = 0 + rclpy.init(args=args) + rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() + # while not loaded: + # retry_count += 1 + # print("trying to start rosbridge TCP server..") + try: + print("") + # update to ros2 + # rclpy.init(args=sys.argv) + # node = rclpy.create_node('rosbridge_tcp') + + # init_node("rosbridge_tcp") + signal(SIGINT, SIG_DFL) + + # Server host is a tuple ('host', port) + # empty string for host makes server listen on all available interfaces + SocketServer.ThreadingTCPServer.allow_reuse_address = True + host = rosbridge_tcpsocket_node._host + port = rosbridge_tcpsocket_node._port + with SocketServer.TCPServer((host, port), RosbridgeTcpSocket) as server: + # server = SocketServer.ThreadingTCPServer((host, port), RosbridgeTcpSocket) + # server_thread = threading.Thread(target=server.serve_forever, daemon=True) + # server_thread.start() + # on_shutdown(partial(shutdown_hook, server)) + # 也许再也不需要使用hook_shutdown 参见:https://github.com/ros2/rclpy/issues/244 + # rclpy.context.Context.on_shutdown(partial(shutdown_hook, server)) + + # loginfo("Rosbridge TCP server started on port %d", port) + sys.stdout.flush() + rosbridge_tcpsocket_node.get_logger().info("Rosbridge TCP server started on port " + str(port)) + + loaded = True + server.serve_forever() + except Exception as e: + time.sleep(rosbridge_tcpsocket_node._delay_between_messages) + print("server not loaded"+e.args) + print(traceback.format_exc()) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/rosbridge_server/src/rosbridge_server/__init__.py b/rosbridge_server/src/rosbridge_server/__init__.py index fe3ed3131..e91910f92 100644 --- a/rosbridge_server/src/rosbridge_server/__init__.py +++ b/rosbridge_server/src/rosbridge_server/__init__.py @@ -1,2 +1,3 @@ from .client_mananger import ClientManager # noqa: F401 from .websocket_handler import RosbridgeWebSocket # noqa: F401 +from .tcp_handler import RosbridgeTcpSocket \ No newline at end of file diff --git a/rosbridge_server/src/rosbridge_server/tcp_handler.py b/rosbridge_server/src/rosbridge_server/tcp_handler.py new file mode 100644 index 000000000..70db250ee --- /dev/null +++ b/rosbridge_server/src/rosbridge_server/tcp_handler.py @@ -0,0 +1,138 @@ +# import rospy +import rclpy +import struct +from rosbridge_library.rosbridge_protocol import RosbridgeProtocol +####### +import functools +print = functools.partial(print, flush=True) +####### +try: + import SocketServer +except ImportError: + import socketserver as SocketServer + +class RosbridgeTcpSocket(SocketServer.BaseRequestHandler): + """ + TCP Socket server for rosbridge + """ + + busy = False + queue = [] + client_id_seed = 0 + clients_connected = 0 + client_count_pub = None + + # list of parameters + incoming_buffer = 65536 # bytes + socket_timeout = 10 # seconds + # The following are passed on to RosbridgeProtocol + # defragmentation.py: + fragment_timeout = 600 # seconds + # protocol.py: + delay_between_messages = 0 # seconds + max_message_size = None # bytes + unregister_timeout = 10.0 # seconds + bson_only_mode = False + ros_node = None + + def setup(self): + cls = self.__class__ + parameters = { + "fragment_timeout": cls.fragment_timeout, + "delay_between_messages": cls.delay_between_messages, + "max_message_size": cls.max_message_size, + "unregister_timeout": cls.unregister_timeout, + "bson_only_mode": cls.bson_only_mode + } + try: + self.protocol = RosbridgeProtocol(cls.client_id_seed, cls.ros_node, parameters=parameters) + print("send function:", cls.send_message) + self.protocol.outgoing = self.send_message + self.protocol.parameters = self.parameters + cls.client_id_seed += 1 + cls.clients_connected += 1 + if cls.client_count_pub: + cls.client_count_pub.publish(cls.clients_connected) + self.protocol.log("info", "connected. " + str(cls.clients_connected) + " client total.") + except Exception as exc: + # rospy.logerr("Unable to accept incoming connection. Reason: %s", str(exc)) + cls.ros_node.get_logger().info("Unable to accept incoming connection. Reason: " + str(exc)) + + + def recvall(self,n): + # http://stackoverflow.com/questions/17667903/python-socket-receive-large-amount-of-data + # Helper function to recv n bytes or return None if EOF is hit + data = '' + while len(data) < n: + packet = self.request.recv(n - len(data)) + if not packet: + return None + data += str(packet) + return eval(bytes(data,'utf-8')) + + def recv_bson(self): + # Read 4 bytes to get the length of the BSON packet + BSON_LENGTH_IN_BYTES = 4 + raw_msglen = self.recvall(BSON_LENGTH_IN_BYTES) + if not raw_msglen: + return None + msglen = struct.unpack('i', raw_msglen)[0] + # print("msglen:",msglen) + # Retrieve the rest of the message + data = self.recvall(msglen - BSON_LENGTH_IN_BYTES) + if data is None: + return None + data = raw_msglen + data # Prefix the data with the message length that has already been received. + # The message length is part of BSONs message format + + # Exit on empty message + if len(data) == 0: + return None + self.protocol.incoming(message=data) + return True + + def handle(self): + print("listening for test, Start TCP-handle") + """ + Listen for TCP messages + """ + cls = self.__class__ + self.request.settimeout(cls.socket_timeout) + while True: + try: + if self.bson_only_mode: + if self.recv_bson() is None: + break + continue + + # non-BSON handling + data = self.request.recv(cls.incoming_buffer) + # Exit on empty string + # add spin to disposal of callback + rclpy.spin_once(self.ros_node, timeout_sec=0.) + if data.strip() == '': + break + elif len(data.strip()) > 0: + self.protocol.incoming(data.strip('')) + else: + pass + except Exception: + self.protocol.log("debug", "socket connection timed out! (ignore warning if client is only listening..)") + + def finish(self): + """ + Called when TCP connection finishes + """ + cls = self.__class__ + cls.clients_connected -= 1 + self.protocol.finish() + if cls.client_count_pub: + cls.client_count_pub.publish(cls.clients_connected) + self.protocol.log("info", "disconnected. " + str(cls.clients_connected) + " client total." ) + + def send_message(self, message=None): + """ + Callback from rosbridge + """ + print("send msg:", message) + self.request.sendall(message) \ No newline at end of file From c8710fe38ef4227c6abc52fd484be78dc5c1c086 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Tue, 9 Aug 2022 04:18:01 +0000 Subject: [PATCH 02/31] Updates to launch file and logger --- rosbridge_server/launch/rosbridge_tcp_launch.py | 2 ++ rosbridge_server/scripts/rosbridge_tcp | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/rosbridge_server/launch/rosbridge_tcp_launch.py b/rosbridge_server/launch/rosbridge_tcp_launch.py index c540a7b55..b253fe4f9 100644 --- a/rosbridge_server/launch/rosbridge_tcp_launch.py +++ b/rosbridge_server/launch/rosbridge_tcp_launch.py @@ -28,6 +28,7 @@ def generate_launch_description(): launch_description.append( launch_ros.actions.Node( + prefix=['stdbuf -o L'], # Need this with distro Dashing/Eloquent so that get_logger().info appears on the screen package='rosbridge_server', node_executable='rosbridge_tcp', node_name='rosbridge_tcp', @@ -53,6 +54,7 @@ def generate_launch_description(): ) launch_description.append( launch_ros.actions.Node( + prefix=['stdbuf -o L'], # Need this with distro Dashing/Eloquent so that get_logger().info appears on the screen package='rosapi', node_executable='rosapi_node', node_name='rosapi', diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index 7a43f7dc6..8129f18d1 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -286,7 +286,7 @@ def main(args=None): # loginfo("Rosbridge TCP server started on port %d", port) sys.stdout.flush() - rosbridge_tcpsocket_node.get_logger().info("Rosbridge TCP server started on port " + str(port)) + rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") loaded = True server.serve_forever() From 8c4074acfe6b54271df30b98f611448a297cc60d Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Tue, 18 Oct 2022 15:43:54 +0000 Subject: [PATCH 03/31] Added TCP, fixed bugs, optimized large array processing --- .../capabilities/unadvertise_service.py | 6 +- .../internal/message_conversion.py | 4 ++ .../rosbridge_library/internal/publishers.py | 1 + .../rosbridge_library/internal/subscribers.py | 11 +-- .../src/rosbridge_library/protocol.py | 10 ++- rosbridge_server/CMakeLists.txt | 6 -- .../launch/rosbridge_tcp_launch.py | 71 ------------------- .../launch/rosbridge_tcp_launch.xml | 2 +- rosbridge_server/scripts/rosbridge_tcp | 4 +- .../src/rosbridge_server/tcp_handler.py | 24 +++---- .../src/rosbridge_server/websocket_handler.py | 2 +- 11 files changed, 37 insertions(+), 104 deletions(-) delete mode 100644 rosbridge_server/launch/rosbridge_tcp_launch.py diff --git a/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_service.py b/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_service.py index ba3d738e1..9337510fa 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_service.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_service.py @@ -50,9 +50,9 @@ def unadvertise_service(self, message): # unregister service in ROS if service_name in self.protocol.external_service_list.keys(): self.protocol.external_service_list[service_name].graceful_shutdown() - self.protocol.external_service_list[service_name].service_handle.shutdown( - "Unadvertise request." - ) + # self.protocol.external_service_list[service_name].service_handle.shutdown( + # "Unadvertise request." + # ) del self.protocol.external_service_list[service_name] self.protocol.log("info", "Unadvertised service %s." % service_name) else: diff --git a/rosbridge_library/src/rosbridge_library/internal/message_conversion.py b/rosbridge_library/src/rosbridge_library/internal/message_conversion.py index 72ce62299..7ca22ec6d 100644 --- a/rosbridge_library/src/rosbridge_library/internal/message_conversion.py +++ b/rosbridge_library/src/rosbridge_library/internal/message_conversion.py @@ -299,6 +299,10 @@ def _to_binary_inst(msg): return list(standard_b64decode(msg)) if isinstance(msg, list): return msg + if isinstance(msg, bytes): + data = array.array('B') + data.frombytes(memoryview(msg)) # Using the frombytes() method with a memoryview of the data allows for zero copying of data (HUGE time-saver for large arrays) + return data return bytes(bytearray(msg)) diff --git a/rosbridge_library/src/rosbridge_library/internal/publishers.py b/rosbridge_library/src/rosbridge_library/internal/publishers.py index e4f674b70..31de7dc1a 100644 --- a/rosbridge_library/src/rosbridge_library/internal/publishers.py +++ b/rosbridge_library/src/rosbridge_library/internal/publishers.py @@ -31,6 +31,7 @@ # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +from multiprocessing.spawn import is_forking from threading import Timer from rclpy.duration import Duration diff --git a/rosbridge_library/src/rosbridge_library/internal/subscribers.py b/rosbridge_library/src/rosbridge_library/internal/subscribers.py index f2187bada..02f5f1f83 100644 --- a/rosbridge_library/src/rosbridge_library/internal/subscribers.py +++ b/rosbridge_library/src/rosbridge_library/internal/subscribers.py @@ -114,11 +114,12 @@ def __init__(self, topic, client_id, callback, node_handle, msg_type=None, raw=F reliability=ReliabilityPolicy.RELIABLE, ) - infos = node_handle.get_publishers_info_by_topic(topic) - if any(pub.qos_profile.durability == DurabilityPolicy.TRANSIENT_LOCAL for pub in infos): - qos.durability = DurabilityPolicy.TRANSIENT_LOCAL - if any(pub.qos_profile.reliability == ReliabilityPolicy.BEST_EFFORT for pub in infos): - qos.reliability = ReliabilityPolicy.BEST_EFFORT + # TODO: How to fix this for eloquent? + # infos = node_handle.get_publishers_info_by_topic(topic) + # if any(pub.qos_profile.durability == DurabilityPolicy.TRANSIENT_LOCAL for pub in infos): + # qos.durability = DurabilityPolicy.TRANSIENT_LOCAL + # if any(pub.qos_profile.reliability == ReliabilityPolicy.BEST_EFFORT for pub in infos): + # qos.reliability = ReliabilityPolicy.BEST_EFFORT # Create the subscriber and associated member variables # Subscriptions is initialized with the current client to start with. diff --git a/rosbridge_library/src/rosbridge_library/protocol.py b/rosbridge_library/src/rosbridge_library/protocol.py index e31a1bee8..aaa51e5f9 100644 --- a/rosbridge_library/src/rosbridge_library/protocol.py +++ b/rosbridge_library/src/rosbridge_library/protocol.py @@ -31,6 +31,7 @@ # POSSIBILITY OF SUCH DAMAGE. import time +import traceback from rosbridge_library.capabilities.fragmentation import Fragmentation from rosbridge_library.util import bson, json @@ -114,16 +115,20 @@ def incoming(self, message_string=""): message_string -- the wire-level message sent by the client """ + # start_proc = time.time() if len(self.buffer) > 0: self.buffer = self.buffer + message_string else: self.buffer = message_string + # num_bytes = len(self.buffer) msg = None # take care of having multiple JSON-objects in receiving buffer # ..first, try to load the whole buffer as a JSON-object try: + # dstart = time.time() msg = self.deserialize(self.buffer) + # des_time = time.time() - dstart self.buffer = "" # if loading whole object fails try to load part of it (from first opening bracket "{" to next closing bracket "}" @@ -134,7 +139,7 @@ def incoming(self, message_string=""): # that receives exactly one full BSON message. # This will then be passed to self.deserialize and shouldn't cause any # exceptions because of fragmented messages (broken or invalid messages might still be sent tough) - self.log("error", "Exception in deserialization of BSON") + self.log("error", f"Exception in deserialization of BSON: {traceback.format_exc()}") else: # TODO: handling of partial/multiple/broken json data in incoming buffer @@ -211,10 +216,13 @@ def incoming(self, message_string=""): # now try to pass message to according operation try: + # opstart = time.time() self.operations[op](msg) + # optime = time.time() - opstart except Exception as exc: self.log("error", f"{op}: {str(exc)}", mid) + # self.log("warn", f"OP with {num_bytes} bytes: Deserial in {des_time:.4E} / Op func in {optime:.4E} / Total in {time.time()-start_proc:.4E} sec") # if anything left in buffer .. re-call self.incoming # TODO: check what happens if we have "garbage" on tcp-stack --> infinite loop might be triggered! .. might get out of it when next valid JSON arrives since only data after last 'valid' closing bracket is kept if len(self.buffer) > 0: diff --git a/rosbridge_server/CMakeLists.txt b/rosbridge_server/CMakeLists.txt index 69e89d83e..a0c54d94b 100644 --- a/rosbridge_server/CMakeLists.txt +++ b/rosbridge_server/CMakeLists.txt @@ -17,12 +17,6 @@ install(PROGRAMS DESTINATION lib/${PROJECT_NAME} ) -# install(FILES -# launch/rosbridge_websocket_launch.xml -# launch/rosbridge_tcp_launch.xml -# launch/rosbridge_tcp_launch.py -# DESTINATION share/${PROJECT_NAME}/launch -# ) install(DIRECTORY launch DESTINATION share/${PROJECT_NAME} ) diff --git a/rosbridge_server/launch/rosbridge_tcp_launch.py b/rosbridge_server/launch/rosbridge_tcp_launch.py deleted file mode 100644 index b253fe4f9..000000000 --- a/rosbridge_server/launch/rosbridge_tcp_launch.py +++ /dev/null @@ -1,71 +0,0 @@ -import launch -from launch.substitutions import EnvironmentVariable -from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription -from launch.substitutions import PathJoinSubstitution, TextSubstitution, LaunchConfiguration -import os -import launch_ros.actions -import pathlib - -def generate_launch_description(): - launch_description = [] - - launch_description.append(DeclareLaunchArgument('port', default_value='9090')) - launch_description.append(DeclareLaunchArgument('host', default_value='127.0.0.1')) - - launch_description.append(DeclareLaunchArgument('incoming_buffer', default_value='65536')) - launch_description.append(DeclareLaunchArgument('socket_timeout', default_value='10')) - launch_description.append(DeclareLaunchArgument('retry_startup_delay', default_value='5')) - - launch_description.append(DeclareLaunchArgument('fragment_timeout', default_value='600')) - launch_description.append(DeclareLaunchArgument('delay_between_messages', default_value='0')) - launch_description.append(DeclareLaunchArgument('max_message_size', default_value='None')) - launch_description.append(DeclareLaunchArgument('unregister_timeout', default_value='10')) - - launch_description.append(DeclareLaunchArgument('topics_glob', default_value='')) - launch_description.append(DeclareLaunchArgument('services_glob', default_value='')) - launch_description.append(DeclareLaunchArgument('params_glob', default_value='')) - launch_description.append(DeclareLaunchArgument('bson_only_mode', default_value='True')) - - launch_description.append( - launch_ros.actions.Node( - prefix=['stdbuf -o L'], # Need this with distro Dashing/Eloquent so that get_logger().info appears on the screen - package='rosbridge_server', - node_executable='rosbridge_tcp', - node_name='rosbridge_tcp', - output='screen', - parameters=[ - { - "port": LaunchConfiguration('port'), - "host": LaunchConfiguration('host'), - "incoming_buffer": LaunchConfiguration('incoming_buffer'), - "socket_timeout": LaunchConfiguration('socket_timeout'), - "retry_startup_delay": LaunchConfiguration('retry_startup_delay'), - "fragment_timeout": LaunchConfiguration('fragment_timeout'), - "delay_between_messages": LaunchConfiguration('delay_between_messages'), - "max_message_size": LaunchConfiguration('max_message_size'), - "unregister_timeout": LaunchConfiguration('unregister_timeout'), - "topics_glob": LaunchConfiguration('topics_glob'), - "services_glob": LaunchConfiguration('services_glob'), - "params_glob": LaunchConfiguration('params_glob'), - "bson_only_mode": LaunchConfiguration('bson_only_mode'), - } - ], - ) - ) - launch_description.append( - launch_ros.actions.Node( - prefix=['stdbuf -o L'], # Need this with distro Dashing/Eloquent so that get_logger().info appears on the screen - package='rosapi', - node_executable='rosapi_node', - node_name='rosapi', - output='screen', - parameters=[ - { - "topics_glob": LaunchConfiguration('topics_glob'), - "services_glob": LaunchConfiguration('services_glob'), - "params_glob": LaunchConfiguration('params_glob'), - } - ], - ) - ) - return launch.LaunchDescription(launch_description) \ No newline at end of file diff --git a/rosbridge_server/launch/rosbridge_tcp_launch.xml b/rosbridge_server/launch/rosbridge_tcp_launch.xml index 5a74a885c..a269ec7a0 100644 --- a/rosbridge_server/launch/rosbridge_tcp_launch.xml +++ b/rosbridge_server/launch/rosbridge_tcp_launch.xml @@ -1,6 +1,6 @@ - + diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index 8129f18d1..303955652 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -3,7 +3,7 @@ # from rospy import init_node, get_param, loginfo, on_shutdown, Publisher import rclpy from rclpy.node import Node -from rclpy.qos import QoSProfile, QoSDurabilityPolicy +from rclpy.qos import QoSProfile, DurabilityPolicy from rosbridge_server import RosbridgeTcpSocket from rosbridge_library.capabilities.advertise import Advertise @@ -103,7 +103,7 @@ class RosbridgeTcpsocketNode(Node): # QoS profile with transient local durability (latched topic in ROS 1). client_count_qos_profile = QoSProfile( depth=10, - durability=QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL + durability=DurabilityPolicy.TRANSIENT_LOCAL ) RosbridgeTcpSocket.client_count_pub = self.create_publisher(Int32, 'client_count', qos_profile=client_count_qos_profile) # RosbridgeTcpSocket.client_count_pub.publish(0) diff --git a/rosbridge_server/src/rosbridge_server/tcp_handler.py b/rosbridge_server/src/rosbridge_server/tcp_handler.py index 70db250ee..10bd635b7 100644 --- a/rosbridge_server/src/rosbridge_server/tcp_handler.py +++ b/rosbridge_server/src/rosbridge_server/tcp_handler.py @@ -1,5 +1,6 @@ # import rospy import rclpy +from std_msgs.msg import Int32 import struct from rosbridge_library.rosbridge_protocol import RosbridgeProtocol ####### @@ -46,29 +47,27 @@ def setup(self): } try: self.protocol = RosbridgeProtocol(cls.client_id_seed, cls.ros_node, parameters=parameters) - print("send function:", cls.send_message) self.protocol.outgoing = self.send_message - self.protocol.parameters = self.parameters cls.client_id_seed += 1 cls.clients_connected += 1 if cls.client_count_pub: - cls.client_count_pub.publish(cls.clients_connected) + cls.client_count_pub.publish(Int32(data=cls.clients_connected)) self.protocol.log("info", "connected. " + str(cls.clients_connected) + " client total.") except Exception as exc: # rospy.logerr("Unable to accept incoming connection. Reason: %s", str(exc)) cls.ros_node.get_logger().info("Unable to accept incoming connection. Reason: " + str(exc)) - def recvall(self,n): + def recvall(self, msgsize): # http://stackoverflow.com/questions/17667903/python-socket-receive-large-amount-of-data # Helper function to recv n bytes or return None if EOF is hit - data = '' - while len(data) < n: - packet = self.request.recv(n - len(data)) + data = bytearray() + while len(data) < msgsize: + packet = self.request.recv(msgsize - len(data)) if not packet: return None - data += str(packet) - return eval(bytes(data,'utf-8')) + data.extend(packet) + return data def recv_bson(self): # Read 4 bytes to get the length of the BSON packet @@ -77,7 +76,6 @@ def recv_bson(self): if not raw_msglen: return None msglen = struct.unpack('i', raw_msglen)[0] - # print("msglen:",msglen) # Retrieve the rest of the message data = self.recvall(msglen - BSON_LENGTH_IN_BYTES) if data is None: @@ -88,11 +86,10 @@ def recv_bson(self): # Exit on empty message if len(data) == 0: return None - self.protocol.incoming(message=data) + self.protocol.incoming(data) return True def handle(self): - print("listening for test, Start TCP-handle") """ Listen for TCP messages """ @@ -127,12 +124,11 @@ def finish(self): cls.clients_connected -= 1 self.protocol.finish() if cls.client_count_pub: - cls.client_count_pub.publish(cls.clients_connected) + cls.client_count_pub.publish(Int32(data=cls.clients_connected)) self.protocol.log("info", "disconnected. " + str(cls.clients_connected) + " client total." ) def send_message(self, message=None): """ Callback from rosbridge """ - print("send msg:", message) self.request.sendall(message) \ No newline at end of file diff --git a/rosbridge_server/src/rosbridge_server/websocket_handler.py b/rosbridge_server/src/rosbridge_server/websocket_handler.py index 72580c163..2e83601ae 100755 --- a/rosbridge_server/src/rosbridge_server/websocket_handler.py +++ b/rosbridge_server/src/rosbridge_server/websocket_handler.py @@ -161,7 +161,7 @@ def open(self): @log_exceptions def on_message(self, message): - if isinstance(message, bytes): + if not self.bson_only_mode and isinstance(message, bytes): message = message.decode("utf-8") self.incoming_queue.push(message) From 441cea9bc654752a1f1439b2c8135d84a45b179d Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Fri, 21 Oct 2022 02:13:50 +0000 Subject: [PATCH 04/31] Optimized large binary array publishing --- .../src/rosbridge_library/internal/message_conversion.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rosbridge_library/src/rosbridge_library/internal/message_conversion.py b/rosbridge_library/src/rosbridge_library/internal/message_conversion.py index df46b5574..de541b0ef 100644 --- a/rosbridge_library/src/rosbridge_library/internal/message_conversion.py +++ b/rosbridge_library/src/rosbridge_library/internal/message_conversion.py @@ -299,6 +299,11 @@ def _to_binary_inst(msg): return list(standard_b64decode(msg)) if isinstance(msg, list): return msg + if isinstance(msg, bytes): + # Using the frombytes() method with a memoryview of the data allows for zero copying of data thanks to Python's buffer protocol (HUGE time-saver for large arrays) + data = array.array('B') + data.frombytes(memoryview(msg)) + return data return bytes(bytearray(msg)) From 17252e579446ea737321e527577640ad0243ef27 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Fri, 21 Oct 2022 17:57:09 +0000 Subject: [PATCH 05/31] Fixed linting issue --- .../src/rosbridge_library/internal/message_conversion.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbridge_library/src/rosbridge_library/internal/message_conversion.py b/rosbridge_library/src/rosbridge_library/internal/message_conversion.py index de541b0ef..3355a11f6 100644 --- a/rosbridge_library/src/rosbridge_library/internal/message_conversion.py +++ b/rosbridge_library/src/rosbridge_library/internal/message_conversion.py @@ -301,7 +301,7 @@ def _to_binary_inst(msg): return msg if isinstance(msg, bytes): # Using the frombytes() method with a memoryview of the data allows for zero copying of data thanks to Python's buffer protocol (HUGE time-saver for large arrays) - data = array.array('B') + data = array.array("B") data.frombytes(memoryview(msg)) return data return bytes(bytearray(msg)) From 59958f398f54f9e9e6619b33b647440d0b91e2e6 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Fri, 21 Oct 2022 18:28:14 +0000 Subject: [PATCH 06/31] Removed unnecessary debug comments --- .../src/rosbridge_library/internal/message_conversion.py | 5 +++-- rosbridge_library/src/rosbridge_library/protocol.py | 7 ------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/rosbridge_library/src/rosbridge_library/internal/message_conversion.py b/rosbridge_library/src/rosbridge_library/internal/message_conversion.py index 7ca22ec6d..385cb9ae1 100644 --- a/rosbridge_library/src/rosbridge_library/internal/message_conversion.py +++ b/rosbridge_library/src/rosbridge_library/internal/message_conversion.py @@ -300,8 +300,9 @@ def _to_binary_inst(msg): if isinstance(msg, list): return msg if isinstance(msg, bytes): - data = array.array('B') - data.frombytes(memoryview(msg)) # Using the frombytes() method with a memoryview of the data allows for zero copying of data (HUGE time-saver for large arrays) + # Using the frombytes() method with a memoryview of the data allows for zero copying of data thanks to Python's buffer protocol (HUGE time-saver for large arrays) + data = array.array("B") + data.frombytes(memoryview(msg)) return data return bytes(bytearray(msg)) diff --git a/rosbridge_library/src/rosbridge_library/protocol.py b/rosbridge_library/src/rosbridge_library/protocol.py index aaa51e5f9..9fa527a81 100644 --- a/rosbridge_library/src/rosbridge_library/protocol.py +++ b/rosbridge_library/src/rosbridge_library/protocol.py @@ -115,20 +115,16 @@ def incoming(self, message_string=""): message_string -- the wire-level message sent by the client """ - # start_proc = time.time() if len(self.buffer) > 0: self.buffer = self.buffer + message_string else: self.buffer = message_string - # num_bytes = len(self.buffer) msg = None # take care of having multiple JSON-objects in receiving buffer # ..first, try to load the whole buffer as a JSON-object try: - # dstart = time.time() msg = self.deserialize(self.buffer) - # des_time = time.time() - dstart self.buffer = "" # if loading whole object fails try to load part of it (from first opening bracket "{" to next closing bracket "}" @@ -216,13 +212,10 @@ def incoming(self, message_string=""): # now try to pass message to according operation try: - # opstart = time.time() self.operations[op](msg) - # optime = time.time() - opstart except Exception as exc: self.log("error", f"{op}: {str(exc)}", mid) - # self.log("warn", f"OP with {num_bytes} bytes: Deserial in {des_time:.4E} / Op func in {optime:.4E} / Total in {time.time()-start_proc:.4E} sec") # if anything left in buffer .. re-call self.incoming # TODO: check what happens if we have "garbage" on tcp-stack --> infinite loop might be triggered! .. might get out of it when next valid JSON arrives since only data after last 'valid' closing bracket is kept if len(self.buffer) > 0: From f0d0b6ef431453833fd0cff58680ded35a68cd1e Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Sat, 12 Nov 2022 22:07:37 +0000 Subject: [PATCH 07/31] Fixed rosbridge_tcp and updated tcp_handler to match ros1 version --- rosbridge_server/scripts/rosbridge_tcp | 51 ++++++------------ .../src/rosbridge_server/tcp_handler.py | 54 ++++++++++--------- 2 files changed, 45 insertions(+), 60 deletions(-) diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index 303955652..1572fd95d 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -1,6 +1,5 @@ #!/usr/bin/env python -# from rospy import init_node, get_param, loginfo, on_shutdown, Publisher import rclpy from rclpy.node import Node from rclpy.qos import QoSProfile, DurabilityPolicy @@ -97,8 +96,6 @@ class RosbridgeTcpsocketNode(Node): for element in self.declare_parameter('params_glob', '').value[1:-1].split(',') if len(element.strip().strip("'")) > 0] - # Publisher for number of connected clients - # RosbridgeTcpSocket.client_count_pub = Publisher('client_count', Int32, queue_size=10, latch=True) # Publisher for number of connected clients # QoS profile with transient local durability (latched topic in ROS 1). client_count_qos_profile = QoSProfile( @@ -106,7 +103,6 @@ class RosbridgeTcpsocketNode(Node): durability=DurabilityPolicy.TRANSIENT_LOCAL ) RosbridgeTcpSocket.client_count_pub = self.create_publisher(Int32, 'client_count', qos_profile=client_count_qos_profile) - # RosbridgeTcpSocket.client_count_pub.publish(0) RosbridgeTcpSocket.client_count_pub.publish(Int32(data=0)) # update parameters if provided via commandline @@ -250,51 +246,38 @@ class RosbridgeTcpsocketNode(Node): ...END (parameter handling) """ -# if __name__ == "__main__": def main(args=None): if args is None: args = sys.argv - loaded = False - retry_count = 0 rclpy.init(args=args) rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() - # while not loaded: - # retry_count += 1 - # print("trying to start rosbridge TCP server..") - try: - print("") - # update to ros2 - # rclpy.init(args=sys.argv) - # node = rclpy.create_node('rosbridge_tcp') - - # init_node("rosbridge_tcp") - signal(SIGINT, SIG_DFL) + try: # Server host is a tuple ('host', port) # empty string for host makes server listen on all available interfaces - SocketServer.ThreadingTCPServer.allow_reuse_address = True + SocketServer.TCPServer.allow_reuse_address = True host = rosbridge_tcpsocket_node._host port = rosbridge_tcpsocket_node._port - with SocketServer.TCPServer((host, port), RosbridgeTcpSocket) as server: - # server = SocketServer.ThreadingTCPServer((host, port), RosbridgeTcpSocket) - # server_thread = threading.Thread(target=server.serve_forever, daemon=True) - # server_thread.start() - # on_shutdown(partial(shutdown_hook, server)) - # 也许再也不需要使用hook_shutdown 参见:https://github.com/ros2/rclpy/issues/244 - # rclpy.context.Context.on_shutdown(partial(shutdown_hook, server)) - - # loginfo("Rosbridge TCP server started on port %d", port) + server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) + with server: + # Need to place server.server_forever() in a thread so we can call rclpy.spin() below + server_thread = threading.Thread(target=server.serve_forever, daemon=True) + server_thread.start() + sys.stdout.flush() rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") - - loaded = True - server.serve_forever() + rclpy.spin(rosbridge_tcpsocket_node) + + except KeyboardInterrupt: + rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") except Exception as e: - time.sleep(rosbridge_tcpsocket_node._delay_between_messages) - print("server not loaded"+e.args) print(traceback.format_exc()) - + finally: + server.shutdown() + server_thread.join() + rosbridge_tcpsocket_node.destroy_node() + rclpy.shutdown() if __name__ == '__main__': main() \ No newline at end of file diff --git a/rosbridge_server/src/rosbridge_server/tcp_handler.py b/rosbridge_server/src/rosbridge_server/tcp_handler.py index 10bd635b7..d70b8ffc1 100644 --- a/rosbridge_server/src/rosbridge_server/tcp_handler.py +++ b/rosbridge_server/src/rosbridge_server/tcp_handler.py @@ -1,12 +1,7 @@ -# import rospy -import rclpy from std_msgs.msg import Int32 import struct from rosbridge_library.rosbridge_protocol import RosbridgeProtocol -####### -import functools -print = functools.partial(print, flush=True) -####### + try: import SocketServer except ImportError: @@ -54,16 +49,15 @@ def setup(self): cls.client_count_pub.publish(Int32(data=cls.clients_connected)) self.protocol.log("info", "connected. " + str(cls.clients_connected) + " client total.") except Exception as exc: - # rospy.logerr("Unable to accept incoming connection. Reason: %s", str(exc)) cls.ros_node.get_logger().info("Unable to accept incoming connection. Reason: " + str(exc)) - def recvall(self, msgsize): + def recvall(self, n): # http://stackoverflow.com/questions/17667903/python-socket-receive-large-amount-of-data # Helper function to recv n bytes or return None if EOF is hit data = bytearray() - while len(data) < msgsize: - packet = self.request.recv(msgsize - len(data)) + while len(data) < n: + packet = self.request.recv(n - len(data)) if not packet: return None data.extend(packet) @@ -76,6 +70,7 @@ def recv_bson(self): if not raw_msglen: return None msglen = struct.unpack('i', raw_msglen)[0] + # Retrieve the rest of the message data = self.recvall(msglen - BSON_LENGTH_IN_BYTES) if data is None: @@ -86,6 +81,7 @@ def recv_bson(self): # Exit on empty message if len(data) == 0: return None + self.protocol.incoming(data) return True @@ -97,28 +93,29 @@ def handle(self): self.request.settimeout(cls.socket_timeout) while True: try: - if self.bson_only_mode: - if self.recv_bson() is None: - break - continue + if self.bson_only_mode: + if self.recv_bson() is None: + break + continue + + # non-BSON handling + bytes = self.request.recv(cls.incoming_buffer) - # non-BSON handling - data = self.request.recv(cls.incoming_buffer) - # Exit on empty string - # add spin to disposal of callback - rclpy.spin_once(self.ros_node, timeout_sec=0.) - if data.strip() == '': - break - elif len(data.strip()) > 0: - self.protocol.incoming(data.strip('')) - else: - pass + # Exit on empty message because socket was closed + if len(bytes) == 0: + break + else: + string_encoded = bytes.decode().strip() + if len(string_encoded) > 0: + self.protocol.incoming(string_encoded) except Exception: self.protocol.log("debug", "socket connection timed out! (ignore warning if client is only listening..)") def finish(self): """ Called when TCP connection finishes + Called after the handle() method to perform any clean-up actions required. + If setup() raises an exception, this function will not be called. """ cls = self.__class__ cls.clients_connected -= 1 @@ -131,4 +128,9 @@ def send_message(self, message=None): """ Callback from rosbridge """ - self.request.sendall(message) \ No newline at end of file + if self.bson_only_mode: + self.request.sendall(message) + elif message is not None: + self.request.sendall(message.encode()) + else: + self.protocol.log("error", "send_message called with no message or message is None, not sending") \ No newline at end of file From 2bcbee30d01999e70aef66389a1d2a762ad7811e Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Sat, 12 Nov 2022 22:13:12 +0000 Subject: [PATCH 08/31] Cleaned up some comments --- rosbridge_server/scripts/rosbridge_tcp | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index 1572fd95d..c62ef32f6 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -25,14 +25,11 @@ import sys import time import traceback import threading + #TODO: take care of socket timeouts and make sure to close sockets after killing program to release network ports #TODO: add new parameters to websocket version! those of rosbridge_tcp.py might not be needed, but the others should work well when adding them to .._websocket.py -# update reference https://docs.ros.org/en/foxy/Contributing/Migration-Guide-Python.html -# update to a class and could be compile to a excutable file -def shutdown_hook(server): - server.shutdown() class RosbridgeTcpsocketNode(Node): def __init__(self): @@ -264,9 +261,9 @@ def main(args=None): # Need to place server.server_forever() in a thread so we can call rclpy.spin() below server_thread = threading.Thread(target=server.serve_forever, daemon=True) server_thread.start() - sys.stdout.flush() rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") + rclpy.spin(rosbridge_tcpsocket_node) except KeyboardInterrupt: From 5278d23fdee08e4fa4ee4638caffb3755cb4151a Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Sun, 13 Nov 2022 16:56:10 +0000 Subject: [PATCH 09/31] Removed a few unnecesary lines, updated shebang --- rosbridge_server/scripts/rosbridge_tcp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index c62ef32f6..61ad3cd16 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 import rclpy from rclpy.node import Node @@ -12,8 +12,6 @@ from rosbridge_library.capabilities.advertise_service import AdvertiseService from rosbridge_library.capabilities.unadvertise_service import UnadvertiseService from rosbridge_library.capabilities.call_service import CallService -from functools import partial -from signal import signal, SIGINT, SIG_DFL from std_msgs.msg import Int32 try: From a034f921544e4f7594517a5b1b51d0bcf50cf832 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Sun, 13 Nov 2022 17:03:19 +0000 Subject: [PATCH 10/31] Added back TCP handler --- rosbridge_server/CMakeLists.txt | 6 +- .../launch/rosbridge_tcp_launch.xml | 42 +++ rosbridge_server/scripts/rosbridge_tcp | 278 ++++++++++++++++++ .../src/rosbridge_server/tcp_handler.py | 136 +++++++++ 4 files changed, 459 insertions(+), 3 deletions(-) create mode 100644 rosbridge_server/launch/rosbridge_tcp_launch.xml create mode 100755 rosbridge_server/scripts/rosbridge_tcp create mode 100644 rosbridge_server/src/rosbridge_server/tcp_handler.py diff --git a/rosbridge_server/CMakeLists.txt b/rosbridge_server/CMakeLists.txt index 2e493ab53..13be432c6 100644 --- a/rosbridge_server/CMakeLists.txt +++ b/rosbridge_server/CMakeLists.txt @@ -13,12 +13,12 @@ ament_package() install(PROGRAMS scripts/rosbridge_websocket.py scripts/rosbridge_websocket + scripts/rosbridge_tcp DESTINATION lib/${PROJECT_NAME} ) -install(FILES - launch/rosbridge_websocket_launch.xml - DESTINATION share/${PROJECT_NAME}/launch +install(DIRECTORY launch + DESTINATION share/${PROJECT_NAME} ) if(BUILD_TESTING) diff --git a/rosbridge_server/launch/rosbridge_tcp_launch.xml b/rosbridge_server/launch/rosbridge_tcp_launch.xml new file mode 100644 index 000000000..a269ec7a0 --- /dev/null +++ b/rosbridge_server/launch/rosbridge_tcp_launch.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp new file mode 100755 index 000000000..61ad3cd16 --- /dev/null +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -0,0 +1,278 @@ +#!/usr/bin/env python3 + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, DurabilityPolicy +from rosbridge_server import RosbridgeTcpSocket + +from rosbridge_library.capabilities.advertise import Advertise +from rosbridge_library.capabilities.publish import Publish +from rosbridge_library.capabilities.subscribe import Subscribe +from rosbridge_library.capabilities.advertise_service import AdvertiseService +from rosbridge_library.capabilities.unadvertise_service import UnadvertiseService +from rosbridge_library.capabilities.call_service import CallService + +from std_msgs.msg import Int32 + +try: + import SocketServer +except ImportError: + import socketserver as SocketServer + +import sys +import time +import traceback +import threading + +#TODO: take care of socket timeouts and make sure to close sockets after killing program to release network ports + +#TODO: add new parameters to websocket version! those of rosbridge_tcp.py might not be needed, but the others should work well when adding them to .._websocket.py + + +class RosbridgeTcpsocketNode(Node): + def __init__(self): + super().__init__('rosbridge_tcp') + + """ + Parameter handling: + - try to get parameter from parameter server (..define those via launch-file) + - overwrite value if given as commandline-parameter + + BEGIN... + """ + + #TODO: ensure types get cast correctly after getting from parameter server + #TODO: check if ROS parameter server uses None string for 'None-value' or Null or something else, then change code accordingly + + # update parameters from parameter server or use default value ( second parameter of get_param ) + # port = get_param('~port', 9090) + port = self.declare_parameter('port', 9090).value + self._port = port + # host = get_param('~host', '') + host = self.declare_parameter('host', '127.0.0.1').value + self._host = host + + RosbridgeTcpSocket.ros_node = self + # incoming_buffer = get_param('~incoming_buffer', RosbridgeTcpSocket.incoming_buffer) + incoming_buffer = self.declare_parameter('incoming_buffer', RosbridgeTcpSocket.incoming_buffer).value + # socket_timeout = get_param('~socket_timeout', RosbridgeTcpSocket.socket_timeout) + socket_timeout = self.declare_parameter('socket_timeout', RosbridgeTcpSocket.socket_timeout).value + # retry_startup_delay = get_param('~retry_startup_delay', 5.0) # seconds + retry_startup_delay = self.declare_parameter('retry_startup_delay', 5.0).value # seconds + # fragment_timeout = get_param('~fragment_timeout', RosbridgeTcpSocket.fragment_timeout) + fragment_timeout = self.declare_parameter('fragment_timeout', RosbridgeTcpSocket.fragment_timeout).value + # delay_between_messages = get_param('~delay_between_messages', RosbridgeTcpSocket.delay_between_messages) + delay_between_messages = self.declare_parameter('delay_between_messages', RosbridgeTcpSocket.delay_between_messages).value + self._delay_between_messages = delay_between_messages + # max_message_size = get_param('~max_message_size', RosbridgeTcpSocket.max_message_size) + max_message_size = self.declare_parameter('max_message_size', RosbridgeTcpSocket.max_message_size).value + # unregister_timeout = get_param('~unregister_timeout', RosbridgeTcpSocket.unregister_timeout) + unregister_timeout = self.declare_parameter('unregister_timeout', RosbridgeTcpSocket.unregister_timeout).value + # bson_only_mode = get_param('~bson_only_mode', False) + bson_only_mode = self.declare_parameter('bson_only_mode', RosbridgeTcpSocket.bson_only_mode).value + + if max_message_size == "None": + max_message_size = None + + # Get the glob strings and parse them as arrays. + RosbridgeTcpSocket.topics_glob = [ + element.strip().strip("'") + # for element in get_param('~topics_glob', '')[1:-1].split(',') + for element in self.declare_parameter('topics_glob', '').value[1:-1].split(',') + if len(element.strip().strip("'")) > 0] + RosbridgeTcpSocket.services_glob = [ + element.strip().strip("'") + # for element in get_param('~services_glob', '')[1:-1].split(',') + for element in self.declare_parameter('services_glob', '').value[1:-1].split(',') + if len(element.strip().strip("'")) > 0] + RosbridgeTcpSocket.params_glob = [ + element.strip().strip("'") + # for element in get_param('~params_glob', '')[1:-1].split(',') + for element in self.declare_parameter('params_glob', '').value[1:-1].split(',') + if len(element.strip().strip("'")) > 0] + + # Publisher for number of connected clients + # QoS profile with transient local durability (latched topic in ROS 1). + client_count_qos_profile = QoSProfile( + depth=10, + durability=DurabilityPolicy.TRANSIENT_LOCAL + ) + RosbridgeTcpSocket.client_count_pub = self.create_publisher(Int32, 'client_count', qos_profile=client_count_qos_profile) + RosbridgeTcpSocket.client_count_pub.publish(Int32(data=0)) + + # update parameters if provided via commandline + # .. could implemented 'better' (value/type checking, etc.. ) + if "--port" in sys.argv: + idx = sys.argv.index("--port") + 1 + if idx < len(sys.argv): + port = int(sys.argv[idx]) + else: + print("--port argument provided without a value.") + sys.exit(-1) + + if "--host" in sys.argv: + idx = sys.argv.index("--host") + 1 + if idx < len(sys.argv): + host = str(sys.argv[idx]) + else: + print("--host argument provided without a value.") + sys.exit(-1) + + if "--incoming_buffer" in sys.argv: + idx = sys.argv.index("--incoming_buffer") + 1 + if idx < len(sys.argv): + incoming_buffer = int(sys.argv[idx]) + else: + print("--incoming_buffer argument provided without a value.") + sys.exit(-1) + + if "--socket_timeout" in sys.argv: + idx = sys.argv.index("--socket_timeout") + 1 + if idx < len(sys.argv): + socket_timeout = int(sys.argv[idx]) + else: + print("--socket_timeout argument provided without a value.") + sys.exit(-1) + + if "--retry_startup_delay" in sys.argv: + idx = sys.argv.index("--retry_startup_delay") + 1 + if idx < len(sys.argv): + retry_startup_delay = int(sys.argv[idx]) + else: + print("--retry_startup_delay argument provided without a value.") + sys.exit(-1) + + if "--fragment_timeout" in sys.argv: + idx = sys.argv.index("--fragment_timeout") + 1 + if idx < len(sys.argv): + fragment_timeout = int(sys.argv[idx]) + else: + print("--fragment_timeout argument provided without a value.") + sys.exit(-1) + + if "--delay_between_messages" in sys.argv: + idx = sys.argv.index("--delay_between_messages") + 1 + if idx < len(sys.argv): + delay_between_messages = float(sys.argv[idx]) + else: + print("--delay_between_messages argument provided without a value.") + sys.exit(-1) + + if "--max_message_size" in sys.argv: + idx = sys.argv.index("--max_message_size") + 1 + if idx < len(sys.argv): + value = sys.argv[idx] + if value == "None": + max_message_size = None + else: + max_message_size = int(value) + else: + print("--max_message_size argument provided without a value. (can be None or )") + sys.exit(-1) + + if "--unregister_timeout" in sys.argv: + idx = sys.argv.index("--unregister_timeout") + 1 + if idx < len(sys.argv): + unregister_timeout = float(sys.argv[idx]) + else: + print("--unregister_timeout argument provided without a value.") + sys.exit(-1) + + # export parameters to handler class + RosbridgeTcpSocket.incoming_buffer = incoming_buffer + RosbridgeTcpSocket.socket_timeout = socket_timeout + RosbridgeTcpSocket.fragment_timeout = fragment_timeout + RosbridgeTcpSocket.delay_between_messages = delay_between_messages + RosbridgeTcpSocket.max_message_size = max_message_size + RosbridgeTcpSocket.unregister_timeout = unregister_timeout + RosbridgeTcpSocket.bson_only_mode = bson_only_mode + + if "--topics_glob" in sys.argv: + idx = sys.argv.index("--topics_glob") + 1 + if idx < len(sys.argv): + value = sys.argv[idx] + if value == "None": + RosbridgeTcpSocket.topics_glob = [] + else: + RosbridgeTcpSocket.topics_glob = [element.strip().strip("'") for element in value[1:-1].split(',')] + else: + print("--topics_glob argument provided without a value. (can be None or a list)") + sys.exit(-1) + + if "--services_glob" in sys.argv: + idx = sys.argv.index("--services_glob") + 1 + if idx < len(sys.argv): + value = sys.argv[idx] + if value == "None": + RosbridgeTcpSocket.services_glob = [] + else: + RosbridgeTcpSocket.services_glob = [element.strip().strip("'") for element in value[1:-1].split(',')] + else: + print("--services_glob argument provided without a value. (can be None or a list)") + sys.exit(-1) + + if "--params_glob" in sys.argv: + idx = sys.argv.index("--params_glob") + 1 + if idx < len(sys.argv): + value = sys.argv[idx] + if value == "None": + RosbridgeTcpSocket.params_glob = [] + else: + RosbridgeTcpSocket.params_glob = [element.strip().strip("'") for element in value[1:-1].split(',')] + else: + print("--params_glob argument provided without a value. (can be None or a list)") + sys.exit(-1) + + if "--bson_only_mode" in sys.argv: + bson_only_mode = True + + # To be able to access the list of topics and services, you must be able to access the rosapi services. + if RosbridgeTcpSocket.services_glob: + RosbridgeTcpSocket.services_glob.append("/rosapi/*") + + Subscribe.topics_glob = RosbridgeTcpSocket.topics_glob + Advertise.topics_glob = RosbridgeTcpSocket.topics_glob + Publish.topics_glob = RosbridgeTcpSocket.topics_glob + AdvertiseService.services_glob = RosbridgeTcpSocket.services_glob + UnadvertiseService.services_glob = RosbridgeTcpSocket.services_glob + CallService.services_glob = RosbridgeTcpSocket.services_glob + + """ + ...END (parameter handling) + """ + +def main(args=None): + if args is None: + args = sys.argv + + rclpy.init(args=args) + rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() + + try: + # Server host is a tuple ('host', port) + # empty string for host makes server listen on all available interfaces + SocketServer.TCPServer.allow_reuse_address = True + host = rosbridge_tcpsocket_node._host + port = rosbridge_tcpsocket_node._port + server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) + with server: + # Need to place server.server_forever() in a thread so we can call rclpy.spin() below + server_thread = threading.Thread(target=server.serve_forever, daemon=True) + server_thread.start() + sys.stdout.flush() + rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") + + rclpy.spin(rosbridge_tcpsocket_node) + + except KeyboardInterrupt: + rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") + except Exception as e: + print(traceback.format_exc()) + finally: + server.shutdown() + server_thread.join() + rosbridge_tcpsocket_node.destroy_node() + rclpy.shutdown() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/rosbridge_server/src/rosbridge_server/tcp_handler.py b/rosbridge_server/src/rosbridge_server/tcp_handler.py new file mode 100644 index 000000000..d70b8ffc1 --- /dev/null +++ b/rosbridge_server/src/rosbridge_server/tcp_handler.py @@ -0,0 +1,136 @@ +from std_msgs.msg import Int32 +import struct +from rosbridge_library.rosbridge_protocol import RosbridgeProtocol + +try: + import SocketServer +except ImportError: + import socketserver as SocketServer + +class RosbridgeTcpSocket(SocketServer.BaseRequestHandler): + """ + TCP Socket server for rosbridge + """ + + busy = False + queue = [] + client_id_seed = 0 + clients_connected = 0 + client_count_pub = None + + # list of parameters + incoming_buffer = 65536 # bytes + socket_timeout = 10 # seconds + # The following are passed on to RosbridgeProtocol + # defragmentation.py: + fragment_timeout = 600 # seconds + # protocol.py: + delay_between_messages = 0 # seconds + max_message_size = None # bytes + unregister_timeout = 10.0 # seconds + bson_only_mode = False + ros_node = None + + def setup(self): + cls = self.__class__ + parameters = { + "fragment_timeout": cls.fragment_timeout, + "delay_between_messages": cls.delay_between_messages, + "max_message_size": cls.max_message_size, + "unregister_timeout": cls.unregister_timeout, + "bson_only_mode": cls.bson_only_mode + } + try: + self.protocol = RosbridgeProtocol(cls.client_id_seed, cls.ros_node, parameters=parameters) + self.protocol.outgoing = self.send_message + cls.client_id_seed += 1 + cls.clients_connected += 1 + if cls.client_count_pub: + cls.client_count_pub.publish(Int32(data=cls.clients_connected)) + self.protocol.log("info", "connected. " + str(cls.clients_connected) + " client total.") + except Exception as exc: + cls.ros_node.get_logger().info("Unable to accept incoming connection. Reason: " + str(exc)) + + + def recvall(self, n): + # http://stackoverflow.com/questions/17667903/python-socket-receive-large-amount-of-data + # Helper function to recv n bytes or return None if EOF is hit + data = bytearray() + while len(data) < n: + packet = self.request.recv(n - len(data)) + if not packet: + return None + data.extend(packet) + return data + + def recv_bson(self): + # Read 4 bytes to get the length of the BSON packet + BSON_LENGTH_IN_BYTES = 4 + raw_msglen = self.recvall(BSON_LENGTH_IN_BYTES) + if not raw_msglen: + return None + msglen = struct.unpack('i', raw_msglen)[0] + + # Retrieve the rest of the message + data = self.recvall(msglen - BSON_LENGTH_IN_BYTES) + if data is None: + return None + data = raw_msglen + data # Prefix the data with the message length that has already been received. + # The message length is part of BSONs message format + + # Exit on empty message + if len(data) == 0: + return None + + self.protocol.incoming(data) + return True + + def handle(self): + """ + Listen for TCP messages + """ + cls = self.__class__ + self.request.settimeout(cls.socket_timeout) + while True: + try: + if self.bson_only_mode: + if self.recv_bson() is None: + break + continue + + # non-BSON handling + bytes = self.request.recv(cls.incoming_buffer) + + # Exit on empty message because socket was closed + if len(bytes) == 0: + break + else: + string_encoded = bytes.decode().strip() + if len(string_encoded) > 0: + self.protocol.incoming(string_encoded) + except Exception: + self.protocol.log("debug", "socket connection timed out! (ignore warning if client is only listening..)") + + def finish(self): + """ + Called when TCP connection finishes + Called after the handle() method to perform any clean-up actions required. + If setup() raises an exception, this function will not be called. + """ + cls = self.__class__ + cls.clients_connected -= 1 + self.protocol.finish() + if cls.client_count_pub: + cls.client_count_pub.publish(Int32(data=cls.clients_connected)) + self.protocol.log("info", "disconnected. " + str(cls.clients_connected) + " client total." ) + + def send_message(self, message=None): + """ + Callback from rosbridge + """ + if self.bson_only_mode: + self.request.sendall(message) + elif message is not None: + self.request.sendall(message.encode()) + else: + self.protocol.log("error", "send_message called with no message or message is None, not sending") \ No newline at end of file From c37630650407c44719731c65b2e7dc0d7ed6dafa Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Sun, 13 Nov 2022 17:25:26 +0000 Subject: [PATCH 11/31] Added new line to pass linter requirements --- rosbridge_server/src/rosbridge_server/tcp_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbridge_server/src/rosbridge_server/tcp_handler.py b/rosbridge_server/src/rosbridge_server/tcp_handler.py index d70b8ffc1..0ee474d63 100644 --- a/rosbridge_server/src/rosbridge_server/tcp_handler.py +++ b/rosbridge_server/src/rosbridge_server/tcp_handler.py @@ -133,4 +133,4 @@ def send_message(self, message=None): elif message is not None: self.request.sendall(message.encode()) else: - self.protocol.log("error", "send_message called with no message or message is None, not sending") \ No newline at end of file + self.protocol.log("error", "send_message called with no message or message is None, not sending") From e6905d6918cb9da20041253861ecf7be8d1bb240 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Sun, 13 Nov 2022 17:28:22 +0000 Subject: [PATCH 12/31] Added blank lines to end of other files --- rosbridge_server/launch/rosbridge_tcp_launch.xml | 2 +- rosbridge_server/scripts/rosbridge_tcp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rosbridge_server/launch/rosbridge_tcp_launch.xml b/rosbridge_server/launch/rosbridge_tcp_launch.xml index a269ec7a0..496415f4f 100644 --- a/rosbridge_server/launch/rosbridge_tcp_launch.xml +++ b/rosbridge_server/launch/rosbridge_tcp_launch.xml @@ -39,4 +39,4 @@ - \ No newline at end of file + diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index 61ad3cd16..f37a4ccf5 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -275,4 +275,4 @@ def main(args=None): rclpy.shutdown() if __name__ == '__main__': - main() \ No newline at end of file + main() From 07423aaceb0532791b2029f898929e4907b7073a Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Sat, 19 Nov 2022 17:50:36 +0000 Subject: [PATCH 13/31] Switching to threading tcp server --- rosbridge_server/scripts/rosbridge_tcp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index 61ad3cd16..c08ffde17 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -251,10 +251,10 @@ def main(args=None): try: # Server host is a tuple ('host', port) # empty string for host makes server listen on all available interfaces - SocketServer.TCPServer.allow_reuse_address = True + SocketServer.ThreadingTCPServer.allow_reuse_address = True host = rosbridge_tcpsocket_node._host port = rosbridge_tcpsocket_node._port - server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) + server = SocketServer.ThreadingTCPServer((host, port), RosbridgeTcpSocket) with server: # Need to place server.server_forever() in a thread so we can call rclpy.spin() below server_thread = threading.Thread(target=server.serve_forever, daemon=True) From 8e77ec9bb76af945d7ba9b77a4906fd410752db1 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Tue, 22 Nov 2022 16:36:25 +0000 Subject: [PATCH 14/31] Trying to fix connection reset errors --- rosbridge_server/scripts/rosbridge_tcp | 27 +++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index c08ffde17..e37161f9c 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -251,22 +251,31 @@ def main(args=None): try: # Server host is a tuple ('host', port) # empty string for host makes server listen on all available interfaces - SocketServer.ThreadingTCPServer.allow_reuse_address = True + SocketServer.TCPServer.allow_reuse_address = True host = rosbridge_tcpsocket_node._host port = rosbridge_tcpsocket_node._port - server = SocketServer.ThreadingTCPServer((host, port), RosbridgeTcpSocket) - with server: + server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) + # with server: # Need to place server.server_forever() in a thread so we can call rclpy.spin() below - server_thread = threading.Thread(target=server.serve_forever, daemon=True) - server_thread.start() - sys.stdout.flush() - rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") - - rclpy.spin(rosbridge_tcpsocket_node) + server_thread = threading.Thread(target=server.serve_forever, daemon=True) + server_thread.start() + sys.stdout.flush() + rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") + + rclpy.spin(rosbridge_tcpsocket_node) + + # Put rclpy.spin in separate thread? + # sys.stdout.flush() + # spin_thread = threading.Thread(target=lambda: rclpy.spin(rosbridge_tcpsocket_node), daemon=True) + # spin_thread.start() + # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP node spinning in separate thread") + # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") + # server.serve_forever() except KeyboardInterrupt: rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") except Exception as e: + rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") print(traceback.format_exc()) finally: server.shutdown() From c9c82487bcae1a859c8c112cc4ee6b840d0b6093 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Thu, 24 Nov 2022 21:54:08 -0500 Subject: [PATCH 15/31] Added auto-reload rosbridge tcp on failure --- rosbridge_server/scripts/rosbridge_tcp | 102 ++++++++++++++++--------- 1 file changed, 68 insertions(+), 34 deletions(-) diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index e37161f9c..67415b0f3 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -187,6 +187,8 @@ class RosbridgeTcpsocketNode(Node): RosbridgeTcpSocket.unregister_timeout = unregister_timeout RosbridgeTcpSocket.bson_only_mode = bson_only_mode + self.retry_startup_delay = retry_startup_delay + if "--topics_glob" in sys.argv: idx = sys.argv.index("--topics_glob") + 1 if idx < len(sys.argv): @@ -246,42 +248,74 @@ def main(args=None): args = sys.argv rclpy.init(args=args) - rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() - - try: - # Server host is a tuple ('host', port) - # empty string for host makes server listen on all available interfaces - SocketServer.TCPServer.allow_reuse_address = True - host = rosbridge_tcpsocket_node._host - port = rosbridge_tcpsocket_node._port - server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) - # with server: - # Need to place server.server_forever() in a thread so we can call rclpy.spin() below - server_thread = threading.Thread(target=server.serve_forever, daemon=True) - server_thread.start() - sys.stdout.flush() - rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") + + loaded = False + while not loaded: + try: + rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() + # Server host is a tuple ('host', port) + # empty string for host makes server listen on all available interfaces + SocketServer.TCPServer.allow_reuse_address = True + host = rosbridge_tcpsocket_node._host + port = rosbridge_tcpsocket_node._port + server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) + server_thread = threading.Thread(target=server.serve_forever, daemon=True) + server_thread.start() + sys.stdout.flush() + rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") + + loaded = True + rclpy.spin(rosbridge_tcpsocket_node) + except KeyboardInterrupt: + rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") + except Exception as e: + rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") + rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") + print(traceback.format_exc()) + loaded = False + time.sleep(rosbridge_tcpsocket_node.retry_startup_delay) + finally: + server.shutdown() + server_thread.join() + rosbridge_tcpsocket_node.destroy_node() + rclpy.shutdown() + + # No while loop + # rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() + # try: + # # Server host is a tuple ('host', port) + # # empty string for host makes server listen on all available interfaces + # SocketServer.TCPServer.allow_reuse_address = True + # host = rosbridge_tcpsocket_node._host + # port = rosbridge_tcpsocket_node._port + # server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) + # # with server: + # # Need to place server.server_forever() in a thread so we can call rclpy.spin() below + # server_thread = threading.Thread(target=server.serve_forever, daemon=True) + # server_thread.start() + # sys.stdout.flush() + # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") - rclpy.spin(rosbridge_tcpsocket_node) - - # Put rclpy.spin in separate thread? - # sys.stdout.flush() - # spin_thread = threading.Thread(target=lambda: rclpy.spin(rosbridge_tcpsocket_node), daemon=True) - # spin_thread.start() - # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP node spinning in separate thread") - # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") - # server.serve_forever() + # rclpy.spin(rosbridge_tcpsocket_node) + + # # Put rclpy.spin in separate thread? + # # sys.stdout.flush() + # # spin_thread = threading.Thread(target=lambda: rclpy.spin(rosbridge_tcpsocket_node), daemon=True) + # # spin_thread.start() + # # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP node spinning in separate thread") + # # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") + # # server.serve_forever() - except KeyboardInterrupt: - rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") - except Exception as e: - rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") - print(traceback.format_exc()) - finally: - server.shutdown() - server_thread.join() - rosbridge_tcpsocket_node.destroy_node() - rclpy.shutdown() + # except KeyboardInterrupt: + # rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") + # except Exception as e: + # rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") + # print(traceback.format_exc()) + # finally: + # server.shutdown() + # server_thread.join() + # rosbridge_tcpsocket_node.destroy_node() + # rclpy.shutdown() if __name__ == '__main__': main() \ No newline at end of file From 7f39f0acb524a5529b0e079feef19019d73b58da Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Wed, 7 Dec 2022 18:05:17 +0000 Subject: [PATCH 16/31] Adding more exception catching to rosbridge_tcp --- rosbridge_server/scripts/rosbridge_tcp | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index 67415b0f3..54c657ad6 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -23,6 +23,7 @@ import sys import time import traceback import threading +import errno #TODO: take care of socket timeouts and make sure to close sockets after killing program to release network ports @@ -247,11 +248,13 @@ def main(args=None): if args is None: args = sys.argv - rclpy.init(args=args) + # rclpy.init(args=args) loaded = False + sleep_dur = 0. while not loaded: try: + rclpy.init(args=args) rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() # Server host is a tuple ('host', port) # empty string for host makes server listen on all available interfaces @@ -268,17 +271,31 @@ def main(args=None): rclpy.spin(rosbridge_tcpsocket_node) except KeyboardInterrupt: rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") + sleep_dur = 0. + + except OSError as error: + rosbridge_tcpsocket_node.get_logger().info(f"Caught OS error: {traceback.format_exc()}") + if error.errno == errno.EADDRINUSE: + rosbridge_tcpsocket_node.get_logger().info(f"OS thinks address is in use. Will try reloading in 5 minutes.") + sleep_dur = 360. + else: + rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") + sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay + loaded = False + except Exception as e: rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") print(traceback.format_exc()) loaded = False - time.sleep(rosbridge_tcpsocket_node.retry_startup_delay) + sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay + finally: server.shutdown() server_thread.join() rosbridge_tcpsocket_node.destroy_node() - rclpy.shutdown() + rclpy.shutdown() + time.sleep(sleep_dur) # No while loop # rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() From cb7766181a5782474e7958f8a9b57bd49885a14e Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Wed, 7 Dec 2022 22:53:18 -0500 Subject: [PATCH 17/31] Added close socket call before retrying to reload TCP server --- rosbridge_server/scripts/rosbridge_tcp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index 54c657ad6..e25b1e35b 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -269,6 +269,7 @@ def main(args=None): loaded = True rclpy.spin(rosbridge_tcpsocket_node) + except KeyboardInterrupt: rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") sleep_dur = 0. @@ -276,8 +277,8 @@ def main(args=None): except OSError as error: rosbridge_tcpsocket_node.get_logger().info(f"Caught OS error: {traceback.format_exc()}") if error.errno == errno.EADDRINUSE: - rosbridge_tcpsocket_node.get_logger().info(f"OS thinks address is in use. Will try reloading in 5 minutes.") - sleep_dur = 360. + rosbridge_tcpsocket_node.get_logger().info(f"OS thinks address is in use. Will try reloading in 30 seconds.") + sleep_dur = 30. else: rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay @@ -293,6 +294,7 @@ def main(args=None): finally: server.shutdown() server_thread.join() + server.server_close() # Must close socket to do proper cleanup rosbridge_tcpsocket_node.destroy_node() rclpy.shutdown() time.sleep(sleep_dur) From 5a1358a32a94b61b5f4569c79ea5aa7b67ed1a07 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Fri, 9 Dec 2022 12:17:04 -0500 Subject: [PATCH 18/31] Putting rclpy.spin in separate thread --- rosbridge_server/scripts/rosbridge_tcp | 132 +++++++++++++++---------- 1 file changed, 78 insertions(+), 54 deletions(-) diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index e25b1e35b..826747a9a 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -244,97 +244,121 @@ class RosbridgeTcpsocketNode(Node): ...END (parameter handling) """ +def spin_function(event, node): + while rclpy.ok() and not event.is_set(): + rclpy.spin_once(node) + node.get_logger().info(f"Ending spin thread.") + def main(args=None): if args is None: args = sys.argv # rclpy.init(args=args) + # Version 1: TCP server in separate thread + # loaded = False + # sleep_dur = 0. + # while not loaded: + # try: + # rclpy.init(args=args) + # rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() + # # Server host is a tuple ('host', port) + # # empty string for host makes server listen on all available interfaces + # SocketServer.TCPServer.allow_reuse_address = True + # host = rosbridge_tcpsocket_node._host + # port = rosbridge_tcpsocket_node._port + # server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) + # server_thread = threading.Thread(target=server.serve_forever, daemon=True) + # server_thread.start() + # sys.stdout.flush() + # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") + + # loaded = True + # rclpy.spin(rosbridge_tcpsocket_node) + + # except KeyboardInterrupt: + # rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") + # sleep_dur = 0. + + # except OSError as error: + # rosbridge_tcpsocket_node.get_logger().info(f"Caught OS error: {traceback.format_exc()}") + # if error.errno == errno.EADDRINUSE: + # rosbridge_tcpsocket_node.get_logger().info(f"OS thinks address is in use. Will try reloading in 30 seconds.") + # sleep_dur = 30. + # else: + # rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") + # sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay + # loaded = False + + # except Exception as e: + # rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") + # rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") + # print(traceback.format_exc()) + # loaded = False + # sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay + + # finally: + # server.shutdown() + # server_thread.join() + # server.server_close() # Must close socket to do proper cleanup + # rosbridge_tcpsocket_node.destroy_node() + # rclpy.shutdown() + # time.sleep(sleep_dur) + + # Version 2: rclpy.spin in different thread loaded = False sleep_dur = 0. + server_is_running = False while not loaded: try: rclpy.init(args=args) rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() + event = threading.Event() + spin_thread = threading.Thread(target=spin_function, daemon=True, args=(event, rosbridge_tcpsocket_node)) + # executor = rclpy.executors.MultiThreadedExecutor() + # executor.add_node(rosbridge_tcpsocket_node) + # spin_thread = threading.Thread(target=executor.spin, daemon=True) + spin_thread.start() + rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP node spinning in separate thread") # Server host is a tuple ('host', port) # empty string for host makes server listen on all available interfaces SocketServer.TCPServer.allow_reuse_address = True host = rosbridge_tcpsocket_node._host port = rosbridge_tcpsocket_node._port - server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) - server_thread = threading.Thread(target=server.serve_forever, daemon=True) - server_thread.start() + server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) + loaded = True + sys.stdout.flush() rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") - - loaded = True - rclpy.spin(rosbridge_tcpsocket_node) + server_is_running = True + server.serve_forever() except KeyboardInterrupt: rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") sleep_dur = 0. - except OSError as error: + except OSError: rosbridge_tcpsocket_node.get_logger().info(f"Caught OS error: {traceback.format_exc()}") - if error.errno == errno.EADDRINUSE: - rosbridge_tcpsocket_node.get_logger().info(f"OS thinks address is in use. Will try reloading in 30 seconds.") - sleep_dur = 30. - else: - rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") - sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay + rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") + sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay loaded = False - except Exception as e: + except Exception: rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") - print(traceback.format_exc()) loaded = False sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay finally: - server.shutdown() - server_thread.join() + if server_is_running: + server.shutdown() + server_is_running = False server.server_close() # Must close socket to do proper cleanup + event.set() + # spin_thread.join() rosbridge_tcpsocket_node.destroy_node() rclpy.shutdown() time.sleep(sleep_dur) - # No while loop - # rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() - # try: - # # Server host is a tuple ('host', port) - # # empty string for host makes server listen on all available interfaces - # SocketServer.TCPServer.allow_reuse_address = True - # host = rosbridge_tcpsocket_node._host - # port = rosbridge_tcpsocket_node._port - # server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) - # # with server: - # # Need to place server.server_forever() in a thread so we can call rclpy.spin() below - # server_thread = threading.Thread(target=server.serve_forever, daemon=True) - # server_thread.start() - # sys.stdout.flush() - # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") - - # rclpy.spin(rosbridge_tcpsocket_node) - - # # Put rclpy.spin in separate thread? - # # sys.stdout.flush() - # # spin_thread = threading.Thread(target=lambda: rclpy.spin(rosbridge_tcpsocket_node), daemon=True) - # # spin_thread.start() - # # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP node spinning in separate thread") - # # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") - # # server.serve_forever() - - # except KeyboardInterrupt: - # rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") - # except Exception as e: - # rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") - # print(traceback.format_exc()) - # finally: - # server.shutdown() - # server_thread.join() - # rosbridge_tcpsocket_node.destroy_node() - # rclpy.shutdown() - if __name__ == '__main__': main() \ No newline at end of file From 5e705795b2dbf245ebcfd6399f26c0f27fe7b718 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Sat, 10 Dec 2022 11:21:30 -0500 Subject: [PATCH 19/31] Trying to catch broken pipe and connection reset error --- rosbridge_server/scripts/rosbridge_tcp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index 826747a9a..2d44cb469 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -337,10 +337,12 @@ def main(args=None): rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") sleep_dur = 0. - except OSError: + except OSError as error: rosbridge_tcpsocket_node.get_logger().info(f"Caught OS error: {traceback.format_exc()}") rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay + if error.errno == errno.EPIPE or error.errno == errno.ECONNRESET: + server_is_running = False loaded = False except Exception: From f70ebb41c4d0a276f8d053151fa653db4ae770a7 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Sun, 11 Dec 2022 23:15:44 -0500 Subject: [PATCH 20/31] Still trying to debug connection reset problem --- rosbridge_server/scripts/rosbridge_tcp | 139 +++++++++--------- .../src/rosbridge_server/tcp_handler.py | 16 +- 2 files changed, 78 insertions(+), 77 deletions(-) diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index 2d44cb469..a15cb0ce8 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -256,82 +256,25 @@ def main(args=None): # rclpy.init(args=args) # Version 1: TCP server in separate thread - # loaded = False - # sleep_dur = 0. - # while not loaded: - # try: - # rclpy.init(args=args) - # rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() - # # Server host is a tuple ('host', port) - # # empty string for host makes server listen on all available interfaces - # SocketServer.TCPServer.allow_reuse_address = True - # host = rosbridge_tcpsocket_node._host - # port = rosbridge_tcpsocket_node._port - # server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) - # server_thread = threading.Thread(target=server.serve_forever, daemon=True) - # server_thread.start() - # sys.stdout.flush() - # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") - - # loaded = True - # rclpy.spin(rosbridge_tcpsocket_node) - - # except KeyboardInterrupt: - # rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") - # sleep_dur = 0. - - # except OSError as error: - # rosbridge_tcpsocket_node.get_logger().info(f"Caught OS error: {traceback.format_exc()}") - # if error.errno == errno.EADDRINUSE: - # rosbridge_tcpsocket_node.get_logger().info(f"OS thinks address is in use. Will try reloading in 30 seconds.") - # sleep_dur = 30. - # else: - # rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") - # sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay - # loaded = False - - # except Exception as e: - # rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") - # rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") - # print(traceback.format_exc()) - # loaded = False - # sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay - - # finally: - # server.shutdown() - # server_thread.join() - # server.server_close() # Must close socket to do proper cleanup - # rosbridge_tcpsocket_node.destroy_node() - # rclpy.shutdown() - # time.sleep(sleep_dur) - - # Version 2: rclpy.spin in different thread loaded = False sleep_dur = 0. - server_is_running = False while not loaded: try: rclpy.init(args=args) rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() - event = threading.Event() - spin_thread = threading.Thread(target=spin_function, daemon=True, args=(event, rosbridge_tcpsocket_node)) - # executor = rclpy.executors.MultiThreadedExecutor() - # executor.add_node(rosbridge_tcpsocket_node) - # spin_thread = threading.Thread(target=executor.spin, daemon=True) - spin_thread.start() - rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP node spinning in separate thread") # Server host is a tuple ('host', port) # empty string for host makes server listen on all available interfaces SocketServer.TCPServer.allow_reuse_address = True host = rosbridge_tcpsocket_node._host port = rosbridge_tcpsocket_node._port - server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) - loaded = True - + server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) + server_thread = threading.Thread(target=server.serve_forever, daemon=True) + server_thread.start() sys.stdout.flush() rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") - server_is_running = True - server.serve_forever() + + loaded = True + rclpy.spin(rosbridge_tcpsocket_node) except KeyboardInterrupt: rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") @@ -341,26 +284,80 @@ def main(args=None): rosbridge_tcpsocket_node.get_logger().info(f"Caught OS error: {traceback.format_exc()}") rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay - if error.errno == errno.EPIPE or error.errno == errno.ECONNRESET: - server_is_running = False loaded = False - except Exception: + except Exception as e: rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") loaded = False sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay finally: - if server_is_running: - server.shutdown() - server_is_running = False + rosbridge_tcpsocket_node.get_logger().info(f"Shutting down server...") + server.shutdown() + server_thread.join() server.server_close() # Must close socket to do proper cleanup - event.set() - # spin_thread.join() + rosbridge_tcpsocket_node.get_logger().info(f"Destroying node...") rosbridge_tcpsocket_node.destroy_node() rclpy.shutdown() time.sleep(sleep_dur) + # Version 2: rclpy.spin in different thread + # loaded = False + # sleep_dur = 0. + # server_is_running = False + # while not loaded: + # try: + # rclpy.init(args=args) + # rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() + # event = threading.Event() + # spin_thread = threading.Thread(target=spin_function, daemon=True, args=(event, rosbridge_tcpsocket_node)) + # # executor = rclpy.executors.MultiThreadedExecutor() + # # executor.add_node(rosbridge_tcpsocket_node) + # # spin_thread = threading.Thread(target=executor.spin, daemon=True) + # spin_thread.start() + # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP node spinning in separate thread") + # # Server host is a tuple ('host', port) + # # empty string for host makes server listen on all available interfaces + # SocketServer.TCPServer.allow_reuse_address = True + # host = rosbridge_tcpsocket_node._host + # port = rosbridge_tcpsocket_node._port + # server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) + # loaded = True + + # sys.stdout.flush() + # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") + # server_is_running = True + # server.serve_forever() + + # except KeyboardInterrupt: + # rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") + # sleep_dur = 0. + + # except OSError as error: + # rosbridge_tcpsocket_node.get_logger().info(f"Caught OS error: {traceback.format_exc()}") + # rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") + # sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay + # if error.errno == errno.EPIPE or error.errno == errno.ECONNRESET: + # server_is_running = False + # loaded = False + + # except Exception: + # rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") + # rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") + # loaded = False + # sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay + + # finally: + # if server_is_running: + # server.shutdown() + # server_is_running = False + # server.server_close() # Must close socket to do proper cleanup + # event.set() + # # spin_thread.join() + # rosbridge_tcpsocket_node.destroy_node() + # rclpy.shutdown() + # time.sleep(sleep_dur) + if __name__ == '__main__': main() \ No newline at end of file diff --git a/rosbridge_server/src/rosbridge_server/tcp_handler.py b/rosbridge_server/src/rosbridge_server/tcp_handler.py index d70b8ffc1..5d7ee90c6 100644 --- a/rosbridge_server/src/rosbridge_server/tcp_handler.py +++ b/rosbridge_server/src/rosbridge_server/tcp_handler.py @@ -1,6 +1,7 @@ from std_msgs.msg import Int32 import struct from rosbridge_library.rosbridge_protocol import RosbridgeProtocol +import traceback try: import SocketServer @@ -128,9 +129,12 @@ def send_message(self, message=None): """ Callback from rosbridge """ - if self.bson_only_mode: - self.request.sendall(message) - elif message is not None: - self.request.sendall(message.encode()) - else: - self.protocol.log("error", "send_message called with no message or message is None, not sending") \ No newline at end of file + try: + if self.bson_only_mode: + self.request.sendall(message) + elif message is not None: + self.request.sendall(message.encode()) + else: + self.protocol.log("error", "send_message called with no message or message is None, not sending") + except Exception: + self.protocol.log("error", f"Exception on send_message(): {traceback.format_exc()}") \ No newline at end of file From c86088564ffbf127b16ee05076307999710d0863 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Mon, 12 Dec 2022 18:01:26 +0000 Subject: [PATCH 21/31] Added debug logging to srv handle_request() --- .../capabilities/advertise_service.py | 3 +- rosbridge_server/scripts/rosbridge_tcp | 136 +++++++++--------- .../src/rosbridge_server/tcp_handler.py | 17 +-- 3 files changed, 78 insertions(+), 78 deletions(-) diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py index a1db57ed5..7022def2c 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py @@ -5,7 +5,7 @@ from rosbridge_library.capability import Capability from rosbridge_library.internal import message_conversion from rosbridge_library.internal.ros_loader import get_service_class - +import datetime class AdvertisedServiceHandler: @@ -30,6 +30,7 @@ def next_id(self): return id async def handle_request(self, req, res): + self.protocol.log("info", f"[{datetime.datetime.now()}] handle_request() for service {self.service_name}") # DEBUG # generate a unique ID request_id = "service_request:" + self.service_name + ":" + str(self.next_id()) diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index a15cb0ce8..80b4ac73a 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -256,25 +256,79 @@ def main(args=None): # rclpy.init(args=args) # Version 1: TCP server in separate thread + # loaded = False + # sleep_dur = 0. + # while not loaded: + # try: + # rclpy.init(args=args) + # rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() + # # Server host is a tuple ('host', port) + # # empty string for host makes server listen on all available interfaces + # SocketServer.TCPServer.allow_reuse_address = True + # host = rosbridge_tcpsocket_node._host + # port = rosbridge_tcpsocket_node._port + # server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) + # server_thread = threading.Thread(target=server.serve_forever, daemon=True) + # server_thread.start() + # sys.stdout.flush() + # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") + + # loaded = True + # rclpy.spin(rosbridge_tcpsocket_node) + + # except KeyboardInterrupt: + # rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") + # sleep_dur = 0. + + # except OSError as error: + # rosbridge_tcpsocket_node.get_logger().info(f"Caught OS error: {traceback.format_exc()}") + # rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") + # sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay + # loaded = False + + # except Exception as e: + # rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") + # rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") + # loaded = False + # sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay + + # finally: + # rosbridge_tcpsocket_node.get_logger().info(f"Shutting down server...") + # server.shutdown() + # server_thread.join() + # server.server_close() # Must close socket to do proper cleanup + # rosbridge_tcpsocket_node.get_logger().info(f"Destroying node...") + # rosbridge_tcpsocket_node.destroy_node() + # rclpy.shutdown() + # time.sleep(sleep_dur) + + # Version 2: rclpy.spin in different thread loaded = False sleep_dur = 0. + server_is_running = False while not loaded: try: rclpy.init(args=args) rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() + event = threading.Event() + spin_thread = threading.Thread(target=spin_function, daemon=True, args=(event, rosbridge_tcpsocket_node)) + # executor = rclpy.executors.MultiThreadedExecutor() + # executor.add_node(rosbridge_tcpsocket_node) + # spin_thread = threading.Thread(target=executor.spin, daemon=True) + spin_thread.start() + rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP node spinning in separate thread") # Server host is a tuple ('host', port) # empty string for host makes server listen on all available interfaces SocketServer.TCPServer.allow_reuse_address = True host = rosbridge_tcpsocket_node._host port = rosbridge_tcpsocket_node._port - server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) - server_thread = threading.Thread(target=server.serve_forever, daemon=True) - server_thread.start() + server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) + loaded = True + sys.stdout.flush() rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") - - loaded = True - rclpy.spin(rosbridge_tcpsocket_node) + server_is_running = True + server.serve_forever() except KeyboardInterrupt: rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") @@ -284,80 +338,28 @@ def main(args=None): rosbridge_tcpsocket_node.get_logger().info(f"Caught OS error: {traceback.format_exc()}") rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay + if error.errno == errno.EPIPE or error.errno == errno.ECONNRESET: + server_is_running = False loaded = False - except Exception as e: + except Exception: rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") loaded = False sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay finally: - rosbridge_tcpsocket_node.get_logger().info(f"Shutting down server...") - server.shutdown() - server_thread.join() + if server_is_running: + rosbridge_tcpsocket_node.get_logger().info(f"Shutting down server...") + server.shutdown() + server_is_running = False server.server_close() # Must close socket to do proper cleanup + event.set() + # spin_thread.join() rosbridge_tcpsocket_node.get_logger().info(f"Destroying node...") rosbridge_tcpsocket_node.destroy_node() rclpy.shutdown() time.sleep(sleep_dur) - # Version 2: rclpy.spin in different thread - # loaded = False - # sleep_dur = 0. - # server_is_running = False - # while not loaded: - # try: - # rclpy.init(args=args) - # rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() - # event = threading.Event() - # spin_thread = threading.Thread(target=spin_function, daemon=True, args=(event, rosbridge_tcpsocket_node)) - # # executor = rclpy.executors.MultiThreadedExecutor() - # # executor.add_node(rosbridge_tcpsocket_node) - # # spin_thread = threading.Thread(target=executor.spin, daemon=True) - # spin_thread.start() - # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP node spinning in separate thread") - # # Server host is a tuple ('host', port) - # # empty string for host makes server listen on all available interfaces - # SocketServer.TCPServer.allow_reuse_address = True - # host = rosbridge_tcpsocket_node._host - # port = rosbridge_tcpsocket_node._port - # server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) - # loaded = True - - # sys.stdout.flush() - # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") - # server_is_running = True - # server.serve_forever() - - # except KeyboardInterrupt: - # rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") - # sleep_dur = 0. - - # except OSError as error: - # rosbridge_tcpsocket_node.get_logger().info(f"Caught OS error: {traceback.format_exc()}") - # rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") - # sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay - # if error.errno == errno.EPIPE or error.errno == errno.ECONNRESET: - # server_is_running = False - # loaded = False - - # except Exception: - # rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") - # rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") - # loaded = False - # sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay - - # finally: - # if server_is_running: - # server.shutdown() - # server_is_running = False - # server.server_close() # Must close socket to do proper cleanup - # event.set() - # # spin_thread.join() - # rosbridge_tcpsocket_node.destroy_node() - # rclpy.shutdown() - # time.sleep(sleep_dur) - if __name__ == '__main__': main() \ No newline at end of file diff --git a/rosbridge_server/src/rosbridge_server/tcp_handler.py b/rosbridge_server/src/rosbridge_server/tcp_handler.py index 5d7ee90c6..be7c0c4dc 100644 --- a/rosbridge_server/src/rosbridge_server/tcp_handler.py +++ b/rosbridge_server/src/rosbridge_server/tcp_handler.py @@ -1,7 +1,6 @@ from std_msgs.msg import Int32 import struct from rosbridge_library.rosbridge_protocol import RosbridgeProtocol -import traceback try: import SocketServer @@ -129,12 +128,10 @@ def send_message(self, message=None): """ Callback from rosbridge """ - try: - if self.bson_only_mode: - self.request.sendall(message) - elif message is not None: - self.request.sendall(message.encode()) - else: - self.protocol.log("error", "send_message called with no message or message is None, not sending") - except Exception: - self.protocol.log("error", f"Exception on send_message(): {traceback.format_exc()}") \ No newline at end of file + if self.bson_only_mode: + self.request.sendall(message) + elif message is not None: + self.request.sendall(message.encode()) + else: + self.protocol.log("error", "send_message called with no message or message is None, not sending") + \ No newline at end of file From 6a802307c1862b492441bf00f0f3871dc5faee31 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Tue, 13 Dec 2022 03:00:40 +0000 Subject: [PATCH 22/31] Added more debug stuff --- .../capabilities/advertise_service.py | 3 +- .../src/rosbridge_library/protocol.py | 5 ++- rosbridge_server/scripts/rosbridge_tcp | 39 ++++++++++--------- .../src/rosbridge_server/tcp_handler.py | 17 +++++--- 4 files changed, 35 insertions(+), 29 deletions(-) diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py index 7022def2c..dd1f2e891 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py @@ -5,7 +5,6 @@ from rosbridge_library.capability import Capability from rosbridge_library.internal import message_conversion from rosbridge_library.internal.ros_loader import get_service_class -import datetime class AdvertisedServiceHandler: @@ -30,7 +29,7 @@ def next_id(self): return id async def handle_request(self, req, res): - self.protocol.log("info", f"[{datetime.datetime.now()}] handle_request() for service {self.service_name}") # DEBUG + self.protocol.log("info", f"handle_request() for service {self.service_name}") # DEBUG # generate a unique ID request_id = "service_request:" + self.service_name + ":" + str(self.next_id()) diff --git a/rosbridge_library/src/rosbridge_library/protocol.py b/rosbridge_library/src/rosbridge_library/protocol.py index 9fa527a81..75c334eee 100644 --- a/rosbridge_library/src/rosbridge_library/protocol.py +++ b/rosbridge_library/src/rosbridge_library/protocol.py @@ -32,6 +32,7 @@ import time import traceback +import datetime from rosbridge_library.capabilities.fragmentation import Fragmentation from rosbridge_library.util import bson, json @@ -386,9 +387,9 @@ def log(self, level, message, lid=None): """ stdout_formatted_msg = None if lid is not None: - stdout_formatted_msg = f"[Client {self.client_id}] [id: {lid}] {message}" + stdout_formatted_msg = f"[{datetime.datetime.now()}] [Client {self.client_id}] [id: {lid}] {message}" else: - stdout_formatted_msg = f"[Client {self.client_id}] {message}" + stdout_formatted_msg = f"[{datetime.datetime.now()}] [Client {self.client_id}] {message}" if level == "error" or level == "err": self.node_handle.get_logger().error(stdout_formatted_msg) diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index 80b4ac73a..9a151912a 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -24,6 +24,7 @@ import time import traceback import threading import errno +import datetime #TODO: take care of socket timeouts and make sure to close sockets after killing program to release network ports @@ -271,33 +272,33 @@ def main(args=None): # server_thread = threading.Thread(target=server.serve_forever, daemon=True) # server_thread.start() # sys.stdout.flush() - # rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") + # rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Rosbridge TCP server started on {host}:{port}") # loaded = True # rclpy.spin(rosbridge_tcpsocket_node) # except KeyboardInterrupt: - # rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") + # rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Exiting due to SIGINT") # sleep_dur = 0. # except OSError as error: - # rosbridge_tcpsocket_node.get_logger().info(f"Caught OS error: {traceback.format_exc()}") - # rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") + # rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Caught OS error: {traceback.format_exc()}") + # rosbridge_tcpsocket_node.get_logger().warn(f"[{datetime.datetime.now()}] Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") # sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay # loaded = False # except Exception as e: - # rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") - # rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") + # rosbridge_tcpsocket_node.get_logger().warn(f"[{datetime.datetime.now()}] Encountered an exception: {traceback.format_exc()}") + # rosbridge_tcpsocket_node.get_logger().warn(f"[{datetime.datetime.now()}] Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") # loaded = False # sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay # finally: - # rosbridge_tcpsocket_node.get_logger().info(f"Shutting down server...") + # rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Shutting down server...") # server.shutdown() # server_thread.join() # server.server_close() # Must close socket to do proper cleanup - # rosbridge_tcpsocket_node.get_logger().info(f"Destroying node...") + # rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Destroying node...") # rosbridge_tcpsocket_node.destroy_node() # rclpy.shutdown() # time.sleep(sleep_dur) @@ -316,7 +317,7 @@ def main(args=None): # executor.add_node(rosbridge_tcpsocket_node) # spin_thread = threading.Thread(target=executor.spin, daemon=True) spin_thread.start() - rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP node spinning in separate thread") + rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Rosbridge TCP node spinning in separate thread") # Server host is a tuple ('host', port) # empty string for host makes server listen on all available interfaces SocketServer.TCPServer.allow_reuse_address = True @@ -326,37 +327,37 @@ def main(args=None): loaded = True sys.stdout.flush() - rosbridge_tcpsocket_node.get_logger().info(f"Rosbridge TCP server started on {host}:{port}") + rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Rosbridge TCP server started on {host}:{port}") server_is_running = True server.serve_forever() except KeyboardInterrupt: - rosbridge_tcpsocket_node.get_logger().info(f"Exiting due to SIGINT") + rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Exiting due to SIGINT") sleep_dur = 0. except OSError as error: - rosbridge_tcpsocket_node.get_logger().info(f"Caught OS error: {traceback.format_exc()}") - rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") + rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Caught OS error: {traceback.format_exc()}") + rosbridge_tcpsocket_node.get_logger().warn(f"[{datetime.datetime.now()}] Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay - if error.errno == errno.EPIPE or error.errno == errno.ECONNRESET: - server_is_running = False + # if error.errno == errno.EPIPE or error.errno == errno.ECONNRESET: + # server_is_running = False loaded = False except Exception: - rosbridge_tcpsocket_node.get_logger().warn(f"Encountered an exception: {traceback.format_exc()}") - rosbridge_tcpsocket_node.get_logger().warn(f"Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") + rosbridge_tcpsocket_node.get_logger().warn(f"[{datetime.datetime.now()}] Encountered an exception: {traceback.format_exc()}") + rosbridge_tcpsocket_node.get_logger().warn(f"[{datetime.datetime.now()}] Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") loaded = False sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay finally: if server_is_running: - rosbridge_tcpsocket_node.get_logger().info(f"Shutting down server...") + rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Shutting down server...") server.shutdown() server_is_running = False server.server_close() # Must close socket to do proper cleanup event.set() # spin_thread.join() - rosbridge_tcpsocket_node.get_logger().info(f"Destroying node...") + rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Destroying node...") rosbridge_tcpsocket_node.destroy_node() rclpy.shutdown() time.sleep(sleep_dur) diff --git a/rosbridge_server/src/rosbridge_server/tcp_handler.py b/rosbridge_server/src/rosbridge_server/tcp_handler.py index be7c0c4dc..ea5d61eb6 100644 --- a/rosbridge_server/src/rosbridge_server/tcp_handler.py +++ b/rosbridge_server/src/rosbridge_server/tcp_handler.py @@ -1,6 +1,7 @@ from std_msgs.msg import Int32 import struct from rosbridge_library.rosbridge_protocol import RosbridgeProtocol +import traceback try: import SocketServer @@ -128,10 +129,14 @@ def send_message(self, message=None): """ Callback from rosbridge """ - if self.bson_only_mode: - self.request.sendall(message) - elif message is not None: - self.request.sendall(message.encode()) - else: - self.protocol.log("error", "send_message called with no message or message is None, not sending") + try: + if self.bson_only_mode: + self.request.sendall(message) + elif message is not None: + self.request.sendall(message.encode()) + else: + self.protocol.log("error", "send_message called with no message or message is None, not sending") + except Exception: + self.protocol.log("error", f"Error in send_message(): {traceback.format_exc()}") + raise Exception(f"Error on TCP request.sendall()") \ No newline at end of file From e7e708c02c3fe01d3b1322d944c25071abd14eb2 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Fri, 16 Dec 2022 21:53:38 -0500 Subject: [PATCH 23/31] Minor changes --- .../src/rosbridge_server/tcp_handler.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/rosbridge_server/src/rosbridge_server/tcp_handler.py b/rosbridge_server/src/rosbridge_server/tcp_handler.py index ea5d61eb6..ca1ee42aa 100644 --- a/rosbridge_server/src/rosbridge_server/tcp_handler.py +++ b/rosbridge_server/src/rosbridge_server/tcp_handler.py @@ -1,7 +1,6 @@ from std_msgs.msg import Int32 import struct from rosbridge_library.rosbridge_protocol import RosbridgeProtocol -import traceback try: import SocketServer @@ -129,14 +128,11 @@ def send_message(self, message=None): """ Callback from rosbridge """ - try: - if self.bson_only_mode: - self.request.sendall(message) - elif message is not None: - self.request.sendall(message.encode()) - else: - self.protocol.log("error", "send_message called with no message or message is None, not sending") - except Exception: - self.protocol.log("error", f"Error in send_message(): {traceback.format_exc()}") - raise Exception(f"Error on TCP request.sendall()") + if self.bson_only_mode: + self.request.sendall(message) + elif message is not None: + self.request.sendall(message.encode()) + else: + self.protocol.log("error", "send_message called with no message or message is None, not sending") + \ No newline at end of file From 6ad6893270eddb0bc0a75b5a6aa942adeaca7a01 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Sun, 1 Jan 2023 20:18:11 +0000 Subject: [PATCH 24/31] Removed temporary advertise service debug log --- .../src/rosbridge_library/capabilities/advertise_service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py index dd1f2e891..6d97b77b3 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py @@ -29,7 +29,6 @@ def next_id(self): return id async def handle_request(self, req, res): - self.protocol.log("info", f"handle_request() for service {self.service_name}") # DEBUG # generate a unique ID request_id = "service_request:" + self.service_name + ":" + str(self.next_id()) From 21f47bfa63c2aec3bdca8d9b240bfb9a061df584 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Sun, 1 Jan 2023 20:21:21 +0000 Subject: [PATCH 25/31] Adding back internal.subscribers qos code for foxy --- .../src/rosbridge_library/internal/subscribers.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/rosbridge_library/src/rosbridge_library/internal/subscribers.py b/rosbridge_library/src/rosbridge_library/internal/subscribers.py index 02f5f1f83..f2187bada 100644 --- a/rosbridge_library/src/rosbridge_library/internal/subscribers.py +++ b/rosbridge_library/src/rosbridge_library/internal/subscribers.py @@ -114,12 +114,11 @@ def __init__(self, topic, client_id, callback, node_handle, msg_type=None, raw=F reliability=ReliabilityPolicy.RELIABLE, ) - # TODO: How to fix this for eloquent? - # infos = node_handle.get_publishers_info_by_topic(topic) - # if any(pub.qos_profile.durability == DurabilityPolicy.TRANSIENT_LOCAL for pub in infos): - # qos.durability = DurabilityPolicy.TRANSIENT_LOCAL - # if any(pub.qos_profile.reliability == ReliabilityPolicy.BEST_EFFORT for pub in infos): - # qos.reliability = ReliabilityPolicy.BEST_EFFORT + infos = node_handle.get_publishers_info_by_topic(topic) + if any(pub.qos_profile.durability == DurabilityPolicy.TRANSIENT_LOCAL for pub in infos): + qos.durability = DurabilityPolicy.TRANSIENT_LOCAL + if any(pub.qos_profile.reliability == ReliabilityPolicy.BEST_EFFORT for pub in infos): + qos.reliability = ReliabilityPolicy.BEST_EFFORT # Create the subscriber and associated member variables # Subscriptions is initialized with the current client to start with. From fa654ed98e08a73a2f6792d5e1d4532715a93ef5 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Fri, 3 Feb 2023 10:44:13 -0500 Subject: [PATCH 26/31] Trying to auto terminate client request after broken pipe --- .../src/rosbridge_server/tcp_handler.py | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/rosbridge_server/src/rosbridge_server/tcp_handler.py b/rosbridge_server/src/rosbridge_server/tcp_handler.py index ca1ee42aa..065f688a5 100644 --- a/rosbridge_server/src/rosbridge_server/tcp_handler.py +++ b/rosbridge_server/src/rosbridge_server/tcp_handler.py @@ -48,6 +48,7 @@ def setup(self): if cls.client_count_pub: cls.client_count_pub.publish(Int32(data=cls.clients_connected)) self.protocol.log("info", "connected. " + str(cls.clients_connected) + " client total.") + self.force_close_request = False except Exception as exc: cls.ros_node.get_logger().info("Unable to accept incoming connection. Reason: " + str(exc)) @@ -93,6 +94,10 @@ def handle(self): self.request.settimeout(cls.socket_timeout) while True: try: + if self.force_close_request: + self.protocol.log("error", "send_message failed, likely by ConnectionResetError or BrokenPipe. Terminating client connection.") + break + if self.bson_only_mode: if self.recv_bson() is None: break @@ -128,11 +133,14 @@ def send_message(self, message=None): """ Callback from rosbridge """ - if self.bson_only_mode: - self.request.sendall(message) - elif message is not None: - self.request.sendall(message.encode()) - else: - self.protocol.log("error", "send_message called with no message or message is None, not sending") + try: + if self.bson_only_mode: + self.request.sendall(message) + elif message is not None: + self.request.sendall(message.encode()) + else: + self.protocol.log("error", "send_message called with no message or message is None, not sending") + except Exception: + self.force_close_request = True \ No newline at end of file From 94fd8fc3a83e6591e1c32359c6227b69bc7db699 Mon Sep 17 00:00:00 2001 From: John Kaniarz Date: Tue, 18 Apr 2023 11:45:46 -0700 Subject: [PATCH 27/31] Updated interface to match latest (but didn't implement anything) --- rosbridge_server/launch/rosbridge_tcp_launch.xml | 3 +++ rosbridge_server/scripts/rosbridge_tcp | 1 + .../src/rosbridge_server/__init__.py | 1 + .../src/rosbridge_server/tcp_handler.py | 16 +++++++++++----- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/rosbridge_server/launch/rosbridge_tcp_launch.xml b/rosbridge_server/launch/rosbridge_tcp_launch.xml index 496415f4f..12417233b 100644 --- a/rosbridge_server/launch/rosbridge_tcp_launch.xml +++ b/rosbridge_server/launch/rosbridge_tcp_launch.xml @@ -11,6 +11,8 @@ + + @@ -26,6 +28,7 @@ + diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index f37a4ccf5..5c0198a13 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -68,6 +68,7 @@ class RosbridgeTcpsocketNode(Node): max_message_size = self.declare_parameter('max_message_size', RosbridgeTcpSocket.max_message_size).value # unregister_timeout = get_param('~unregister_timeout', RosbridgeTcpSocket.unregister_timeout) unregister_timeout = self.declare_parameter('unregister_timeout', RosbridgeTcpSocket.unregister_timeout).value + call_services_in_new_thread = self.declare_parameter("call_services_in_new_thread", False).value # bson_only_mode = get_param('~bson_only_mode', False) bson_only_mode = self.declare_parameter('bson_only_mode', RosbridgeTcpSocket.bson_only_mode).value diff --git a/rosbridge_server/src/rosbridge_server/__init__.py b/rosbridge_server/src/rosbridge_server/__init__.py index fe3ed3131..061210bab 100644 --- a/rosbridge_server/src/rosbridge_server/__init__.py +++ b/rosbridge_server/src/rosbridge_server/__init__.py @@ -1,2 +1,3 @@ from .client_mananger import ClientManager # noqa: F401 from .websocket_handler import RosbridgeWebSocket # noqa: F401 +from .tcp_handler import RosbridgeTcpSocket # noqa: F401 diff --git a/rosbridge_server/src/rosbridge_server/tcp_handler.py b/rosbridge_server/src/rosbridge_server/tcp_handler.py index 0ee474d63..c846fd5fc 100644 --- a/rosbridge_server/src/rosbridge_server/tcp_handler.py +++ b/rosbridge_server/src/rosbridge_server/tcp_handler.py @@ -1,4 +1,5 @@ from std_msgs.msg import Int32 +from rosbridge_library.util import bson import struct from rosbridge_library.rosbridge_protocol import RosbridgeProtocol @@ -124,13 +125,18 @@ def finish(self): cls.client_count_pub.publish(Int32(data=cls.clients_connected)) self.protocol.log("info", "disconnected. " + str(cls.clients_connected) + " client total." ) - def send_message(self, message=None): + def send_message(self, message=None, compression="none"): """ Callback from rosbridge """ - if self.bson_only_mode: + if isinstance(message, bson.BSON) or bson_only_mode: + binary = True + elif compression in ["cbor", "cbor-raw"]: + binary = True + else: + binary = False + + if binary: self.request.sendall(message) - elif message is not None: - self.request.sendall(message.encode()) else: - self.protocol.log("error", "send_message called with no message or message is None, not sending") + self.request.sendall(message.encode()) From 4a6b60e5722c2b795b46a8f24a51d05d8b7ee69d Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Fri, 21 Apr 2023 18:44:33 +0000 Subject: [PATCH 28/31] Requiring bson_only_mode to be true for TCp handler to operate, changed TCPServer to ThreadedTCPServer --- rosbridge_server/scripts/rosbridge_tcp | 71 +++---------------- .../src/rosbridge_server/tcp_handler.py | 6 +- 2 files changed, 16 insertions(+), 61 deletions(-) diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index 9a151912a..42a6d77c9 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -72,6 +72,7 @@ class RosbridgeTcpsocketNode(Node): unregister_timeout = self.declare_parameter('unregister_timeout', RosbridgeTcpSocket.unregister_timeout).value # bson_only_mode = get_param('~bson_only_mode', False) bson_only_mode = self.declare_parameter('bson_only_mode', RosbridgeTcpSocket.bson_only_mode).value + self._bson_only_mode = bson_only_mode if max_message_size == "None": max_message_size = None @@ -254,56 +255,7 @@ def main(args=None): if args is None: args = sys.argv - # rclpy.init(args=args) - - # Version 1: TCP server in separate thread - # loaded = False - # sleep_dur = 0. - # while not loaded: - # try: - # rclpy.init(args=args) - # rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() - # # Server host is a tuple ('host', port) - # # empty string for host makes server listen on all available interfaces - # SocketServer.TCPServer.allow_reuse_address = True - # host = rosbridge_tcpsocket_node._host - # port = rosbridge_tcpsocket_node._port - # server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) - # server_thread = threading.Thread(target=server.serve_forever, daemon=True) - # server_thread.start() - # sys.stdout.flush() - # rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Rosbridge TCP server started on {host}:{port}") - - # loaded = True - # rclpy.spin(rosbridge_tcpsocket_node) - - # except KeyboardInterrupt: - # rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Exiting due to SIGINT") - # sleep_dur = 0. - - # except OSError as error: - # rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Caught OS error: {traceback.format_exc()}") - # rosbridge_tcpsocket_node.get_logger().warn(f"[{datetime.datetime.now()}] Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") - # sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay - # loaded = False - - # except Exception as e: - # rosbridge_tcpsocket_node.get_logger().warn(f"[{datetime.datetime.now()}] Encountered an exception: {traceback.format_exc()}") - # rosbridge_tcpsocket_node.get_logger().warn(f"[{datetime.datetime.now()}] Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") - # loaded = False - # sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay - - # finally: - # rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Shutting down server...") - # server.shutdown() - # server_thread.join() - # server.server_close() # Must close socket to do proper cleanup - # rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Destroying node...") - # rosbridge_tcpsocket_node.destroy_node() - # rclpy.shutdown() - # time.sleep(sleep_dur) - - # Version 2: rclpy.spin in different thread + # Place rclpy.spin in different thread loaded = False sleep_dur = 0. server_is_running = False @@ -311,6 +263,13 @@ def main(args=None): try: rclpy.init(args=args) rosbridge_tcpsocket_node = RosbridgeTcpsocketNode() + + if not rosbridge_tcpsocket_node._bson_only_mode: + rosbridge_tcpsocket_node.get_logger().warn(f"[{datetime.datetime.now()}] Rosbridge TCP node can only operate properly if arg 'bson_only_mode' is True. Shutting down...") + rosbridge_tcpsocket_node.destroy_node() + rclpy.shutdown() + return + event = threading.Event() spin_thread = threading.Thread(target=spin_function, daemon=True, args=(event, rosbridge_tcpsocket_node)) # executor = rclpy.executors.MultiThreadedExecutor() @@ -320,10 +279,10 @@ def main(args=None): rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Rosbridge TCP node spinning in separate thread") # Server host is a tuple ('host', port) # empty string for host makes server listen on all available interfaces - SocketServer.TCPServer.allow_reuse_address = True + SocketServer.ThreadingTCPServer.allow_reuse_address = True host = rosbridge_tcpsocket_node._host port = rosbridge_tcpsocket_node._port - server = SocketServer.TCPServer((host, port), RosbridgeTcpSocket) + server = SocketServer.ThreadingTCPServer((host, port), RosbridgeTcpSocket) loaded = True sys.stdout.flush() @@ -334,14 +293,6 @@ def main(args=None): except KeyboardInterrupt: rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Exiting due to SIGINT") sleep_dur = 0. - - except OSError as error: - rosbridge_tcpsocket_node.get_logger().info(f"[{datetime.datetime.now()}] Caught OS error: {traceback.format_exc()}") - rosbridge_tcpsocket_node.get_logger().warn(f"[{datetime.datetime.now()}] Will try reloading in {rosbridge_tcpsocket_node.retry_startup_delay} seconds") - sleep_dur = rosbridge_tcpsocket_node.retry_startup_delay - # if error.errno == errno.EPIPE or error.errno == errno.ECONNRESET: - # server_is_running = False - loaded = False except Exception: rosbridge_tcpsocket_node.get_logger().warn(f"[{datetime.datetime.now()}] Encountered an exception: {traceback.format_exc()}") diff --git a/rosbridge_server/src/rosbridge_server/tcp_handler.py b/rosbridge_server/src/rosbridge_server/tcp_handler.py index 065f688a5..78d44eb22 100644 --- a/rosbridge_server/src/rosbridge_server/tcp_handler.py +++ b/rosbridge_server/src/rosbridge_server/tcp_handler.py @@ -1,12 +1,14 @@ from std_msgs.msg import Int32 import struct from rosbridge_library.rosbridge_protocol import RosbridgeProtocol +import traceback try: import SocketServer except ImportError: import socketserver as SocketServer + class RosbridgeTcpSocket(SocketServer.BaseRequestHandler): """ TCP Socket server for rosbridge @@ -49,6 +51,7 @@ def setup(self): cls.client_count_pub.publish(Int32(data=cls.clients_connected)) self.protocol.log("info", "connected. " + str(cls.clients_connected) + " client total.") self.force_close_request = False + self.send_message_exception = None except Exception as exc: cls.ros_node.get_logger().info("Unable to accept incoming connection. Reason: " + str(exc)) @@ -95,7 +98,7 @@ def handle(self): while True: try: if self.force_close_request: - self.protocol.log("error", "send_message failed, likely by ConnectionResetError or BrokenPipe. Terminating client connection.") + self.protocol.log("error", "Force close request was triggered due to exception on send_message(). Likely by ConnectionResetError or BrokenPipe. Terminating client connection.") break if self.bson_only_mode: @@ -142,5 +145,6 @@ def send_message(self, message=None): self.protocol.log("error", "send_message called with no message or message is None, not sending") except Exception: self.force_close_request = True + self.protocol.log("error", f"Exception during send_message(): {traceback.format_exc()}") \ No newline at end of file From ab047a3ef59c97ddfadb3929247ec15e82f64dc1 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Fri, 21 Apr 2023 19:01:52 +0000 Subject: [PATCH 29/31] Cleanup --- rosbridge_server/scripts/rosbridge_tcp | 1 - rosbridge_server/src/rosbridge_server/tcp_handler.py | 7 +------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/rosbridge_server/scripts/rosbridge_tcp b/rosbridge_server/scripts/rosbridge_tcp index 42a6d77c9..51ed83119 100755 --- a/rosbridge_server/scripts/rosbridge_tcp +++ b/rosbridge_server/scripts/rosbridge_tcp @@ -23,7 +23,6 @@ import sys import time import traceback import threading -import errno import datetime #TODO: take care of socket timeouts and make sure to close sockets after killing program to release network ports diff --git a/rosbridge_server/src/rosbridge_server/tcp_handler.py b/rosbridge_server/src/rosbridge_server/tcp_handler.py index 78d44eb22..8d25096ce 100644 --- a/rosbridge_server/src/rosbridge_server/tcp_handler.py +++ b/rosbridge_server/src/rosbridge_server/tcp_handler.py @@ -8,7 +8,6 @@ except ImportError: import socketserver as SocketServer - class RosbridgeTcpSocket(SocketServer.BaseRequestHandler): """ TCP Socket server for rosbridge @@ -51,11 +50,9 @@ def setup(self): cls.client_count_pub.publish(Int32(data=cls.clients_connected)) self.protocol.log("info", "connected. " + str(cls.clients_connected) + " client total.") self.force_close_request = False - self.send_message_exception = None except Exception as exc: cls.ros_node.get_logger().info("Unable to accept incoming connection. Reason: " + str(exc)) - def recvall(self, n): # http://stackoverflow.com/questions/17667903/python-socket-receive-large-amount-of-data # Helper function to recv n bytes or return None if EOF is hit @@ -145,6 +142,4 @@ def send_message(self, message=None): self.protocol.log("error", "send_message called with no message or message is None, not sending") except Exception: self.force_close_request = True - self.protocol.log("error", f"Exception during send_message(): {traceback.format_exc()}") - - \ No newline at end of file + self.protocol.log("error", f"Exception during send_message(): {traceback.format_exc()}") \ No newline at end of file From ddd5fdf37c3c2f2e46cf58439394f643e1699537 Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Fri, 21 Apr 2023 19:13:23 +0000 Subject: [PATCH 30/31] Updated repo, minor changes --- rosbridge_server/src/rosbridge_server/tcp_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbridge_server/src/rosbridge_server/tcp_handler.py b/rosbridge_server/src/rosbridge_server/tcp_handler.py index c846fd5fc..be9d68aec 100644 --- a/rosbridge_server/src/rosbridge_server/tcp_handler.py +++ b/rosbridge_server/src/rosbridge_server/tcp_handler.py @@ -129,7 +129,7 @@ def send_message(self, message=None, compression="none"): """ Callback from rosbridge """ - if isinstance(message, bson.BSON) or bson_only_mode: + if isinstance(message, bson.BSON) or self.bson_only_mode: binary = True elif compression in ["cbor", "cbor-raw"]: binary = True From 0886f32ac34fa9c40a62831ccd5748da4d9dc52e Mon Sep 17 00:00:00 2001 From: Ted Sender Date: Wed, 4 Oct 2023 01:01:46 -0400 Subject: [PATCH 31/31] Fixing default arg values in TCP launch file --- rosbridge_server/launch/rosbridge_tcp_launch.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rosbridge_server/launch/rosbridge_tcp_launch.xml b/rosbridge_server/launch/rosbridge_tcp_launch.xml index 49b8d1596..9ba1d4846 100644 --- a/rosbridge_server/launch/rosbridge_tcp_launch.xml +++ b/rosbridge_server/launch/rosbridge_tcp_launch.xml @@ -4,12 +4,12 @@ - + - +