diff --git a/README.md b/README.md index 15abbe93..d15572a3 100644 --- a/README.md +++ b/README.md @@ -167,6 +167,7 @@ To publish a given u-blox message to a ROS topic, set the parameter shown below * `publish/mon/hw`: Topic `~monhw` ### NAV messages +* NAV messages are time stamped with `i_tow`, and multiple messages can be synchronized with [ublox_msg_filters](ublox_msg_filters) * `publish/nav/all`: This is the default value for the `publish/mon/` parameters below. It defaults to `publish/all`. Individual messages can be enabled or disabled by setting the parameters below. * `publish/nav/att`: Topic `~navatt`. **ADR/UDR devices only** * `publish/nav/clock`: Topic `~navclock` diff --git a/ublox/package.xml b/ublox/package.xml index b34a1235..4253cb40 100644 --- a/ublox/package.xml +++ b/ublox/package.xml @@ -12,6 +12,7 @@ ament_cmake ublox_serialization ublox_msgs + ublox_msg_filters ublox_gps diff --git a/ublox_msg_filters/CMakeLists.txt b/ublox_msg_filters/CMakeLists.txt new file mode 100644 index 00000000..5ea9e167 --- /dev/null +++ b/ublox_msg_filters/CMakeLists.txt @@ -0,0 +1,54 @@ +cmake_minimum_required(VERSION 3.5) +project(ublox_msg_filters) + +# Default to C++14 +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 14) +endif() + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +find_package(ament_cmake_ros REQUIRED) +find_package(rclcpp REQUIRED) +find_package(message_filters REQUIRED) +find_package(ublox_msgs REQUIRED) + +set(dependencies "rclcpp" "message_filters" "ublox_msgs") + +add_executable(talker src/talker.cpp) +ament_target_dependencies(talker ${dependencies}) +target_include_directories(talker PUBLIC + $ + $ +) + +add_executable(listener src/listener.cpp) +ament_target_dependencies(listener ${dependencies}) +target_include_directories(listener PUBLIC + $ + $ +) + +install(DIRECTORY include/${PROJECT_NAME}/ + DESTINATION include/${PROJECT_NAME} +) +install(DIRECTORY launch + DESTINATION share/${PROJECT_NAME} +) +install(TARGETS talker listener + DESTINATION lib/${PROJECT_NAME} +) + +if(BUILD_TESTING) + ament_add_gtest(${PROJECT_NAME}-test_exact_time_policy test/test_exact_time_policy.cpp) + ament_target_dependencies(${PROJECT_NAME}-test_exact_time_policy ${dependencies}) + target_include_directories(${PROJECT_NAME}-test_exact_time_policy PUBLIC + $ + $ + ) +endif() + +ament_export_include_directories(include) +ament_package() diff --git a/ublox_msg_filters/README.md b/ublox_msg_filters/README.md new file mode 100644 index 00000000..89d79b52 --- /dev/null +++ b/ublox_msg_filters/README.md @@ -0,0 +1,11 @@ +# ublox_msg_filters + +Time synchronize multiple uBlox messages to get a single callback + +Port of [message_filters::ExactTime](http://wiki.ros.org/message_filters#ExactTime_Policy) message synchronization policy to use integer time of week in milliseconds `i_tow` instead of `ros::Time` in a header. + +The [message_filters::ApproximateTime](http://wiki.ros.org/message_filters#ApproximateTime_Policy) message synchronization policy is not relevent because messages generated by a ublox sensor for a single update contain identical `i_tow` time stamps. + +See [src/listener.cpp](src/listener.cpp) for example usage. + +Launch the example with `ros2 launch ublox_msg_filters example.launch.xml` diff --git a/ublox_msg_filters/include/ublox_msg_filters/exact_time.h b/ublox_msg_filters/include/ublox_msg_filters/exact_time.h new file mode 100644 index 00000000..ba87cccd --- /dev/null +++ b/ublox_msg_filters/include/ublox_msg_filters/exact_time.h @@ -0,0 +1,251 @@ +/********************************************************************* +* Software License Agreement (BSD License) +* +* Copyright (c) 2009, Willow Garage, Inc. +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* +* * Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above +* copyright notice, this list of conditions and the following +* disclaimer in the documentation and/or other materials provided +* with the distribution. +* * Neither the name of the Willow Garage nor the names of its +* contributors may be used to endorse or promote products derived +* from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +*********************************************************************/ + +#ifndef UBLOX_MSG_FILTERS__EXACT_TIME_HPP_ +#define UBLOX_MSG_FILTERS__EXACT_TIME_HPP_ + +#include +#include +#include + +#include + +#include "message_filters/connection.h" +#include "message_filters/message_traits.h" +#include "message_filters/null_types.h" +#include "message_filters/signal9.h" +#include "message_filters/synchronizer.h" + +namespace ublox_msg_filters +{ +namespace sync_policies +{ + +using NullType = message_filters::NullType; +using Connection = message_filters::Connection; + +template +using PolicyBase = message_filters::PolicyBase; + +template +using Synchronizer = message_filters::Synchronizer; + +template +struct iTOW +{ + static u_int32_t value(const M& m) { return m.i_tow; } +}; + +template<> +struct iTOW +{ + static u_int32_t value(const NullType&) { return 0; } +}; + +template +struct ExactTime : public PolicyBase +{ + typedef Synchronizer Sync; + typedef PolicyBase Super; + typedef typename Super::Messages Messages; + typedef typename Super::Signal Signal; + typedef typename Super::Events Events; + typedef typename Super::RealTypeCount RealTypeCount; + typedef typename Super::M0Event M0Event; + typedef typename Super::M1Event M1Event; + typedef typename Super::M2Event M2Event; + typedef typename Super::M3Event M3Event; + typedef typename Super::M4Event M4Event; + typedef typename Super::M5Event M5Event; + typedef typename Super::M6Event M6Event; + typedef typename Super::M7Event M7Event; + typedef typename Super::M8Event M8Event; + typedef Events Tuple; + + ExactTime(uint32_t queue_size) + : parent_(0) + , queue_size_(queue_size) + , last_signal_time_(0) + { + } + + ExactTime(const ExactTime& e) + { + *this = e; + } + + ExactTime& operator=(const ExactTime& rhs) + { + parent_ = rhs.parent_; + queue_size_ = rhs.queue_size_; + last_signal_time_ = rhs.last_signal_time_; + tuples_ = rhs.tuples_; + + return *this; + } + + void initParent(Sync* parent) + { + parent_ = parent; + } + + template + void add(const typename std::tuple_element::type& evt) + { + RCUTILS_ASSERT(parent_); + + namespace mt = message_filters::message_traits; + + std::lock_guard lock(mutex_); + + Tuple& t = tuples_[iTOW::type>::value(*evt.getMessage())]; + std::get(t) = evt; + + checkTuple(t); + } + + template + Connection registerDropCallback(const C& callback) + { + return drop_signal_.addCallback(callback); + } + + template + Connection registerDropCallback(C& callback) + { + return drop_signal_.addCallback(callback); + } + + template + Connection registerDropCallback(const C& callback, T* t) + { + return drop_signal_.addCallback(callback, t); + } + + template + Connection registerDropCallback(C& callback, T* t) + { + return drop_signal_.addCallback(callback, t); + } + +private: + + // assumes mutex_ is already locked + void checkTuple(Tuple& t) + { + namespace mt = message_filters::message_traits; + + bool full = true; + full = full && (bool)std::get<0>(t).getMessage(); + full = full && (bool)std::get<1>(t).getMessage(); + full = full && (RealTypeCount::value > 2 ? (bool)std::get<2>(t).getMessage() : true); + full = full && (RealTypeCount::value > 3 ? (bool)std::get<3>(t).getMessage() : true); + full = full && (RealTypeCount::value > 4 ? (bool)std::get<4>(t).getMessage() : true); + full = full && (RealTypeCount::value > 5 ? (bool)std::get<5>(t).getMessage() : true); + full = full && (RealTypeCount::value > 6 ? (bool)std::get<6>(t).getMessage() : true); + full = full && (RealTypeCount::value > 7 ? (bool)std::get<7>(t).getMessage() : true); + full = full && (RealTypeCount::value > 8 ? (bool)std::get<8>(t).getMessage() : true); + + if (full) + { + parent_->signal(std::get<0>(t), std::get<1>(t), std::get<2>(t), + std::get<3>(t), std::get<4>(t), std::get<5>(t), + std::get<6>(t), std::get<7>(t), std::get<8>(t)); + + last_signal_time_ = iTOW::value(*std::get<0>(t).getMessage()); + + tuples_.erase(last_signal_time_); + + clearOldTuples(); + } + + if (queue_size_ > 0) + { + while (tuples_.size() > queue_size_) + { + Tuple& t2 = tuples_.begin()->second; + drop_signal_.call(std::get<0>(t2), std::get<1>(t2), std::get<2>(t2), + std::get<3>(t2), std::get<4>(t2), std::get<5>(t2), + std::get<6>(t2), std::get<7>(t2), std::get<8>(t2)); + tuples_.erase(tuples_.begin()); + } + } + } + + // assumes mutex_ is already locked + void clearOldTuples() + { + typename M_TimeToTuple::iterator it = tuples_.begin(); + typename M_TimeToTuple::iterator end = tuples_.end(); + for (; it != end;) + { + if (it->first <= last_signal_time_) + { + typename M_TimeToTuple::iterator old = it; + ++it; + + Tuple& t = old->second; + drop_signal_.call(std::get<0>(t), std::get<1>(t), std::get<2>(t), + std::get<3>(t), std::get<4>(t), std::get<5>(t), + std::get<6>(t), std::get<7>(t), std::get<8>(t)); + tuples_.erase(old); + } + else + { + // the map is sorted by time, so we can ignore anything after this if this one's time is ok + break; + } + } + } + +private: + Sync* parent_; + + uint32_t queue_size_; + typedef std::map M_TimeToTuple; + M_TimeToTuple tuples_; + uint32_t last_signal_time_; + + Signal drop_signal_; + + std::mutex mutex_; +}; + +} // namespace sync_policies +} // namespace ublox_msg_filters + +#endif // UBLOX_MSG_FILTERS__EXACT_TIME_HPP_ + diff --git a/ublox_msg_filters/launch/example.launch.xml b/ublox_msg_filters/launch/example.launch.xml new file mode 100644 index 00000000..054669f6 --- /dev/null +++ b/ublox_msg_filters/launch/example.launch.xml @@ -0,0 +1,9 @@ + + + + + + + + + diff --git a/ublox_msg_filters/package.xml b/ublox_msg_filters/package.xml new file mode 100644 index 00000000..7081f793 --- /dev/null +++ b/ublox_msg_filters/package.xml @@ -0,0 +1,21 @@ + + + + ublox_msg_filters + 2.3.0 + Time synchronize multiple uBlox messages to get a single callback + Kevin Hallenbeck + Kevin Hallenbeck + BSD + http://ros.org/wiki/ublox + + ament_cmake_ros + + rclcpp + message_filters + ublox_msgs + + + ament_cmake + + diff --git a/ublox_msg_filters/src/listener.cpp b/ublox_msg_filters/src/listener.cpp new file mode 100644 index 00000000..179cf203 --- /dev/null +++ b/ublox_msg_filters/src/listener.cpp @@ -0,0 +1,46 @@ +#include +#include +#include +#include +#include +#include + +class Listener : public rclcpp::Node { +public: + Listener() : Node("listener"), + sub1_(this, "msg1"), + sub2_(this, "msg2"), + sub3_(this, "msg3"), + sync_(MySyncPolicy(10), sub1_, sub2_, sub3_) + { + using std::placeholders::_1; + using std::placeholders::_2; + using std::placeholders::_3; + sync_.registerCallback(std::bind(&Listener::callback, this, _1, _2, _3)); + RCLCPP_INFO(this->get_logger(), "Ready to receive"); + } + +private: + void callback(const ublox_msgs::msg::NavPOSLLH::ConstSharedPtr msg1, + const ublox_msgs::msg::NavRELPOSNED9::ConstSharedPtr msg2, + const ublox_msgs::msg::NavVELNED::ConstSharedPtr msg3) { + RCLCPP_INFO(this->get_logger(), "RX %u %u %u", msg1->i_tow, msg2->i_tow, msg3->i_tow); + } + +private: + message_filters::Subscriber sub1_; + message_filters::Subscriber sub2_; + message_filters::Subscriber sub3_; + typedef ublox_msg_filters::sync_policies::ExactTime MySyncPolicy; + message_filters::Synchronizer sync_; +}; + +int main(int argc, char * argv[]) +{ + rclcpp::init(argc,argv); + rclcpp::spin(std::make_shared()); + rclcpp::shutdown(); + return 0; +} diff --git a/ublox_msg_filters/src/talker.cpp b/ublox_msg_filters/src/talker.cpp new file mode 100644 index 00000000..76d11148 --- /dev/null +++ b/ublox_msg_filters/src/talker.cpp @@ -0,0 +1,50 @@ +#include +#include +#include +#include + +class Talker : public rclcpp::Node { +public: + Talker() : Node("talker") { + using namespace std::chrono_literals; + pub1_ = this->create_publisher("msg1", 2); + pub2_ = this->create_publisher("msg2", 2); + pub3_ = this->create_publisher("msg3", 2); + timer_ = this->create_wall_timer(0.2s, std::bind(&Talker::publish, this)); + RCLCPP_INFO(this->get_logger(), "Ready to transmit"); + } + +private: + void publish() { + RCLCPP_INFO(this->get_logger(), "TX %u %u %u", i_tow_, i_tow_ + 1, i_tow_ + 3); + + auto msg1 = ublox_msgs::msg::NavPOSLLH(); + msg1.i_tow = i_tow_; + pub1_->publish(msg1); + + auto msg2 = ublox_msgs::msg::NavRELPOSNED9(); + msg2.i_tow = i_tow_ + 1; + pub2_->publish(msg2); + + auto msg3 = ublox_msgs::msg::NavVELNED(); + msg3.i_tow = i_tow_ + 3; + pub3_->publish(msg3); + + i_tow_++; + } + +private: + rclcpp::TimerBase::SharedPtr timer_; + rclcpp::Publisher::SharedPtr pub1_; + rclcpp::Publisher::SharedPtr pub2_; + rclcpp::Publisher::SharedPtr pub3_; + uint32_t i_tow_ = 0; +}; + +int main(int argc, char * argv[]) +{ + rclcpp::init(argc, argv); + rclcpp::spin(std::make_shared()); + rclcpp::shutdown(); + return 0; +} diff --git a/ublox_msg_filters/test/test_exact_time_policy.cpp b/ublox_msg_filters/test/test_exact_time_policy.cpp new file mode 100644 index 00000000..468cff27 --- /dev/null +++ b/ublox_msg_filters/test/test_exact_time_policy.cpp @@ -0,0 +1,152 @@ +/********************************************************************* +* Software License Agreement (BSD License) +* +* Copyright (c) 2008, Willow Garage, Inc. +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* +* * Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above +* copyright notice, this list of conditions and the following +* disclaimer in the documentation and/or other materials provided +* with the distribution. +* * Neither the name of the Willow Garage nor the names of its +* contributors may be used to endorse or promote products derived +* from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +*********************************************************************/ + +#include + +#include +#include "message_filters/synchronizer.h" +#include "ublox_msg_filters/exact_time.h" + +using namespace ublox_msg_filters::sync_policies; + +struct Msg +{ + uint32_t i_tow; + int data; +}; +typedef std::shared_ptr MsgPtr; +typedef std::shared_ptr MsgConstPtr; + +class Helper +{ +public: + Helper() + : count_(0) + , drop_count_(0) + {} + + void cb() + { + ++count_; + } + + void dropcb() + { + ++drop_count_; + } + + int32_t count_; + int32_t drop_count_; +}; + +typedef ExactTime Policy2; +typedef ExactTime Policy3; +typedef Synchronizer Sync2; +typedef Synchronizer Sync3; + +////////////////////////////////////////////////////////////////////////////////////////////////// +// From here on we assume that testing the 3-message version is sufficient, so as not to duplicate +// tests for everywhere from 2-9 +////////////////////////////////////////////////////////////////////////////////////////////////// +TEST(ExactTime, multipleTimes) +{ + Sync3 sync(2); + Helper h; + sync.registerCallback(std::bind(&Helper::cb, &h)); + MsgPtr m(std::make_shared()); + m->i_tow = 0; + + sync.add<0>(m); + ASSERT_EQ(h.count_, 0); + + m = std::make_shared(); + m->i_tow = 100000000; + sync.add<1>(m); + ASSERT_EQ(h.count_, 0); + sync.add<0>(m); + ASSERT_EQ(h.count_, 0); + sync.add<2>(m); + ASSERT_EQ(h.count_, 1); +} + +TEST(ExactTime, queueSize) +{ + Sync3 sync(1); + Helper h; + sync.registerCallback(std::bind(&Helper::cb, &h)); + MsgPtr m(std::make_shared()); + m->i_tow = 0; + + sync.add<0>(m); + ASSERT_EQ(h.count_, 0); + sync.add<1>(m); + ASSERT_EQ(h.count_, 0); + + m = std::make_shared(); + m->i_tow = 100000000; + sync.add<1>(m); + ASSERT_EQ(h.count_, 0); + + m = std::make_shared(); + m->i_tow = 0; + sync.add<1>(m); + ASSERT_EQ(h.count_, 0); + sync.add<2>(m); + ASSERT_EQ(h.count_, 0); +} + +TEST(ExactTime, dropCallback) +{ + Sync2 sync(1); + Helper h; + sync.registerCallback(std::bind(&Helper::cb, &h)); + sync.getPolicy()->registerDropCallback(std::bind(&Helper::dropcb, &h)); + MsgPtr m(std::make_shared()); + m->i_tow = 0; + + sync.add<0>(m); + ASSERT_EQ(h.drop_count_, 0); + m->i_tow = 100000000; + sync.add<0>(m); + + ASSERT_EQ(h.drop_count_, 1); +} + +int main(int argc, char **argv){ + testing::InitGoogleTest(&argc, argv); + rclcpp::init(argc, argv); + return RUN_ALL_TESTS(); +} + +