Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: AutoPilot Manager: Split autopilot in platforms (Serial PX4 support part 5) #2943

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/services/ardupilot_manager/api/v1/routers/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

from autopilot_manager import AutoPilotManager
from exceptions import InvalidFirmwareFile
from typedefs import Firmware, FlightController, Parameters, Serial, SITLFrame, Vehicle
from flight_controller import FlightController
from typedefs import Firmware, Parameters, Serial, SITLFrame, Vehicle

index_router_v1 = APIRouter(
tags=["index_v1"],
Expand Down
117 changes: 117 additions & 0 deletions core/services/ardupilot_manager/autopilot/environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@

import abc
from typing import List, Optional, Type

from loguru import logger

from autopilot.exceptions import InvalidEnvironmentImplementation
from autopilot.firmware import AutopilotFirmware
from flight_controller import FlightController
from mavlink_proxy.Endpoint import Endpoint, EndpointType
from mavlink_proxy.exceptions import EndpointAlreadyExists
from mavlink_proxy.Manager import Manager as MavlinkManager


class AutopilotEnvironment(abc.ABC):
def __init__(self, owner: str) -> None:
self.owner = owner
self.mavlink_manager: Optional[MavlinkManager] = None

@classmethod
def all(cls) -> List[str]:
return [subclass.__name__ for subclass in cls.__subclasses__()]

@classmethod
def get(cls, name: str) -> Type["AutopilotEnvironment"]:
for subclass in cls.__subclasses__():
if subclass.__name__ == name:
return subclass()
raise InvalidEnvironmentImplementation(f"{name} is not a valid environment implementation class")

async def start_mavlink_manager(self, device: Endpoint) -> None:
default_endpoints = [
Endpoint(
name="GCS Server Link",
owner=self.owner,
connection_type=EndpointType.UDPServer,
place="0.0.0.0",
argument=14550,
persistent=True,
enabled=False,
),
Endpoint(
name="GCS Client Link",
owner=self.owner,
connection_type=EndpointType.UDPClient,
place="192.168.2.1",
argument=14550,
persistent=True,
enabled=True,
),
Endpoint(
name="MAVLink2RestServer",
owner=self.owner,
connection_type=EndpointType.UDPServer,
place="127.0.0.1",
argument=14001,
persistent=True,
protected=True,
),
Endpoint(
name="MAVLink2Rest",
owner=self.owner,
connection_type=EndpointType.UDPClient,
place="127.0.0.1",
argument=14000,
persistent=True,
protected=True,
overwrite_settings=True,
),
Endpoint(
name="Internal Link",
owner=self.owner,
connection_type=EndpointType.TCPServer,
place="127.0.0.1",
argument=5777,
persistent=True,
protected=True,
overwrite_settings=True,
),
Endpoint(
name="Ping360 Heading",
owner=self.owner,
connection_type=EndpointType.UDPServer,
place="0.0.0.0",
argument=14660,
persistent=True,
protected=True,
),
]
for endpoint in default_endpoints:
try:
self.mavlink_manager.add_endpoint(endpoint)
except EndpointAlreadyExists:
pass
except Exception as error:
logger.warning(str(error))
await self.mavlink_manager.start(device)

@abc.abstractmethod
async def start(self) -> None:
raise NotImplementedError

@abc.abstractmethod
async def stop(self) -> None:
raise NotImplementedError

@abc.abstractmethod
async def restart(self) -> None:
raise NotImplementedError

@abc.abstractmethod
async def setup(self) -> None:
raise NotImplementedError

@abc.abstractmethod
def boards(self, firmware: AutopilotFirmware) -> Optional[FlightController]:
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from autopilot.environment import AutopilotEnvironment

class LinuxAutopilotEnvironment(AutopilotEnvironment):
pass
44 changes: 44 additions & 0 deletions core/services/ardupilot_manager/autopilot/environments/serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Optional
from autopilot.environment import AutopilotEnvironment
from autopilot.firmware import AutopilotFirmware
from flight_controller import FlightController
from mavlink_proxy.Endpoint import Endpoint, EndpointType

class SerialEnvironment(AutopilotEnvironment):
def __init__(self, owner: str, board: FlightController) -> None:
self.board: FlightController = board
super().__init__(owner)

async def start(self) -> None:
if not self.board.path:
raise ValueError(f"Could not find device path for board {self.board.name}.")

baudrate = 115200
is_px4 = "px4" in self.board.name.lower()
if is_px4:
baudrate = 57600
await self.start_mavlink_manager(
Endpoint(
name="Master",
owner=self.owner,
connection_type=EndpointType.Serial,
place=self.board.path,
argument=baudrate,
protected=True,
)
)
if is_px4:
# PX4 needs at least one initial heartbeat to start sending data
await self.vehicle_manager.burst_heartbeat()

def stop(self) -> None:
raise NotImplementedError

def restart(self) -> None:
raise NotImplementedError

def setup(self) -> None:
raise NotImplementedError

def boards(self, firmware: AutopilotFirmware) -> Optional[FlightController]:
raise NotImplementedError
18 changes: 18 additions & 0 deletions core/services/ardupilot_manager/autopilot/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

class InvalidEnvironmentImplementation(ValueError):
"""Required platform implementation class does not exists"""

class InvalidFirmwareImplementation(ValueError):
"""Required firmware implementation class does not exists"""

class InvalidAutopilotManifestData(ValueError):
"""Invalid manifest data"""

class InvalidAutopilotManifestURL(ValueError):
"""Invalid manifest URL"""

class ManifestDataFetchFailed(RuntimeError):
"""Failed to fetch manifest data"""

class BackendIsOffline(RuntimeError):
"""Backend is offline"""
20 changes: 20 additions & 0 deletions core/services/ardupilot_manager/autopilot/firmware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from typing import List, Optional, Type

from autopilot.exceptions import InvalidFirmwareImplementation
from flight_controller import FlightController


class AutopilotFirmware:
@classmethod
def all(cls) -> List[str]:
return [subclass.__name__ for subclass in cls.__subclasses__()]

@classmethod
def get(cls, name: str) -> Optional[Type["AutopilotFirmware"]]:
for subclass in cls.__subclasses__():
if subclass.__name__ == name:
return subclass()
raise InvalidFirmwareImplementation(f"{name} is not a valid firmware implementation class")

def supported_boards(self) -> List[FlightController]:
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from autopilot.firmware import AutopilotFirmware


class ArdupilotFirmware(AutopilotFirmware):
pass
Loading
Loading