-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow task and fleet updates over ROS 2 #1003
Conversation
Signed-off-by: Aaron Chong <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's still not clear where the connection issues come from, there is a chance that upgrading dependencies would have fixed it. There is also a chance that this doesn't fix it as we have seen ros2 messages being significantly delayed before as well.
That said, I'm not opposed to using ros2, but I think we should only receive updates from a single source, there should be an option to choose which source to use.
I fully agree that focusing on a single transport mode would be the cleanest way to handle it. For updating dependencies, we could be limited to half of the equation only, we could update rmf-web as we go, but as the cpp stack uses https://github.com/zaphoyd/websocketpp, which is generally stable but won't be having any updates any time soon.
It does not resolve the root cause of the issue, but that could actually be infrastructure for all we know and therefore there is nothing we can do to properly diagnose and provide a solution for. However allowing the option to use ROS 2 will at least circumvent the symptoms of stalling all the components that use websockets (API server, fleet adapter, task dispatcher), and require manual restarting of these components in a production environment.
I agree with this too, however we might have situations where there are fleet adapters running on the same system as the API server (also encountering websocket connection issues) and could benefit from rmw shared memory transport by using ROS 2, whereas there are fleet adapters running in the cloud somewhere that would like to use websockets instead. The fleet adapters will only have 1 mode of transport to choose from either websockets or ROS 2, but not both, and we can plaster as many warnings or deprecation notices as we want on the ROS 2 endpoints |
I see that there is multiple way this can play out.
I think there is value in supporting both transport simultaneously so I prefer going with 1. |
|
||
async def save_alert(alert_request: AlertRequest): | ||
try: | ||
created_alert = await self._alert_repo.create_new_alert( | ||
alert_request | ||
) | ||
except AlreadyExistsError as e: | ||
self._logger.error(e) | ||
return | ||
self._alert_events.alert_requests.on_next(created_alert) | ||
|
||
if task_state.status == TaskStatus.completed: | ||
alert_request = AlertRequest( | ||
id=str(uuid4()), | ||
unix_millis_alert_time=round(datetime.now().timestamp() * 1000), | ||
title="Task completed", | ||
subtitle=f"ID: {task_state.booking.id}", | ||
message="", | ||
display=True, | ||
tier=AlertRequest.Tier.Info, | ||
responses_available=["Acknowledge"], | ||
alert_parameters=[], | ||
task_id=task_state.booking.id, | ||
) | ||
self._loop.create_task(save_alert(alert_request)) | ||
elif task_state.status == TaskStatus.failed: | ||
errorMessage = "" | ||
if ( | ||
task_state.dispatch is not None | ||
and task_state.dispatch.status == DispatchStatus.failed_to_assign | ||
): | ||
errorMessage += "Failed to assign\n" | ||
if task_state.dispatch.errors is not None: | ||
for error in task_state.dispatch.errors: | ||
errorMessage += error.json() + "\n" | ||
|
||
alert_request = AlertRequest( | ||
id=str(uuid4()), | ||
unix_millis_alert_time=round(datetime.now().timestamp() * 1000), | ||
title="Task failed", | ||
subtitle=f"ID: {task_state.booking.id}", | ||
message=errorMessage, | ||
display=True, | ||
tier=AlertRequest.Tier.Error, | ||
responses_available=["Acknowledge"], | ||
alert_parameters=[], | ||
task_id=task_state.booking.id, | ||
) | ||
self._loop.create_task(save_alert(alert_request)) | ||
|
||
task_state_update_sub = self._ros_node.create_subscription( | ||
StringMsg, "task_state_update", handle_task_state_update, 10 | ||
) | ||
self._subscriptions.append(task_state_update_sub) | ||
|
||
def handle_task_log_update(msg): | ||
msg = cast(StringMsg, msg) | ||
json_msg = json.loads(msg.data) | ||
|
||
async def save(task_event_log: TaskEventLog): | ||
await self._task_repo.save_task_log(task_event_log) | ||
self._task_events.task_event_logs.on_next(task_event_log) | ||
|
||
self._loop.create_task(save(TaskEventLog.model_validate(json_msg["data"]))) | ||
|
||
task_log_update_sub = self._ros_node.create_subscription( | ||
StringMsg, "task_log_update", handle_task_log_update, 10 | ||
) | ||
self._subscriptions.append(task_log_update_sub) | ||
|
||
def handle_fleet_state_update(msg): | ||
msg = cast(StringMsg, msg) | ||
json_msg = json.loads(msg.data) | ||
|
||
async def save(fleet_state: FleetState): | ||
await self._fleet_repo.save_fleet_state(fleet_state) | ||
self._fleet_events.fleet_states.on_next(fleet_state) | ||
|
||
self._loop.create_task(save(FleetState.model_validate(json_msg["data"]))) | ||
|
||
fleet_state_update_sub = self._ros_node.create_subscription( | ||
StringMsg, "fleet_state_update", handle_fleet_state_update, 10 | ||
) | ||
self._subscriptions.append(fleet_state_update_sub) | ||
|
||
def handle_fleet_log_update(msg): | ||
msg = cast(StringMsg, msg) | ||
json_msg = json.loads(msg.data) | ||
|
||
async def save(fleet_log: FleetLog): | ||
await self._fleet_repo.save_fleet_log(fleet_log) | ||
self._fleet_events.fleet_logs.on_next(fleet_log) | ||
|
||
self._loop.create_task(save(FleetLog.model_validate(json_msg["data"]))) | ||
|
||
fleet_log_update_sub = self._ros_node.create_subscription( | ||
StringMsg, "fleet_log_update", handle_fleet_log_update, 10 | ||
) | ||
self._subscriptions.append(fleet_log_update_sub) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refactor so both the internal route and this share the same code.
With the discussion to allow publishing over ROS 2 regardless of the availability of websockets, open-rmf/rmf_ros2#383, we will inevitably run into issues where the API server will be receiving duplicated information if a locally running fleet adapter is provided websockets access. Apologies for the back-and-forth, we can retain the current behavior of using websockets only on rmf-web for now, until there is a more tangible requirement for supporting both websockets and ROS 2. For now, with the fleet adapters and task dispatcher publishing over ROS 2 by default, since the original motivation is to provide a workaround for some obscure issue, users/collaborators can reference this PR to implement the changes they require for their deployments. Closing this PR |
What's new
Sets up ROS 2 subscriptions for fleet and task related updates, that were solely on websockets
_internal
route in the past.The motivation is related to #899, where non-reproducible issues with
websockets
occur unpredictably in production environments, causing various systems to fail.This allows the option to mitigate issues with the upstream websocketpp library, and just rely on ROS 2 to publish updates instead.
Works with open-rmf/rmf_ros2#383
Self-checks