From e13614028727819d9c83403b82612921974ae8c5 Mon Sep 17 00:00:00 2001 From: Nate Koenig Date: Sat, 18 Apr 2020 12:42:52 -0700 Subject: [PATCH 01/13] Websocket updates --- plugins/websocket_server/WebsocketServer.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/plugins/websocket_server/WebsocketServer.cc b/plugins/websocket_server/WebsocketServer.cc index 6db837bd..ab57d01e 100644 --- a/plugins/websocket_server/WebsocketServer.cc +++ b/plugins/websocket_server/WebsocketServer.cc @@ -245,7 +245,7 @@ void WebsocketServer::OnMessage(int _socketId, const std::string &_msg) ignition::msgs::WebRequest requestMsg; requestMsg.ParseFromString(_msg); - if (requestMsg.operation() == "list") + if (requestMsg.operation() == "topic_list") { igndbg << "Topic list request recieved\n"; ignition::msgs::Packet msg; @@ -259,6 +259,8 @@ void WebsocketServer::OnMessage(int _socketId, const std::string &_msg) for (const std::string &topic : topics) msg.mutable_string_msg_v()->add_data(topic); + msg.set_topic("/topic_list"); + msg.set_type("ignition.msgs.StringMsg_V"); std::string data = msg.SerializeAsString(); // Queue the message for delivery. From 65e335dedc5547abd1654846e0239dcb45978284 Mon Sep 17 00:00:00 2001 From: Nate Koenig Date: Sat, 18 Apr 2020 12:43:38 -0700 Subject: [PATCH 02/13] Adde files --- plugins/websocket_server/combined.proto | 125 +++++++ plugins/websocket_server/eventemitter2.min.js | 1 + plugins/websocket_server/ign.js | 314 ++++++++++++++++++ plugins/websocket_server/index2.html | 35 ++ 4 files changed, 475 insertions(+) create mode 100644 plugins/websocket_server/combined.proto create mode 100644 plugins/websocket_server/eventemitter2.min.js create mode 100644 plugins/websocket_server/ign.js create mode 100644 plugins/websocket_server/index2.html diff --git a/plugins/websocket_server/combined.proto b/plugins/websocket_server/combined.proto new file mode 100644 index 00000000..1070d002 --- /dev/null +++ b/plugins/websocket_server/combined.proto @@ -0,0 +1,125 @@ +syntax = "proto3"; + +package ignition.msgs; + +message Time { + int64 sec = 1; + int32 nsec = 2; +} + +message Clock +{ + Header header = 1; + Time system = 2; + Time real = 3; + Time sim = 4; +} +message Header { + message Map { + string key = 1; + repeated string value = 2; + } + Time stamp = 1; + repeated Map data = 2; +} +message WebRequest +{ + Header header = 1; + string operation = 2; + string topic = 3; + string msg_type = 4; + string compression = 5; + double hz = 6; +} +message StringMsg_V +{ + repeated string data = 2; +} +message CmdVel2D +{ + Header header = 1; + double velocity = 2; + double theta = 3; +} +enum PixelFormatType +{ + UNKNOWN_PIXEL_FORMAT = 0; + L_INT8 = 1; + L_INT16 = 2; + RGB_INT8 = 3; + RGBA_INT8 = 4; + BGRA_INT8 = 5; + RGB_INT16 = 6; + RGB_INT32 = 7; + BGR_INT8 = 8; + BGR_INT16 = 9; + BGR_INT32 = 10; + R_FLOAT16 = 11; + RGB_FLOAT16 = 12; + R_FLOAT32 = 13; + RGB_FLOAT32 = 14; + BAYER_RGGB8 = 15; + BAYER_RGGR8 = 16; + BAYER_GBRG8 = 17; + BAYER_GRBG8 = 18; +} + +message Image +{ + Header header = 1; + uint32 width = 2; + uint32 height = 3; + uint32 pixel_format = 4; + uint32 step = 5; + bytes data = 6; +} +message Vector3d +{ + Header header = 1; + double x = 2; + double y = 3; + double z = 4; +} +message Pose +{ + Header header = 1; + string name = 2; + uint32 id = 3; + Vector3d position = 4; + Quaternion orientation = 5; +} +message Quaternion +{ + Header header = 1; + double x = 2; + double y = 3; + double z = 4; + double w = 5; +} +message Double_V +{ + repeated double data = 1; +} +message Pose_V +{ + Header header = 1; + repeated Pose pose = 2; +} +message Packet +{ + string topic = 1; + string type = 2; + + oneof content + { + CmdVel2D cmd_vel2d = 3; + Image image = 4; + StringMsg_V string_msg_v = 5; + WebRequest web_request = 6; + Pose pose = 7; + Double_V doublev = 8; + Pose_V pose_v = 9; + Time time = 10; + Clock clock = 11; + } +} diff --git a/plugins/websocket_server/eventemitter2.min.js b/plugins/websocket_server/eventemitter2.min.js new file mode 100644 index 00000000..a5af1eda --- /dev/null +++ b/plugins/websocket_server/eventemitter2.min.js @@ -0,0 +1 @@ +!function(e){function t(){this._events={},this._conf&&i.call(this,this._conf)}function i(e){e&&(this._conf=e,e.delimiter&&(this.delimiter=e.delimiter),e.maxListeners&&(this._events.maxListeners=e.maxListeners),e.wildcard&&(this.wildcard=e.wildcard),e.newListener&&(this.newListener=e.newListener),this.wildcard&&(this.listenerTree={}))}function s(e){this._events={},this.newListener=!1,i.call(this,e)}function n(e,t,i,s){if(!i)return[];var r,l,o,h,a,f,c,_=[],p=t.length,u=t[s],v=t[s+1];if(s===p&&i._listeners){if("function"==typeof i._listeners)return e&&e.push(i._listeners),[i];for(r=0,l=i._listeners.length;r0&&n._listeners.length>h&&(n._listeners.warned=!0,console.error("(node) warning: possible EventEmitter memory leak detected. %d listeners added. Use emitter.setMaxListeners() to increase limit.",n._listeners.length),console.trace())}}else n._listeners=t;return!0}r=e.shift()}return!0}var l=Array.isArray?Array.isArray:function(e){return"[object Array]"===Object.prototype.toString.call(e)},o=10;s.prototype.delimiter=".",s.prototype.setMaxListeners=function(e){this._events||t.call(this),this._events.maxListeners=e,this._conf||(this._conf={}),this._conf.maxListeners=e},s.prototype.event="",s.prototype.once=function(e,t){return this.many(e,1,t),this},s.prototype.many=function(e,t,i){function s(){0==--t&&n.off(e,s),i.apply(this,arguments)}var n=this;if("function"!=typeof i)throw new Error("many only accepts instances of Function");return s._origin=i,this.on(e,s),n},s.prototype.emit=function(){this._events||t.call(this);var e=arguments[0];if("newListener"===e&&!this.newListener&&!this._events.newListener)return!1;if(this._all){for(var i=arguments.length,s=new Array(i-1),r=1;r1)switch(arguments.length){case 2:l.call(this,arguments[1]);break;case 3:l.call(this,arguments[1],arguments[2]);break;default:for(var i=arguments.length,s=new Array(i-1),r=1;r0||!!this._all}return!!this._all},s.prototype.on=function(e,i){if("function"==typeof e)return this.onAny(e),this;if("function"!=typeof i)throw new Error("on only accepts instances of Function");if(this._events||t.call(this),this.emit("newListener",e,i),this.wildcard)return r.call(this,e,i),this;if(this._events[e]){if("function"==typeof this._events[e])this._events[e]=[this._events[e],i];else if(l(this._events[e])&&(this._events[e].push(i),!this._events[e].warned)){var s=o;void 0!==this._events.maxListeners&&(s=this._events.maxListeners),s>0&&this._events[e].length>s&&(this._events[e].warned=!0,console.error("(node) warning: possible EventEmitter memory leak detected. %d listeners added. Use emitter.setMaxListeners() to increase limit.",this._events[e].length),console.trace())}}else this._events[e]=i;return this},s.prototype.onAny=function(e){if("function"!=typeof e)throw new Error("onAny only accepts instances of Function");return this._all||(this._all=[]),this._all.push(e),this},s.prototype.addListener=s.prototype.on,s.prototype.off=function(e,t){if("function"!=typeof t)throw new Error("removeListener only takes instances of Function");var i,s=[];if(this.wildcard){var r="string"==typeof e?e.split(this.delimiter):e.slice();s=n.call(this,null,r,this.listenerTree,0)}else{if(!this._events[e])return this;i=this._events[e],s.push({_listeners:i})}for(var o=0;o0){for(i=0,s=(t=this._all).length;i + + + + WebSocket Test + + + + + + + + + Hello + + From 323e3567f1dc0db846b6f64f35c790bc8173accd Mon Sep 17 00:00:00 2001 From: Nate Koenig Date: Sat, 18 Apr 2020 17:56:28 -0700 Subject: [PATCH 03/13] Updates --- plugins/websocket_server/ign.js | 13 ++++--------- plugins/websocket_server/index2.html | 20 +++++++++++--------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/plugins/websocket_server/ign.js b/plugins/websocket_server/ign.js index 653cfeb1..9f919048 100644 --- a/plugins/websocket_server/ign.js +++ b/plugins/websocket_server/ign.js @@ -275,11 +275,10 @@ function Topic(options) { this.throttle_rate = 0; } - var that = this; - // this.sendMessage = this.ign.sendMsg; - /*this.messageCallback = function(_msg) { - that.emit('message', _msg); - }*/ + // Subscribe immediately if the callback is specified. + if (options.callback) { + this.subscribe(options.callback); + } } Topic.prototype.__proto__ = EventEmitter2.prototype @@ -290,10 +289,6 @@ Topic.prototype.__proto__ = EventEmitter2.prototype Topic.prototype.subscribe = function(callback) { var that = this; - /*if (typeof callback == 'function') { - this.on('message', callback); - }*/ - //this.ign.on(this.name, this.messageCallback); this.ign.on(this.name, callback); diff --git a/plugins/websocket_server/index2.html b/plugins/websocket_server/index2.html index 221bf054..4b632267 100644 --- a/plugins/websocket_server/index2.html +++ b/plugins/websocket_server/index2.html @@ -8,23 +8,25 @@ From b08ebba253415e1ddf5b25a2c8d04447273eebd7 Mon Sep 17 00:00:00 2001 From: Nate Koenig Date: Tue, 21 Apr 2020 06:22:00 -0700 Subject: [PATCH 04/13] Updates to the websocket server and associated javascript client library --- examples/websocket.ign | 2 +- plugins/websocket_server/CMakeLists.txt | 11 + .../websocket_server/MessageDefinitions.hh.in | 32 +++ plugins/websocket_server/WebsocketServer.cc | 10 + plugins/websocket_server/WebsocketServer.hh | 18 +- plugins/websocket_server/ign.js | 267 ++++++------------ plugins/websocket_server/index2.html | 37 ++- 7 files changed, 174 insertions(+), 203 deletions(-) create mode 100644 plugins/websocket_server/MessageDefinitions.hh.in diff --git a/examples/websocket.ign b/examples/websocket.ign index d1d489e4..ebdc8e9a 100644 --- a/examples/websocket.ign +++ b/examples/websocket.ign @@ -5,7 +5,7 @@ topic --> - 60 + 30 diff --git a/plugins/websocket_server/CMakeLists.txt b/plugins/websocket_server/CMakeLists.txt index 35e85081..3f781d66 100644 --- a/plugins/websocket_server/CMakeLists.txt +++ b/plugins/websocket_server/CMakeLists.txt @@ -3,6 +3,17 @@ if (websockets_FOUND) set (sources WebsocketServer.cc) add_library(${plugin} SHARED ${sources}) + + file (READ combined.proto MESSAGE_DEFINITIONS) + configure_file("MessageDefinitions.hh.in" + "${CMAKE_CURRENT_BINARY_DIR}/MessageDefinitions.hh" @ONLY) + # Add a dependency on binary source dir so that you can change the + # combinded.proto file and `make` will rebuild the plugin. + set_property(DIRECTORY APPEND PROPERTY CMAKE_CONFIGURE_DEPENDS combined.proto) + target_include_directories(${plugin} PRIVATE + $ + ) + target_link_libraries(${plugin} PRIVATE ${websockets_LIBRARIES} diff --git a/plugins/websocket_server/MessageDefinitions.hh.in b/plugins/websocket_server/MessageDefinitions.hh.in new file mode 100644 index 00000000..edb7cb5f --- /dev/null +++ b/plugins/websocket_server/MessageDefinitions.hh.in @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2019 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGE_DEFINITIONS_HH_ +#define IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGE_DEFINITIONS_HH_ + +namespace ignition +{ + namespace launch + { + /// \brief All of the protobuf messages provided by the websocket + /// server. + /// \todo Get this information from a running simulation. The problem + /// right now is that Ignition Transport does not show subscribers. + static const std::string kMessageDefinitions = + R"protos(@MESSAGE_DEFINITIONS@)protos"; + } +} +#endif diff --git a/plugins/websocket_server/WebsocketServer.cc b/plugins/websocket_server/WebsocketServer.cc index 556f8b74..9419c518 100644 --- a/plugins/websocket_server/WebsocketServer.cc +++ b/plugins/websocket_server/WebsocketServer.cc @@ -19,6 +19,7 @@ #include #include +#include "MessageDefinitions.hh" #include "WebsocketServer.hh" using namespace ignition::launch; @@ -268,6 +269,15 @@ void WebsocketServer::OnDisconnect(int _socketId) ////////////////////////////////////////////////// void WebsocketServer::OnMessage(int _socketId, const std::string &_msg) { + // Handle the case where the client requests the message definitions. + if (_msg == "message_definitions") + { + igndbg << "Message definitions request recieved\n"; + this->QueueMessage(this->connections[_socketId].get(), + kMessageDefinitions.c_str(), kMessageDefinitions.length()); + return; + } + ignition::msgs::WebRequest requestMsg; requestMsg.ParseFromString(_msg); diff --git a/plugins/websocket_server/WebsocketServer.hh b/plugins/websocket_server/WebsocketServer.hh index c9a6b5fa..bca14a20 100644 --- a/plugins/websocket_server/WebsocketServer.hh +++ b/plugins/websocket_server/WebsocketServer.hh @@ -35,15 +35,31 @@ namespace ignition /// \brief Reads from a USB joystick device and outputs /// ignition::msgs::WebsocketServer messages. /// + /// # Websocket Server Interface + /// + /// \todo: Describe the websocket API. + /// /// # Example usage /// + /// ## Websocket Server + /// + /// 1. Define a launch file by copying the following contents to a file + /// called `websocket.ign`. + /// /// /// /// /// - /// 60 + /// 30 /// + /// + /// 2. Run the launch file + /// + /// `ign launch -v 4 websocket.ign` + /// + /// 3. Open the [index.html](https://github.com/ignitionrobotics/ign-launch/blob/master/plugins/websocket_server/index.html) webpage. + /// class WebsocketServer : public ignition::launch::Plugin { /// \brief Constructor diff --git a/plugins/websocket_server/ign.js b/plugins/websocket_server/ign.js index 9f919048..810d511d 100644 --- a/plugins/websocket_server/ign.js +++ b/plugins/websocket_server/ign.js @@ -1,173 +1,49 @@ -var proto = "syntax = \"proto3\";\ - package ignition.msgs;\ - message Time {\ - int64 sec = 1;\ - int32 nsec = 2;\ - }\ - message Clock\ - {\ - Header header = 1;\ - Time system = 2;\ - Time real = 3;\ - Time sim = 4;\ - }\ - message Header {\ - message Map {\ - string key = 1;\ - repeated string value = 2;\ - }\ - Time stamp = 1;\ - repeated Map data = 2;\ - }\ - message WebRequest\ - {\ - Header header = 1;\ - string operation = 2;\ - string topic = 3;\ - string msg_type = 4;\ - string compression = 5;\ - double hz = 6;\ - }\ - message StringMsg_V\ - {\ - repeated string data = 2;\ - }\ - message CmdVel2D\ - {\ - Header header = 1;\ - double velocity = 2;\ - double theta = 3;\ - }\ - enum PixelFormatType\ - {\ - UNKNOWN_PIXEL_FORMAT = 0;\ - L_INT8 = 1;\ - L_INT16 = 2;\ - RGB_INT8 = 3;\ - RGBA_INT8 = 4;\ - BGRA_INT8 = 5;\ - RGB_INT16 = 6;\ - RGB_INT32 = 7;\ - BGR_INT8 = 8;\ - BGR_INT16 = 9;\ - BGR_INT32 = 10;\ - R_FLOAT16 = 11;\ - RGB_FLOAT16 = 12;\ - R_FLOAT32 = 13;\ - RGB_FLOAT32 = 14;\ - BAYER_RGGB8 = 15;\ - BAYER_RGGR8 = 16;\ - BAYER_GBRG8 = 17;\ - BAYER_GRBG8 = 18;\ - }\ - \ - message Image\ - {\ - Header header = 1;\ - uint32 width = 2;\ - uint32 height = 3;\ - uint32 pixel_format = 4;\ - uint32 step = 5;\ - bytes data = 6;\ - }\ - message Vector3d\ - {\ - Header header = 1;\ - double x = 2;\ - double y = 3;\ - double z = 4;\ - }\ - message Pose\ - {\ - Header header = 1;\ - string name = 2;\ - uint32 id = 3;\ - Vector3d position = 4;\ - Quaternion orientation = 5;\ - }\ - message Quaternion\ - {\ - Header header = 1;\ - double x = 2;\ - double y = 3;\ - double z = 4;\ - double w = 5;\ - }\ - message Double_V\ - {\ - repeated double data = 1;\ - }\ - message Pose_V\ - {\ - Header header = 1;\ - repeated Pose pose = 2;\ - }\ - message Packet\ - {\ - string topic = 1;\ - string type = 2;\ - \ - oneof content\ - {\ - CmdVel2D cmd_vel2d = 3;\ - Image image = 4;\ - StringMsg_V string_msg_v = 5;\ - WebRequest web_request = 6;\ - Pose pose = 7;\ - Double_V doublev = 8;\ - Pose_V pose_v = 9;\ - Time time = 10;\ - Clock clock = 11;\ - }\ - }"; - -function Transport(options) { +/* + * Copyright (C) 2020 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/// \brief The main interface to the Ignition websocket server and +/// data on Ignition Transport. +function Ignition(options) { options = options || {}; this.socket = null; - this.idCounter = 0; this.topics = []; this.isConnected = false; - // Sets unlimited event listeners. - this.setMaxListeners(0); - - // Create a protobuf root object - // \todo: Get proto definitions from the server. - this.root = protobuf.parse(proto, {keepCase: true}).root; - // this.root = new protobuf.Root(); + // Start with a null root protobuf object. This object will be + // created when we get the set of protobuf definitions from the server. + this.root = null; if (options.url) { this.connect(options.url); } - - // This maps protobuf message type to .proto files. - /*this.typeMap = { - "ignition.msgs.Header": "msgs/header.proto", - "ignition.msgs.Time": "msgs/time.proto", - "ignition.msgs.StringMsg_V": "msgs/stringmsg_v.proto" - };*/ } -Transport.prototype.__proto__ = EventEmitter2.prototype; +Ignition.prototype.__proto__ = EventEmitter2.prototype; /// \brief Connect to the specified WebSocket. /// \param url - WebSocket URL for Ignition HTTPServer -Transport.prototype.connect = function(url) { +Ignition.prototype.connect = function(url) { var that = this; /// \brief Emits a 'connection' event on WebSocket connection. /// \param event - the argument to emit with the event. function onOpen(event) { - that.isConnected = true; - that.emit('connection', event); - - // Request the list of topics on start. - var msgType = that.root.lookupType("WebRequest"); - var webRequestMsg = msgType.encode({ - operation: "topic_list", - compression: "none", - }).finish(); - that.socket.send(webRequestMsg); + that.socket.send("message_definitions"); + } /// \brief Emits a 'close' event on WebSocket disconnection. @@ -188,6 +64,32 @@ Transport.prototype.connect = function(url) { // \param message - the JSON message from the Ignition // httpserver. function onMessage(_message) { + + if (that.root === undefined || that.root === null) { + // Read the Blob as an array buffer + var f = new FileReader(); + + f.onloadend = function(event) { + // This is the proto message data + var contents = event.target.result; + that.root = protobuf.parse(contents, {keepCase: true}).root; + that.isConnected = true; + that.emit('connection', event); + + // Request the list of topics on start. + // \todo: Check that the "WebRequest message exists. + var msgType = that.root.lookupType("WebRequest"); + var webRequestMsg = msgType.encode({ + operation: "topic_list", + }).finish(); + that.socket.send(webRequestMsg); + }; + + // Read the blob data as an array buffer. + f.readAsText(_message.data); + return; + } + // \todo: Check that packetMsgType is valid var packetMsgType = that.root.lookup("ignition.msgs.Packet"); @@ -212,9 +114,8 @@ Transport.prototype.connect = function(url) { // console.log("Actual Which: ", which); // console.log("Actual Msg: ", oneOfMsg); + if (packetMsg.topic === "/topic_list") { // The "/topic_list" topic is a special case - if (packetMsg.topic === "/topic_list") - { that.topics = oneOfMsg.data; } // Otherwise emit the message. @@ -235,13 +136,16 @@ Transport.prototype.connect = function(url) { this.socket.onmessage = onMessage; }; -Transport.prototype.sendMsg = function(_msg) { +/// \brief Send a message to the websocket server +/// \param[in] _msg Message to send +Ignition.prototype.sendMsg = function(_msg) { var that = this; var emitter = function(msg){ that.socket.send(msg); }; + // Wait for a connection before sending the message. if (!this.isConnected) { that.on('connection', function() { emitter(_msg); @@ -251,29 +155,13 @@ Transport.prototype.sendMsg = function(_msg) { } }; +/// \brief Interface to Ignition Transport topics. function Topic(options) { options = options || {}; this.ign = options.ign; this.name = options.name; this.messageType = options.messageType; this.isAdvertised = false; - this.compression = options.compression || 'none'; - this.throttle_rate = options.throttle_rate || 0; - - // Check for valid compression types - if (this.compression && this.compression !== 'png' && - this.compression !== 'cbor' && this.compression !== 'cbor-raw' && - this.compression !== 'none') { - this.emit('warning', this.compression + - ' compression is not supported. No compression will be used.'); - this.compression = 'none'; - } - - // Check if throttle rate is negative - if (this.throttle_rate < 0) { - this.emit('warning', this.throttle_rate + ' is not allowed. Set to 0'); - this.throttle_rate = 0; - } // Subscribe immediately if the callback is specified. if (options.callback) { @@ -284,26 +172,33 @@ Topic.prototype.__proto__ = EventEmitter2.prototype // \brief Every time a message is published for the given topic, the callback // will be called with the message object. -// \param callback - function with the following params: +// \param[in] callback - function with the following params: // * message - the published message -Topic.prototype.subscribe = function(callback) { +Topic.prototype.subscribe = function(_callback) { var that = this; - //this.ign.on(this.name, this.messageCallback); - this.ign.on(this.name, callback); + var emitter = function(_cb) { + // Register the callback with the topic name + that.ign.on(that.name, _cb); - // this.ign.idCounter++; + // \todo: Check that requesttMsgType is valid + var requestMsgType = that.ign.root.lookup("ignition.msgs.WebRequest"); - // \todo: Check that requesttMsgType is valid - var requestMsgType = this.ign.root.lookup("ignition.msgs.WebRequest"); + // Create the subscription request message + var webRequestMsg = requestMsgType.encode({ + operation: "subscribe", + topic: that.name, + }).finish(); - // Create the request message - var webRequestMsg = requestMsgType.encode({ - operation: "subscribe", - topic: that.name, - compression: "none", - }).finish(); + // Send the subscription message over the websocket. + that.ign.sendMsg(webRequestMsg); + } - // Send the message over the websocket. - this.ign.sendMsg(webRequestMsg); + if (!this.ign.isConnected) { + this.ign.on('connection', function() { + emitter(_callback); + }); + } else { + emiiter(_callback); + } }; diff --git a/plugins/websocket_server/index2.html b/plugins/websocket_server/index2.html index 4b632267..aae1425a 100644 --- a/plugins/websocket_server/index2.html +++ b/plugins/websocket_server/index2.html @@ -1,37 +1,44 @@ - + WebSocket Test - - - + + + - Hello +
Status: disconnected
+
From 99d9b1414859776cd8860939a244c8788c2968be Mon Sep 17 00:00:00 2001 From: Nate Koenig Date: Tue, 21 Apr 2020 06:27:04 -0700 Subject: [PATCH 05/13] Drop to 30hz --- examples/websocket.ign | 2 +- plugins/websocket_server/WebsocketServer.hh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/websocket.ign b/examples/websocket.ign index d1d489e4..ebdc8e9a 100644 --- a/examples/websocket.ign +++ b/examples/websocket.ign @@ -5,7 +5,7 @@ topic --> - 60 + 30
diff --git a/plugins/websocket_server/WebsocketServer.hh b/plugins/websocket_server/WebsocketServer.hh index c9a6b5fa..f7249325 100644 --- a/plugins/websocket_server/WebsocketServer.hh +++ b/plugins/websocket_server/WebsocketServer.hh @@ -42,7 +42,7 @@ namespace ignition /// filename="libignition-launch-joystick0.so"> /// /// - /// 60 + /// 30 /// class WebsocketServer : public ignition::launch::Plugin { From f1ae31bd55a4dd717dc07895234de505e04a7b9f Mon Sep 17 00:00:00 2001 From: Nate Koenig Date: Wed, 22 Apr 2020 08:49:35 -0700 Subject: [PATCH 06/13] Update websocket frames --- plugins/websocket_server/CMakeLists.txt | 2 +- .../websocket_server/MessageDefinitions.hh.in | 11 ++- plugins/websocket_server/WebsocketServer.cc | 98 +++++++++++-------- plugins/websocket_server/WebsocketServer.hh | 70 ++++++++++++- plugins/websocket_server/ign.js | 73 +++++--------- 5 files changed, 154 insertions(+), 100 deletions(-) diff --git a/plugins/websocket_server/CMakeLists.txt b/plugins/websocket_server/CMakeLists.txt index 3f781d66..47a183a1 100644 --- a/plugins/websocket_server/CMakeLists.txt +++ b/plugins/websocket_server/CMakeLists.txt @@ -4,7 +4,7 @@ if (websockets_FOUND) add_library(${plugin} SHARED ${sources}) - file (READ combined.proto MESSAGE_DEFINITIONS) + file (READ combined.proto WEBSOCKETSERVER_MESSAGE_DEFINITIONS) configure_file("MessageDefinitions.hh.in" "${CMAKE_CURRENT_BINARY_DIR}/MessageDefinitions.hh" @ONLY) # Add a dependency on binary source dir so that you can change the diff --git a/plugins/websocket_server/MessageDefinitions.hh.in b/plugins/websocket_server/MessageDefinitions.hh.in index edb7cb5f..c6038d00 100644 --- a/plugins/websocket_server/MessageDefinitions.hh.in +++ b/plugins/websocket_server/MessageDefinitions.hh.in @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019 Open Source Robotics Foundation + * Copyright (C) 2020 Open Source Robotics Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,8 +14,8 @@ * limitations under the License. * */ -#ifndef IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGE_DEFINITIONS_HH_ -#define IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGE_DEFINITIONS_HH_ +#ifndef IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGEDEFINITIONS_HH_ +#define IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGEDEFINITIONS_HH_ namespace ignition { @@ -23,10 +23,11 @@ namespace ignition { /// \brief All of the protobuf messages provided by the websocket /// server. + /// /// \todo Get this information from a running simulation. The problem /// right now is that Ignition Transport does not show subscribers. - static const std::string kMessageDefinitions = - R"protos(@MESSAGE_DEFINITIONS@)protos"; + static const std::string kWebSocketServerMessageDefinitions = + R"protos(@WEBSOCKETSERVER_MESSAGE_DEFINITIONS@)protos"; } } #endif diff --git a/plugins/websocket_server/WebsocketServer.cc b/plugins/websocket_server/WebsocketServer.cc index 9419c518..5f57bae6 100644 --- a/plugins/websocket_server/WebsocketServer.cc +++ b/plugins/websocket_server/WebsocketServer.cc @@ -15,6 +15,7 @@ * */ +#include #include #include #include @@ -24,6 +25,9 @@ using namespace ignition::launch; +#define BUILD_HEADER(_op, _topic, _type) ((_op)+","+(_topic)+","+(_type)+",") +#define BUILD_MSG(_op, _topic, _type, _payload) (BUILD_HEADER(_op, _topic, _type) + _payload) + int rootCallback(struct lws *_wsi, enum lws_callback_reasons _reason, void * /*user*/, @@ -266,25 +270,43 @@ void WebsocketServer::OnDisconnect(int _socketId) } } +// Option 1: Encode everything in a Packet. +// Pros: Works, and has serialization. +// Cons: Server need to de-serialize and re-serialize messages. +// +// Option 2: Send header along with payload. Don't encode the header, and +// only copy the serialized payload. +// +// Option 3: Encode the header and payload into a Packet. + ////////////////////////////////////////////////// void WebsocketServer::OnMessage(int _socketId, const std::string &_msg) { - // Handle the case where the client requests the message definitions. - if (_msg == "message_definitions") + // Frame: operation,topic,type,payload + std::vector frameParts = common::split(_msg, ","); + + // Check for a valid frame. + if (frameParts.size() != 4 && + // Count the number of commas to handle a frame like "sub,,," + std::count(_msg.begin(), _msg.end(), ',') != 3) { - igndbg << "Message definitions request recieved\n"; - this->QueueMessage(this->connections[_socketId].get(), - kMessageDefinitions.c_str(), kMessageDefinitions.length()); + ignerr << "Received an invalid frame with " << frameParts.size() + << "components when 4 is expected.\n"; return; } - ignition::msgs::WebRequest requestMsg; - requestMsg.ParseFromString(_msg); - - if (requestMsg.operation() == "topic_list") + // Handle the case where the client requests the message definitions. + if (frameParts[0] == "protos") + { + igndbg << "Protos request received\n"; + this->QueueMessage(this->connections[_socketId].get(), + kWebSocketServerMessageDefinitions.c_str(), + kWebSocketServerMessageDefinitions.length()); + } + else if (frameParts[0] == "topics") { igndbg << "Topic list request recieved\n"; - ignition::msgs::Packet msg; + ignition::msgs::StringMsg_V msg; std::vector topics; @@ -293,25 +315,24 @@ void WebsocketServer::OnMessage(int _socketId, const std::string &_msg) // Store the topics in a message and serialize the message. for (const std::string &topic : topics) - msg.mutable_string_msg_v()->add_data(topic); + msg.add_data(topic); - msg.set_topic("/topic_list"); - msg.set_type("ignition.msgs.StringMsg_V"); - std::string data = msg.SerializeAsString(); + std::string data = BUILD_MSG(this->operations[PUBLISH], frameParts[0], + std::string("ignition.msgs.StringMsg_V"), msg.SerializeAsString()); // Queue the message for delivery. this->QueueMessage(this->connections[_socketId].get(), data.c_str(), data.length()); } - else if (requestMsg.operation() == "subscribe") + else if (frameParts[0] == "sub") { // Store the relation of socketId to subscribed topic. - this->topicConnections[requestMsg.topic()].insert(_socketId); - this->topicTimestamps[requestMsg.topic()] = + this->topicConnections[frameParts[1]].insert(_socketId); + this->topicTimestamps[frameParts[1]] = std::chrono::steady_clock::now() - this->publishPeriod; - igndbg << "Subscribe request to topic[" << requestMsg.topic() << "]\n"; - this->node.SubscribeRaw(requestMsg.topic(), + igndbg << "Subscribe request to topic[" << frameParts[1] << "]\n"; + this->node.SubscribeRaw(frameParts[1], std::bind(&WebsocketServer::OnWebsocketSubscribedMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); @@ -328,44 +349,35 @@ void WebsocketServer::OnWebsocketSubscribedMessage( if (iter != this->topicConnections.end()) { + std::lock_guard mainLock(this->subscriptionMutex); std::chrono::time_point systemTime = std::chrono::steady_clock::now(); - ignition::msgs::Packet msg; - msg.set_topic(_info.Topic()); - msg.set_type(_info.Type()); - std::chrono::nanoseconds timeDelta = systemTime - this->topicTimestamps[_info.Topic()]; if (timeDelta > this->publishPeriod) { - if (_info.Type() == "ignition.msgs.CmdVel2D") - msg.mutable_cmd_vel2d()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.Image") - msg.mutable_image()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.StringMsg_V") - msg.mutable_string_msg_v()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.WebRequest") - msg.mutable_web_request()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.Pose") - msg.mutable_pose()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.Pose_V") - msg.mutable_pose_v()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.Time") - msg.mutable_time()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.Clock") - msg.mutable_clock()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.WorldStatistics") - msg.mutable_world_stats()->ParseFromArray(_data, _size); + // Get the header, or build a new header if it doesn't exist. + auto header = this->publishHeaders.find(_info.Topic()); + if (header == this->publishHeaders.end()) + { + this->publishHeaders[_info.Topic()] = BUILD_HEADER( + this->operations[PUBLISH], _info.Topic(), _info.Type()); + header = this->publishHeaders.find(_info.Topic()); + } + // Store the last time this topic was published. this->topicTimestamps[_info.Topic()] = systemTime; - std::string data = msg.SerializeAsString(); + // Construct the final message. + std::string msg = header->second + std::string(_data, _size); + + // Send the message for (const int &socketId : iter->second) { this->QueueMessage(this->connections[socketId].get(), - data.c_str(), data.length()); + msg.c_str(), msg.length()); } } } diff --git a/plugins/websocket_server/WebsocketServer.hh b/plugins/websocket_server/WebsocketServer.hh index bca14a20..85c1349e 100644 --- a/plugins/websocket_server/WebsocketServer.hh +++ b/plugins/websocket_server/WebsocketServer.hh @@ -37,7 +37,45 @@ namespace ignition /// /// # Websocket Server Interface /// - /// \todo: Describe the websocket API. + /// The websocket server listens for incoming requests and sends + /// messages based on the requests. + /// + /// All messages on the websocket, incoming and outgoing, are structured + /// in a frame that consists of four comma separated components: + /// 1. `operation`: string, + /// 2. `topic_name`: string, + /// 3. `message_type`: string, and + /// 4. `payload`: serialized data. + /// + /// The `operation` component is mandatory and must be one of: + /// 1. "sub": Subscribe to the topic in the `topic_name` component, + /// 2. "pub": Publish a message from the Ingition Transport topic in + /// the `topic_name` component, + /// 3. "topics": Get the list of available topics, and + /// 4. "protos": Get a string containing all the protobuf + /// definitions. + /// + /// The `topic_name` component is mandatory for the "sub" and "pub" + /// operations. If present, it must be the name of an Ignition Transport + /// topic. + /// + /// The `message_type` component is mandatory for the "pub" operation. If + /// present it names the Ignition Message type, such as + /// "ignition.msgs.Clock". + /// + /// The `payload` component is mandatory for the "pub" operation. If + /// present, it contains a serialized string of an Igntion Message. + /// + /// ## Example frames + /// + /// 1. Get the list of topics: `topics,,,` + /// + /// 2. Get the protobuf definitions: `protos,,,` + /// + /// 3. Subscribe to the "/clock" topic: `sub,/clock,,` + /// + /// 4. Websocker server publishing data on the "/clock" topic: + /// `pub,/clock,ignition.msgs.Clock,` /// /// # Example usage /// @@ -80,11 +118,11 @@ namespace ignition public: void OnConnect(int _socketId); public: void OnDisconnect(int _socketId); + public: void OnMessage(int _socketId, const std::string &_msg); public: void OnRequestMessage(int _socketId, const std::string &_msg); - private: ignition::transport::Node node; private: bool run = true; @@ -104,6 +142,7 @@ namespace ignition const char *_data, const size_t _size); public: std::mutex mutex; + public: std::mutex subscriptionMutex; public: std::map> connections; public: std::map> topicConnections; @@ -114,6 +153,33 @@ namespace ignition std::chrono::time_point> topicTimestamps; + /// \brief The set of valid operations. This enum must align with the + /// `operations` member variable. + private: enum Operation + { + /// \brief Subscribe to a topic. + SUBSCRIBE = 0, + + /// \brief Publish a message from a topic. + PUBLISH = 1, + + /// \brief Get the list of topics. + TOPICS = 2, + + /// \brief Get the protobuf definitions. + PROTOS = 3, + }; + + /// \brief The set of valid operations, in string form. These values + /// can be sent in websocket message frames. + /// These valus must align with the `Operation` enum. + private: std::vector operations{ + "sub", "pub", "topics", "protos"}; + + /// \brief Store publish headers for topics. This is here to improve + /// performance. Keys are topic names and values are frame headers. + private: std::map publishHeaders; + /// \brief Period at which messages will be published on the websocket /// for each subscribed topic. /// \sa topicTimestamps. diff --git a/plugins/websocket_server/ign.js b/plugins/websocket_server/ign.js index 810d511d..838dbd30 100644 --- a/plugins/websocket_server/ign.js +++ b/plugins/websocket_server/ign.js @@ -15,6 +15,10 @@ * */ +function buildMsg(_frameParts) { + return _frameParts.join(','); +} + /// \brief The main interface to the Ignition websocket server and /// data on Ignition Transport. function Ignition(options) { @@ -42,8 +46,7 @@ Ignition.prototype.connect = function(url) { /// \brief Emits a 'connection' event on WebSocket connection. /// \param event - the argument to emit with the event. function onOpen(event) { - that.socket.send("message_definitions"); - + that.socket.send(buildMsg(["protos",'','',''])); } /// \brief Emits a 'close' event on WebSocket disconnection. @@ -64,7 +67,6 @@ Ignition.prototype.connect = function(url) { // \param message - the JSON message from the Ignition // httpserver. function onMessage(_message) { - if (that.root === undefined || that.root === null) { // Read the Blob as an array buffer var f = new FileReader(); @@ -77,12 +79,7 @@ Ignition.prototype.connect = function(url) { that.emit('connection', event); // Request the list of topics on start. - // \todo: Check that the "WebRequest message exists. - var msgType = that.root.lookupType("WebRequest"); - var webRequestMsg = msgType.encode({ - operation: "topic_list", - }).finish(); - that.socket.send(webRequestMsg); + that.socket.send(buildMsg(['topics','','',''])); }; // Read the blob data as an array buffer. @@ -90,39 +87,26 @@ Ignition.prototype.connect = function(url) { return; } - // \todo: Check that packetMsgType is valid - var packetMsgType = that.root.lookup("ignition.msgs.Packet"); - - // Read the Blob as an array buffer var f = new FileReader(); f.onloadend = function(event) { - // This is the proto message data - var contents = event.target.result; - - // \todo: Check for error - var error = event.target.error; - - // Get the decoded packet - var packetMsg = packetMsgType.decode(new Uint8Array(contents)); - - // Get the "oneof" message - var which = packetMsg.content; - var oneOfMsg = which !== null ? packetMsg[which] : null; - - // console.log("Actual topic: ", packetMsg.topic); - // console.log("Actual type: ", packetMsg.type); - // console.log("Actual Which: ", which); - // console.log("Actual Msg: ", oneOfMsg); - - if (packetMsg.topic === "/topic_list") { - // The "/topic_list" topic is a special case - that.topics = oneOfMsg.data; - } - // Otherwise emit the message. - else - { + // Decode as UTF-8 to get the header + var str = new TextDecoder("utf-8").decode(event.target.result); + const frameParts = str.split(','); + var msgType = that.root.lookup(frameParts[2]); + var buf = new Uint8Array(event.target.result); + + // Decode the message. The "+3" in the slice accounts for the commas + // in the frame. + var msg = msgType.decode( + buf.slice(frameParts[0].length + frameParts[1].length + + frameParts[2].length+3)); + + // Handle the topic list special case. + if (frameParts[1] == 'topics') { + that.topics = msg.data; + } else { // This will pass along the message on the appropriate topic. - that.emit(packetMsg.topic, oneOfMsg); + that.emit(frameParts[1], msg); } } // Read the blob data as an array buffer. @@ -181,17 +165,8 @@ Topic.prototype.subscribe = function(_callback) { // Register the callback with the topic name that.ign.on(that.name, _cb); - // \todo: Check that requesttMsgType is valid - var requestMsgType = that.ign.root.lookup("ignition.msgs.WebRequest"); - - // Create the subscription request message - var webRequestMsg = requestMsgType.encode({ - operation: "subscribe", - topic: that.name, - }).finish(); - // Send the subscription message over the websocket. - that.ign.sendMsg(webRequestMsg); + that.ign.sendMsg(buildMsg(['sub', that.name, '', ''])); } if (!this.ign.isConnected) { From 88fcea8463aef09e2c1626d5b291876155398dc2 Mon Sep 17 00:00:00 2001 From: Nate Koenig Date: Thu, 23 Apr 2020 06:03:05 -0700 Subject: [PATCH 07/13] Documentation and cleanup --- Changelog.md | 4 ++++ plugins/websocket_server/WebsocketServer.cc | 21 ++++++++++++--------- plugins/websocket_server/WebsocketServer.hh | 9 +++++++++ plugins/websocket_server/ign.js | 6 ++++++ 4 files changed, 31 insertions(+), 9 deletions(-) diff --git a/Changelog.md b/Changelog.md index bbacc280..b6fe746b 100644 --- a/Changelog.md +++ b/Changelog.md @@ -2,6 +2,10 @@ ### Ignition Launch 1.X.X +1. Changed the websocket messages to be more efficient when transmitting + protobuf messages. + * [Pull Request 22](https://github.com/ignitionrobotics/ign-launch/pull/22) + 1. Added ability to set the publication rate of the websocket server. * [Pull Request 17](https://bitbucket.org/ignitionrobotics/ign-launch/pull-requests/17) diff --git a/plugins/websocket_server/WebsocketServer.cc b/plugins/websocket_server/WebsocketServer.cc index 5f57bae6..db818fee 100644 --- a/plugins/websocket_server/WebsocketServer.cc +++ b/plugins/websocket_server/WebsocketServer.cc @@ -25,7 +25,19 @@ using namespace ignition::launch; +/// \brief Construct a websocket frame header. +/// \param[in] _op The operation string. +/// \param[in] _topic The topic name string. +/// \param[in] _type The message type string. +/// \return A string that is the frame header. #define BUILD_HEADER(_op, _topic, _type) ((_op)+","+(_topic)+","+(_type)+",") + +/// \brief Construction a complete websocket frame. +/// \param[in] _op The operation string. +/// \param[in] _topic The topic name string. +/// \param[in] _type The message type string. +/// \param[in] _payload The complete payload string. +/// \return A string that is the frame header. #define BUILD_MSG(_op, _topic, _type, _payload) (BUILD_HEADER(_op, _topic, _type) + _payload) int rootCallback(struct lws *_wsi, @@ -270,15 +282,6 @@ void WebsocketServer::OnDisconnect(int _socketId) } } -// Option 1: Encode everything in a Packet. -// Pros: Works, and has serialization. -// Cons: Server need to de-serialize and re-serialize messages. -// -// Option 2: Send header along with payload. Don't encode the header, and -// only copy the serialized payload. -// -// Option 3: Encode the header and payload into a Packet. - ////////////////////////////////////////////////// void WebsocketServer::OnMessage(int _socketId, const std::string &_msg) { diff --git a/plugins/websocket_server/WebsocketServer.hh b/plugins/websocket_server/WebsocketServer.hh index 85c1349e..1126b8a2 100644 --- a/plugins/websocket_server/WebsocketServer.hh +++ b/plugins/websocket_server/WebsocketServer.hh @@ -142,8 +142,17 @@ namespace ignition const char *_data, const size_t _size); public: std::mutex mutex; + + /// \brief A mutex used in the OnWebsocketSubscribedMessage + /// function. public: std::mutex subscriptionMutex; + + /// \brief All of the websocket connections. public: std::map> connections; + + /// \brief All of the subscribed Ignition topics. + /// The key is the topic name, and the value is the set of websocket + /// connections that have subscribed to the topic. public: std::map> topicConnections; /// \brief Time of last publication for each subscribed topic. The key diff --git a/plugins/websocket_server/ign.js b/plugins/websocket_server/ign.js index 838dbd30..f5efcd9c 100644 --- a/plugins/websocket_server/ign.js +++ b/plugins/websocket_server/ign.js @@ -15,6 +15,12 @@ * */ +/// \brief Construct a complete websocket message. +/// \param[in] _frameParts This must be an array of four strings: +/// 1. operation, +/// 2. topic name, +/// 3. message type, and +/// 4. payload function buildMsg(_frameParts) { return _frameParts.join(','); } From abb06f67e54604df12cf4bb968d71e80a6c1a2c0 Mon Sep 17 00:00:00 2001 From: Nate Koenig Date: Thu, 23 Apr 2020 06:04:13 -0700 Subject: [PATCH 08/13] Move index2.html to index.html and the old index.html to index-old.html Signed-off-by: Nate Koenig --- plugins/websocket_server/index-old.html | 268 +++++++++++++++++++++ plugins/websocket_server/index.html | 308 ++++-------------------- plugins/websocket_server/index2.html | 44 ---- 3 files changed, 310 insertions(+), 310 deletions(-) create mode 100644 plugins/websocket_server/index-old.html delete mode 100644 plugins/websocket_server/index2.html diff --git a/plugins/websocket_server/index-old.html b/plugins/websocket_server/index-old.html new file mode 100644 index 00000000..cf238618 --- /dev/null +++ b/plugins/websocket_server/index-old.html @@ -0,0 +1,268 @@ + + +WebSocket Test + + + + +

WebSocket Test

+ +
+ + + +
+ +
    +
  • By default, the Client will always have just one websocket connection. The Connect button closes the previous websocket.
  • +
  • The Server keeps track of the connections. If we open a new websocket without closing the old one, they will pile up on the Server.
  • +
+ + + + +

+ When you click the Subscribe button, you'll receive messages from the Server. +

+ +
+
+ +
diff --git a/plugins/websocket_server/index.html b/plugins/websocket_server/index.html index cf238618..aae1425a 100644 --- a/plugins/websocket_server/index.html +++ b/plugins/websocket_server/index.html @@ -1,268 +1,44 @@ - -WebSocket Test - - - + + + + - -

WebSocket Test

- -
- - - -
- -
    -
  • By default, the Client will always have just one websocket connection. The Connect button closes the previous websocket.
  • -
  • The Server keeps track of the connections. If we open a new websocket without closing the old one, they will pile up on the Server.
  • -
- - - - -

- When you click the Subscribe button, you'll receive messages from the Server. -

- -
-
- -
+ }); + + // Todo: + // 1. load the gz3d module. + // 2. Show simple shapes. + // 3. Get the meshes and other resource from the server. + + + + +
Status: disconnected
+
+ + diff --git a/plugins/websocket_server/index2.html b/plugins/websocket_server/index2.html deleted file mode 100644 index aae1425a..00000000 --- a/plugins/websocket_server/index2.html +++ /dev/null @@ -1,44 +0,0 @@ - - - - - WebSocket Test - - - - - - - - -
Status: disconnected
-
- - From 8a07d4d29569cd01d3bffcf3de8f8a0baf7dca78 Mon Sep 17 00:00:00 2001 From: Nate Koenig Date: Thu, 23 Apr 2020 06:03:05 -0700 Subject: [PATCH 09/13] Documentation and cleanup Signed-off-by: Nate Koenig --- Changelog.md | 4 ++++ plugins/websocket_server/WebsocketServer.cc | 21 ++++++++++++--------- plugins/websocket_server/WebsocketServer.hh | 9 +++++++++ plugins/websocket_server/ign.js | 6 ++++++ 4 files changed, 31 insertions(+), 9 deletions(-) diff --git a/Changelog.md b/Changelog.md index bbacc280..b6fe746b 100644 --- a/Changelog.md +++ b/Changelog.md @@ -2,6 +2,10 @@ ### Ignition Launch 1.X.X +1. Changed the websocket messages to be more efficient when transmitting + protobuf messages. + * [Pull Request 22](https://github.com/ignitionrobotics/ign-launch/pull/22) + 1. Added ability to set the publication rate of the websocket server. * [Pull Request 17](https://bitbucket.org/ignitionrobotics/ign-launch/pull-requests/17) diff --git a/plugins/websocket_server/WebsocketServer.cc b/plugins/websocket_server/WebsocketServer.cc index 5f57bae6..db818fee 100644 --- a/plugins/websocket_server/WebsocketServer.cc +++ b/plugins/websocket_server/WebsocketServer.cc @@ -25,7 +25,19 @@ using namespace ignition::launch; +/// \brief Construct a websocket frame header. +/// \param[in] _op The operation string. +/// \param[in] _topic The topic name string. +/// \param[in] _type The message type string. +/// \return A string that is the frame header. #define BUILD_HEADER(_op, _topic, _type) ((_op)+","+(_topic)+","+(_type)+",") + +/// \brief Construction a complete websocket frame. +/// \param[in] _op The operation string. +/// \param[in] _topic The topic name string. +/// \param[in] _type The message type string. +/// \param[in] _payload The complete payload string. +/// \return A string that is the frame header. #define BUILD_MSG(_op, _topic, _type, _payload) (BUILD_HEADER(_op, _topic, _type) + _payload) int rootCallback(struct lws *_wsi, @@ -270,15 +282,6 @@ void WebsocketServer::OnDisconnect(int _socketId) } } -// Option 1: Encode everything in a Packet. -// Pros: Works, and has serialization. -// Cons: Server need to de-serialize and re-serialize messages. -// -// Option 2: Send header along with payload. Don't encode the header, and -// only copy the serialized payload. -// -// Option 3: Encode the header and payload into a Packet. - ////////////////////////////////////////////////// void WebsocketServer::OnMessage(int _socketId, const std::string &_msg) { diff --git a/plugins/websocket_server/WebsocketServer.hh b/plugins/websocket_server/WebsocketServer.hh index 85c1349e..1126b8a2 100644 --- a/plugins/websocket_server/WebsocketServer.hh +++ b/plugins/websocket_server/WebsocketServer.hh @@ -142,8 +142,17 @@ namespace ignition const char *_data, const size_t _size); public: std::mutex mutex; + + /// \brief A mutex used in the OnWebsocketSubscribedMessage + /// function. public: std::mutex subscriptionMutex; + + /// \brief All of the websocket connections. public: std::map> connections; + + /// \brief All of the subscribed Ignition topics. + /// The key is the topic name, and the value is the set of websocket + /// connections that have subscribed to the topic. public: std::map> topicConnections; /// \brief Time of last publication for each subscribed topic. The key diff --git a/plugins/websocket_server/ign.js b/plugins/websocket_server/ign.js index 838dbd30..f5efcd9c 100644 --- a/plugins/websocket_server/ign.js +++ b/plugins/websocket_server/ign.js @@ -15,6 +15,12 @@ * */ +/// \brief Construct a complete websocket message. +/// \param[in] _frameParts This must be an array of four strings: +/// 1. operation, +/// 2. topic name, +/// 3. message type, and +/// 4. payload function buildMsg(_frameParts) { return _frameParts.join(','); } From cc2ee824f461098649fa71431b44ffe57a4a75c9 Mon Sep 17 00:00:00 2001 From: Nate Koenig Date: Thu, 23 Apr 2020 06:09:28 -0700 Subject: [PATCH 10/13] changelog --- Changelog.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Changelog.md b/Changelog.md index bbacc280..8408ddbc 100644 --- a/Changelog.md +++ b/Changelog.md @@ -2,6 +2,9 @@ ### Ignition Launch 1.X.X +1. Added ability to get protobuf definitions from the websocket server + * [Pull Request 21](https://github.com/ignitionrobotics/ign-launch/pull/21) + 1. Added ability to set the publication rate of the websocket server. * [Pull Request 17](https://bitbucket.org/ignitionrobotics/ign-launch/pull-requests/17) From 037e33c600355cb962c13e4ea60438512e14640a Mon Sep 17 00:00:00 2001 From: Nate Koenig Date: Sat, 18 Apr 2020 12:42:52 -0700 Subject: [PATCH 11/13] Websocket updates that allow a websocket client to get protobuf message definitions from the server. --- Changelog.md | 3 + plugins/websocket_server/CMakeLists.txt | 11 + .../websocket_server/MessageDefinitions.hh.in | 32 +++ plugins/websocket_server/WebsocketServer.cc | 14 +- plugins/websocket_server/WebsocketServer.hh | 16 ++ plugins/websocket_server/combined.proto | 125 +++++++++++ plugins/websocket_server/eventemitter2.min.js | 1 + plugins/websocket_server/ign.js | 204 ++++++++++++++++++ plugins/websocket_server/index2.html | 44 ++++ 9 files changed, 449 insertions(+), 1 deletion(-) create mode 100644 plugins/websocket_server/MessageDefinitions.hh.in create mode 100644 plugins/websocket_server/combined.proto create mode 100644 plugins/websocket_server/eventemitter2.min.js create mode 100644 plugins/websocket_server/ign.js create mode 100644 plugins/websocket_server/index2.html diff --git a/Changelog.md b/Changelog.md index 43379377..fdbc88ca 100644 --- a/Changelog.md +++ b/Changelog.md @@ -2,6 +2,9 @@ ### Ignition Launch 1.X.X +1. Added ability to get protobuf definitions from the websocket server + * [Pull Request 21](https://github.com/ignitionrobotics/ign-launch/pull/21) + 1. Added ability to set the publication rate of the websocket server. * [Pull Request 17](https://github.com/ignitionrobotics/ign-launch/pull/17) diff --git a/plugins/websocket_server/CMakeLists.txt b/plugins/websocket_server/CMakeLists.txt index 35e85081..3f781d66 100644 --- a/plugins/websocket_server/CMakeLists.txt +++ b/plugins/websocket_server/CMakeLists.txt @@ -3,6 +3,17 @@ if (websockets_FOUND) set (sources WebsocketServer.cc) add_library(${plugin} SHARED ${sources}) + + file (READ combined.proto MESSAGE_DEFINITIONS) + configure_file("MessageDefinitions.hh.in" + "${CMAKE_CURRENT_BINARY_DIR}/MessageDefinitions.hh" @ONLY) + # Add a dependency on binary source dir so that you can change the + # combinded.proto file and `make` will rebuild the plugin. + set_property(DIRECTORY APPEND PROPERTY CMAKE_CONFIGURE_DEPENDS combined.proto) + target_include_directories(${plugin} PRIVATE + $ + ) + target_link_libraries(${plugin} PRIVATE ${websockets_LIBRARIES} diff --git a/plugins/websocket_server/MessageDefinitions.hh.in b/plugins/websocket_server/MessageDefinitions.hh.in new file mode 100644 index 00000000..edb7cb5f --- /dev/null +++ b/plugins/websocket_server/MessageDefinitions.hh.in @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2019 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGE_DEFINITIONS_HH_ +#define IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGE_DEFINITIONS_HH_ + +namespace ignition +{ + namespace launch + { + /// \brief All of the protobuf messages provided by the websocket + /// server. + /// \todo Get this information from a running simulation. The problem + /// right now is that Ignition Transport does not show subscribers. + static const std::string kMessageDefinitions = + R"protos(@MESSAGE_DEFINITIONS@)protos"; + } +} +#endif diff --git a/plugins/websocket_server/WebsocketServer.cc b/plugins/websocket_server/WebsocketServer.cc index 8216cde9..9419c518 100644 --- a/plugins/websocket_server/WebsocketServer.cc +++ b/plugins/websocket_server/WebsocketServer.cc @@ -19,6 +19,7 @@ #include #include +#include "MessageDefinitions.hh" #include "WebsocketServer.hh" using namespace ignition::launch; @@ -268,10 +269,19 @@ void WebsocketServer::OnDisconnect(int _socketId) ////////////////////////////////////////////////// void WebsocketServer::OnMessage(int _socketId, const std::string &_msg) { + // Handle the case where the client requests the message definitions. + if (_msg == "message_definitions") + { + igndbg << "Message definitions request recieved\n"; + this->QueueMessage(this->connections[_socketId].get(), + kMessageDefinitions.c_str(), kMessageDefinitions.length()); + return; + } + ignition::msgs::WebRequest requestMsg; requestMsg.ParseFromString(_msg); - if (requestMsg.operation() == "list") + if (requestMsg.operation() == "topic_list") { igndbg << "Topic list request recieved\n"; ignition::msgs::Packet msg; @@ -285,6 +295,8 @@ void WebsocketServer::OnMessage(int _socketId, const std::string &_msg) for (const std::string &topic : topics) msg.mutable_string_msg_v()->add_data(topic); + msg.set_topic("/topic_list"); + msg.set_type("ignition.msgs.StringMsg_V"); std::string data = msg.SerializeAsString(); // Queue the message for delivery. diff --git a/plugins/websocket_server/WebsocketServer.hh b/plugins/websocket_server/WebsocketServer.hh index f7249325..bca14a20 100644 --- a/plugins/websocket_server/WebsocketServer.hh +++ b/plugins/websocket_server/WebsocketServer.hh @@ -35,8 +35,17 @@ namespace ignition /// \brief Reads from a USB joystick device and outputs /// ignition::msgs::WebsocketServer messages. /// + /// # Websocket Server Interface + /// + /// \todo: Describe the websocket API. + /// /// # Example usage /// + /// ## Websocket Server + /// + /// 1. Define a launch file by copying the following contents to a file + /// called `websocket.ign`. + /// /// /// @@ -44,6 +53,13 @@ namespace ignition /// /// 30 /// + /// + /// 2. Run the launch file + /// + /// `ign launch -v 4 websocket.ign` + /// + /// 3. Open the [index.html](https://github.com/ignitionrobotics/ign-launch/blob/master/plugins/websocket_server/index.html) webpage. + /// class WebsocketServer : public ignition::launch::Plugin { /// \brief Constructor diff --git a/plugins/websocket_server/combined.proto b/plugins/websocket_server/combined.proto new file mode 100644 index 00000000..1070d002 --- /dev/null +++ b/plugins/websocket_server/combined.proto @@ -0,0 +1,125 @@ +syntax = "proto3"; + +package ignition.msgs; + +message Time { + int64 sec = 1; + int32 nsec = 2; +} + +message Clock +{ + Header header = 1; + Time system = 2; + Time real = 3; + Time sim = 4; +} +message Header { + message Map { + string key = 1; + repeated string value = 2; + } + Time stamp = 1; + repeated Map data = 2; +} +message WebRequest +{ + Header header = 1; + string operation = 2; + string topic = 3; + string msg_type = 4; + string compression = 5; + double hz = 6; +} +message StringMsg_V +{ + repeated string data = 2; +} +message CmdVel2D +{ + Header header = 1; + double velocity = 2; + double theta = 3; +} +enum PixelFormatType +{ + UNKNOWN_PIXEL_FORMAT = 0; + L_INT8 = 1; + L_INT16 = 2; + RGB_INT8 = 3; + RGBA_INT8 = 4; + BGRA_INT8 = 5; + RGB_INT16 = 6; + RGB_INT32 = 7; + BGR_INT8 = 8; + BGR_INT16 = 9; + BGR_INT32 = 10; + R_FLOAT16 = 11; + RGB_FLOAT16 = 12; + R_FLOAT32 = 13; + RGB_FLOAT32 = 14; + BAYER_RGGB8 = 15; + BAYER_RGGR8 = 16; + BAYER_GBRG8 = 17; + BAYER_GRBG8 = 18; +} + +message Image +{ + Header header = 1; + uint32 width = 2; + uint32 height = 3; + uint32 pixel_format = 4; + uint32 step = 5; + bytes data = 6; +} +message Vector3d +{ + Header header = 1; + double x = 2; + double y = 3; + double z = 4; +} +message Pose +{ + Header header = 1; + string name = 2; + uint32 id = 3; + Vector3d position = 4; + Quaternion orientation = 5; +} +message Quaternion +{ + Header header = 1; + double x = 2; + double y = 3; + double z = 4; + double w = 5; +} +message Double_V +{ + repeated double data = 1; +} +message Pose_V +{ + Header header = 1; + repeated Pose pose = 2; +} +message Packet +{ + string topic = 1; + string type = 2; + + oneof content + { + CmdVel2D cmd_vel2d = 3; + Image image = 4; + StringMsg_V string_msg_v = 5; + WebRequest web_request = 6; + Pose pose = 7; + Double_V doublev = 8; + Pose_V pose_v = 9; + Time time = 10; + Clock clock = 11; + } +} diff --git a/plugins/websocket_server/eventemitter2.min.js b/plugins/websocket_server/eventemitter2.min.js new file mode 100644 index 00000000..a5af1eda --- /dev/null +++ b/plugins/websocket_server/eventemitter2.min.js @@ -0,0 +1 @@ +!function(e){function t(){this._events={},this._conf&&i.call(this,this._conf)}function i(e){e&&(this._conf=e,e.delimiter&&(this.delimiter=e.delimiter),e.maxListeners&&(this._events.maxListeners=e.maxListeners),e.wildcard&&(this.wildcard=e.wildcard),e.newListener&&(this.newListener=e.newListener),this.wildcard&&(this.listenerTree={}))}function s(e){this._events={},this.newListener=!1,i.call(this,e)}function n(e,t,i,s){if(!i)return[];var r,l,o,h,a,f,c,_=[],p=t.length,u=t[s],v=t[s+1];if(s===p&&i._listeners){if("function"==typeof i._listeners)return e&&e.push(i._listeners),[i];for(r=0,l=i._listeners.length;r0&&n._listeners.length>h&&(n._listeners.warned=!0,console.error("(node) warning: possible EventEmitter memory leak detected. %d listeners added. Use emitter.setMaxListeners() to increase limit.",n._listeners.length),console.trace())}}else n._listeners=t;return!0}r=e.shift()}return!0}var l=Array.isArray?Array.isArray:function(e){return"[object Array]"===Object.prototype.toString.call(e)},o=10;s.prototype.delimiter=".",s.prototype.setMaxListeners=function(e){this._events||t.call(this),this._events.maxListeners=e,this._conf||(this._conf={}),this._conf.maxListeners=e},s.prototype.event="",s.prototype.once=function(e,t){return this.many(e,1,t),this},s.prototype.many=function(e,t,i){function s(){0==--t&&n.off(e,s),i.apply(this,arguments)}var n=this;if("function"!=typeof i)throw new Error("many only accepts instances of Function");return s._origin=i,this.on(e,s),n},s.prototype.emit=function(){this._events||t.call(this);var e=arguments[0];if("newListener"===e&&!this.newListener&&!this._events.newListener)return!1;if(this._all){for(var i=arguments.length,s=new Array(i-1),r=1;r1)switch(arguments.length){case 2:l.call(this,arguments[1]);break;case 3:l.call(this,arguments[1],arguments[2]);break;default:for(var i=arguments.length,s=new Array(i-1),r=1;r0||!!this._all}return!!this._all},s.prototype.on=function(e,i){if("function"==typeof e)return this.onAny(e),this;if("function"!=typeof i)throw new Error("on only accepts instances of Function");if(this._events||t.call(this),this.emit("newListener",e,i),this.wildcard)return r.call(this,e,i),this;if(this._events[e]){if("function"==typeof this._events[e])this._events[e]=[this._events[e],i];else if(l(this._events[e])&&(this._events[e].push(i),!this._events[e].warned)){var s=o;void 0!==this._events.maxListeners&&(s=this._events.maxListeners),s>0&&this._events[e].length>s&&(this._events[e].warned=!0,console.error("(node) warning: possible EventEmitter memory leak detected. %d listeners added. Use emitter.setMaxListeners() to increase limit.",this._events[e].length),console.trace())}}else this._events[e]=i;return this},s.prototype.onAny=function(e){if("function"!=typeof e)throw new Error("onAny only accepts instances of Function");return this._all||(this._all=[]),this._all.push(e),this},s.prototype.addListener=s.prototype.on,s.prototype.off=function(e,t){if("function"!=typeof t)throw new Error("removeListener only takes instances of Function");var i,s=[];if(this.wildcard){var r="string"==typeof e?e.split(this.delimiter):e.slice();s=n.call(this,null,r,this.listenerTree,0)}else{if(!this._events[e])return this;i=this._events[e],s.push({_listeners:i})}for(var o=0;o0){for(i=0,s=(t=this._all).length;i + + + + WebSocket Test + + + + + + + + +
Status: disconnected
+
+ + From 57e2bc0a6b648dbea80dc0f4bcbde99a1434a4e2 Mon Sep 17 00:00:00 2001 From: Nate Koenig Date: Wed, 22 Apr 2020 08:49:35 -0700 Subject: [PATCH 12/13] Update websocket frames sent over the websocket. --- Changelog.md | 4 + plugins/websocket_server/CMakeLists.txt | 2 +- .../websocket_server/MessageDefinitions.hh.in | 11 +- plugins/websocket_server/WebsocketServer.cc | 101 ++++++++++-------- plugins/websocket_server/WebsocketServer.hh | 79 +++++++++++++- plugins/websocket_server/ign.js | 79 ++++++-------- 6 files changed, 176 insertions(+), 100 deletions(-) diff --git a/Changelog.md b/Changelog.md index fdbc88ca..3b5630f9 100644 --- a/Changelog.md +++ b/Changelog.md @@ -2,6 +2,10 @@ ### Ignition Launch 1.X.X +1. Changed the websocket messages to be more efficient when transmitting + protobuf messages. + * [Pull Request 22](https://github.com/ignitionrobotics/ign-launch/pull/22) + 1. Added ability to get protobuf definitions from the websocket server * [Pull Request 21](https://github.com/ignitionrobotics/ign-launch/pull/21) diff --git a/plugins/websocket_server/CMakeLists.txt b/plugins/websocket_server/CMakeLists.txt index 3f781d66..47a183a1 100644 --- a/plugins/websocket_server/CMakeLists.txt +++ b/plugins/websocket_server/CMakeLists.txt @@ -4,7 +4,7 @@ if (websockets_FOUND) add_library(${plugin} SHARED ${sources}) - file (READ combined.proto MESSAGE_DEFINITIONS) + file (READ combined.proto WEBSOCKETSERVER_MESSAGE_DEFINITIONS) configure_file("MessageDefinitions.hh.in" "${CMAKE_CURRENT_BINARY_DIR}/MessageDefinitions.hh" @ONLY) # Add a dependency on binary source dir so that you can change the diff --git a/plugins/websocket_server/MessageDefinitions.hh.in b/plugins/websocket_server/MessageDefinitions.hh.in index edb7cb5f..c6038d00 100644 --- a/plugins/websocket_server/MessageDefinitions.hh.in +++ b/plugins/websocket_server/MessageDefinitions.hh.in @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019 Open Source Robotics Foundation + * Copyright (C) 2020 Open Source Robotics Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,8 +14,8 @@ * limitations under the License. * */ -#ifndef IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGE_DEFINITIONS_HH_ -#define IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGE_DEFINITIONS_HH_ +#ifndef IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGEDEFINITIONS_HH_ +#define IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGEDEFINITIONS_HH_ namespace ignition { @@ -23,10 +23,11 @@ namespace ignition { /// \brief All of the protobuf messages provided by the websocket /// server. + /// /// \todo Get this information from a running simulation. The problem /// right now is that Ignition Transport does not show subscribers. - static const std::string kMessageDefinitions = - R"protos(@MESSAGE_DEFINITIONS@)protos"; + static const std::string kWebSocketServerMessageDefinitions = + R"protos(@WEBSOCKETSERVER_MESSAGE_DEFINITIONS@)protos"; } } #endif diff --git a/plugins/websocket_server/WebsocketServer.cc b/plugins/websocket_server/WebsocketServer.cc index 9419c518..db818fee 100644 --- a/plugins/websocket_server/WebsocketServer.cc +++ b/plugins/websocket_server/WebsocketServer.cc @@ -15,6 +15,7 @@ * */ +#include #include #include #include @@ -24,6 +25,21 @@ using namespace ignition::launch; +/// \brief Construct a websocket frame header. +/// \param[in] _op The operation string. +/// \param[in] _topic The topic name string. +/// \param[in] _type The message type string. +/// \return A string that is the frame header. +#define BUILD_HEADER(_op, _topic, _type) ((_op)+","+(_topic)+","+(_type)+",") + +/// \brief Construction a complete websocket frame. +/// \param[in] _op The operation string. +/// \param[in] _topic The topic name string. +/// \param[in] _type The message type string. +/// \param[in] _payload The complete payload string. +/// \return A string that is the frame header. +#define BUILD_MSG(_op, _topic, _type, _payload) (BUILD_HEADER(_op, _topic, _type) + _payload) + int rootCallback(struct lws *_wsi, enum lws_callback_reasons _reason, void * /*user*/, @@ -269,22 +285,31 @@ void WebsocketServer::OnDisconnect(int _socketId) ////////////////////////////////////////////////// void WebsocketServer::OnMessage(int _socketId, const std::string &_msg) { - // Handle the case where the client requests the message definitions. - if (_msg == "message_definitions") + // Frame: operation,topic,type,payload + std::vector frameParts = common::split(_msg, ","); + + // Check for a valid frame. + if (frameParts.size() != 4 && + // Count the number of commas to handle a frame like "sub,,," + std::count(_msg.begin(), _msg.end(), ',') != 3) { - igndbg << "Message definitions request recieved\n"; - this->QueueMessage(this->connections[_socketId].get(), - kMessageDefinitions.c_str(), kMessageDefinitions.length()); + ignerr << "Received an invalid frame with " << frameParts.size() + << "components when 4 is expected.\n"; return; } - ignition::msgs::WebRequest requestMsg; - requestMsg.ParseFromString(_msg); - - if (requestMsg.operation() == "topic_list") + // Handle the case where the client requests the message definitions. + if (frameParts[0] == "protos") + { + igndbg << "Protos request received\n"; + this->QueueMessage(this->connections[_socketId].get(), + kWebSocketServerMessageDefinitions.c_str(), + kWebSocketServerMessageDefinitions.length()); + } + else if (frameParts[0] == "topics") { igndbg << "Topic list request recieved\n"; - ignition::msgs::Packet msg; + ignition::msgs::StringMsg_V msg; std::vector topics; @@ -293,25 +318,24 @@ void WebsocketServer::OnMessage(int _socketId, const std::string &_msg) // Store the topics in a message and serialize the message. for (const std::string &topic : topics) - msg.mutable_string_msg_v()->add_data(topic); + msg.add_data(topic); - msg.set_topic("/topic_list"); - msg.set_type("ignition.msgs.StringMsg_V"); - std::string data = msg.SerializeAsString(); + std::string data = BUILD_MSG(this->operations[PUBLISH], frameParts[0], + std::string("ignition.msgs.StringMsg_V"), msg.SerializeAsString()); // Queue the message for delivery. this->QueueMessage(this->connections[_socketId].get(), data.c_str(), data.length()); } - else if (requestMsg.operation() == "subscribe") + else if (frameParts[0] == "sub") { // Store the relation of socketId to subscribed topic. - this->topicConnections[requestMsg.topic()].insert(_socketId); - this->topicTimestamps[requestMsg.topic()] = + this->topicConnections[frameParts[1]].insert(_socketId); + this->topicTimestamps[frameParts[1]] = std::chrono::steady_clock::now() - this->publishPeriod; - igndbg << "Subscribe request to topic[" << requestMsg.topic() << "]\n"; - this->node.SubscribeRaw(requestMsg.topic(), + igndbg << "Subscribe request to topic[" << frameParts[1] << "]\n"; + this->node.SubscribeRaw(frameParts[1], std::bind(&WebsocketServer::OnWebsocketSubscribedMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); @@ -328,44 +352,35 @@ void WebsocketServer::OnWebsocketSubscribedMessage( if (iter != this->topicConnections.end()) { + std::lock_guard mainLock(this->subscriptionMutex); std::chrono::time_point systemTime = std::chrono::steady_clock::now(); - ignition::msgs::Packet msg; - msg.set_topic(_info.Topic()); - msg.set_type(_info.Type()); - std::chrono::nanoseconds timeDelta = systemTime - this->topicTimestamps[_info.Topic()]; if (timeDelta > this->publishPeriod) { - if (_info.Type() == "ignition.msgs.CmdVel2D") - msg.mutable_cmd_vel2d()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.Image") - msg.mutable_image()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.StringMsg_V") - msg.mutable_string_msg_v()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.WebRequest") - msg.mutable_web_request()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.Pose") - msg.mutable_pose()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.Pose_V") - msg.mutable_pose_v()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.Time") - msg.mutable_time()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.Clock") - msg.mutable_clock()->ParseFromArray(_data, _size); - else if (_info.Type() == "ignition.msgs.WorldStatistics") - msg.mutable_world_stats()->ParseFromArray(_data, _size); + // Get the header, or build a new header if it doesn't exist. + auto header = this->publishHeaders.find(_info.Topic()); + if (header == this->publishHeaders.end()) + { + this->publishHeaders[_info.Topic()] = BUILD_HEADER( + this->operations[PUBLISH], _info.Topic(), _info.Type()); + header = this->publishHeaders.find(_info.Topic()); + } + // Store the last time this topic was published. this->topicTimestamps[_info.Topic()] = systemTime; - std::string data = msg.SerializeAsString(); + // Construct the final message. + std::string msg = header->second + std::string(_data, _size); + + // Send the message for (const int &socketId : iter->second) { this->QueueMessage(this->connections[socketId].get(), - data.c_str(), data.length()); + msg.c_str(), msg.length()); } } } diff --git a/plugins/websocket_server/WebsocketServer.hh b/plugins/websocket_server/WebsocketServer.hh index bca14a20..1126b8a2 100644 --- a/plugins/websocket_server/WebsocketServer.hh +++ b/plugins/websocket_server/WebsocketServer.hh @@ -37,7 +37,45 @@ namespace ignition /// /// # Websocket Server Interface /// - /// \todo: Describe the websocket API. + /// The websocket server listens for incoming requests and sends + /// messages based on the requests. + /// + /// All messages on the websocket, incoming and outgoing, are structured + /// in a frame that consists of four comma separated components: + /// 1. `operation`: string, + /// 2. `topic_name`: string, + /// 3. `message_type`: string, and + /// 4. `payload`: serialized data. + /// + /// The `operation` component is mandatory and must be one of: + /// 1. "sub": Subscribe to the topic in the `topic_name` component, + /// 2. "pub": Publish a message from the Ingition Transport topic in + /// the `topic_name` component, + /// 3. "topics": Get the list of available topics, and + /// 4. "protos": Get a string containing all the protobuf + /// definitions. + /// + /// The `topic_name` component is mandatory for the "sub" and "pub" + /// operations. If present, it must be the name of an Ignition Transport + /// topic. + /// + /// The `message_type` component is mandatory for the "pub" operation. If + /// present it names the Ignition Message type, such as + /// "ignition.msgs.Clock". + /// + /// The `payload` component is mandatory for the "pub" operation. If + /// present, it contains a serialized string of an Igntion Message. + /// + /// ## Example frames + /// + /// 1. Get the list of topics: `topics,,,` + /// + /// 2. Get the protobuf definitions: `protos,,,` + /// + /// 3. Subscribe to the "/clock" topic: `sub,/clock,,` + /// + /// 4. Websocker server publishing data on the "/clock" topic: + /// `pub,/clock,ignition.msgs.Clock,` /// /// # Example usage /// @@ -80,11 +118,11 @@ namespace ignition public: void OnConnect(int _socketId); public: void OnDisconnect(int _socketId); + public: void OnMessage(int _socketId, const std::string &_msg); public: void OnRequestMessage(int _socketId, const std::string &_msg); - private: ignition::transport::Node node; private: bool run = true; @@ -104,7 +142,17 @@ namespace ignition const char *_data, const size_t _size); public: std::mutex mutex; + + /// \brief A mutex used in the OnWebsocketSubscribedMessage + /// function. + public: std::mutex subscriptionMutex; + + /// \brief All of the websocket connections. public: std::map> connections; + + /// \brief All of the subscribed Ignition topics. + /// The key is the topic name, and the value is the set of websocket + /// connections that have subscribed to the topic. public: std::map> topicConnections; /// \brief Time of last publication for each subscribed topic. The key @@ -114,6 +162,33 @@ namespace ignition std::chrono::time_point> topicTimestamps; + /// \brief The set of valid operations. This enum must align with the + /// `operations` member variable. + private: enum Operation + { + /// \brief Subscribe to a topic. + SUBSCRIBE = 0, + + /// \brief Publish a message from a topic. + PUBLISH = 1, + + /// \brief Get the list of topics. + TOPICS = 2, + + /// \brief Get the protobuf definitions. + PROTOS = 3, + }; + + /// \brief The set of valid operations, in string form. These values + /// can be sent in websocket message frames. + /// These valus must align with the `Operation` enum. + private: std::vector operations{ + "sub", "pub", "topics", "protos"}; + + /// \brief Store publish headers for topics. This is here to improve + /// performance. Keys are topic names and values are frame headers. + private: std::map publishHeaders; + /// \brief Period at which messages will be published on the websocket /// for each subscribed topic. /// \sa topicTimestamps. diff --git a/plugins/websocket_server/ign.js b/plugins/websocket_server/ign.js index 810d511d..f5efcd9c 100644 --- a/plugins/websocket_server/ign.js +++ b/plugins/websocket_server/ign.js @@ -15,6 +15,16 @@ * */ +/// \brief Construct a complete websocket message. +/// \param[in] _frameParts This must be an array of four strings: +/// 1. operation, +/// 2. topic name, +/// 3. message type, and +/// 4. payload +function buildMsg(_frameParts) { + return _frameParts.join(','); +} + /// \brief The main interface to the Ignition websocket server and /// data on Ignition Transport. function Ignition(options) { @@ -42,8 +52,7 @@ Ignition.prototype.connect = function(url) { /// \brief Emits a 'connection' event on WebSocket connection. /// \param event - the argument to emit with the event. function onOpen(event) { - that.socket.send("message_definitions"); - + that.socket.send(buildMsg(["protos",'','',''])); } /// \brief Emits a 'close' event on WebSocket disconnection. @@ -64,7 +73,6 @@ Ignition.prototype.connect = function(url) { // \param message - the JSON message from the Ignition // httpserver. function onMessage(_message) { - if (that.root === undefined || that.root === null) { // Read the Blob as an array buffer var f = new FileReader(); @@ -77,12 +85,7 @@ Ignition.prototype.connect = function(url) { that.emit('connection', event); // Request the list of topics on start. - // \todo: Check that the "WebRequest message exists. - var msgType = that.root.lookupType("WebRequest"); - var webRequestMsg = msgType.encode({ - operation: "topic_list", - }).finish(); - that.socket.send(webRequestMsg); + that.socket.send(buildMsg(['topics','','',''])); }; // Read the blob data as an array buffer. @@ -90,39 +93,26 @@ Ignition.prototype.connect = function(url) { return; } - // \todo: Check that packetMsgType is valid - var packetMsgType = that.root.lookup("ignition.msgs.Packet"); - - // Read the Blob as an array buffer var f = new FileReader(); f.onloadend = function(event) { - // This is the proto message data - var contents = event.target.result; - - // \todo: Check for error - var error = event.target.error; - - // Get the decoded packet - var packetMsg = packetMsgType.decode(new Uint8Array(contents)); - - // Get the "oneof" message - var which = packetMsg.content; - var oneOfMsg = which !== null ? packetMsg[which] : null; - - // console.log("Actual topic: ", packetMsg.topic); - // console.log("Actual type: ", packetMsg.type); - // console.log("Actual Which: ", which); - // console.log("Actual Msg: ", oneOfMsg); - - if (packetMsg.topic === "/topic_list") { - // The "/topic_list" topic is a special case - that.topics = oneOfMsg.data; - } - // Otherwise emit the message. - else - { + // Decode as UTF-8 to get the header + var str = new TextDecoder("utf-8").decode(event.target.result); + const frameParts = str.split(','); + var msgType = that.root.lookup(frameParts[2]); + var buf = new Uint8Array(event.target.result); + + // Decode the message. The "+3" in the slice accounts for the commas + // in the frame. + var msg = msgType.decode( + buf.slice(frameParts[0].length + frameParts[1].length + + frameParts[2].length+3)); + + // Handle the topic list special case. + if (frameParts[1] == 'topics') { + that.topics = msg.data; + } else { // This will pass along the message on the appropriate topic. - that.emit(packetMsg.topic, oneOfMsg); + that.emit(frameParts[1], msg); } } // Read the blob data as an array buffer. @@ -181,17 +171,8 @@ Topic.prototype.subscribe = function(_callback) { // Register the callback with the topic name that.ign.on(that.name, _cb); - // \todo: Check that requesttMsgType is valid - var requestMsgType = that.ign.root.lookup("ignition.msgs.WebRequest"); - - // Create the subscription request message - var webRequestMsg = requestMsgType.encode({ - operation: "subscribe", - topic: that.name, - }).finish(); - // Send the subscription message over the websocket. - that.ign.sendMsg(webRequestMsg); + that.ign.sendMsg(buildMsg(['sub', that.name, '', ''])); } if (!this.ign.isConnected) { From 87d5a5164086c65979e3f1ce39c873c03ea99361 Mon Sep 17 00:00:00 2001 From: Nate Koenig Date: Mon, 1 Jun 2020 10:42:23 -0700 Subject: [PATCH 13/13] Fix function spelling Signed-off-by: Nate Koenig --- plugins/websocket_server/ign.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/websocket_server/ign.js b/plugins/websocket_server/ign.js index f5efcd9c..aa6d74c7 100644 --- a/plugins/websocket_server/ign.js +++ b/plugins/websocket_server/ign.js @@ -180,6 +180,6 @@ Topic.prototype.subscribe = function(_callback) { emitter(_callback); }); } else { - emiiter(_callback); + emitter(_callback); } };