Skip to content

Commit

Permalink
add test to check pddl output
Browse files Browse the repository at this point in the history
- test/2013_fridge_demo.test test/structuretest
  • Loading branch information
k-okada committed Jun 20, 2020
1 parent b99d2a2 commit 54d7e74
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 0 deletions.
26 changes: 26 additions & 0 deletions pddl/pddl_planner/test/2013_fridge_demo.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<launch>
<include file="$(find pddl_planner)/demos/2013_fridge_demo/demo_bring_can.launch" />

<test name="publish_test"
test-name="smach_container_publish_test"
pkg="rostest" type="publishtest">
<rosparam>
topics:
- name: /server_name/smach/container_structure
timeout: 30
negative: False
</rosparam>
</test>

<test name="structure_test"
test-name="smach_container_structure_test"
pkg="pddl_planner" type="structuretest">
<rosparam>
topics:
- name: /server_name/smach/container_structure
timeout: 10
children: ['(move-to start frontfridge)', '(open-door)', '(move-to frontfridge pregrasp)', '(grasp-object can)', '(move-to pregrasp start)', '(move-to start somewhere)', '(try-close) ', '(move-recoverly) ', '(move-to frontfridge start)', '(move-recoverly)', '(close-door)', '(move-to preclose start)', '(try-close)']
container_outcomes: ['goal0']
</rosparam>
</test>
</launch>
198 changes: 198 additions & 0 deletions pddl/pddl_planner/test/structuretest
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
#!/usr/bin/env python
###############################################################################
# Software License Agreement (BSD License)
#
# Copyright (c) 2016, Kentaro Wada.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###############################################################################

"""
Integration test node that subscribes to smach_msgs/SmachContainerStructure
topic (/server_name/smach/container_structure) and verifies it's contents
below parameters must be set:
<test name="structuretest"
test-name="structuretest"
pkg="pddl_planner" type="structuretest">
<rosparam>
topics:
- name: a topic name
timeout: timeout for the topic
children: contens of children
internal_outcomes: contens of internal_outcomes
outcomes_from: contens of coutcomes_from
outcomes_to: contens of outcomes_to
container_outcomes: contens of container_outcomes
- name: another topic name
timeout: timeout for the topic
children: contens of children
internal_outcomes: contens of internal_outcomes
outcomes_from: contens of coutcomes_from
outcomes_to: contens of outcomes_to
container_outcomes: contens of container_outcomes
</rosparam>
</test>
Author: Kentaro Wada <[email protected]>
Modified for SmachContainerStructure by <[email protected]>
"""
from __future__ import print_function

import sys
import time
import unittest

import rospy
import rostopic

from smach_msgs.msg import SmachContainerStructure

PKG = 'pddl_planner'
NAME = 'structtest'


class StructureChecker(object):
def __init__(self, topic_name, timeout, struct):
self.topic_name = topic_name
self.deadline = rospy.Time.now() + rospy.Duration(timeout)
self.struct = struct
self.msg = None
rospy.loginfo("subscribing {}".format(topic_name))
rospy.loginfo(" - children: {}".format(struct.children))
rospy.loginfo(" - internal_outcomes: {}".format(struct.internal_outcomes))
rospy.loginfo(" - outcomes_from: {}".format(struct.outcomes_from))
rospy.loginfo(" - outcomes_to: {}".format(struct.outcomes_to))
rospy.loginfo(" - container_outcomes: {}".format(struct.container_outcomes))
self.sub = rospy.Subscriber(topic_name, SmachContainerStructure, self._callback)

def _callback(self, msg):
message_matched = True

rospy.loginfo("received msg")
if not self.struct.children == None:
rospy.loginfo(" - children: {}".format(msg.children))
if not set(msg.children) == set(self.struct.children):
rospy.loginfo(" expecting: {}".format(self.struct.children))
message_matched = False
if not self.struct.internal_outcomes == None:
rospy.loginfo(" - internal_outcomes: {}".format(msg.internal_outcomes))
if not set(msg.internal_outcomes) == set(self.struct.internal_outcomes):
rospy.loginfo(" expecting : {}".format(self.struct.internal_outcomes))
message_matched = False
if not self.struct.outcomes_from == None:
rospy.loginfo(" - outcomes_from: {}".format(msg.outcomes_from))
if not set(msg.outcomes_from) == set(self.struct.outcomes_from):
rospy.loginfo(" expecting: {}".format(self.struct.outcomes_from))
message_matched = False
if not self.struct.outcomes_to == None:
rospy.loginfo(" - outcomes_to: {}".format(msg.outcomes_to))
if not set(msg.outcomes_to) == set(self.struct.outcomes_to):
rospy.loginfo(" expecting: {}".format(self.struct.outcomes_to))
message_matched = False
if not self.struct.container_outcomes == None:
rospy.loginfo(" - container_outcomes: {}".format(msg.container_outcomes))
if not set(msg.container_outcomes) == set(self.struct.container_outcomes):
rospy.loginfo(" expecting: {}".format(self.struct.container_outcomes))
message_matched = False

if message_matched:
self.msg = msg

def assert_published(self):
if self.msg:
return True
if rospy.Time.now() > self.deadline:
return False
return None


class StructureTest(unittest.TestCase):
def __init__(self, *args):
super(self.__class__, self).__init__(*args)
rospy.init_node(NAME)
# scrape rosparam
self.topics = []
params = rospy.get_param('~topics', [])
for param in params:
if 'name' not in param:
self.fail("'name' field in rosparam is required but not specified.")
topic = {'timeout': 10, 'children': None, 'internal_outcomes': None, 'outcomes_from': None, 'outcomes_to': None, 'container_outcomes': None}
topic.update(param)
self.topics.append(topic)
# check if there is at least one topic
if not self.topics:
self.fail('No topic is specified in rosparam.')

def test_publish(self):
"""Test topics are published and messages come"""
use_sim_time = rospy.get_param('/use_sim_time', False)
t_start = time.time()
while not rospy.is_shutdown() and \
use_sim_time and (rospy.Time.now() == rospy.Time(0)):
rospy.logwarn_throttle(
1, '/use_sim_time is specified and rostime is 0, /clock is published?')
if time.time() - t_start > 10:
self.fail('Timed out (10s) of /clock publication.')
# must use time.sleep because /clock isn't yet published, so rospy.sleep hangs.
time.sleep(0.1)
# subscribe topics
checkers = []
for topic in self.topics:
topic_name = topic['name']
timeout = topic['timeout']
struct = SmachContainerStructure()
struct.children = topic['children']
struct.internal_outcomes = topic['internal_outcomes']
struct.outcomes_from = topic['outcomes_from']
struct.outcomes_to = topic['outcomes_to']
struct.container_outcomes = topic['container_outcomes']

checkers.append(StructureChecker(topic_name, timeout, struct))
deadline = max(checker.deadline for checker in checkers)
# assert
finished_topics = []
while not rospy.is_shutdown():
if len(self.topics) == len(finished_topics):
break
for checker in checkers:
if checker.topic_name in finished_topics:
continue # skip topic testing has finished
ret = checker.assert_published()
if ret is None:
continue # skip if there is no test result
finished_topics.append(checker.topic_name)
assert ret, 'Topic [%s] is not published' % (checker.topic_name)
rospy.sleep(0.01)


if __name__ == '__main__':
import rostest
rostest.run(PKG, NAME, StructureTest, sys.argv)

0 comments on commit 54d7e74

Please sign in to comment.