Skip to content

Commit

Permalink
[ntcore] Networking improvements (#5659)
Browse files Browse the repository at this point in the history
- Utilize TrySend to properly backpressure network traffic
- Split updates into reasonable sized frames using WS fragmentation
- Use WS pings for network aliveness (requires 4.1 protocol revision)
- Measure RTT only at start of connection, rather than periodically
  (this avoids them being affected by other network traffic)
- Refactor network queue
- Refactor network ping, ping from server as well
- Improve meta topic performance
- Implement unified approach for network value updates (currently client and server use very different approaches) that respects requested subscriber update frequency

This adds a new protocol version (4.1) due to WS bugs in prior versions.
  • Loading branch information
PeterJohnson authored Oct 5, 2023
1 parent 1d19e09 commit 8b7c685
Show file tree
Hide file tree
Showing 21 changed files with 1,352 additions and 933 deletions.
51 changes: 45 additions & 6 deletions ntcore/doc/networktables4.adoc
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
= Network Tables Protocol Specification, Version 4.0
= Network Tables Protocol Specification, Version 4.1
WPILib Developers <wpilib@wpi.edu>
Protocol Revision 4.0, 2/14/2021
Protocol Revision 4.1, 10/1/2023
:toc:
:toc-placement: preamble
:sectanchors:

A pub/sub WebSockets protocol based on NetworkTables concepts.

[[motivation4.1]]
== Motivation for Version 4.1

While NetworkTables 4.0 made a large number of improvements to the 3.0 protocol, a few weaknesses have been discovered in "real world" use:

* Keep alives are not required. This can result in very long timeframes before a disconnect is detected.
* Periodic synchronization of timestamps is impacted by high variability of round trip time measurements on a stream connection shared with other data (due to network queueing in adverse network connections), resulting in values being "too old" even if actually more recent due to a change in base time
* Disconnect loops can be caused by large amounts of data values being sent in response to a "subscribe all" type of message (e.g. subscribe with empty or `$` prefix), resulting in data transmission being blocked for an excessive amount of time
* Publishing operations are not clearly subscriber-driven; the information is available via metatopics but not automatically sent to clients when clients publish

Version 4.1 makes the following key changes to address these weaknesses:

* Mandate the server and client send periodic WebSockets PING messages and track PONG responses
* Recommend that timestamp synchronization occur immediately following connection establishment and prior to any other control messages
* Recommend text and binary combining into a single WebSockets frame be limited to the network MTU (unless necessary to transport the message)
* Recommend WebSockets fragmentation be used on large frames to enable rapid handling of PING messages
* Add an option for topics to be marked transient (in which case no last value is retained by the server or sent to clients on initial subscription)
* Recommend clients subscribe to the `$sub$<topic>` meta-topic for each topic published by the client, and use this information to control what value updates are sent over the network to the server

Version 4.1 uses a different WebSockets subprotocol string than version 4.0, so it is easy for both clients and servers to simultaneously support both versions 4.0 and 4.1. Due to WebSockets implementation bugs in version 4.0, version 4.1 implementations must not send WebSockets PING messages on version 4.0 connections.

[[motivation]]
== Motivation
== Motivation for Version 4.0

Currently in NetworkTables there is no way to synchronize user value updates and NT update sweeps, and if user value updates occur more frequently than NT update sweeps, the intermediate values are lost. This prevents NetworkTables from being a viable transport layer for seeing all value changes (e.g. for plotting) at rates higher than the NetworkTables update rate (e.g. for capturing high frequency PID changes). While custom code can work around the second issue, it is more difficult to work around the first issue (unless full timestamps are also sent).
In <<networktables3,NetworkTables 3.0>>, there is no way to synchronize user value updates and NT update sweeps, and if user value updates occur more frequently than NT update sweeps, the intermediate values are lost. This prevents NetworkTables from being a viable transport layer for seeing all value changes (e.g. for plotting) at rates higher than the NetworkTables update rate (e.g. for capturing high frequency PID changes). While custom code can work around the second issue, it is more difficult to work around the first issue (unless full timestamps are also sent).

Adding built-in support for capturing and communicating all timestamped data value changes with minimal additional user code changes will make it much easier for inexperienced teams to get high resolution, accurate data to dashboard displays with the minimal possible bandwidth and airtime usage. Assuming the dashboard performs record and playback of NT updates, this also meets the desire to provide teams a robust data capture and playback mechanism.

Expand Down Expand Up @@ -67,6 +88,15 @@ When the client receives an -1 ID message from the server, it shall compute the

Due to the fact there can be multiple publishers for a single topic and unpredictable network delays / clock drift, there is no global total order for timestamps either globally or on a per-topic basis. While single publishers for real-time data will be the norm, and in that case the timestamps will usually be in order, applications that use timestamps need to be able to handle out-of-order timestamps.

[[aliveness]]
=== Connection Aliveness Checking

With a version 4.1 connection, both the client and the server should send periodic WebSockets PING messages and look for a PONG response within a reasonable period of time. On version 4.0 connections, or if this is not possible (e.g. the underlying WebSockets implementation does not have the ability to send PING messages), the client should use timestamp messages for aliveness testing. If no response is received after an appropriate amount of time, the client or server shall disconnect the WebSockets connection and try to re-establish a new connection.

As the WebSockets protocol allows PONG responses to be sent in the middle of another message stream, WebSockets PING messages are preferred, as this allows for a shorter timeout period that is not dependent on the size of the transmitted messages. Sending a ping every 200 ms with a timeout of 1 second is recommended in this case.

If using timestamp messages for aliveness checking, the client should use a timeout long enough to account for the largest expected message size (as the server can only respond after such a message has been completely transmitted). Sending a ping every 1 second with a timeout of 3 seconds is recommended in this case.

[[reconnection]]
=== Caching and Reconnection Handling

Expand Down Expand Up @@ -127,10 +157,12 @@ The server may operate in-process to an application (e.g. a robot program). In

Clients are responsible for keeping server connections established (e.g. via retries when a connection is lost). Topic IDs must be treated as connection-specific; if the connection to the server is lost, the client is responsible for sending new <<msg-publish,`publish`>> and <<msg-subscribe,`subscribe`>> messages as required for the application when a new connection is established, and not using old topic IDs, but rather waiting for new <<msg-announce,`announce`>> messages to be received.

Except for offline-published values with timestamps of 0, the client shall not send any other published values to the server until its clock is synchronized with the server per the <<timestamps>> section.
Except for offline-published values with timestamps of 0, the client shall not send any other published values to the server until its clock is synchronized with the server per the <<timestamps>> section. Clients should measure RTT prior to sending any control messages (to avoid other traffic disrupting the measurement).

Clients may publish a value at any time following clock synchronization. Clients may subscribe to meta-topics to determine whether or not to publish a value change (e.g. based on whether there are any subscribers, or based on specific <<sub-options>>).

Clients should subscribe to the `$sub$<topic>` meta topic for each topic published and use this metadata to determine how frequently to send updates to the network. However, this is not required--clients may choose to ignore this and send updates at any time.

[[meta-topics]]
=== Server-Published Meta Topics

Expand Down Expand Up @@ -300,10 +332,17 @@ Both clients and servers shall support unsecure connections (`ws:`) and may supp

Servers shall support a resource name of `/nt/<name>`, where `<name>` is an arbitrary string representing the client name. The client name does not need to be unique; multiple connections to the same name are allowed; the server shall ensure the name is unique (for the purposes of meta-topics) by appending a '@' and a unique number (if necessary). To support this, the name provided by the client should not contain an embedded '@'. Clients should provide a way to specify the resource name (in particular, the client name portion).

Both clients and servers shall support/use subprotocol `networktables.first.wpi.edu` for this protocol. Clients and servers shall terminate the connection in accordance with the WebSocket protocol unless both sides support this subprotocol.
Both clients and servers should support/use subprotocol `v4.1.networktables.first.wpi.edu` (for version 4.1) and `networktables.first.wpi.edu` (for version 4.0). Version 4.1 should be preferred, with version 4.0 as a fallback, using standard WebSockets subprotocol negotiation. Clients and servers shall terminate the connection in accordance with the WebSocket protocol unless both sides support a common subprotocol.

The unsecure standard server port number shall be 5810, the secure standard port number shall be 5811.

[[fragmentation]]
=== Fragmentation

Combining multiple text or binary messages into a single WebSockets frame should be limited such that the WebSockets frame does not exceed the MTU unless otherwise required to fit the total binary data size.

Client and server implementations should fragment WebSockets messages to roughly the network MTU in order to facilitate rapid handling of PING and PONG messages.

[[data-types]]
== Supported Data Types

Expand Down
50 changes: 27 additions & 23 deletions ntcore/src/main/native/cpp/NetworkClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ void NetworkClientBase::DoDisconnect(std::string_view reason) {
if (m_readLocalTimer) {
m_readLocalTimer->Stop();
}
if (m_sendValuesTimer) {
m_sendValuesTimer->Stop();
if (m_sendOutgoingTimer) {
m_sendOutgoingTimer->Stop();
}
m_localStorage.ClearNetwork();
m_localQueue.ClearQueue();
Expand All @@ -150,9 +150,9 @@ NetworkClient3::NetworkClient3(int inst, std::string_view id,
loop, kReconnectRate, m_logger,
[this](uv::Tcp& tcp) { TcpConnected(tcp); });

m_sendValuesTimer = uv::Timer::Create(loop);
if (m_sendValuesTimer) {
m_sendValuesTimer->timeout.connect([this] {
m_sendOutgoingTimer = uv::Timer::Create(loop);
if (m_sendOutgoingTimer) {
m_sendOutgoingTimer->timeout.connect([this] {
if (m_clientImpl) {
HandleLocal();
m_clientImpl->SendPeriodic(m_loop.Now().count(), false);
Expand Down Expand Up @@ -206,9 +206,9 @@ void NetworkClient3::TcpConnected(uv::Tcp& tcp) {
auto clientImpl = std::make_shared<net3::ClientImpl3>(
m_loop.Now().count(), m_inst, *wire, m_logger, [this](uint32_t repeatMs) {
DEBUG4("Setting periodic timer to {}", repeatMs);
if (m_sendValuesTimer) {
m_sendValuesTimer->Start(uv::Timer::Time{repeatMs},
uv::Timer::Time{repeatMs});
if (m_sendOutgoingTimer) {
m_sendOutgoingTimer->Start(uv::Timer::Time{repeatMs},
uv::Timer::Time{repeatMs});
}
});
clientImpl->Start(
Expand Down Expand Up @@ -302,18 +302,18 @@ NetworkClient::NetworkClient(
m_readLocalTimer->timeout.connect([this] {
if (m_clientImpl) {
HandleLocal();
m_clientImpl->SendControl(m_loop.Now().count());
m_clientImpl->SendOutgoing(m_loop.Now().count(), false);
}
});
m_readLocalTimer->Start(uv::Timer::Time{100}, uv::Timer::Time{100});
}

m_sendValuesTimer = uv::Timer::Create(loop);
if (m_sendValuesTimer) {
m_sendValuesTimer->timeout.connect([this] {
m_sendOutgoingTimer = uv::Timer::Create(loop);
if (m_sendOutgoingTimer) {
m_sendOutgoingTimer->timeout.connect([this] {
if (m_clientImpl) {
HandleLocal();
m_clientImpl->SendValues(m_loop.Now().count(), false);
m_clientImpl->SendOutgoing(m_loop.Now().count(), false);
}
});
}
Expand All @@ -324,7 +324,7 @@ NetworkClient::NetworkClient(
m_flush->wakeup.connect([this] {
if (m_clientImpl) {
HandleLocal();
m_clientImpl->SendValues(m_loop.Now().count(), true);
m_clientImpl->SendOutgoing(m_loop.Now().count(), true);
}
});
}
Expand Down Expand Up @@ -369,37 +369,41 @@ void NetworkClient::TcpConnected(uv::Tcp& tcp) {
wpi::SmallString<128> idBuf;
auto ws = wpi::WebSocket::CreateClient(
tcp, fmt::format("/nt/{}", wpi::EscapeURI(m_id, idBuf)), "",
{{"networktables.first.wpi.edu"}}, options);
{"v4.1.networktables.first.wpi.edu", "networktables.first.wpi.edu"},
options);
ws->SetMaxMessageSize(kMaxMessageSize);
ws->open.connect([this, &tcp, ws = ws.get()](std::string_view) {
ws->open.connect([this, &tcp, ws = ws.get()](std::string_view protocol) {
if (m_connList.IsConnected()) {
ws->Terminate(1006, "no longer needed");
return;
}
WsConnected(*ws, tcp);
WsConnected(*ws, tcp, protocol);
});
}

void NetworkClient::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp) {
void NetworkClient::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp,
std::string_view protocol) {
if (m_parallelConnect) {
m_parallelConnect->Succeeded(tcp);
}

ConnectionInfo connInfo;
uv::AddrToName(tcp.GetPeer(), &connInfo.remote_ip, &connInfo.remote_port);
connInfo.protocol_version = 0x0400;
connInfo.protocol_version =
protocol == "v4.1.networktables.first.wpi.edu" ? 0x0401 : 0x0400;

INFO("CONNECTED NT4 to {} port {}", connInfo.remote_ip, connInfo.remote_port);
m_connHandle = m_connList.AddConnection(connInfo);

m_wire = std::make_shared<net::WebSocketConnection>(ws);
m_wire =
std::make_shared<net::WebSocketConnection>(ws, connInfo.protocol_version);
m_clientImpl = std::make_unique<net::ClientImpl>(
m_loop.Now().count(), m_inst, *m_wire, m_logger, m_timeSyncUpdated,
[this](uint32_t repeatMs) {
DEBUG4("Setting periodic timer to {}", repeatMs);
if (m_sendValuesTimer) {
m_sendValuesTimer->Start(uv::Timer::Time{repeatMs},
uv::Timer::Time{repeatMs});
if (m_sendOutgoingTimer) {
m_sendOutgoingTimer->Start(uv::Timer::Time{repeatMs},
uv::Timer::Time{repeatMs});
}
});
m_clientImpl->SetLocal(&m_localStorage);
Expand Down
5 changes: 3 additions & 2 deletions ntcore/src/main/native/cpp/NetworkClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class NetworkClientBase : public INetworkClient {
// used only from loop
std::shared_ptr<wpi::ParallelTcpConnector> m_parallelConnect;
std::shared_ptr<wpi::uv::Timer> m_readLocalTimer;
std::shared_ptr<wpi::uv::Timer> m_sendValuesTimer;
std::shared_ptr<wpi::uv::Timer> m_sendOutgoingTimer;
std::shared_ptr<wpi::uv::Async<>> m_flushLocal;
std::shared_ptr<wpi::uv::Async<>> m_flush;

Expand Down Expand Up @@ -138,7 +138,8 @@ class NetworkClient final : public NetworkClientBase {
private:
void HandleLocal();
void TcpConnected(wpi::uv::Tcp& tcp) final;
void WsConnected(wpi::WebSocket& ws, wpi::uv::Tcp& tcp);
void WsConnected(wpi::WebSocket& ws, wpi::uv::Tcp& tcp,
std::string_view protocol);
void ForceDisconnect(std::string_view reason) override;
void DoDisconnect(std::string_view reason) override;

Expand Down
Loading

0 comments on commit 8b7c685

Please sign in to comment.