diff --git a/.github/workflows/linter_check.yml b/.github/workflows/linter_check.yml new file mode 100644 index 00000000..78f4d37f --- /dev/null +++ b/.github/workflows/linter_check.yml @@ -0,0 +1,36 @@ +name: Lint Check + +on: + # Trigger the workflow on push to main or any pull request. + push: + branches: + - main + pull_request: + +permissions: { } + +jobs: + run-linters: + name: Lint + runs-on: ubuntu-latest + + permissions: + contents: read + packages: read + # To report GitHub Actions status checks + statuses: write + + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Super-linter + uses: super-linter/super-linter@v6.6.0 # x-release-please-version + env: + VALIDATE_ALL_CODEBASE: true + VALIDATE_PYTHON_FLAKE8: true + VALIDATE_PYTHON_RUFF: true + # To report GitHub Actions status checks + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/trufflehog_scan.yml b/.github/workflows/trufflehog_scan.yml new file mode 100644 index 00000000..4e1a4336 --- /dev/null +++ b/.github/workflows/trufflehog_scan.yml @@ -0,0 +1,24 @@ +name: Trufflehog Scan + +on: + # Trigger the workflow on push to main or any pull request. + push: + branches: + - main + pull_request: + +jobs: + run-scan: + name: Run scan + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Secret Scanning + uses: trufflesecurity/trufflehog@main + with: + extra_args: --only-verified diff --git a/README.md b/README.md index 4976e6db..77d76479 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ The official NASA Lunabotics 2024 repository for University of Minnesota Robotics. +[![Lint Check](https://github.com/GOFIRST-Robotics/Lunabotics-2024/actions/workflows/linter_check.yml/badge.svg)](https://github.com/GOFIRST-Robotics/Lunabotics-2024/actions/workflows/linter_check.yml) [![Trufflehog Scan](https://github.com/GOFIRST-Robotics/Lunabotics-2024/actions/workflows/trufflehog_scan.yml/badge.svg)](https://github.com/GOFIRST-Robotics/Lunabotics-2024/actions/workflows/trufflehog_scan.yml) + ```mermaid --- title: Connectivity Chart diff --git a/src/apriltag/launch/apriltag_launch.py b/src/apriltag/launch/apriltag_launch.py index 10f08bb4..8ba84eaa 100644 --- a/src/apriltag/launch/apriltag_launch.py +++ b/src/apriltag/launch/apriltag_launch.py @@ -17,7 +17,10 @@ def generate_launch_description(): plugin="nvidia::isaac_ros::apriltag::AprilTagNode", name="isaac_ros_apriltag", namespace="", - remappings=[("image", "zed2i/zed_node/left/image_rect_color_rgb"), ("camera_info", "zed2i/zed_node/left/camera_info")], + remappings=[ + ("image", "zed2i/zed_node/left/image_rect_color_rgb"), + ("camera_info", "zed2i/zed_node/left/camera_info"), + ], ) image_format_converter_node_left = ComposableNode( @@ -29,7 +32,10 @@ def generate_launch_description(): "encoding_desired": "rgb8", } ], - remappings=[("image_raw", "zed2i/zed_node/left/image_rect_color"), ("image", "zed2i/zed_node/left/image_rect_color_rgb")], + remappings=[ + ("image_raw", "zed2i/zed_node/left/image_rect_color"), + ("image", "zed2i/zed_node/left/image_rect_color_rgb"), + ], ) apriltag_container = ComposableNodeContainer( diff --git a/src/apriltag/setup.py b/src/apriltag/setup.py index a2ac9023..41537c3e 100644 --- a/src/apriltag/setup.py +++ b/src/apriltag/setup.py @@ -1,29 +1,29 @@ from setuptools import find_packages, setup import os from glob import glob -package_name = 'apriltag' + +package_name = "apriltag" setup( name=package_name, - version='0.0.0', - packages=find_packages(exclude=['test']), + version="0.0.0", + packages=find_packages(exclude=["test"]), data_files=[ - ('share/ament_index/resource_index/packages', - ['resource/' + package_name]), - ('share/' + package_name, ['package.xml']), + ("share/ament_index/resource_index/packages", ["resource/" + package_name]), + ("share/" + package_name, ["package.xml"]), ( os.path.join("share", package_name), glob("launch/*launch.[pxy][yma]*", recursive=True), ), ], - install_requires=['setuptools'], + install_requires=["setuptools"], zip_safe=True, - maintainer='umn-robotics', - maintainer_email='robotics@umn.edu', - description='Package for processing apriltag detections received from ISAAC ROS Apriltag.', - license='MIT License', - tests_require=['pytest'], + maintainer="umn-robotics", + maintainer_email="robotics@umn.edu", + description="Package for processing apriltag detections received from ISAAC ROS Apriltag.", + license="MIT License", + tests_require=["pytest"], entry_points={ - 'console_scripts': ["apriltag_node = apriltag.apriltag_node:main"], + "console_scripts": ["apriltag_node = apriltag.apriltag_node:main"], }, ) diff --git a/src/apriltag/test/test_copyright.py b/src/apriltag/test/test_copyright.py index 97a39196..95f03810 100644 --- a/src/apriltag/test/test_copyright.py +++ b/src/apriltag/test/test_copyright.py @@ -17,9 +17,9 @@ # Remove the `skip` decorator once the source file(s) have a copyright header -@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.') +@pytest.mark.skip(reason="No copyright header has been placed in the generated source file.") @pytest.mark.copyright @pytest.mark.linter def test_copyright(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found errors' + rc = main(argv=[".", "test"]) + assert rc == 0, "Found errors" diff --git a/src/apriltag/test/test_flake8.py b/src/apriltag/test/test_flake8.py index 27ee1078..49c1644f 100644 --- a/src/apriltag/test/test_flake8.py +++ b/src/apriltag/test/test_flake8.py @@ -20,6 +20,4 @@ @pytest.mark.linter def test_flake8(): rc, errors = main_with_errors(argv=[]) - assert rc == 0, \ - 'Found %d code style errors / warnings:\n' % len(errors) + \ - '\n'.join(errors) + assert rc == 0, "Found %d code style errors / warnings:\n" % len(errors) + "\n".join(errors) diff --git a/src/apriltag/test/test_pep257.py b/src/apriltag/test/test_pep257.py index b234a384..a2c3deb8 100644 --- a/src/apriltag/test/test_pep257.py +++ b/src/apriltag/test/test_pep257.py @@ -19,5 +19,5 @@ @pytest.mark.linter @pytest.mark.pep257 def test_pep257(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found code style errors / warnings' + rc = main(argv=[".", "test"]) + assert rc == 0, "Found code style errors / warnings" diff --git a/src/drivetrain/drivetrain/drivetrain_node.py b/src/drivetrain/drivetrain/drivetrain_node.py index f636f093..670fc4ba 100644 --- a/src/drivetrain/drivetrain/drivetrain_node.py +++ b/src/drivetrain/drivetrain/drivetrain_node.py @@ -31,20 +31,26 @@ def __init__(self, drive_motor, turning_motor, drivetrain): self.drivetrain = drivetrain def set_power(self, power: float) -> None: - self.drivetrain.cli_motor_set.call_async(MotorCommandSet.Request(can_id=self.drive_motor_can_id, type="duty_cycle", value=power)) + self.drivetrain.cli_motor_set.call_async( + MotorCommandSet.Request(can_id=self.drive_motor_can_id, type="duty_cycle", value=power) + ) def set_angle(self, angle: float) -> None: angle = (360 - angle) % 360 self.drivetrain.cli_motor_set.call_async( MotorCommandSet.Request( - can_id=self.turning_motor_can_id, type="position", value=(angle - self.encoder_offset) * self.drivetrain.STEERING_MOTOR_GEAR_RATIO + can_id=self.turning_motor_can_id, + type="position", + value=(angle - self.encoder_offset) * self.drivetrain.STEERING_MOTOR_GEAR_RATIO, ) ) def reset(self, current_relative_angle) -> None: self.encoder_offset = self.current_absolute_angle - current_relative_angle - self.drivetrain.get_logger().info(f"CAN ID {self.turning_motor_can_id} Absolute Encoder angle offset set to: {self.encoder_offset}") + self.drivetrain.get_logger().info( + f"CAN ID {self.turning_motor_can_id} Absolute Encoder angle offset set to: {self.encoder_offset}" + ) self.set_angle(0) # Rotate the module to the 0 degree position def set_state(self, power: float, angle: float) -> None: @@ -176,22 +182,42 @@ def absolute_angle_reset(self): if self.front_left.current_absolute_angle is not None: print("Absolute Encoder angles reset") - # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. Divide this by the gear ratio to get the wheel position. - front_left_future = self.cli_motor_get.call_async(MotorCommandGet.Request(type="position", can_id=self.FRONT_LEFT_TURN)) - front_left_future.add_done_callback(lambda future: self.front_left.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO)) + # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. + # Divide this by the gear ratio to get the wheel position. + front_left_future = self.cli_motor_get.call_async( + MotorCommandGet.Request(type="position", can_id=self.FRONT_LEFT_TURN) + ) + front_left_future.add_done_callback( + lambda future: self.front_left.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO) + ) + + # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. + # Divide this by the gear ratio to get the wheel position. + front_right_future = self.cli_motor_get.call_async( + MotorCommandGet.Request(type="position", can_id=self.FRONT_RIGHT_TURN) + ) + front_right_future.add_done_callback( + lambda future: self.front_right.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO) + ) - # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. Divide this by the gear ratio to get the wheel position. - front_right_future = self.cli_motor_get.call_async(MotorCommandGet.Request(type="position", can_id=self.FRONT_RIGHT_TURN)) - front_right_future.add_done_callback(lambda future: self.front_right.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO)) + # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. + # Divide this by the gear ratio to get the wheel position. + back_left_future = self.cli_motor_get.call_async( + MotorCommandGet.Request(type="position", can_id=self.BACK_LEFT_TURN) + ) + back_left_future.add_done_callback( + lambda future: self.back_left.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO) + ) - # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. Divide this by the gear ratio to get the wheel position. - back_left_future = self.cli_motor_get.call_async(MotorCommandGet.Request(type="position", can_id=self.BACK_LEFT_TURN)) - back_left_future.add_done_callback(lambda future: self.back_left.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO)) + # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. + # Divide this by the gear ratio to get the wheel position. + back_right_future = self.cli_motor_get.call_async( + MotorCommandGet.Request(type="position", can_id=self.BACK_RIGHT_TURN) + ) + back_right_future.add_done_callback( + lambda future: self.back_right.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO) + ) - # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. Divide this by the gear ratio to get the wheel position. - back_right_future = self.cli_motor_get.call_async(MotorCommandGet.Request(type="position", can_id=self.BACK_RIGHT_TURN)) - back_right_future.add_done_callback(lambda future: self.back_right.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO)) - self.absolute_angle_timer.cancel() # Define subsystem methods here @@ -295,21 +321,41 @@ def drive_callback(self, request, response): def calibrate_callback(self, request, response): """This service request calibrates the drivetrain.""" if self.front_left.current_absolute_angle is not None: - # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. Divide this by the gear ratio to get the wheel position. - front_left_future = self.cli_motor_get.call_async(MotorCommandGet.Request(type="position", can_id=self.FRONT_LEFT_TURN)) - front_left_future.add_done_callback(lambda future: self.front_left.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO)) + # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. + # Divide this by the gear ratio to get the wheel position. + front_left_future = self.cli_motor_get.call_async( + MotorCommandGet.Request(type="position", can_id=self.FRONT_LEFT_TURN) + ) + front_left_future.add_done_callback( + lambda future: self.front_left.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO) + ) - # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. Divide this by the gear ratio to get the wheel position. - front_right_future = self.cli_motor_get.call_async(MotorCommandGet.Request(type="position", can_id=self.FRONT_RIGHT_TURN)) - front_right_future.add_done_callback(lambda future: self.front_right.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO)) + # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. + # Divide this by the gear ratio to get the wheel position. + front_right_future = self.cli_motor_get.call_async( + MotorCommandGet.Request(type="position", can_id=self.FRONT_RIGHT_TURN) + ) + front_right_future.add_done_callback( + lambda future: self.front_right.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO) + ) - # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. Divide this by the gear ratio to get the wheel position. - back_left_future = self.cli_motor_get.call_async(MotorCommandGet.Request(type="position", can_id=self.BACK_LEFT_TURN)) - back_left_future.add_done_callback(lambda future: self.back_left.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO)) + # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. + # Divide this by the gear ratio to get the wheel position. + back_left_future = self.cli_motor_get.call_async( + MotorCommandGet.Request(type="position", can_id=self.BACK_LEFT_TURN) + ) + back_left_future.add_done_callback( + lambda future: self.back_left.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO) + ) - # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. Divide this by the gear ratio to get the wheel position. - back_right_future = self.cli_motor_get.call_async(MotorCommandGet.Request(type="position", can_id=self.BACK_RIGHT_TURN)) - back_right_future.add_done_callback(lambda future: self.back_right.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO)) + # future.result().data will contain the position of the MOTOR (not the wheel) in degrees. + # Divide this by the gear ratio to get the wheel position. + back_right_future = self.cli_motor_get.call_async( + MotorCommandGet.Request(type="position", can_id=self.BACK_RIGHT_TURN) + ) + back_right_future.add_done_callback( + lambda future: self.back_right.reset(future.result().data / self.STEERING_MOTOR_GEAR_RATIO) + ) response.success = 0 # indicates success else: diff --git a/src/drivetrain/test/test_copyright.py b/src/drivetrain/test/test_copyright.py index cc8ff03f..f46f861d 100644 --- a/src/drivetrain/test/test_copyright.py +++ b/src/drivetrain/test/test_copyright.py @@ -19,5 +19,5 @@ @pytest.mark.copyright @pytest.mark.linter def test_copyright(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found errors' + rc = main(argv=[".", "test"]) + assert rc == 0, "Found errors" diff --git a/src/drivetrain/test/test_flake8.py b/src/drivetrain/test/test_flake8.py index 27ee1078..49c1644f 100644 --- a/src/drivetrain/test/test_flake8.py +++ b/src/drivetrain/test/test_flake8.py @@ -20,6 +20,4 @@ @pytest.mark.linter def test_flake8(): rc, errors = main_with_errors(argv=[]) - assert rc == 0, \ - 'Found %d code style errors / warnings:\n' % len(errors) + \ - '\n'.join(errors) + assert rc == 0, "Found %d code style errors / warnings:\n" % len(errors) + "\n".join(errors) diff --git a/src/drivetrain/test/test_pep257.py b/src/drivetrain/test/test_pep257.py index b234a384..a2c3deb8 100644 --- a/src/drivetrain/test/test_pep257.py +++ b/src/drivetrain/test/test_pep257.py @@ -19,5 +19,5 @@ @pytest.mark.linter @pytest.mark.pep257 def test_pep257(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found code style errors / warnings' + rc = main(argv=[".", "test"]) + assert rc == 0, "Found code style errors / warnings" diff --git a/src/gstreamer/gstreamer/client.py b/src/gstreamer/gstreamer/client.py index 7a3e3f8f..1f48ae0a 100644 --- a/src/gstreamer/gstreamer/client.py +++ b/src/gstreamer/gstreamer/client.py @@ -6,10 +6,8 @@ class ClientPlugin(Plugin): def __init__(self, context): super(ClientPlugin, self).__init__(context) self.setObjectName("ClientPlugin") - assert hasattr(context, 'node'), 'Context does not have a node.' + assert hasattr(context, "node"), "Context does not have a node." self._widget = ClientWidget(context.node) if context.serial_number() > 1: - self._widget.setWindowTitle( - self._widget.windowTitle() + (' (%d)' % context.serial_number())) + self._widget.setWindowTitle(self._widget.windowTitle() + (" (%d)" % context.serial_number())) context.add_widget(self._widget) - \ No newline at end of file diff --git a/src/gstreamer/gstreamer/client_gstreamer.py b/src/gstreamer/gstreamer/client_gstreamer.py index ac177d9d..8d2803c1 100644 --- a/src/gstreamer/gstreamer/client_gstreamer.py +++ b/src/gstreamer/gstreamer/client_gstreamer.py @@ -1,8 +1,10 @@ from threading import Thread, Event import gi + gi.require_version("Gst", "1.0") from gi.repository import Gst # noqa: E402 + class GstreamerClient: def __init__(self): # Initialize GStreamer @@ -37,11 +39,10 @@ def __init__(self): vid_conv.link(sink) def init_h265(self, source, queue): - # gst-launch-1.0 udpsrc port=5000 ! "application/x-rtp,payload=96" ! rtph265depay ! h265parse ! queue ! nvv4l2decoder ! nveglglessink + # gst-launch-1.0 udpsrc port=5000 ! "application/x-rtp,payload=96" ! rtph265depay + # ! h265parse ! queue ! nvv4l2decoder ! nveglglessink caps_udp = Gst.ElementFactory.make("capsfilter", "caps_udp") - caps_udp.set_property( - "caps", Gst.Caps.from_string("application/x-rtp,payload=96") - ) + caps_udp.set_property("caps", Gst.Caps.from_string("application/x-rtp,payload=96")) self.pipeline.add(caps_udp) source.link(caps_udp) @@ -56,7 +57,8 @@ def init_h265(self, source, queue): h265parse.link(queue) def init_av1(self, source, queue): - # gst-launch-1.0 udpsrc port=5000 ! "video/x-av1,width=640,height=480,framerate=30/1" ! queue ! nvv4l2decoder ! nvvideoconvert ! ximagesink + # gst-launch-1.0 udpsrc port=5000 ! "video/x-av1,width=640,height=480,framerate=30/1" ! queue + # ! nvv4l2decoder ! nvvideoconvert ! ximagesink caps_v4l2src = Gst.ElementFactory.make("capsfilter", "caps_v4l2src") caps_v4l2src.set_property( "caps", @@ -75,11 +77,12 @@ def run(self): self.stop_event = Event() change_source_thread = Thread() change_source_thread.start() - + def stop(self): self.stop_event.set() self.pipeline.set_state(Gst.State.NULL) + if __name__ == "__main__": client = GstreamerClient() client.run() @@ -88,7 +91,7 @@ def stop(self): change_source_thread.start() while True: try: - message:Gst.Message = client.pipeline.get_bus().timed_pop(Gst.SECOND) + message: Gst.Message = client.pipeline.get_bus().timed_pop(Gst.SECOND) if message is None: pass elif message.type == Gst.MessageType.EOS: @@ -98,4 +101,4 @@ def stop(self): break except KeyboardInterrupt: break - client.stop() \ No newline at end of file + client.stop() diff --git a/src/gstreamer/gstreamer/client_main.py b/src/gstreamer/gstreamer/client_main.py index 0b7b4f2d..046a71d3 100644 --- a/src/gstreamer/gstreamer/client_main.py +++ b/src/gstreamer/gstreamer/client_main.py @@ -5,8 +5,8 @@ def main(): main = Main() - sys.exit(main.main(sys.argv, standalone='gstreamer.client.ClientPlugin')) + sys.exit(main.main(sys.argv, standalone="gstreamer.client.ClientPlugin")) -if __name__ == '__main__': - main() \ No newline at end of file +if __name__ == "__main__": + main() diff --git a/src/gstreamer/gstreamer/client_widget.py b/src/gstreamer/gstreamer/client_widget.py index a15b79cb..455d759e 100644 --- a/src/gstreamer/gstreamer/client_widget.py +++ b/src/gstreamer/gstreamer/client_widget.py @@ -3,7 +3,7 @@ from ament_index_python import get_package_share_directory from python_qt_binding import loadUi from python_qt_binding.QtCore import Slot -from python_qt_binding.QtWidgets import QWidget, QComboBox, QPushButton +from python_qt_binding.QtWidgets import QWidget, QComboBox from rovr_interfaces.srv import SetClientIp, SetActiveCamera, SetEncoding from .client_gstreamer import GstreamerClient import rclpy @@ -12,18 +12,18 @@ import struct from rqt_py_common.extended_combo_box import ExtendedComboBox + class ClientWidget(QWidget): - timeout = 2e9 # 2 seconds with nano seconds as unit + timeout = 2e9 # 2 seconds with nano seconds as unit encodings = ["av1", "h265"] + def __init__(self, node: Node): super(ClientWidget, self).__init__() self.setObjectName("ClientWidget") self.node = node self.display_window = GstreamerClient() self.display_window.run() - ui_file = os.path.join( - get_package_share_directory("gstreamer"), "resource", "gstreamer-select.ui" - ) + ui_file = os.path.join(get_package_share_directory("gstreamer"), "resource", "gstreamer-select.ui") loadUi(ui_file, self, {"ExtendedComboBox": ExtendedComboBox}) network_dropdown: QComboBox = self.findChild(QComboBox, "network_dropdown") self.add_network_interfaces(network_dropdown) @@ -35,7 +35,7 @@ def __init__(self, node: Node): self.on_encoding_push_button_clicked() def add_network_interfaces(self, comboBox: QComboBox): - for _,interface in socket.if_nameindex(): + for _, interface in socket.if_nameindex(): if interface != "lo": comboBox.addItem(interface) comboBox.setCurrentIndex(0) @@ -44,7 +44,7 @@ def get_encodings(self, comboBox: QComboBox): for encoding in self.encodings: comboBox.addItem(encoding) comboBox.setCurrentIndex(0) - + def get_ip_address(self): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) interface = str(self.network_dropdown.currentText()) @@ -52,13 +52,11 @@ def get_ip_address(self): fcntl.ioctl( s.fileno(), 0x8915, - struct.pack( - "256s", interface[:15].encode("utf-8") - ), # SIOCGIFADDR + struct.pack("256s", interface[:15].encode("utf-8")), # SIOCGIFADDR )[20:24] - ) - - def wait_cli(self,cli:Client,req): + ) + + def wait_cli(self, cli: Client, req): future = cli.call_async(req) start_time = self.node.get_clock().now().nanoseconds @@ -68,7 +66,7 @@ def wait_cli(self,cli:Client,req): if not future.done(): print("Service Call Failed") return - + print("Service Call Returned") result = future.result().success if result == -1: @@ -77,10 +75,10 @@ def wait_cli(self,cli:Client,req): print("No encoding set") elif result == -3: print("No camera selected") - - # self.restart_window() + + # self.restart_window() self.node.destroy_client(cli) - + @Slot() def on_camera1_push_button_clicked(self): print("Requesting Camera 1") @@ -92,7 +90,7 @@ def on_camera1_push_button_clicked(self): req.framerate = 30 req.format = "NV12" cli = self.node.create_client(SetActiveCamera, "/set_active_camera") - self.wait_cli(cli,req) + self.wait_cli(cli, req) @Slot() def on_camera2_push_button_clicked(self): @@ -105,7 +103,7 @@ def on_camera2_push_button_clicked(self): req.framerate = 30 req.format = "NV12" cli = self.node.create_client(SetActiveCamera, "/set_active_camera") - self.wait_cli(cli,req) + self.wait_cli(cli, req) @Slot() def on_camera3_push_button_clicked(self): @@ -118,7 +116,7 @@ def on_camera3_push_button_clicked(self): req.framerate = 30 req.format = "NV12" cli = self.node.create_client(SetActiveCamera, "/set_active_camera") - self.wait_cli(cli,req) + self.wait_cli(cli, req) @Slot() def on_camera4_push_button_clicked(self): @@ -131,8 +129,8 @@ def on_camera4_push_button_clicked(self): req.framerate = 30 req.format = "NV12" cli = self.node.create_client(SetActiveCamera, "/set_active_camera") - self.wait_cli(cli,req) - + self.wait_cli(cli, req) + @Slot() def on_camera5_push_button_clicked(self): print("Requesting Camera 5") @@ -144,7 +142,7 @@ def on_camera5_push_button_clicked(self): req.framerate = 30 req.format = "NV12" cli = self.node.create_client(SetActiveCamera, "/set_active_camera") - self.wait_cli(cli,req) + self.wait_cli(cli, req) def restart_window(self): self.display_window.stop() @@ -160,11 +158,11 @@ def on_ip_push_button_clicked(self): print(e) return cli = self.node.create_client(SetClientIp, "/set_client_ip") - self.wait_cli(cli,req) - + self.wait_cli(cli, req) + @Slot() def on_encoding_push_button_clicked(self): req = SetEncoding.Request() req.encoding = str(self.encoding_dropdown.currentText()) cli = self.node.create_client(SetEncoding, "/set_encoding") - self.wait_cli(cli,req) \ No newline at end of file + self.wait_cli(cli, req) diff --git a/src/gstreamer/gstreamer/server_gstreamer.py b/src/gstreamer/gstreamer/server_gstreamer.py index 70e8418e..3390b4b6 100644 --- a/src/gstreamer/gstreamer/server_gstreamer.py +++ b/src/gstreamer/gstreamer/server_gstreamer.py @@ -7,6 +7,7 @@ gi.require_version("Gst", "1.0") from gi.repository import Gst # noqa: E402 + class GstreamerServer: def __init__(self, ip_srv: SetClientIp, camera_srv: SetActiveCamera, encod_srv: SetEncoding): @@ -20,7 +21,7 @@ def __init__(self, ip_srv: SetClientIp, camera_srv: SetActiveCamera, encod_srv: else: camera_src.set_property("device", camera_srv.device) self.pipeline.add(camera_src) - + nonNVvideoconvert = Gst.ElementFactory.make("videoconvert", "videoconvert") self.pipeline.add(nonNVvideoconvert) camera_src.link(nonNVvideoconvert) @@ -36,7 +37,6 @@ def __init__(self, ip_srv: SetClientIp, camera_srv: SetActiveCamera, encod_srv: src_caps.set_property("caps", caps) self.pipeline.add(src_caps) nonNVvideoconvert.link(src_caps) - if platform.machine() == "aarch64": videoconvert = Gst.ElementFactory.make("nvvidconv", "nvvidconv") @@ -58,8 +58,12 @@ def __init__(self, ip_srv: SetClientIp, camera_srv: SetActiveCamera, encod_srv: self.init_av1(videoconvert, udp_sink) def init_h265(self, input, sink): - # gst-launch-1.0 videotestsrc ! 'video/x-raw, width=(int)1920, height=(int)1080, format=(string)NV12, framerate=(fraction)30/1' ! nvvideoconvert ! nvv4l2h265enc ! rtph265pay ! udpsink host=127.0.0.1 port=5000 - # gst-launch-1.0 videotestsrc ! 'video/x-raw, width=(int)640, height=(int)480, format=(string)NV12, framerate=(fraction)30/1' ! nvvideoconvert ! nvv4l2h265enc ! rtph265pay ! udpsink host=127.0.0.1 port=5000 + # gst-launch-1.0 videotestsrc ! 'video/x-raw, width=(int)1920, height=(int)1080, + # format=(string)NV12, framerate=(fraction)30/1' ! nvvideoconvert ! nvv4l2h265enc + # ! rtph265pay! udpsink host=127.0.0.1 port=5000 + # gst-launch-1.0 videotestsrc ! 'video/x-raw, width=(int)640, height=(int)480, + # format=(string)NV12, framerate=(fraction)30/1' ! nvvideoconvert ! nvv4l2h265enc + # ! rtph265pay ! udpsink host=127.0.0.1 port=5000 nvv4l2h265enc = Gst.ElementFactory.make("nvv4l2h265enc", "nvv4l2h265enc") self.pipeline.add(nvv4l2h265enc) input.link(nvv4l2h265enc) @@ -71,7 +75,8 @@ def init_h265(self, input, sink): rtph265pay.link(sink) def init_av1(self, input, sink): - # gst-launch-1.0 videotestsrc ! 'video/x-raw, width=(int)640, height=(int)480, format=(string)NV12, framerate=(fraction)30/1' ! nvvideoconvert ! nvv4l2av1enc ! udpsink host=127.0.0.1 port=5000 + # gst-launch-1.0 videotestsrc ! 'video/x-raw, width=(int)640, height=(int)480, format=(string)NV12, + # framerate=(fraction)30/1' ! nvvideoconvert ! nvv4l2av1enc ! udpsink host=127.0.0.1 port=5000 nvv4l2av1enc = Gst.ElementFactory.make("nvv4l2av1enc", "nvv4l2av1enc") nvv4l2av1enc.set_property("bitrate", 1000000) self.pipeline.add(nvv4l2av1enc) @@ -86,6 +91,7 @@ def run(self): def stop(self): self.pipeline.set_state(Gst.State.NULL) + if __name__ == "__main__": ip = SetClientIp.Request() ip.client_ip = "127.0.0.1" @@ -98,14 +104,14 @@ def stop(self): camera.format = "NV12" encoding = SetEncoding.Request() encoding.encoding = "h265" - server = GstreamerServer(ip,camera,encoding) + server = GstreamerServer(ip, camera, encoding) server.run() stop_event = Event() change_source_thread = Thread() change_source_thread.start() while True: try: - message:Gst.Message = server.pipeline.get_bus().timed_pop(Gst.SECOND) + message: Gst.Message = server.pipeline.get_bus().timed_pop(Gst.SECOND) if message is None: pass elif message.type == Gst.MessageType.EOS: diff --git a/src/gstreamer/gstreamer/server_node.py b/src/gstreamer/gstreamer/server_node.py index 2e34fd9b..b872f111 100644 --- a/src/gstreamer/gstreamer/server_node.py +++ b/src/gstreamer/gstreamer/server_node.py @@ -2,24 +2,19 @@ from rclpy.node import Node from rovr_interfaces.srv import SetClientIp, SetActiveCamera, SetEncoding from .server_gstreamer import GstreamerServer -import time + class ServerNode(Node): g_server: GstreamerServer = None ip_srv: SetClientIp = None camera_srv: SetActiveCamera = None encod_srv: SetEncoding = None + def __init__(self): super().__init__("server") - self.serv_set_client_ip = self.create_service( - SetClientIp, "/set_client_ip", self.set_client_ip_callback - ) - self.serv_set_active_camera = self.create_service( - SetActiveCamera, "/set_active_camera", self.set_active_camera - ) - self.serv_set_encoding = self.create_service( - SetEncoding, "/set_encoding", self.set_encoding - ) + self.serv_set_client_ip = self.create_service(SetClientIp, "/set_client_ip", self.set_client_ip_callback) + self.serv_set_active_camera = self.create_service(SetActiveCamera, "/set_active_camera", self.set_active_camera) + self.serv_set_encoding = self.create_service(SetEncoding, "/set_encoding", self.set_encoding) def set_client_ip_callback(self, request: SetClientIp, response): print("recieved ip request") @@ -32,7 +27,7 @@ def set_active_camera(self, request: SetActiveCamera, response): self.camera_srv = request response.success = self.restart_server() return response - + def set_encoding(self, request: SetEncoding, response): print("recieved encoding request") self.encod_srv = request @@ -58,6 +53,7 @@ def restart_server(self): print("Server restarted") return 0 + def main(args=None): rclpy.init(args=args) server = ServerNode() diff --git a/src/gstreamer/test/test_copyright.py b/src/gstreamer/test/test_copyright.py index 97a39196..95f03810 100644 --- a/src/gstreamer/test/test_copyright.py +++ b/src/gstreamer/test/test_copyright.py @@ -17,9 +17,9 @@ # Remove the `skip` decorator once the source file(s) have a copyright header -@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.') +@pytest.mark.skip(reason="No copyright header has been placed in the generated source file.") @pytest.mark.copyright @pytest.mark.linter def test_copyright(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found errors' + rc = main(argv=[".", "test"]) + assert rc == 0, "Found errors" diff --git a/src/gstreamer/test/test_flake8.py b/src/gstreamer/test/test_flake8.py index 27ee1078..49c1644f 100644 --- a/src/gstreamer/test/test_flake8.py +++ b/src/gstreamer/test/test_flake8.py @@ -20,6 +20,4 @@ @pytest.mark.linter def test_flake8(): rc, errors = main_with_errors(argv=[]) - assert rc == 0, \ - 'Found %d code style errors / warnings:\n' % len(errors) + \ - '\n'.join(errors) + assert rc == 0, "Found %d code style errors / warnings:\n" % len(errors) + "\n".join(errors) diff --git a/src/gstreamer/test/test_pep257.py b/src/gstreamer/test/test_pep257.py index b234a384..a2c3deb8 100644 --- a/src/gstreamer/test/test_pep257.py +++ b/src/gstreamer/test/test_pep257.py @@ -19,5 +19,5 @@ @pytest.mark.linter @pytest.mark.pep257 def test_pep257(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found code style errors / warnings' + rc = main(argv=[".", "test"]) + assert rc == 0, "Found code style errors / warnings" diff --git a/src/isaac_ros/isaac_ros_launch/launch/gazebo_launch.py b/src/isaac_ros/isaac_ros_launch/launch/gazebo_launch.py index 8cbaaf01..167fc78d 100644 --- a/src/isaac_ros/isaac_ros_launch/launch/gazebo_launch.py +++ b/src/isaac_ros/isaac_ros_launch/launch/gazebo_launch.py @@ -15,9 +15,7 @@ def generate_launch_description(): ) ld = IncludeLaunchDescription( - PythonLaunchDescriptionSource( - PathJoinSubstitution([FindPackageShare("isaac_ros_launch"), "isaac_launch.py"]) - ), + PythonLaunchDescriptionSource(PathJoinSubstitution([FindPackageShare("isaac_ros_launch"), "isaac_launch.py"])), launch_arguments={ "setup_for_zed": "False", "setup_for_gazebo": "True", diff --git a/src/isaac_ros/isaac_ros_launch/launch/zed2i.launch.py b/src/isaac_ros/isaac_ros_launch/launch/zed2i.launch.py index 76941817..2029b9f7 100644 --- a/src/isaac_ros/isaac_ros_launch/launch/zed2i.launch.py +++ b/src/isaac_ros/isaac_ros_launch/launch/zed2i.launch.py @@ -103,7 +103,8 @@ def generate_launch_description(): " service call ", "/zed2i/zed_node/start_svo_rec ", "zed_interfaces/srv/StartSvoRec ", - f"\"{{compression_mode: 2, bitrate: 10000, svo_filename: '{svo_filename}'}}\"", # Tune this bitrate to adjust file size + # Tune this bitrate to adjust file size + f"\"{{compression_mode: 2, bitrate: 10000, svo_filename: '{svo_filename}'}}\"", ] ], shell=True, diff --git a/src/isaac_ros/isaac_ros_launch/setup.py b/src/isaac_ros/isaac_ros_launch/setup.py index 0b77abcc..9bb678ac 100644 --- a/src/isaac_ros/isaac_ros_launch/setup.py +++ b/src/isaac_ros/isaac_ros_launch/setup.py @@ -23,7 +23,6 @@ description="Package for ISAAC_ROS launch files", tests_require=["pytest"], entry_points={ - "console_scripts": [ - ], + "console_scripts": [], }, ) diff --git a/src/motor_control/launch/motor_control_launch.py b/src/motor_control/launch/motor_control_launch.py index 57dad3ec..637417e6 100644 --- a/src/motor_control/launch/motor_control_launch.py +++ b/src/motor_control/launch/motor_control_launch.py @@ -12,7 +12,7 @@ def generate_launch_description(): output="screen", emulate_tty=True, ) - + ld.add_action(motor_control) return ld diff --git a/src/rovr_control/launch/main_launch.py b/src/rovr_control/launch/main_launch.py index d0a2e9e0..7598bcb9 100644 --- a/src/rovr_control/launch/main_launch.py +++ b/src/rovr_control/launch/main_launch.py @@ -37,21 +37,21 @@ def generate_launch_description(): output="screen", emulate_tty=True, ) - + skimmer = Node( package="skimmer", executable="skimmer_node", name="skimmer_node", parameters=["config/motor_control.yaml"], output="screen", - ) - + ) + read_serial = Node( package="rovr_control", executable="read_serial", name="read_serial", ) - + can_bus = Node( package="ros2socketcan_bridge", executable="ros2socketcan", diff --git a/src/rovr_control/launch/main_no_joysticks_launch.py b/src/rovr_control/launch/main_no_joysticks_launch.py index eecc8952..f7e4e842 100644 --- a/src/rovr_control/launch/main_no_joysticks_launch.py +++ b/src/rovr_control/launch/main_no_joysticks_launch.py @@ -31,7 +31,7 @@ def generate_launch_description(): output="screen", emulate_tty=True, ) - + skimmer = Node( package="skimmer", executable="skimmer_node", @@ -39,13 +39,13 @@ def generate_launch_description(): parameters=["config/motor_control.yaml"], output="screen", ) - + read_serial = Node( package="rovr_control", executable="read_serial", name="read_serial", ) - + can_bus = Node( package="ros2socketcan_bridge", executable="ros2socketcan", diff --git a/src/rovr_control/rovr_control/main_control_node.py b/src/rovr_control/rovr_control/main_control_node.py index 9d5066be..96f45eb7 100644 --- a/src/rovr_control/rovr_control/main_control_node.py +++ b/src/rovr_control/rovr_control/main_control_node.py @@ -29,10 +29,10 @@ from scipy.spatial.transform import Rotation as R # Import our logitech gamepad button mappings -from .gamepad_constants import * +from . import gamepad_constants as bindings # Uncomment the line below to use the Xbox controller mappings instead -# from .xbox_controller_constants import * +# from . import xbox_controller_constants as bindings # GLOBAL VARIABLES # @@ -78,9 +78,15 @@ def __init__(self) -> None: self.skimmer_belt_power = self.get_parameter("skimmer_belt_power").value self.skimmer_lift_manual_power = self.get_parameter("skimmer_lift_manual_power").value self.autonomous_field_type = self.get_parameter("autonomous_field_type").value - self.lift_dumping_position = self.get_parameter("lift_dumping_position").value * 360 / 42 # Convert encoder counts to degrees - self.lift_digging_start_position = self.get_parameter("lift_digging_start_position").value * 360 / 42 # Convert encoder counts to degrees - self.lift_digging_end_position = self.get_parameter("lift_digging_end_position").value * 360 / 42 # Convert encoder counts to degrees + self.lift_dumping_position = ( + self.get_parameter("lift_dumping_position").value * 360 / 42 + ) # Convert encoder counts to degrees + self.lift_digging_start_position = ( + self.get_parameter("lift_digging_start_position").value * 360 / 42 + ) # Convert encoder counts to degrees + self.lift_digging_end_position = ( + self.get_parameter("lift_digging_end_position").value * 360 / 42 + ) # Convert encoder counts to degrees # Print the ROS Parameters to the terminal below # self.get_logger().info("autonomous_driving_power has been set to: " + str(self.autonomous_driving_power)) @@ -108,7 +114,9 @@ def __init__(self) -> None: # Define important map locations if self.autonomous_field_type == "top": - self.autonomous_berm_location = create_pose_stamped(7.25, -3.2, 90) # TODO: Test this location in simulation + self.autonomous_berm_location = create_pose_stamped( + 7.25, -3.2, 90 + ) # TODO: Test this location in simulation self.dig_location = create_pose_stamped(6.2, -1.2, 0) elif self.autonomous_field_type == "bottom": self.autonomous_berm_location = create_pose_stamped(7.25, -1.4, 270) @@ -138,7 +146,9 @@ def __init__(self) -> None: # Define publishers and subscribers here self.drive_power_publisher = self.create_publisher(Twist, "cmd_vel", 10) self.joy_subscription = self.create_subscription(Joy, "joy", self.joystick_callback, 10) - self.skimmer_goal_subscription = self.create_subscription(Bool, "/skimmer/goal_reached", self.skimmer_goal_callback, 10) + self.skimmer_goal_subscription = self.create_subscription( + Bool, "/skimmer/goal_reached", self.skimmer_goal_callback, 10 + ) self.started_calibration = False self.field_calibrated = False @@ -168,7 +178,10 @@ def __init__(self) -> None: # return None # i = dig_zone_start + robot_width # while i <= dig_zone_end - robot_width: - # if costmap.getDigCost(i, dig_zone_border_y, robot_width_pixels, dig_zone_depth) <= self.DANGER_THRESHOLD: + # if ( + # costmap.getDigCost(i, dig_zone_border_y, robot_width_pixels, dig_zone_depth) + # <= self.DANGER_THRESHOLD + # ): # available_dig_spots.append(create_pose_stamped(i, -dig_zone_border_y, 270)) # i += robot_width # else: @@ -210,7 +223,9 @@ async def calibrate_field_coordinates(self) -> None: while not self.cli_drivetrain_drive.wait_for_service(): # Wait for the drivetrain services to be available self.get_logger().warn("Waiting for drivetrain services to become available...") await asyncio.sleep(0.1) - await self.cli_drivetrain_drive.call_async(Drive.Request(forward_power=0.0, horizontal_power=0.0, turning_power=0.3)) + await self.cli_drivetrain_drive.call_async( + Drive.Request(forward_power=0.0, horizontal_power=0.0, turning_power=0.3) + ) while not self.field_calibrated: future = self.cli_set_apriltag_odometry.call_async(ResetOdom.Request()) future.add_done_callback(self.future_odom_callback) @@ -223,14 +238,15 @@ async def calibrate_field_coordinates(self) -> None: self.apriltag_timer.cancel() self.end_autonomous() # Return to Teleop mode - # TODO: This autonomous routine has not been tested yet! async def auto_dig_procedure(self) -> None: """This method lays out the procedure for autonomously digging!""" self.get_logger().info("\nStarting Autonomous Digging Procedure!") try: # Wrap the autonomous procedure in a try-except await self.cli_lift_zero.call_async(Stop.Request()) - await self.cli_lift_setPosition.call_async(SetPosition.Request(position=self.lift_digging_start_position)) # Lower the skimmer onto the ground + await self.cli_lift_setPosition.call_async( + SetPosition.Request(position=self.lift_digging_start_position) + ) # Lower the skimmer onto the ground self.skimmer_goal_reached = False # Wait for the goal height to be reached while not self.skimmer_goal_reached: @@ -243,7 +259,9 @@ async def auto_dig_procedure(self) -> None: # accelerate for 2 seconds while elapsed < 2e9: await self.cli_lift_set_power.call_async(SetPower.Request(power=-0.05e-9 * (elapsed))) - await self.cli_drivetrain_drive.call_async(Drive.Request(forward_power=0.0, horizontal_power=0.25e-9 * (elapsed), turning_power=0.0)) + await self.cli_drivetrain_drive.call_async( + Drive.Request(forward_power=0.0, horizontal_power=0.25e-9 * (elapsed), turning_power=0.0) + ) self.get_logger().info("Accelerating lift and drive train") elapsed = self.get_clock().now().nanoseconds - start_time await asyncio.sleep(0.1) # Allows other async tasks to continue running (this is non-blocking) @@ -253,7 +271,9 @@ async def auto_dig_procedure(self) -> None: await asyncio.sleep(0.1) # Allows other async tasks to continue running (this is non-blocking) await self.cli_drivetrain_stop.call_async(Stop.Request()) await self.cli_skimmer_stop.call_async(Stop.Request()) - await self.cli_lift_setPosition.call_async(SetPosition.Request(position=self.lift_dumping_position)) # Raise the skimmer back up + await self.cli_lift_setPosition.call_async( + SetPosition.Request(position=self.lift_dumping_position) + ) # Raise the skimmer back up self.skimmer_goal_reached = False # Wait for the lift goal to be reached while not self.skimmer_goal_reached: @@ -271,7 +291,9 @@ async def auto_offload_procedure(self) -> None: self.get_logger().info("\nStarting Autonomous Offload Procedure!") try: # Wrap the autonomous procedure in a try-except # Drive backward into the berm zone - await self.cli_drivetrain_drive.call_async(Drive.Request(forward_power=0.0, horizontal_power=-0.25, turning_power=0.0)) + await self.cli_drivetrain_drive.call_async( + Drive.Request(forward_power=0.0, horizontal_power=-0.25, turning_power=0.0) + ) start_time = self.get_clock().now().nanoseconds while self.get_clock().now().nanoseconds - start_time < 10e9: self.get_logger().info("Auto Driving") @@ -300,7 +322,8 @@ async def auto_cycle_procedure(self) -> None: """This method lays out the procedure for doing a complete autonomous cycle!""" self.get_logger().info("\nStarting an Autonomous Cycle!") try: # Wrap the autonomous procedure in a try-except - ## Navigate to the dig_location, run the dig procedure, then navigate to the berm zone and run the offload procedure ## + # Navigate to the dig_location, run the dig procedure, + # then navigate to the berm zone and run the offload procedure if not self.field_calibrated: self.get_logger().error("Field coordinates must be calibrated first!") self.end_autonomous() # Return to Teleop mode @@ -346,47 +369,49 @@ def joystick_callback(self, msg: Joy) -> None: if self.state == states["Teleop"]: # Drive the robot using joystick input during Teleop - forward_power = msg.axes[RIGHT_JOYSTICK_VERTICAL_AXIS] * self.max_drive_power # Forward power - horizontal_power = msg.axes[RIGHT_JOYSTICK_HORIZONTAL_AXIS] * self.max_drive_power # Horizontal power - turn_power = msg.axes[LEFT_JOYSTICK_HORIZONTAL_AXIS] * self.max_turn_power # Turning power + forward_power = msg.axes[bindings.RIGHT_JOYSTICK_VERTICAL_AXIS] * self.max_drive_power # Forward power + horizontal_power = ( + msg.axes[bindings.RIGHT_JOYSTICK_HORIZONTAL_AXIS] * self.max_drive_power + ) # Horizontal power + turn_power = msg.axes[bindings.LEFT_JOYSTICK_HORIZONTAL_AXIS] * self.max_turn_power # Turning power self.drive_power_publisher.publish( Twist(linear=Vector3(x=forward_power, y=horizontal_power), angular=Vector3(z=turn_power)) ) # Check if the skimmer button is pressed # - if msg.buttons[X_BUTTON] == 1 and buttons[X_BUTTON] == 0: + if msg.buttons[bindings.X_BUTTON] == 1 and buttons[bindings.X_BUTTON] == 0: self.cli_skimmer_toggle.call_async(SetPower.Request(power=self.skimmer_belt_power)) # Check if the reverse skimmer button is pressed # - if msg.buttons[Y_BUTTON] == 1 and buttons[Y_BUTTON] == 0: + if msg.buttons[bindings.Y_BUTTON] == 1 and buttons[bindings.Y_BUTTON] == 0: self.cli_skimmer_setPower.call_async(SetPower.Request(power=-self.skimmer_belt_power)) # Check if the lift dumping position button is pressed # - if msg.buttons[B_BUTTON] == 1 and buttons[B_BUTTON] == 0: + if msg.buttons[bindings.B_BUTTON] == 1 and buttons[bindings.B_BUTTON] == 0: self.cli_lift_setPosition.call_async(SetPosition.Request(position=self.lift_dumping_position)) # Check if the lift digging position button is pressed # - if msg.buttons[A_BUTTON] == 1 and buttons[A_BUTTON] == 0: + if msg.buttons[bindings.A_BUTTON] == 1 and buttons[bindings.A_BUTTON] == 0: self.cli_lift_setPosition.call_async(SetPosition.Request(position=self.lift_digging_start_position)) # Manually adjust the height of the skimmer with the left and right triggers - if msg.buttons[RIGHT_TRIGGER] == 1 and buttons[RIGHT_TRIGGER] == 0: + if msg.buttons[bindings.RIGHT_TRIGGER] == 1 and buttons[bindings.RIGHT_TRIGGER] == 0: self.cli_lift_set_power.call_async(SetPower.Request(power=self.skimmer_lift_manual_power)) - elif msg.buttons[RIGHT_TRIGGER] == 0 and buttons[RIGHT_TRIGGER] == 1: + elif msg.buttons[bindings.RIGHT_TRIGGER] == 0 and buttons[bindings.RIGHT_TRIGGER] == 1: self.cli_lift_stop.call_async(Stop.Request()) - elif msg.buttons[LEFT_TRIGGER] == 1 and buttons[LEFT_TRIGGER] == 0: + elif msg.buttons[bindings.LEFT_TRIGGER] == 1 and buttons[bindings.LEFT_TRIGGER] == 0: self.cli_lift_set_power.call_async(SetPower.Request(power=-self.skimmer_lift_manual_power)) - elif msg.buttons[LEFT_TRIGGER] == 0 and buttons[LEFT_TRIGGER] == 1: + elif msg.buttons[bindings.LEFT_TRIGGER] == 0 and buttons[bindings.LEFT_TRIGGER] == 1: self.cli_lift_stop.call_async(Stop.Request()) # Check if the calibration button is pressed - if msg.buttons[RIGHT_BUMPER] == 1 and buttons[RIGHT_BUMPER] == 0: + if msg.buttons[bindings.RIGHT_BUMPER] == 1 and buttons[bindings.RIGHT_BUMPER] == 0: self.cli_drivetrain_calibrate.call_async(Stop.Request()) # THE CONTROLS BELOW ALWAYS WORK # # Check if the Apriltag calibration button is pressed - if msg.buttons[START_BUTTON] == 1 and buttons[START_BUTTON] == 0: + if msg.buttons[bindings.START_BUTTON] == 1 and buttons[bindings.START_BUTTON] == 0: # Start the field calibration process if self.apriltag_timer.is_canceled(): self.started_calibration = False @@ -400,7 +425,7 @@ def joystick_callback(self, msg: Joy) -> None: self.end_autonomous() # Return to Teleop mode # Check if the autonomous digging button is pressed - if msg.buttons[BACK_BUTTON] == 1 and buttons[BACK_BUTTON] == 0: + if msg.buttons[bindings.BACK_BUTTON] == 1 and buttons[bindings.BACK_BUTTON] == 0: if self.state == states["Teleop"]: self.stop_all_subsystems() # Stop all subsystems self.state = states["Autonomous"] @@ -412,7 +437,7 @@ def joystick_callback(self, msg: Joy) -> None: self.autonomous_digging_process = None # Check if the autonomous offload button is pressed - if msg.buttons[LEFT_BUMPER] == 1 and buttons[LEFT_BUMPER] == 0: + if msg.buttons[bindings.LEFT_BUMPER] == 1 and buttons[bindings.LEFT_BUMPER] == 0: if self.state == states["Teleop"]: self.stop_all_subsystems() # Stop all subsystems self.state = states["Autonomous"] diff --git a/src/rovr_control/setup.py b/src/rovr_control/setup.py index 4d61c16a..c0da311e 100644 --- a/src/rovr_control/setup.py +++ b/src/rovr_control/setup.py @@ -24,9 +24,11 @@ license="MIT License", tests_require=["pytest"], entry_points={ - "console_scripts": [{ - "main_control_node = rovr_control.main_control_node:main", - "read_serial = rovr_control.read_serial:main", - }], + "console_scripts": [ + { + "main_control_node = rovr_control.main_control_node:main", + "read_serial = rovr_control.read_serial:main", + } + ], }, ) diff --git a/src/rovr_control/test/test_copyright.py b/src/rovr_control/test/test_copyright.py index cc8ff03f..f46f861d 100644 --- a/src/rovr_control/test/test_copyright.py +++ b/src/rovr_control/test/test_copyright.py @@ -19,5 +19,5 @@ @pytest.mark.copyright @pytest.mark.linter def test_copyright(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found errors' + rc = main(argv=[".", "test"]) + assert rc == 0, "Found errors" diff --git a/src/rovr_control/test/test_flake8.py b/src/rovr_control/test/test_flake8.py index 27ee1078..49c1644f 100644 --- a/src/rovr_control/test/test_flake8.py +++ b/src/rovr_control/test/test_flake8.py @@ -20,6 +20,4 @@ @pytest.mark.linter def test_flake8(): rc, errors = main_with_errors(argv=[]) - assert rc == 0, \ - 'Found %d code style errors / warnings:\n' % len(errors) + \ - '\n'.join(errors) + assert rc == 0, "Found %d code style errors / warnings:\n" % len(errors) + "\n".join(errors) diff --git a/src/rovr_control/test/test_pep257.py b/src/rovr_control/test/test_pep257.py index b234a384..a2c3deb8 100644 --- a/src/rovr_control/test/test_pep257.py +++ b/src/rovr_control/test/test_pep257.py @@ -19,5 +19,5 @@ @pytest.mark.linter @pytest.mark.pep257 def test_pep257(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found code style errors / warnings' + rc = main(argv=[".", "test"]) + assert rc == 0, "Found code style errors / warnings" diff --git a/src/skimmer/skimmer/ros_check_load.py b/src/skimmer/skimmer/ros_check_load.py index 1c741440..1bb89bbf 100644 --- a/src/skimmer/skimmer/ros_check_load.py +++ b/src/skimmer/skimmer/ros_check_load.py @@ -31,7 +31,8 @@ def __init__(self): self.bridge = CvBridge() self.pub = self.create_publisher(Bool, "readyDump", 10) self.prior_checks = [] - depth_image_topic = "/skimmer/camera/depth/image_rect_raw" # The launch file should remap so the realsense publishes to this topic + # The launch file should remap so the realsense publishes to this topic + depth_image_topic = "/skimmer/camera/depth/image_rect_raw" self.getSkimmerHeight = self.create_subscription(Float32, skimmer_height_topic, self.setHeight, 10) self.oneTimeSub = self.create_subscription(Image, depth_image_topic, self.setParamCallback, 10) self.subscriber = self.create_subscription(Image, depth_image_topic, self.depth_image_callback, 10) @@ -54,8 +55,8 @@ def depth_image_callback(self, msg): self.depth_image = self.bridge.imgmsg_to_cv2(msg, msg.encoding) def publish_distance(self): - """does the actual math to determine if the skimmer is ready to offload. Publishes a bool to the readyDump topic. - Called every POLLRATE seconds. + """does the actual math to determine if the skimmer is ready to offload. + Publishes a bool to the readyDump topic. Called every POLLRATE seconds. Will kill the node after 5 seconds of not receiving a depth image / throwing consecutive errors.""" if self.depth_image is None: self.get_logger().fatal("Camera not working") diff --git a/src/skimmer/skimmer/skimmer_node.py b/src/skimmer/skimmer/skimmer_node.py index f33cde82..b570992b 100644 --- a/src/skimmer/skimmer/skimmer_node.py +++ b/src/skimmer/skimmer/skimmer_node.py @@ -63,7 +63,9 @@ def __init__(self): self.current_goal_position = 0 # Current position of the lift motor in degrees self.current_position_degrees = 0 # Relative encoders always initialize to 0 - # Goal Threshold (if abs(self.current_goal_position - ACTUAL VALUE) <= self.goal_threshold then we should publish True to /skimmer/goal_reached) + # Goal Threshold + # if abs(self.current_goal_position - ACTUAL VALUE) <= self.goal_threshold, + # then we should publish True to /skimmer/goal_reached self.goal_threshold = 320 # in degrees of the motor # TODO: Tune this threshold if needed # Current state of the lift system self.lift_running = False @@ -182,14 +184,15 @@ def zero_lift_callback(self, request, response): # Define timer callback methods here def timer_callback(self): """Publishes whether or not the current goal position has been reached.""" - # This MotorCommandGet service call will return a future object, that will eventually contain the position in degrees + # This service call will return a future object, that will eventually contain the position in degrees future = self.cli_motor_get.call_async(MotorCommandGet.Request(type="position", can_id=self.SKIMMER_LIFT_MOTOR)) future.add_done_callback(self.done_callback) def done_callback(self, future): self.current_position_degrees = future.result().data goal_reached_msg = Bool( - data=abs(self.current_goal_position + self.lift_encoder_offset - self.current_position_degrees) <= self.goal_threshold + data=abs(self.current_goal_position + self.lift_encoder_offset - self.current_position_degrees) + <= self.goal_threshold ) self.publisher_goal_reached.publish(goal_reached_msg) diff --git a/src/skimmer/test/test_copyright.py b/src/skimmer/test/test_copyright.py index cc8ff03f..f46f861d 100644 --- a/src/skimmer/test/test_copyright.py +++ b/src/skimmer/test/test_copyright.py @@ -19,5 +19,5 @@ @pytest.mark.copyright @pytest.mark.linter def test_copyright(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found errors' + rc = main(argv=[".", "test"]) + assert rc == 0, "Found errors" diff --git a/src/skimmer/test/test_flake8.py b/src/skimmer/test/test_flake8.py index 27ee1078..49c1644f 100644 --- a/src/skimmer/test/test_flake8.py +++ b/src/skimmer/test/test_flake8.py @@ -20,6 +20,4 @@ @pytest.mark.linter def test_flake8(): rc, errors = main_with_errors(argv=[]) - assert rc == 0, \ - 'Found %d code style errors / warnings:\n' % len(errors) + \ - '\n'.join(errors) + assert rc == 0, "Found %d code style errors / warnings:\n" % len(errors) + "\n".join(errors) diff --git a/src/skimmer/test/test_pep257.py b/src/skimmer/test/test_pep257.py index b234a384..a2c3deb8 100644 --- a/src/skimmer/test/test_pep257.py +++ b/src/skimmer/test/test_pep257.py @@ -19,5 +19,5 @@ @pytest.mark.linter @pytest.mark.pep257 def test_pep257(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found code style errors / warnings' + rc = main(argv=[".", "test"]) + assert rc == 0, "Found code style errors / warnings"