Skip to content

Commit

Permalink
fix(nebula_launch.py): update the generic launch file to forward all …
Browse files Browse the repository at this point in the history
…arguments to the correct vendor's launch file
  • Loading branch information
mojomex committed Sep 4, 2024
1 parent 456cb1e commit 007e9a3
Showing 1 changed file with 75 additions and 196 deletions.
271 changes: 75 additions & 196 deletions nebula_ros/launch/nebula_launch.py
Original file line number Diff line number Diff line change
@@ -1,211 +1,90 @@
# Copyright 2024 TIER IV, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import warnings

from ament_index_python.packages import get_package_share_directory
import launch
from launch.actions import DeclareLaunchArgument
from launch.actions import GroupAction
from launch.actions import IncludeLaunchDescription
from launch.actions import OpaqueFunction
from launch.conditions import LaunchConfigurationEquals
from launch.conditions import LaunchConfigurationNotEquals
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import ComposableNodeContainer
from launch_ros.actions import LoadComposableNodes
from launch_ros.descriptions import ComposableNode
import yaml


def get_sensor_make(sensor_name):
if sensor_name[:6].lower() == "pandar":
return "Hesai", ".csv"
elif sensor_name[:3].lower() in ["hdl", "vlp", "vls"]:
return "Velodyne", ".yaml"
elif sensor_name.lower() in ["helios", "bpearl"]:
return "Robosense", None
elif sensor_name.lower() == "ars548":
return "Continental", None
return "unrecognized_sensor_model", None


def get_plugin_name(sensor_make, sensor_model):
if sensor_make.lower() != "continental":
return sensor_make
elif sensor_model.lower() == "ars548":
return "ContinentalARS548"
else:
return "invalid_plugin"


def is_hw_monitor_available(sensor_make):
return sensor_make.lower() != "continental"


def launch_setup(context, *args, **kwargs):
# Model and make
from launch.substitutions import ThisLaunchFileDir
from launch_xml.launch_description_sources import XMLLaunchDescriptionSource

SENSOR_MODELS_VELODYNE = ["VLP16", "VLP32", "VLS128"]
SENSOR_MODELS_HESAI = [
"Pandar40P",
"Pandar64",
"Pandar128E4X",
"PandarAT128",
"PandarQT64",
"PandarQT128",
"PandarXT32",
"PandarXT32M",
]
SENSOR_MODELS_ROBOSENSE = ["Bpearl", "Helios"]
SENSOR_MODELS_CONTINENTAL = ["ARS548", "SRR520"]

SENSOR_MODELS = (
SENSOR_MODELS_VELODYNE
+ SENSOR_MODELS_HESAI
+ SENSOR_MODELS_ROBOSENSE
+ SENSOR_MODELS_CONTINENTAL
)


def launch_setup(context: launch.LaunchContext, *args, **kwargs):
sensor_model = LaunchConfiguration("sensor_model").perform(context)
calibration_file = LaunchConfiguration("calibration_file").perform(context)
sensor_make, sensor_extension = get_sensor_make(sensor_model)

if sensor_make.lower() == "hesai":
raise ValueError(
"\n `nebula_launch.py` is deprecated. For Hesai sensors, use `hesai_launch_all_hw.xml` instead."
)

nebula_decoders_share_dir = get_package_share_directory("nebula_decoders")
nebula_ros_share_dir = get_package_share_directory("nebula_ros")

# Config
sensor_params_fp = LaunchConfiguration("config_file").perform(context)
if sensor_params_fp == "":
warnings.warn("No config file provided, using sensor model default", RuntimeWarning)
sensor_params_fp = os.path.join(
nebula_ros_share_dir, "config", sensor_make.lower(), sensor_model + ".yaml"
)

if not os.path.exists(sensor_params_fp):
sensor_params_fp = os.path.join(nebula_ros_share_dir, "config", "BaseParams.yaml")
assert os.path.exists(
sensor_params_fp
), "Sensor params yaml file under config/ was not found: {}".format(sensor_params_fp)

sensor_calib_fp = sensor_corr_fp = ""
if sensor_extension is not None: # Velodyne and Hesai
sensor_calib_fp = os.path.join(
nebula_decoders_share_dir,
"calibration",
sensor_make.lower(),
sensor_model + sensor_extension,
)
assert os.path.exists(
sensor_calib_fp
), "Sensor calib file under calibration/ was not found: {}".format(sensor_calib_fp)

if sensor_model.lower() == "pandarat128":
sensor_corr_fp = os.path.splitext(sensor_calib_fp)[0] + ".dat"
assert os.path.exists(
sensor_corr_fp
), "Sensor corr file under calibration/ was not found: {}".format(sensor_corr_fp)

with open(sensor_params_fp, "r") as f:
sensor_params = yaml.safe_load(f)["/**"]["ros__parameters"]
nodes = []
launch_hw = LaunchConfiguration("launch_hw").perform(context) == "true"

if launch_hw:
nodes.append(
# HwInterface
ComposableNode(
package="nebula_ros",
plugin=get_plugin_name(sensor_make, sensor_model) + "HwInterfaceRosWrapper",
name=sensor_make.lower() + "_hw_interface_ros_wrapper_node",
parameters=[
sensor_params,
{
"sensor_model": LaunchConfiguration("sensor_model"),
"sensor_ip": LaunchConfiguration("sensor_ip"),
"return_mode": LaunchConfiguration("return_mode"),
"calibration_file": calibration_file or sensor_calib_fp,
"setup_sensor": LaunchConfiguration("setup_sensor"),
"ptp_profile": LaunchConfiguration("ptp_profile"),
"ptp_domain": LaunchConfiguration("ptp_domain"),
"ptp_transport_type": LaunchConfiguration("ptp_transport_type"),
"ptp_switch_type": LaunchConfiguration("ptp_switch_type"),
},
],
),
)

if launch_hw and is_hw_monitor_available(sensor_make):
nodes.append(
# HwMonitor
ComposableNode(
package="nebula_ros",
plugin=sensor_make + "HwMonitorRosWrapper",
name=sensor_make.lower() + "_hw_monitor_ros_wrapper_node",
parameters=[
sensor_params,
{
"sensor_model": sensor_model,
"sensor_ip": LaunchConfiguration("sensor_ip"),
"return_mode": LaunchConfiguration("return_mode"),
"calibration_file": calibration_file or sensor_calib_fp,
},
],
)
launch_args = {
"sensor_model": sensor_model,
}

if sensor_model in SENSOR_MODELS_VELODYNE:
vendor_launch_file = "velodyne_launch_all_hw.xml"
elif sensor_model in SENSOR_MODELS_HESAI:
vendor_launch_file = "hesai_launch_all_hw.xml"
elif sensor_model in SENSOR_MODELS_ROBOSENSE:
vendor_launch_file = "robosense_launch_all_hw.xml"
elif sensor_model in SENSOR_MODELS_CONTINENTAL:
vendor_launch_file = "continental_launch_all_hw.xml"
else:
raise KeyError(f"Sensor model {sensor_model} does not have an associated launch file")

vendor_launch_file = os.path.join(ThisLaunchFileDir().perform(context), vendor_launch_file)
if not os.path.isfile(vendor_launch_file):
raise FileNotFoundError(f"Could not find launch file {vendor_launch_file}")

# Any launch arguments not specified here are implicitly forwarded to the included launch description.
# Thus, passing arguments like `config_file` still works, it is just not appearing here.
# If it would be here, even if empty and not passed in `launch_arguments`,
# it would cause the default substitution in the included launch file to fail.
return [
IncludeLaunchDescription(
XMLLaunchDescriptionSource(vendor_launch_file),
launch_arguments=launch_args.items(),
)
nodes.append(
ComposableNode(
package="nebula_ros",
plugin=get_plugin_name(sensor_make, sensor_model) + "DriverRosWrapper",
name=sensor_make.lower() + "_driver_ros_wrapper_node",
parameters=[
sensor_params,
{
"sensor_model": sensor_model,
"sensor_ip": LaunchConfiguration("sensor_ip"),
"return_mode": LaunchConfiguration("return_mode"),
"calibration_file": calibration_file or sensor_calib_fp,
"launch_hw": LaunchConfiguration("launch_hw"),
"ptp_profile": LaunchConfiguration("ptp_profile"),
"ptp_domain": LaunchConfiguration("ptp_domain"),
"ptp_transport_type": LaunchConfiguration("ptp_transport_type"),
"ptp_switch_type": LaunchConfiguration("ptp_switch_type"),
},
],
),
)

loader = LoadComposableNodes(
condition=LaunchConfigurationNotEquals("container", ""),
composable_node_descriptions=nodes,
target_container=LaunchConfiguration("container"),
)

container_kwargs = {}
if LaunchConfiguration("debug_logging").perform(context) == "true":
container_kwargs["ros_arguments"] = ["--log-level", "debug"]

container = ComposableNodeContainer(
name="nebula_ros_node",
namespace="",
package="rclcpp_components",
executable="component_container",
composable_node_descriptions=nodes,
output="screen",
condition=LaunchConfigurationEquals("container", ""),
**container_kwargs,
)

group = GroupAction(
[
container,
loader,
]
)

return [group]
]


def generate_launch_description():
def add_launch_arg(name: str, default_value=None):
return DeclareLaunchArgument(name, default_value=default_value)

return launch.LaunchDescription(
[
add_launch_arg("container", ""),
add_launch_arg("config_file", ""),
add_launch_arg("sensor_model", ""),
add_launch_arg("sensor_ip", "192.168.1.201"),
add_launch_arg("calibration_file", ""),
add_launch_arg("correction_file", ""),
add_launch_arg("return_mode", "Dual"),
add_launch_arg("launch_hw", "true"),
add_launch_arg("setup_sensor", "true"),
add_launch_arg("debug_logging", "false"),
add_launch_arg("ptp_profile", "1588v2"),
add_launch_arg("ptp_domain", "0"),
add_launch_arg("ptp_transport_type", "UDP"),
add_launch_arg("ptp_switch_type", "TSN"),
DeclareLaunchArgument(
"sensor_model",
choices=SENSOR_MODELS,
description="The sensor model for which to launch Nebula",
),
OpaqueFunction(function=launch_setup),
]
+ [OpaqueFunction(function=launch_setup)]
)

0 comments on commit 007e9a3

Please sign in to comment.