Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(planning_debug_tools): add perception_replayer.py #4574

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion planning/planning_debug_tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ ament_auto_package(
install(PROGRAMS
scripts/trajectory_visualizer.py
scripts/closest_velocity_checker.py
scripts/perception_reproducer.py
scripts/perception_replayer/perception_reproducer.py
scripts/perception_replayer/perception_replayer.py
DESTINATION lib/${PROJECT_NAME}
)
32 changes: 31 additions & 1 deletion planning/planning_debug_tools/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ PlotCurrentVelocity('localization_kinematic_state', '/localization/kinematic_sta

## Perception reproducer

This script can overlay the perception results from the rosbag on the planning simulator.
This script can overlay the perception results from the rosbag on the planning simulator synchronized with the simulator's ego pose.

In detail, the ego pose in the rosbag which is closest to the current ego pose in the simulator is calculated.
The perception results at the timestamp of the closest ego pose is extracted, and published.
Expand All @@ -211,3 +211,33 @@ ros2 run planning_debug_tools perception_reproducer.py -b <dir-to-bag-files>
```

Instead of publishing predicted objects, you can publish detected/tracked objects by designating `-d` or `-t`, respectively.

## Perception replayer

A part of the feature is under development.

This script can overlay the perception results from the rosbag on the planning simulator.

In detail, this script publishes the data at a certain timestamp from the rosbag.
The timestamp will increase according to the real time without any operation.
By using the GUI, you can modify the timestamp by pausing, changing the rate or going back into the past.

### How to use

First, launch the planning simulator, and put the ego pose.
Then, run the script according to the following command.

By designating a rosbag, perception replayer can be launched.
The GUI is launched as well with which a timestamp of rosbag can be managed.

```bash
ros2 run planning_debug_tools perception_replayer.py -b <bag-file>
```

You can designate multiple rosbags in the directory.

```bash
ros2 run planning_debug_tools perception_replayer.py -b <dir-to-bag-files>
```

Instead of publishing predicted objects, you can publish detected/tracked objects by designating `-d` or `-t`, respectively.
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#!/usr/bin/env python3

# Copyright 2023 TIER IV, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import copy
import functools
import sys

from PyQt5.QtWidgets import QApplication
from perception_replayer_common import PerceptionReplayerCommon
import rclpy
from time_manager_widget import TimeManagerWidget
from utils import create_empty_pointcloud
from utils import translate_objects_coordinate


class PerceptionReplayer(PerceptionReplayerCommon):
def __init__(self, args):
super().__init__(args, "perception_replayer")

self.bag_timestamp = self.rosbag_objects_data[0][0]
self.is_pause = False
self.rate = 1.0

# initialize widget
self.widget = TimeManagerWidget()
self.widget.show()
self.widget.button.clicked.connect(self.onPushed)
for button in self.widget.rate_button:
button.clicked.connect(functools.partial(self.onSetRate, button))

# start timer callback
self.delta_time = 0.1
self.timer = self.create_timer(self.delta_time, self.on_timer)
print("Start timer callback")

def on_timer(self):
timestamp = self.get_clock().now().to_msg()

self.kill_online_perception_node()

if self.args.detected_object:
pointcloud_msg = create_empty_pointcloud(timestamp)
self.pointcloud_pub.publish(pointcloud_msg)

# step timestamp
if not self.is_pause:
self.bag_timestamp += self.rate * self.delta_time * 1e9 # seconds to timestamp

# extract message by the timestamp
msgs = copy.deepcopy(self.find_topics_by_timestamp(self.bag_timestamp))
objects_msg = msgs[0]
traffic_signals_msg = msgs[1]

# objects
if objects_msg:
objects_msg.header.stamp = timestamp
if self.args.detected_object:
if not self.ego_pose:
print("No ego pose found.")
return

ego_odom = self.find_ego_odom_by_timestamp(self.bag_timestamp)
log_ego_pose = ego_odom[1].pose.pose

translate_objects_coordinate(self.ego_pose, log_ego_pose, objects_msg)
self.objects_pub.publish(objects_msg)

# traffic signals
if traffic_signals_msg:
traffic_signals_msg.header.stamp = timestamp
self.traffic_signals_pub.publish(traffic_signals_msg)
self.prev_traffic_signals_msg = traffic_signals_msg
elif self.prev_traffic_signals_msg:
self.prev_traffic_signals_msg.header.stamp = timestamp
self.traffic_signals_pub.publish(self.prev_traffic_signals_msg)

def onPushed(self, event):
if self.widget.button.isChecked():
self.is_pause = True
else:
self.is_pause = False

def onSetRate(self, button):
self.rate = float(button.text())


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--bag", help="rosbag", default=None)
parser.add_argument(
"-d", "--detected-object", help="publish detected object", action="store_true"
)
parser.add_argument(
"-t", "--tracked-object", help="publish tracked object", action="store_true"
)
args = parser.parse_args()

app = QApplication(sys.argv)

rclpy.init()
node = PerceptionReplayer(args)

try:
while True:
app.processEvents()
rclpy.spin_once(node, timeout_sec=0.01)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#!/usr/bin/env python3

Check warning on line 1 in planning/planning_debug_tools/scripts/perception_replayer/perception_replayer_common.py

View check run for this annotation

CodeScene Delta Analysis / CodeScene Cloud Delta Analysis (main)

❌ New issue: Bumpy Road Ahead

PerceptionReplayerCommon.find_topics_by_timestamp has 2 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is one single, nested block per function. The Bumpy Road code smell is a function that contains multiple chunks of nested conditional logic. The deeper the nesting and the more bumps, the lower the code health.

# Copyright 2023 TIER IV, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from subprocess import CalledProcessError
from subprocess import check_output
import time

from autoware_auto_perception_msgs.msg import DetectedObjects
from autoware_auto_perception_msgs.msg import PredictedObjects
from autoware_auto_perception_msgs.msg import TrackedObjects
from autoware_auto_perception_msgs.msg import TrafficSignalArray
from nav_msgs.msg import Odometry
import psutil

Check warning on line 27 in planning/planning_debug_tools/scripts/perception_replayer/perception_replayer_common.py

View workflow job for this annotation

GitHub Actions / spell-check-differential

Unknown word (psutil)
from rclpy.node import Node
from rclpy.serialization import deserialize_message
from rosbag2_py import StorageFilter
from rosidl_runtime_py.utilities import get_message
from sensor_msgs.msg import PointCloud2
from utils import open_reader


class PerceptionReplayerCommon(Node):
def __init__(self, args, name):
super().__init__(name)
self.args = args

self.ego_pose = None
self.rosbag_objects_data = []
self.rosbag_ego_odom_data = []
self.rosbag_traffic_signals_data = []

# subscriber
self.sub_odom = self.create_subscription(
Odometry, "/localization/kinematic_state", self.on_odom, 1
)

# publisher
if self.args.detected_object:
self.objects_pub = self.create_publisher(
DetectedObjects, "/perception/object_recognition/detection/objects", 1
)
elif self.args.tracked_object:
self.objects_pub = self.create_publisher(
TrackedObjects, "/perception/object_recognition/tracking/objects", 1
)
else:
self.objects_pub = self.create_publisher(
PredictedObjects, "/perception/object_recognition/objects", 1
)

self.pointcloud_pub = self.create_publisher(
PointCloud2, "/perception/obstacle_segmentation/pointcloud", 1
)
self.traffic_signals_pub = self.create_publisher(
TrafficSignalArray, "/perception/traffic_light_recognition/traffic_signals", 1
)

# load rosbag
print("Stared loading rosbag")
if os.path.isdir(args.bag):
for bag_file in sorted(os.listdir(args.bag)):
self.load_rosbag(args.bag + "/" + bag_file)
else:
self.load_rosbag(args.bag)
print("Ended loading rosbag")

# wait for ready to publish/subscribe
time.sleep(1.0)

def on_odom(self, odom):
self.ego_pose = odom.pose.pose

def load_rosbag(self, rosbag2_path: str):
reader = open_reader(str(rosbag2_path))

topic_types = reader.get_all_topics_and_types()
# Create a map for quicker lookup
type_map = {topic_types[i].name: topic_types[i].type for i in range(len(topic_types))}

objects_topic = (
"/perception/object_recognition/detection/objects"
if self.args.detected_object
else "/perception/object_recognition/tracking/objects"
if self.args.tracked_object
else "/perception/object_recognition/objects"
)
ego_odom_topic = "/localization/kinematic_state"
traffic_signals_topic = "/perception/traffic_light_recognition/traffic_signals"
topic_filter = StorageFilter(topics=[objects_topic, ego_odom_topic, traffic_signals_topic])
reader.set_filter(topic_filter)

while reader.has_next():
(topic, data, stamp) = reader.read_next()
msg_type = get_message(type_map[topic])
msg = deserialize_message(data, msg_type)
if topic == objects_topic:
self.rosbag_objects_data.append((stamp, msg))
if topic == ego_odom_topic:
self.rosbag_ego_odom_data.append((stamp, msg))
if topic == traffic_signals_topic:
self.rosbag_traffic_signals_data.append((stamp, msg))

def kill_online_perception_node(self):
# kill node if required
kill_process_name = None
if self.args.detected_object:
kill_process_name = "dummy_perception_publisher_node"
elif self.args.tracked_object:
kill_process_name = "multi_object_tracker"
else:
kill_process_name = "map_based_prediction"
if kill_process_name:
try:
pid = check_output(["pidof", kill_process_name])

Check warning on line 128 in planning/planning_debug_tools/scripts/perception_replayer/perception_replayer_common.py

View workflow job for this annotation

GitHub Actions / spell-check-differential

Unknown word (pidof)
process = psutil.Process(int(pid[:-1]))

Check warning on line 129 in planning/planning_debug_tools/scripts/perception_replayer/perception_replayer_common.py

View workflow job for this annotation

GitHub Actions / spell-check-differential

Unknown word (psutil)
process.terminate()
except CalledProcessError:
pass

def find_topics_by_timestamp(self, timestamp):
objects_data = None
for data in self.rosbag_objects_data:
if timestamp < data[0]:
objects_data = data[1]
break

traffic_signals_data = None
for data in self.rosbag_traffic_signals_data:
if timestamp < data[0]:
traffic_signals_data = data[1]
break

return objects_data, traffic_signals_data

def find_ego_odom_by_timestamp(self, timestamp):
ego_odom_data = None
for data in self.rosbag_ego_odom_data:
if timestamp < data[0]:
ego_odom_data = data[1]
break

return ego_odom_data
Loading
Loading