Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle msg not found #2384

Open
wants to merge 22 commits into
base: noetic-devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
89ec986
roslaunch: Add ability to read from param server
Oct 4, 2019
9ace68e
rospy: default subscriber nodelay hint to True
c-andy-martin Apr 23, 2019
e1ddb16
Add re-subscribe feature to rosbag
Jul 20, 2021
8e6202b
Remove dual-tracking of the wall-clock horizon
bobhenz-jabil Jun 4, 2019
8e7a2fa
Simplify rosbag time management by using ROS time for horizons
bobhenz-jabil Jun 5, 2019
f4b306c
Delete the no-longer-needed TimeTranslator
bobhenz-jabil Jun 5, 2019
1c8ae80
Move tracking of "paused" into TimePublisher, correct publish rate
bobhenz-jabil Jun 5, 2019
c9ea68d
Add feature to increase and decrease the playback rate on-the-fly
bobhenz-jabil Jul 9, 2019
1ea5725
rospy: add recv timeout on header
dandedrick Mar 19, 2020
6036364
rospy: fix tcp reconnect case
dandedrick Mar 19, 2020
0ab7820
rospy: attempt reconnect on ECONNRESET
dandedrick Mar 19, 2020
e1616c7
fix delays from rospy long-running callbacks
Jul 9, 2021
84a3ce8
rospy: Fix severe bug in get_param_cached()
Nov 22, 2021
c622e6d
rospy: Fix performance get_param_cached('non_exist')
Nov 23, 2021
51f319b
Rosbag Recorder: Fix latched topic bug
Mar 18, 2022
3c8e7d3
Change the default tcp_nodelay value from "false" to "true"
bobhenz-jabil May 31, 2022
d07cc62
Set queue size for rospy subscriber to 100 when None
Ibrahim-Apata Sep 2, 2022
46f9ef6
Fix python3.10 collections
duaneenorris Sep 27, 2022
328e1de
pmon: change sleep to 1.0 seconds from 0.1
c-andy-martin Nov 9, 2022
46529e2
roslaunch: Return non-zero exit code for required nodes
travisariggs Jul 13, 2023
9fed7d6
rospy: unsub params on shutdown
dandedrick Sep 13, 2024
8e9616c
Handle genmsg.msg_loader MsgNotFound exception
12tbuchman Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/roscpp/include/ros/transport_hints.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class ROSCPP_DECL TransportHints
M_string::iterator it = options_.find("tcp_nodelay");
if (it == options_.end())
{
return false;
return true;
}

const std::string& val = it->second;
Expand Down
44 changes: 28 additions & 16 deletions clients/rospy/src/rospy/impl/paramserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def __init__(self):
self.lock = threading.Lock()
self.d = None
self.notifier = None

self.subscribed_params = set()

## Delete parameter from cache
def delete(self, key):
with self.lock:
Expand Down Expand Up @@ -94,13 +95,13 @@ def update(self, key, value):
"""
with self.lock:
# partially borrowed from rosmaster/paramserver.py
namespaces = [x for x in key.split(SEP) if x]
path_elems = tuple(x for x in key.split(SEP) if x)
# - last namespace is the actual key we're storing in
d = self.d
if d is None:
raise KeyError(key)
value_key = namespaces[-1]
namespaces = namespaces[:-1]
value_key = path_elems[-1]
namespaces = path_elems[:-1]
# - descend tree to the node we're setting
for ns in namespaces:
if ns not in d:
Expand Down Expand Up @@ -131,11 +132,12 @@ def set(self, key, value):
raise TypeError("cannot set root of parameter tree to "
"non-dictionary")
self.d = value
self.subscribed_params.add(GLOBALNS)
else:
namespaces = [x for x in key.split(SEP) if x]
path_elems = tuple(x for x in key.split(SEP) if x)
# - last namespace is the actual key we're storing in
value_key = namespaces[-1]
namespaces = namespaces[:-1]
value_key = path_elems[-1]
namespaces = path_elems[:-1]
if self.d is None:
self.d = {}
d = self.d
Expand All @@ -153,6 +155,7 @@ def set(self, key, value):
d = val

d[value_key] = value
self.subscribed_params.add(path_elems)

def get(self, key):
"""
Expand All @@ -166,15 +169,24 @@ def get(self, key):
if self.d is None:
raise KeyError(key)
val = self.d
if key != GLOBALNS:
# split by the namespace separator, ignoring empty splits
namespaces = [x for x in key.split(SEP) if x]
for ns in namespaces:
if not type(val) == dict:
raise KeyError(val)
val = val[ns]
if isinstance(val, dict) and not val:
raise KeyError(key)
# split by the namespace separator, ignoring empty splits
namespaces = tuple(x for x in key.split(SEP) if x)

if GLOBALNS not in self.subscribed_params:
# Make sure this key or some parent of it was actually set
# so that updates are subscribed.
is_subscribed = False
for i in range(len(namespaces), 0, -1):
if namespaces[:i] in self.subscribed_params:
is_subscribed = True
break
if not is_subscribed:
raise KeyError(key)

for ns in namespaces:
if not type(val) == dict:
raise KeyError(val)
val = val[ns]
return val

_param_server_cache = None
Expand Down
63 changes: 59 additions & 4 deletions clients/rospy/src/rospy/impl/tcpros_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,14 @@ def _tcp_server_callback(self, sock, client_addr):
#and then use that to do the writing
try:
buff_size = 4096 # size of read buffer
timeout = sock.gettimeout()
sock.settimeout(30)
if python3 == 0:
#initialize read_ros_handshake_header with BytesIO for Python 3 (instead of bytesarray())
header = read_ros_handshake_header(sock, StringIO(), buff_size)
else:
header = read_ros_handshake_header(sock, BytesIO(), buff_size)
sock.settimeout(timeout)

if 'topic' in header:
err_msg = self.topic_connection_handler(sock, client_addr, header)
Expand Down Expand Up @@ -575,17 +578,28 @@ def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
self.close()
elif not isinstance(e, socket.timeout) and e.errno not in [
errno.ENETDOWN, errno.ENETUNREACH, errno.ENETRESET,
errno.ECONNABORTED, errno.ETIMEDOUT, errno.EHOSTDOWN, errno.EHOSTUNREACH]:
errno.ECONNABORTED, errno.ECONNRESET, errno.ETIMEDOUT,
errno.EHOSTDOWN, errno.EHOSTUNREACH]:
# reconnect in follow cases, otherwise close the socket:
# 1. socket.timeout: on timeouts caused by delays on wireless links
# 2. ENETDOWN (100), ENETUNREACH (101), ENETRESET (102), ECONNABORTED (103):
# while using ROS_HOSTNAME ros binds to a specific interface. Theses errors
# are thrown on interface shutdown e.g. on reconnection in LTE networks
# 3. ETIMEDOUT (110): same like 1. (for completeness)
# 4. EHOSTDOWN (112), EHOSTUNREACH (113): while network and/or DNS-server is not reachable
# 5. ECONNRESET (104): The connection was reset by the peer. Connections may be backed up we
# just need to try again.
#
# no reconnection as error is not 1.-4.
# no reconnection as error is not 1.-5.
self.close()
else:
try:
self.socket.shutdown(socket.SHUT_RDWR)
except:
pass
finally:
self.socket.close()
self.socket = None
raise TransportInitError(str(e)) #re-raise i/o error

def _validate_header(self, header):
Expand Down Expand Up @@ -782,6 +796,31 @@ def _reconnect(self):

time.sleep(interval)

def callback_loop(self, msgs_callback):
while not self.done and not is_shutdown():
try:
with self.msg_queue_lock:
# Data that was leftover from reading header may have made
# messages immediately available (such as from a latched
# topic). Go ahead and process anything we already have before
# waiting.
while self.msg_queue:
msg = self.msg_queue.pop(0)
# Be sure to not hold the message queue lock while calling
# the callback, it may take a while.
self.msg_queue_lock.release()
msgs_callback([msg], self)
self.msg_queue_lock.acquire()
self.msg_queue_condition.wait()
except:
# in many cases this will be a normal hangup, but log internally
try:
#1467 sometimes we get exceptions due to
#interpreter shutdown, so blanket ignore those if
#the reporting fails
rospydebug("exception in callback loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
except: pass

def receive_loop(self, msgs_callback):
"""
Receive messages until shutdown
Expand All @@ -790,13 +829,27 @@ def receive_loop(self, msgs_callback):
"""
# - use assert here as this would be an internal error, aka bug
logger.debug("receive_loop for [%s]", self.name)
# Start a callback thread to process the callbacks. This way the
# receive loop can continue calling recv() while a long-running
# callback is running.
try:
self.msg_queue = []
self.msg_queue_lock = threading.Lock()
self.msg_queue_condition = threading.Condition(self.msg_queue_lock)
callback_thread = threading.Thread(
target=self.callback_loop,
args=(msgs_callback,))
callback_thread.start()
while not self.done and not is_shutdown():
try:
if self.socket is not None:
msgs = self.receive_once()
if not self.done and not is_shutdown():
msgs_callback(msgs, self)
with self.msg_queue_lock:
self.msg_queue += msgs
if self.protocol.queue_size is not None:
self.msg_queue = self.msg_queue[-self.protocol.queue_size:]
self.msg_queue_condition.notify()
else:
self._reconnect()

Expand Down Expand Up @@ -827,7 +880,9 @@ def receive_loop(self, msgs_callback):
#the reporting fails
rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
except: pass

with self.msg_queue_lock:
self.msg_queue_condition.notify()
callback_thread.join()
rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)
finally:
if not self.done:
Expand Down
23 changes: 18 additions & 5 deletions clients/rospy/src/rospy/msproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ def __getitem__(self, key):
#NOTE: remapping occurs here!
resolved_key = rospy.names.resolve_name(key)
try:
return rospy.impl.paramserver.get_param_server_cache().get(resolved_key)
value = rospy.impl.paramserver.get_param_server_cache().get(resolved_key)
if isinstance(value, dict) and not value:
raise KeyError(key)
return value
except KeyError:
pass
code, msg, value = self.target.getParam(rospy.names.get_caller_id(), resolved_key)
Expand Down Expand Up @@ -162,17 +165,18 @@ def get_param_cached(self, key):
resolved_key = rospy.names.resolve_name(key)
try:
# check for value in the parameter server cache
return rospy.impl.paramserver.get_param_server_cache().get(resolved_key)
value = rospy.impl.paramserver.get_param_server_cache().get(resolved_key)
except KeyError:
# first access, make call to parameter server
rospy.core.add_shutdown_hook(self.preshutdown_callback)
code, msg, value = self.target.subscribeParam(rospy.names.get_caller_id(), rospy.core.get_node_uri(), resolved_key)
if code != 1: #unwrap value with Python semantics
raise KeyError(key)
# set the value in the cache so that it's marked as subscribed
rospy.impl.paramserver.get_param_server_cache().set(resolved_key, value)
if isinstance(value, dict) and not value:
raise KeyError(key)
return value
if isinstance(value, dict) and not value:
raise KeyError(key)
return value

def __delitem__(self, key):
"""
Expand Down Expand Up @@ -208,3 +212,12 @@ def __iter__(self):
return value.__iter__()
else:
raise rospy.exceptions.ROSException("cannot retrieve parameter names: %s"%msg)

def unsubscribe_params(self):
for key in rospy.impl.paramserver.get_param_server_cache().subscribed_params:
self.target.unsubscribeParam(rospy.names.get_caller_id(), rospy.core.get_node_uri(), key[0])

def preshutdown_callback(self, reason):
# If we don't do this roscore will keep this node as active since it
# still has an active param subscription and won't free the node.
self.unsubscribe_params()
17 changes: 10 additions & 7 deletions clients/rospy/src/rospy/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ class Subscriber(Topic):
the messages are of a given type.
"""
def __init__(self, name, data_class, callback=None, callback_args=None,
queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=True):
"""
Constructor.

Expand Down Expand Up @@ -552,11 +552,8 @@ def __init__(self, name, data_class, callback=None, callback_args=None,
buff_size to a non-default value affects all subscribers to
this topic in this process.
@type buff_size: int
@param tcp_nodelay: if True, request TCP_NODELAY from
publisher. Use of this option is not generally recommended
in most cases as it is better to rely on timestamps in
message data. Setting tcp_nodelay to True enables TCP_NODELAY
for all subscribers in the same python process.
@param tcp_nodelay: set the nodelay transport hint.
The default is False (same as roscpp).
@type tcp_nodelay: bool
@raise ROSException: if parameters are invalid
"""
Expand All @@ -565,6 +562,8 @@ def __init__(self, name, data_class, callback=None, callback_args=None,

# last person to set these to non-defaults wins, not much way
# around this
if queue_size is None:
queue_size = 100
if queue_size is not None:
self.impl.set_queue_size(queue_size)
if buff_size != DEFAULT_BUFF_SIZE:
Expand Down Expand Up @@ -648,7 +647,11 @@ def set_queue_size(self, queue_size):
@type queue_size: int
"""
if queue_size == -1:
self.queue_size = None
queue_size = 100
self.queue_size = queue_size
elif queue_size is None:
queue_size = 100
self.queue_size = queue_size
elif queue_size == 0:
raise ROSException("queue size may not be set to zero")
elif queue_size is not None and type(queue_size) != int:
Expand Down
3 changes: 1 addition & 2 deletions tools/rosbag/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ catkin_package(

add_library(rosbag
src/player.cpp
src/recorder.cpp
src/time_translator.cpp)
src/recorder.cpp)

target_link_libraries(rosbag ${catkin_LIBRARIES} ${Boost_LIBRARIES}
${BZIP2_LIBRARIES}
Expand Down
Loading