diff --git a/Changelog.md b/Changelog.md index 43379377..3b5630f9 100644 --- a/Changelog.md +++ b/Changelog.md @@ -2,6 +2,13 @@ ### Ignition Launch 1.X.X +1. Changed the websocket messages to be more efficient when transmitting + protobuf messages. + * [Pull Request 22](https://github.com/ignitionrobotics/ign-launch/pull/22) + +1. Added ability to get protobuf definitions from the websocket server + * [Pull Request 21](https://github.com/ignitionrobotics/ign-launch/pull/21) + 1. Added ability to set the publication rate of the websocket server. * [Pull Request 17](https://github.com/ignitionrobotics/ign-launch/pull/17) diff --git a/plugins/websocket_server/CMakeLists.txt b/plugins/websocket_server/CMakeLists.txt index 35e85081..47a183a1 100644 --- a/plugins/websocket_server/CMakeLists.txt +++ b/plugins/websocket_server/CMakeLists.txt @@ -3,6 +3,17 @@ if (websockets_FOUND) set (sources WebsocketServer.cc) add_library(${plugin} SHARED ${sources}) + + file (READ combined.proto WEBSOCKETSERVER_MESSAGE_DEFINITIONS) + configure_file("MessageDefinitions.hh.in" + "${CMAKE_CURRENT_BINARY_DIR}/MessageDefinitions.hh" @ONLY) + # Add a dependency on binary source dir so that you can change the + # combinded.proto file and `make` will rebuild the plugin. + set_property(DIRECTORY APPEND PROPERTY CMAKE_CONFIGURE_DEPENDS combined.proto) + target_include_directories(${plugin} PRIVATE + $ + ) + target_link_libraries(${plugin} PRIVATE ${websockets_LIBRARIES} diff --git a/plugins/websocket_server/MessageDefinitions.hh.in b/plugins/websocket_server/MessageDefinitions.hh.in new file mode 100644 index 00000000..c6038d00 --- /dev/null +++ b/plugins/websocket_server/MessageDefinitions.hh.in @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2020 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGEDEFINITIONS_HH_ +#define IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGEDEFINITIONS_HH_ + +namespace ignition +{ + namespace launch + { + /// \brief All of the protobuf messages provided by the websocket + /// server. + /// + /// \todo Get this information from a running simulation. The problem + /// right now is that Ignition Transport does not show subscribers. + static const std::string kWebSocketServerMessageDefinitions = + R"protos(@WEBSOCKETSERVER_MESSAGE_DEFINITIONS@)protos"; + } +} +#endif diff --git a/plugins/websocket_server/WebsocketServer.cc b/plugins/websocket_server/WebsocketServer.cc index 8216cde9..db818fee 100644 --- a/plugins/websocket_server/WebsocketServer.cc +++ b/plugins/websocket_server/WebsocketServer.cc @@ -15,14 +15,31 @@ * */ +#include #include #include #include +#include "MessageDefinitions.hh" #include "WebsocketServer.hh" using namespace ignition::launch; +/// \brief Construct a websocket frame header. +/// \param[in] _op The operation string. +/// \param[in] _topic The topic name string. +/// \param[in] _type The message type string. +/// \return A string that is the frame header. +#define BUILD_HEADER(_op, _topic, _type) ((_op)+","+(_topic)+","+(_type)+",") + +/// \brief Construction a complete websocket frame. +/// \param[in] _op The operation string. +/// \param[in] _topic The topic name string. +/// \param[in] _type The message type string. +/// \param[in] _payload The complete payload string. +/// \return A string that is the frame header. +#define BUILD_MSG(_op, _topic, _type, _payload) (BUILD_HEADER(_op, _topic, _type) + _payload) + int rootCallback(struct lws *_wsi, enum lws_callback_reasons _reason, void * /*user*/, @@ -268,13 +285,31 @@ void WebsocketServer::OnDisconnect(int _socketId) ////////////////////////////////////////////////// void WebsocketServer::OnMessage(int _socketId, const std::string &_msg) { - ignition::msgs::WebRequest requestMsg; - requestMsg.ParseFromString(_msg); + // Frame: operation,topic,type,payload + std::vector frameParts = common::split(_msg, ","); + + // Check for a valid frame. + if (frameParts.size() != 4 && + // Count the number of commas to handle a frame like "sub,,," + std::count(_msg.begin(), _msg.end(), ',') != 3) + { + ignerr << "Received an invalid frame with " << frameParts.size() + << "components when 4 is expected.\n"; + return; + } - if (requestMsg.operation() == "list") + // Handle the case where the client requests the message definitions. + if (frameParts[0] == "protos") + { + igndbg << "Protos request received\n"; + this->QueueMessage(this->connections[_socketId].get(), + kWebSocketServerMessageDefinitions.c_str(), + kWebSocketServerMessageDefinitions.length()); + } + else if (frameParts[0] == "topics") { igndbg << "Topic list request recieved\n"; - ignition::msgs::Packet msg; + ignition::msgs::StringMsg_V msg; std::vector topics; @@ -283,23 +318,24 @@ void WebsocketServer::OnMessage(int _socketId, const std::string &_msg) // Store the topics in a message and serialize the message. for (const std::string &topic : topics) - msg.mutable_string_msg_v()->add_data(topic); + msg.add_data(topic); - std::string data = msg.SerializeAsString(); + std::string data = BUILD_MSG(this->operations[PUBLISH], frameParts[0], + std::string("ignition.msgs.StringMsg_V"), msg.SerializeAsString()); // Queue the message for delivery. this->QueueMessage(this->connections[_socketId].get(), data.c_str(), data.length()); } - else if (requestMsg.operation() == "subscribe") + else if (frameParts[0] == "sub") { // Store the relation of socketId to subscribed topic. - this->topicConnections[requestMsg.topic()].insert(_socketId); - this->topicTimestamps[requestMsg.topic()] = + this->topicConnections[frameParts[1]].insert(_socketId); + this->topicTimestamps[frameParts[1]] = std::chrono::steady_clock::now() - this->publishPeriod; - igndbg << "Subscribe request to topic[" << requestMsg.topic() << "]\n"; - this->node.SubscribeRaw(requestMsg.topic(), + igndbg << "Subscribe request to topic[" << frameParts[1] << "]\n"; + this->node.SubscribeRaw(frameParts[1], std::bind(&WebsocketServer::OnWebsocketSubscribedMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); @@ -316,44 +352,35 @@ void WebsocketServer::OnWebsocketSubscribedMessage( if (iter != this->topicConnections.end()) { + std::lock_guard mainLock(this->subscriptionMutex); std::chrono::time_point systemTime = std::chrono::steady_clock::now(); - ignition::msgs::Packet msg; - msg.set_topic(_info.Topic()); - msg.set_type(_info.Type()); - std::chrono::nanoseconds timeDelta = systemTime - this->topicTimestamps[_info.Topic()]; if (timeDelta > this->publishPeriod) { - if (_info.Type() == "ignition.msgs.CmdVel2D") - msg.mutable_cmd_vel2d()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.Image") - msg.mutable_image()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.StringMsg_V") - msg.mutable_string_msg_v()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.WebRequest") - msg.mutable_web_request()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.Pose") - msg.mutable_pose()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.Pose_V") - msg.mutable_pose_v()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.Time") - msg.mutable_time()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.Clock") - msg.mutable_clock()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.WorldStatistics") - msg.mutable_world_stats()->ParseFromArray(_data, _size); + // Get the header, or build a new header if it doesn't exist. + auto header = this->publishHeaders.find(_info.Topic()); + if (header == this->publishHeaders.end()) + { + this->publishHeaders[_info.Topic()] = BUILD_HEADER( + this->operations[PUBLISH], _info.Topic(), _info.Type()); + header = this->publishHeaders.find(_info.Topic()); + } + // Store the last time this topic was published. this->topicTimestamps[_info.Topic()] = systemTime; - std::string data = msg.SerializeAsString(); + // Construct the final message. + std::string msg = header->second + std::string(_data, _size); + + // Send the message for (const int &socketId : iter->second) { this->QueueMessage(this->connections[socketId].get(), - data.c_str(), data.length()); + msg.c_str(), msg.length()); } } } diff --git a/plugins/websocket_server/WebsocketServer.hh b/plugins/websocket_server/WebsocketServer.hh index f7249325..1126b8a2 100644 --- a/plugins/websocket_server/WebsocketServer.hh +++ b/plugins/websocket_server/WebsocketServer.hh @@ -35,8 +35,55 @@ namespace ignition /// \brief Reads from a USB joystick device and outputs /// ignition::msgs::WebsocketServer messages. /// + /// # Websocket Server Interface + /// + /// The websocket server listens for incoming requests and sends + /// messages based on the requests. + /// + /// All messages on the websocket, incoming and outgoing, are structured + /// in a frame that consists of four comma separated components: + /// 1. `operation`: string, + /// 2. `topic_name`: string, + /// 3. `message_type`: string, and + /// 4. `payload`: serialized data. + /// + /// The `operation` component is mandatory and must be one of: + /// 1. "sub": Subscribe to the topic in the `topic_name` component, + /// 2. "pub": Publish a message from the Ingition Transport topic in + /// the `topic_name` component, + /// 3. "topics": Get the list of available topics, and + /// 4. "protos": Get a string containing all the protobuf + /// definitions. + /// + /// The `topic_name` component is mandatory for the "sub" and "pub" + /// operations. If present, it must be the name of an Ignition Transport + /// topic. + /// + /// The `message_type` component is mandatory for the "pub" operation. If + /// present it names the Ignition Message type, such as + /// "ignition.msgs.Clock". + /// + /// The `payload` component is mandatory for the "pub" operation. If + /// present, it contains a serialized string of an Igntion Message. + /// + /// ## Example frames + /// + /// 1. Get the list of topics: `topics,,,` + /// + /// 2. Get the protobuf definitions: `protos,,,` + /// + /// 3. Subscribe to the "/clock" topic: `sub,/clock,,` + /// + /// 4. Websocker server publishing data on the "/clock" topic: + /// `pub,/clock,ignition.msgs.Clock,` + /// /// # Example usage /// + /// ## Websocket Server + /// + /// 1. Define a launch file by copying the following contents to a file + /// called `websocket.ign`. + /// /// /// @@ -44,6 +91,13 @@ namespace ignition /// /// 30 /// + /// + /// 2. Run the launch file + /// + /// `ign launch -v 4 websocket.ign` + /// + /// 3. Open the [index.html](https://github.com/ignitionrobotics/ign-launch/blob/master/plugins/websocket_server/index.html) webpage. + /// class WebsocketServer : public ignition::launch::Plugin { /// \brief Constructor @@ -64,11 +118,11 @@ namespace ignition public: void OnConnect(int _socketId); public: void OnDisconnect(int _socketId); + public: void OnMessage(int _socketId, const std::string &_msg); public: void OnRequestMessage(int _socketId, const std::string &_msg); - private: ignition::transport::Node node; private: bool run = true; @@ -88,7 +142,17 @@ namespace ignition const char *_data, const size_t _size); public: std::mutex mutex; + + /// \brief A mutex used in the OnWebsocketSubscribedMessage + /// function. + public: std::mutex subscriptionMutex; + + /// \brief All of the websocket connections. public: std::map> connections; + + /// \brief All of the subscribed Ignition topics. + /// The key is the topic name, and the value is the set of websocket + /// connections that have subscribed to the topic. public: std::map> topicConnections; /// \brief Time of last publication for each subscribed topic. The key @@ -98,6 +162,33 @@ namespace ignition std::chrono::time_point> topicTimestamps; + /// \brief The set of valid operations. This enum must align with the + /// `operations` member variable. + private: enum Operation + { + /// \brief Subscribe to a topic. + SUBSCRIBE = 0, + + /// \brief Publish a message from a topic. + PUBLISH = 1, + + /// \brief Get the list of topics. + TOPICS = 2, + + /// \brief Get the protobuf definitions. + PROTOS = 3, + }; + + /// \brief The set of valid operations, in string form. These values + /// can be sent in websocket message frames. + /// These valus must align with the `Operation` enum. + private: std::vector operations{ + "sub", "pub", "topics", "protos"}; + + /// \brief Store publish headers for topics. This is here to improve + /// performance. Keys are topic names and values are frame headers. + private: std::map publishHeaders; + /// \brief Period at which messages will be published on the websocket /// for each subscribed topic. /// \sa topicTimestamps. diff --git a/plugins/websocket_server/combined.proto b/plugins/websocket_server/combined.proto new file mode 100644 index 00000000..1070d002 --- /dev/null +++ b/plugins/websocket_server/combined.proto @@ -0,0 +1,125 @@ +syntax = "proto3"; + +package ignition.msgs; + +message Time { + int64 sec = 1; + int32 nsec = 2; +} + +message Clock +{ + Header header = 1; + Time system = 2; + Time real = 3; + Time sim = 4; +} +message Header { + message Map { + string key = 1; + repeated string value = 2; + } + Time stamp = 1; + repeated Map data = 2; +} +message WebRequest +{ + Header header = 1; + string operation = 2; + string topic = 3; + string msg_type = 4; + string compression = 5; + double hz = 6; +} +message StringMsg_V +{ + repeated string data = 2; +} +message CmdVel2D +{ + Header header = 1; + double velocity = 2; + double theta = 3; +} +enum PixelFormatType +{ + UNKNOWN_PIXEL_FORMAT = 0; + L_INT8 = 1; + L_INT16 = 2; + RGB_INT8 = 3; + RGBA_INT8 = 4; + BGRA_INT8 = 5; + RGB_INT16 = 6; + RGB_INT32 = 7; + BGR_INT8 = 8; + BGR_INT16 = 9; + BGR_INT32 = 10; + R_FLOAT16 = 11; + RGB_FLOAT16 = 12; + R_FLOAT32 = 13; + RGB_FLOAT32 = 14; + BAYER_RGGB8 = 15; + BAYER_RGGR8 = 16; + BAYER_GBRG8 = 17; + BAYER_GRBG8 = 18; +} + +message Image +{ + Header header = 1; + uint32 width = 2; + uint32 height = 3; + uint32 pixel_format = 4; + uint32 step = 5; + bytes data = 6; +} +message Vector3d +{ + Header header = 1; + double x = 2; + double y = 3; + double z = 4; +} +message Pose +{ + Header header = 1; + string name = 2; + uint32 id = 3; + Vector3d position = 4; + Quaternion orientation = 5; +} +message Quaternion +{ + Header header = 1; + double x = 2; + double y = 3; + double z = 4; + double w = 5; +} +message Double_V +{ + repeated double data = 1; +} +message Pose_V +{ + Header header = 1; + repeated Pose pose = 2; +} +message Packet +{ + string topic = 1; + string type = 2; + + oneof content + { + CmdVel2D cmd_vel2d = 3; + Image image = 4; + StringMsg_V string_msg_v = 5; + WebRequest web_request = 6; + Pose pose = 7; + Double_V doublev = 8; + Pose_V pose_v = 9; + Time time = 10; + Clock clock = 11; + } +} diff --git a/plugins/websocket_server/eventemitter2.min.js b/plugins/websocket_server/eventemitter2.min.js new file mode 100644 index 00000000..a5af1eda --- /dev/null +++ b/plugins/websocket_server/eventemitter2.min.js @@ -0,0 +1 @@ +!function(e){function t(){this._events={},this._conf&&i.call(this,this._conf)}function i(e){e&&(this._conf=e,e.delimiter&&(this.delimiter=e.delimiter),e.maxListeners&&(this._events.maxListeners=e.maxListeners),e.wildcard&&(this.wildcard=e.wildcard),e.newListener&&(this.newListener=e.newListener),this.wildcard&&(this.listenerTree={}))}function s(e){this._events={},this.newListener=!1,i.call(this,e)}function n(e,t,i,s){if(!i)return[];var r,l,o,h,a,f,c,_=[],p=t.length,u=t[s],v=t[s+1];if(s===p&&i._listeners){if("function"==typeof i._listeners)return e&&e.push(i._listeners),[i];for(r=0,l=i._listeners.length;r0&&n._listeners.length>h&&(n._listeners.warned=!0,console.error("(node) warning: possible EventEmitter memory leak detected. %d listeners added. Use emitter.setMaxListeners() to increase limit.",n._listeners.length),console.trace())}}else n._listeners=t;return!0}r=e.shift()}return!0}var l=Array.isArray?Array.isArray:function(e){return"[object Array]"===Object.prototype.toString.call(e)},o=10;s.prototype.delimiter=".",s.prototype.setMaxListeners=function(e){this._events||t.call(this),this._events.maxListeners=e,this._conf||(this._conf={}),this._conf.maxListeners=e},s.prototype.event="",s.prototype.once=function(e,t){return this.many(e,1,t),this},s.prototype.many=function(e,t,i){function s(){0==--t&&n.off(e,s),i.apply(this,arguments)}var n=this;if("function"!=typeof i)throw new Error("many only accepts instances of Function");return s._origin=i,this.on(e,s),n},s.prototype.emit=function(){this._events||t.call(this);var e=arguments[0];if("newListener"===e&&!this.newListener&&!this._events.newListener)return!1;if(this._all){for(var i=arguments.length,s=new Array(i-1),r=1;r1)switch(arguments.length){case 2:l.call(this,arguments[1]);break;case 3:l.call(this,arguments[1],arguments[2]);break;default:for(var i=arguments.length,s=new Array(i-1),r=1;r0||!!this._all}return!!this._all},s.prototype.on=function(e,i){if("function"==typeof e)return this.onAny(e),this;if("function"!=typeof i)throw new Error("on only accepts instances of Function");if(this._events||t.call(this),this.emit("newListener",e,i),this.wildcard)return r.call(this,e,i),this;if(this._events[e]){if("function"==typeof this._events[e])this._events[e]=[this._events[e],i];else if(l(this._events[e])&&(this._events[e].push(i),!this._events[e].warned)){var s=o;void 0!==this._events.maxListeners&&(s=this._events.maxListeners),s>0&&this._events[e].length>s&&(this._events[e].warned=!0,console.error("(node) warning: possible EventEmitter memory leak detected. %d listeners added. Use emitter.setMaxListeners() to increase limit.",this._events[e].length),console.trace())}}else this._events[e]=i;return this},s.prototype.onAny=function(e){if("function"!=typeof e)throw new Error("onAny only accepts instances of Function");return this._all||(this._all=[]),this._all.push(e),this},s.prototype.addListener=s.prototype.on,s.prototype.off=function(e,t){if("function"!=typeof t)throw new Error("removeListener only takes instances of Function");var i,s=[];if(this.wildcard){var r="string"==typeof e?e.split(this.delimiter):e.slice();s=n.call(this,null,r,this.listenerTree,0)}else{if(!this._events[e])return this;i=this._events[e],s.push({_listeners:i})}for(var o=0;o0){for(i=0,s=(t=this._all).length;i + +WebSocket Test + + + + +

WebSocket Test

+ +
+ + + +
+ +
    +
  • By default, the Client will always have just one websocket connection. The Connect button closes the previous websocket.
  • +
  • The Server keeps track of the connections. If we open a new websocket without closing the old one, they will pile up on the Server.
  • +
+ + + + +

+ When you click the Subscribe button, you'll receive messages from the Server. +

+ +
+
+ +
diff --git a/plugins/websocket_server/index.html b/plugins/websocket_server/index.html index cf238618..aae1425a 100644 --- a/plugins/websocket_server/index.html +++ b/plugins/websocket_server/index.html @@ -1,268 +1,44 @@ - -WebSocket Test - - - + + + + - -

WebSocket Test

- -
- - - -
- -
    -
  • By default, the Client will always have just one websocket connection. The Connect button closes the previous websocket.
  • -
  • The Server keeps track of the connections. If we open a new websocket without closing the old one, they will pile up on the Server.
  • -
- - - - -

- When you click the Subscribe button, you'll receive messages from the Server. -

- -
-
- -
+ }); + + // Todo: + // 1. load the gz3d module. + // 2. Show simple shapes. + // 3. Get the meshes and other resource from the server. + + + + +
Status: disconnected
+
+ +