Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(obstacle_stop_planner): add test #598

9 changes: 9 additions & 0 deletions planning/obstacle_stop_planner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,19 @@ rclcpp_components_register_node(obstacle_stop_planner
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()
find_package(launch_testing_ament_cmake REQUIRED)
add_launch_test(test/test_obstacle_stop_planner_node.test.py)

install(DIRECTORY
#test/config
DESTINATION share/${PROJECT_NAME}/
)
endif()

ament_auto_package(
INSTALL_TO_SHARE
config
launch
)


Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
ros__parameters:
stop_planner:
stop_margin: 5.0 # stop margin distance from obstacle on the path [m]
min_behavior_stop_margin: 2.0 # stop margin distance when any other stop point is inserted in stop margin [m]
min_behavior_stop_margin: 5.0 # stop margin distance when any other stop point is inserted in stop margin [m]
step_length: 1.0 # step length for pointcloud search range [m]
extend_distance: 0.0 # extend trajectory to consider after goal obstacle in the extend_distance
expand_stop_range: 0.0 # margin of vehicle footprint [m]

slow_down_planner:
# slow down planner parameters
forward_margin: 5.0 # margin distance from slow down point to vehicle front [m]
backward_margin: 5.0 # margin distance from slow down point to vehicle rear [m]
backward_margin: 0.0 # margin distance from slow down point to vehicle rear [m]
expand_slow_down_range: 1.0 # offset from vehicle side edge for expanding the search area of the surrounding point cloud [m]
max_slow_down_vel: 1.38 # max slow down velocity [m/s]
min_slow_down_vel: 0.28 # min slow down velocity [m/s]
Expand All @@ -19,7 +19,7 @@
consider_constraints: False # set "True", if no decel plan found under jerk/dec constrains, relax target slow down vel
forward_margin_min: 1.0 # min margin for relaxing slow down margin [m/s]
forward_margin_span: -0.1 # fineness param for relaxing slow down margin [m/s]
jerk_min_slow_down: -0.3 # min slow down jerk constraint [m/sss]
jerk_min_slow_down: -0.6 # min slow down jerk constraint [m/sss]
jerk_span: -0.01 # fineness param for planning deceleration jerk [m/sss]
jerk_start: -0.1 # init jerk used for deceleration planning [m/sss]
slow_down_vel: 1.38 # target slow down velocity [m/s]
3 changes: 3 additions & 0 deletions planning/obstacle_stop_planner/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
<depend>vehicle_info_util</depend>
<depend>visualization_msgs</depend>

<test_depend>launch</test_depend>
<test_depend>launch_ros</test_depend>
<test_depend>launch_testing</test_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>autoware_lint_common</test_depend>

Expand Down
14 changes: 14 additions & 0 deletions planning/obstacle_stop_planner/src/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,20 @@ void ObstacleStopPlannerNode::pathCallback(const Trajectory::ConstSharedPtr inpu
return;
}

if(!object_ptr_){
RCLCPP_WARN_THROTTLE(
get_logger(), *get_clock(), std::chrono::milliseconds(1000).count(),
"waiting for object...");
return;
}

if(!current_velocity_ptr_){
RCLCPP_WARN_THROTTLE(
get_logger(), *get_clock(), std::chrono::milliseconds(1000).count(),
"waiting for odometry...");
return;
}

PlannerData planner_data{};

getSelfPose(input_msg->header, tf_buffer_, planner_data.current_pose);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
# Copyright 2022 Tier IV, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import field
import imp
from operator import truediv
import time
import math
import unittest
import std_msgs.msg
import sensor_msgs.msg
import autoware_auto_planning_msgs.msg
import autoware_auto_perception_msgs.msg
import nav_msgs.msg
import launch
import launch_ros
from launch_ros.substitutions import FindPackageShare
import launch_testing
import pytest
import rclpy
from rclpy.node import Node
import copy
import struct
from ros2param.api import call_get_parameters

@pytest.mark.launch_test

def generate_test_description():

obstacle_stop_planner_node = launch_ros.actions.Node(
package='obstacle_stop_planner',
executable='obstacle_stop_planner_node',
name='obstacle_stop_planner_node',
remappings=[
('~/input/pointcloud', 'input/pointcloud'),
('~/input/trajectory', 'input/trajectory'),
('~/input/odometry', 'input/odometry'),
('~/input/objects', 'input/objects'),
('~/output/trajectory', 'output/trajectory')
],
parameters=[
[FindPackageShare('vehicle_info_util'), '/config/vehicle_info.param.yaml'],
[FindPackageShare('obstacle_stop_planner'), '/config/obstacle_stop_planner.param.yaml'],
[FindPackageShare('obstacle_stop_planner'), '/config/common.param.yaml'],
[FindPackageShare('obstacle_stop_planner'), '/config/adaptive_cruise_control.param.yaml']
],
)
static_transform_publisher_node = launch_ros.actions.Node(package = "tf2_ros",
executable = "static_transform_publisher",
name='static_transform_publisher_node',
arguments = ["0", "0", "0", "0", "0", "0", "map", "base_link"])

return (
launch.LaunchDescription([
obstacle_stop_planner_node,
static_transform_publisher_node,
# Start tests right away - no need to wait for anything
launch_testing.actions.ReadyToTest(),
]),
{
'obstacle_stop_planner': obstacle_stop_planner_node,
'static_transform_publisher': static_transform_publisher_node
}
)


class TestObstacleStopPlannerLink(unittest.TestCase):

@classmethod
def setUpClass(cls):
# Initialize the ROS context for the test node
rclpy.init()

@classmethod
def tearDownClass(cls):
# Shutdown the ROS context
rclpy.shutdown()

def setUp(self):
# Create a ROS node for tests
self.node = rclpy.create_node('test_object_stop_planner_link')
self.event_name = 'test_object_stop_planner'
params=call_get_parameters( node=self.node,
node_name='/obstacle_stop_planner_node',
parameter_names=[
'stop_planner.stop_margin',
'wheel_base',
'front_overhang',
'stop_planner.expand_stop_range',
'wheel_tread',
'left_overhang',
'right_overhang'
])
self.stop_margin=params.values[0].double_value;
wheel_base=params.values[1].double_value;
front_overhang=params.values[2].double_value;
self.base_link_to_front=wheel_base+front_overhang
expand_stop_range=params.values[3].double_value;
wheel_tread=params.values[4].double_value;
left_overhang=params.values[5].double_value;
right_overhang=params.values[6].double_value;
self.left_limit_line=wheel_tread/2.0+left_overhang+expand_stop_range
self.right_limit_line=wheel_tread/2.0+right_overhang+expand_stop_range


def tearDown(self):
self.node.destroy_node()

def init_messages(self):
#message init
#frame id
map_frame="map"
base_link_frame="base_link"
x=0
y=0
#pointcloud2
self.pointcloud_msg=sensor_msgs.msg.PointCloud2()
self.pointcloud_msg.header.frame_id=base_link_frame
self.pointcloud_msg.header.stamp=self.node.get_clock().now().to_msg()
self.pointcloud_msg.height=1
self.pointcloud_msg.width=0
field=sensor_msgs.msg.PointField()
field.name="x"
field.offset=0
field.datatype=7
field.count=1
self.pointcloud_msg.fields.append(copy.deepcopy(field))
field.name="y"
field.offset=4
self.pointcloud_msg.fields.append(copy.deepcopy(field))
field.name="z"
field.offset=8
self.pointcloud_msg.fields.append(copy.deepcopy(field))
self.pointcloud_msg.is_bigendian=False
self.pointcloud_msg.point_step=16
self.pointcloud_msg.row_step=self.pointcloud_msg.width*self.pointcloud_msg.point_step
self.pointcloud_msg.is_dense=True

#trajectory
self.trajectory_resolution=0.1
self.trajectory_length=100.0
trajectory_element=int(self.trajectory_length/self.trajectory_resolution)
self.trajectory_msg=autoware_auto_planning_msgs.msg.Trajectory()
self.trajectory_msg.header.frame_id=map_frame
self.trajectory_msg.header.stamp=self.node.get_clock().now().to_msg()
for x in range(trajectory_element):
trajectory_point=autoware_auto_planning_msgs.msg.TrajectoryPoint()
trajectory_point.pose.position.x=x*self.trajectory_resolution
trajectory_point.pose.orientation.w=1.0
trajectory_point.longitudinal_velocity_mps=25/3
self.trajectory_msg.points.append(copy.deepcopy(trajectory_point))

#odometry
self.odom_msg=nav_msgs.msg.Odometry()
self.odom_msg.header.frame_id=map_frame
self.odom_msg.header.stamp=self.node.get_clock().now().to_msg()
self.odom_msg.child_frame_id=base_link_frame
self.odom_msg.pose.pose.orientation.w=1.0
self.odom_msg.twist.twist.linear.x=0.0

#object
self.object_msg=autoware_auto_perception_msgs.msg.PredictedObjects()
self.object_msg.header.frame_id=map_frame
self.object_msg.header.stamp=self.node.get_clock().now().to_msg()

def set_pointcloud_messages(self,x,y):
self.pointcloud_msg.width=1
self.pointcloud_msg.row_step=self.pointcloud_msg.width*self.pointcloud_msg.point_step
self.pointcloud_msg.data=[]
for i in range(0,self.pointcloud_msg.width):
self.pointcloud_msg.data=self.point_to_list(x,y,i/2,0.0)+self.pointcloud_msg.data

def point_to_list(self,x,y,z,i):
point_list=struct.pack('<f',float(x))+struct.pack('<f',float(y))+struct.pack('<f',float(z))+struct.pack('<f',float(i))
return point_list

def clip(self,val,min_val,max_val):
return max(min(val,max_val),min_val)

def trajectory_evaluation(self,x,y,trajectory):
if self.left_limit_line > y and y > -self.right_limit_line:
# Inside the trajectory
stop_line=self.clip(x-(self.stop_margin+self.base_link_to_front),0,self.trajectory_length)
stop_element=int(stop_line/self.trajectory_resolution)+1
check_vel=(trajectory.points[stop_element].longitudinal_velocity_mps == 0.0)
return check_vel
else:
# Outside the trajectory
check_vel=math.isclose(trajectory.points[-1].longitudinal_velocity_mps,self.trajectory_msg.points[-1].longitudinal_velocity_mps,abs_tol=0.01)
return check_vel

def trajectory_callback(self,msg):
result_val=self.trajectory_evaluation(self.current_x,self.current_y,msg)
#print("x:",self.current_x," , y:",self.current_y," , result:",result_val)
self.callback_flag=True
if result_val:
self.true_count+=1
else:
print("Test failed at (x:",self.current_x,", y:",self.current_y,")")

def test_linear_trajectory(self):
#publisher
pointcloud_pub = self.node.create_publisher(sensor_msgs.msg.PointCloud2, 'input/pointcloud', 10)
trajectory_pub = self.node.create_publisher(autoware_auto_planning_msgs.msg.Trajectory, 'input/trajectory', 10)
odom_pub = self.node.create_publisher(nav_msgs.msg.Odometry, 'input/odometry', 10)
object_pub = self.node.create_publisher(autoware_auto_perception_msgs.msg.PredictedObjects, 'input/objects', 10)
#subscliber
trajectory_sub = self.node.create_subscription(autoware_auto_planning_msgs.msg.Trajectory, 'output/trajectory', self.trajectory_callback, 10)

self.init_messages()

# param [m]
y_point_resolution=0.4
y_point_test_margin=0.5
x_point_resolution=10.0

self.current_x=0.0
self.current_y=0.0
self.callback_flag=False
self.true_count=0

#test points list
test_point_x=list(range(0,int(self.trajectory_length/x_point_resolution),1))
test_point_x=[n*x_point_resolution for n in test_point_x]
test_point_y=list(range(-int((self.right_limit_line+y_point_test_margin)/y_point_resolution), int((self.left_limit_line+y_point_test_margin)/y_point_resolution), 1))
test_point_y=[n*y_point_resolution for n in test_point_y]
test_point_y.append(-self.right_limit_line)
test_point_y.append(self.left_limit_line)
#test_point_x.sort()
test_point_y.sort()

#while rclpy.ok():
for ix in test_point_x:
for iy in test_point_y:
self.current_x=float(ix)
self.current_y=float(iy)
self.set_pointcloud_messages(self.current_x,self.current_y)

#update time stamp
self.object_msg.header.stamp=self.node.get_clock().now().to_msg()
self.odom_msg.header.stamp=self.node.get_clock().now().to_msg()
self.pointcloud_msg.header.stamp=self.node.get_clock().now().to_msg()
self.trajectory_msg.header.stamp=self.node.get_clock().now().to_msg()

object_pub.publish(self.object_msg)
odom_pub.publish(self.odom_msg)
pointcloud_pub.publish(self.pointcloud_msg)
trajectory_pub.publish(self.trajectory_msg)

self.callback_flag=False
while not self.callback_flag:
rclpy.spin_once(self.node, timeout_sec=0.1)
self.assertEqual(self.true_count,len(test_point_x)*len(test_point_y))


@launch_testing.post_shutdown_test()
class TestProcessOutput(unittest.TestCase):

def test_exit_code(self, proc_info):
# Check that all processes in the launch (in this case, there's just one) exit
# with code 0
launch_testing.asserts.assertExitCodes(proc_info)