Python ROS2 writer slower than aniticpated #1083
-
When running the code (attached in the zip below) on my machine, it takes about 1 second to write each image (each image is ~3 megabytes) as a ROS2 message by calling
Is this expected behavior? Thank you for your help! The code reads a flatbuffer message and converts it to a ROS2 encoded message, then writes it to disk. Full code: # import flatbuffers
import foxglove_schemas_flatbuffer.RawImage as RawImage
import foxglove_schemas_flatbuffer.PointCloud as PointCloud
from mcap_ros2.writer import Writer as McapWriter
import os
import time
from enum import Enum
from collections import namedtuple
import multiprocessing
# import cProfile
Ros2SensorMsg = namedtuple('Ros2SensorMsg', ['type', 'stream_name', 'message'])
# Converts foxglove to ROS2 numeric type (e.g. uint8) for point cloud msgs. Usage:
#
# ROS2_numeric_type = cloud_numeric_type_foxglove_to_ros2[foxglove_numeric_type]
#
# https://docs.foxglove.dev/docs/visualization/message-schemas/numeric-type
# Compared to ROS2 Pointcloud2 message schema, Foxglove swaps the order of unsigned
# and signed integers, but does not swap the order of FLOAT32 and FLOAT64:
#
# Numeric type | Foxglove | ROS2
# ---------------------------------
# UINT8 | 1 | 2
# INT8 | 2 | 1
# UINT16 | 3 | 4
# INT16 | 4 | 3
# UINT32 | 5 | 6
# INT32 | 6 | 5
# FLOAT32 | 7 | 7
# FLOAT64 | 8 | 8
cloud_numeric_type_foxglove_to_ros2 = [0, 2, 1, 4, 3, 6, 5, 7, 8]
class StreamType(Enum):
UNKNOWN = 0
IMAGE = 1
POINTCLOUD = 2
def flatbuffer_image_to_ros2(buffer: bytes) -> dict:
start_ts = time.time()
image = RawImage.RawImage.GetRootAs(buffer, 0)
ros2_image={
"header": {
"stamp": {
"sec": image.Timestamp().Sec(),
"nanosec": image.Timestamp().Nsec()},
"frame_id": image.FrameId().decode('UTF-8'),
},
"height" : image.Height(),
"width" : image.Width(),
"encoding" : image.Encoding().decode('UTF-8'),
"is_bigendian": 0, # little endian
"step" : image.Step(),
# TODO: why does it take ~ 1 sec to write each image msg? Removing compression does not help, and tolist() seems to run quickly by itself
"data" : image.DataAsNumpy().tolist(), # buffer that contains actual image
}
end_ts = time.time()
print('Time taken to convert to ROS2 IMAGE:', end_ts - start_ts)
return ros2_image
def flatbuffer_pointcloud_to_ros2(buffer: bytes):
start_ts = time.time()
cloud = PointCloud.PointCloud.GetRootAs(buffer, 0)
point_fields = []
# Convert Foxglove PackedElementField to ROS2 PointField
for i in range(0, cloud.FieldsLength()):
field = cloud.Fields(i)
# print(dir(field))
datatype = 0
if field.Type() < 1 or field.Type() > 8:
print('Invalid cloud numeric type. Must be in range [1, 8]:', field.Type()) # TODO: warn user?
return None
else:
datatype = cloud_numeric_type_foxglove_to_ros2[field.Type()]
# print('DATA TYPE:', field.Type())
point_fields.append(
{
"name" : field.Name().decode('UTF-8'),
"offset" : field.Offset(),
"datatype" : datatype,
"count" : 1
})
# print(point_fields)
ros2_cloud={
"header": {
"stamp": {
"sec": cloud.Timestamp().Sec(),
"nanosec": cloud.Timestamp().Nsec()},
"frame_id": cloud.FrameId().decode('UTF-8'),
# "frame_id": 'downward-chest-camera',
},
# We use unorganized pointclouds: height is 1, width is length of data
"height" : 1,
"width" : int(cloud.DataLength() / cloud.PointStride()),
"fields" : point_fields,
"is_bigendian": 0, # data is little endian
# size of point in bytes. E.g. if each element is 4 bytes, XYZRGB is 6 * 4 = 24 bytes long
"point_step" : cloud.PointStride(),
"row_step" : cloud.DataLength(), # Length of a row in bytes
"is_dense" : 0, # there can be invalid pts in our cloud
"data" :cloud.DataAsNumpy().tolist(), # buffer that contains actual cloud
}
# print(ros2_cloud)
end_ts = time.time()
print('Time taken to convert to ROS2 CLOUD:', end_ts - start_ts)
return ros2_cloud
class Ros2McapWriter:
def __init__(self, filename: str, streams: set[str]):
self.streams = streams
self.filename = filename
self.queue = multiprocessing.Queue()
self.p = multiprocessing.Process(target=self._write_messages)
self.p.start()
def stop(self):
self.event.set() # stops disk writing process
self.p.join()
"""
message_name : the name of the ROS2 message
"""
def _register_schema(self, writer, stream_type: StreamType, filename: str, message_name: str):
# TODO: use bazel runfiles instead?
if not self.stream_type_to_schema.get(stream_type):
filepath = os.path.join(os.path.dirname(__file__), filename)
with open(filepath, "r") as schema:
schema_definition = schema.read()
self.stream_type_to_schema[stream_type] = writer.register_msgdef(message_name, schema_definition)
print(schema_definition)
def add_sensor_frame(self, frame, stream_name: str) -> dict:
# print(stream_name)
# if stream_name == 'stop':
# self.stop()
# return
if not stream_name in self.streams:
print('Invalid stream name:', stream_name) # TODO: warn user?
return
if 'image' in stream_name:
self.queue.put(Ros2SensorMsg(StreamType.IMAGE, stream_name, flatbuffer_image_to_ros2(frame)))
elif 'points' in stream_name:
self.queue.put(Ros2SensorMsg(StreamType.POINTCLOUD, stream_name, flatbuffer_pointcloud_to_ros2(frame)))
else:
print('Unsupported sensor stream type') # TODO: warn user?
def _write_messages(self):
counter : int = 0
mcap_file = open(self.filename, mode="wb")
writer = McapWriter(mcap_file)
# Add ROS2 message schemas for specified streams
for stream in self.streams:
if 'image' in stream:
self._register_schema(writer, StreamType.IMAGE, "ros2_msgs/Image.msg", "sensor_msgs/msg/Image")
elif 'points' in stream:
self._register_schema(writer, StreamType.POINTCLOUD, "ros2_msgs/PointCloud2.msg", "sensor_msgs/msg/PointCloud2")
while True:
if self.event.is_set():
# External process has requested stop
break
if self.queue.empty():
# Sleep a bit, so we do not waste CPU cycles
time.sleep(0.01)
else:
# Write next msg in queue to disk
start_ts = time.time()
ros2_msg = self.queue.get()
writer.write_message(
topic=ros2_msg.stream_name,
schema=self.stream_type_to_schema[ros2_msg.type],
message= ros2_msg.message,
# log_time=ros2_msg['header']['stamp']['sec'], use msg time instead?
log_time = time.time_ns(),
publish_time=time.time_ns(),
sequence=counter
)
counter += 1
end_ts = time.time()
print('\n>>>>>>>>>>>> Message write duration:', end_ts - start_ts, 'of type', ros2_msg.type)
# Finish writing to mcap file
writer.finish()
print('writer closed')
mcap_file.close()
self.queue.close()
self.queue.join_thread()
print ('ROS2 MCAP WRITER STOPPED')
def num_unwritten_msgs(self):
return self.queue.qsize()
streams = None
stream_type_to_schema = dict()
queue = None
p = None # process that writes files to disk
event = multiprocessing.Event()
if __name__ == "__main__":
streams = set(['downward-chest-camera.color.image-raw','upper-lidar.depth.points'])
writer = Ros2McapWriter(os.path.join(os.path.dirname(__file__), 'slow_writes.mcap'), streams)
img_fb = open(os.path.join(os.path.dirname(__file__), 'raw_image'), mode="rb").read()
cloud_fb = open(os.path.join(os.path.dirname(__file__), 'pointcloud'), mode = "rb").read()
for i in range (0, 10):
writer.add_sensor_frame(cloud_fb,'upper-lidar.depth.points')
writer.add_sensor_frame(img_fb,'downward-chest-camera.color.image-raw')
print ('Finished adding all sensor frames')
while writer.num_unwritten_msgs() > 0:
time.sleep(0.2)
print('Wait for msgs to write to disk...')
writer.stop() |
Beta Was this translation helpful? Give feedback.
Replies: 0 comments 3 replies
-
@andrewwongagility A good place to start would be a performance/cpu profile of the above code to see where the slowdowns are and whether they are CPU or disk. |
Beta Was this translation helpful? Give feedback.
Thanks for running the profile. If I recall correctly, we had to implement our own CDR serialization since there was not a standalone package for ROS2 done in native python or without bringing in an entire ROS workspace. Quite possibly there are some easy performance wins in that code. It's not an area of focus for us right now but we'd definitely be open to PRs improving it if you see anything that stands out.
One thing to try if you want to produce ROS2 serialized data is using the ROS2 python libraries to write the mcap file (or maybe serialize the messages). Those might use native bindings and run faster. Its been a while since I've tried but I think these apis will write MCAP files o…