Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ntcore] Add cached topic property #5494

Merged
merged 15 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions ntcore/doc/networktables4.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Version 4.1 makes the following key changes to address these weaknesses:
* Recommend that timestamp synchronization occur immediately following connection establishment and prior to any other control messages
* Recommend text and binary combining into a single WebSockets frame be limited to the network MTU (unless necessary to transport the message)
* Recommend WebSockets fragmentation be used on large frames to enable rapid handling of PING messages
* Add an option for topics to be marked transient (in which case no last value is retained by the server or sent to clients on initial subscription)
* Add a (default true) option for topics to be marked cached (in which case the last value is retained by the server and sent to clients on initial subscription)
* Recommend clients subscribe to the `$sub$<topic>` meta-topic for each topic published by the client, and use this information to control what value updates are sent over the network to the server

Version 4.1 uses a different WebSockets subprotocol string than version 4.0, so it is easy for both clients and servers to simultaneously support both versions 4.0 and 4.1. Due to WebSockets implementation bugs in version 4.0, version 4.1 implementations must not send WebSockets PING messages on version 4.0 connections.
Expand Down Expand Up @@ -100,7 +100,7 @@ If using timestamp messages for aliveness checking on the primary connection, th
[[reconnection]]
=== Caching and Reconnection Handling

Servers shall keep a retained value for each topic for the purposes of <<msg-subscribe>> requests; the retained value shall be the value in the largest timestamp (greater-than or equal-to) message received for that topic. This retained value is deleted if the topic is deleted (e.g. there are no more publishers).
Servers shall keep a retained value for each cached topic for the purposes of <<msg-subscribe>> requests; the retained value shall be the value in the largest timestamp (greater-than or equal-to) message received for that topic. This retained value is deleted if the topic is deleted (e.g. there are no more publishers).

Clients may similarly keep a retained value for each topic for ease of use by user code. If this is done, this retained value shall be updated by both locally published values and received messages for that topic with greater-than/equal-to timestamps, and the retained value shall be deleted when a <<msg-unannounce>> is received.

Expand Down Expand Up @@ -419,6 +419,7 @@ Each published topic may also have properties associated to it. Properties are
|Property|Type|Description|Notes
|`persistent`|boolean|Persistent Flag|If true, the last set value will be periodically saved to persistent storage on the server and be restored during server startup. Topics with this property set to true will not be deleted by the server when the last publisher stops publishing.
|`retained`|boolean|Retained Flag|Topics with this property set to true will not be deleted by the server when the last publisher stops publishing.
|`cached`|boolean|Cached Flag|If false, the server and clients will not store the value of the topic. This means that only value updates will be available for the topic.
|===

[[sub-options]]
Expand Down Expand Up @@ -619,7 +620,7 @@ If a property is not included in the update map, its value is not changed. If a
[[msg-subscribe]]
==== Subscribe Message (`subscribe`)

Sent from a client to the server to indicate the client wants to subscribe to value changes for the specified topics / groups of topics. The server shall send MessagePack messages containing the current values for any existing topics upon receipt, and continue sending MessagePack messages for future value changes. If a topic does not yet exist, no message is sent until it is created (via a publish), at which point a <<msg-announce>> will be sent and MessagePack messages will automatically follow as they are published.
Sent from a client to the server to indicate the client wants to subscribe to value changes for the specified topics / groups of topics. The server shall send MessagePack messages containing the current values for any existing cached topics upon receipt, and continue sending MessagePack messages for future value changes. If a topic does not yet exist, no message is sent until it is created (via a publish), at which point a <<msg-announce>> will be sent and MessagePack messages will automatically follow as they are published.

Subscriptions may overlap; only one MessagePack message is sent per value change regardless of the number of subscriptions. Sending a `subscribe` message with the same subscription UID as a previous `subscribe` message results in updating the subscription (replacing the array of identifiers and updating any specified options).

Expand Down
4 changes: 4 additions & 0 deletions ntcore/src/generate/main/java/NetworkTablesJNI.java.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ public final class NetworkTablesJNI {

public static native boolean getTopicRetained(int topic);

public static native void setTopicCached(int topic, boolean value);

public static native boolean getTopicCached(int topic);

public static native String getTopicTypeString(int topic);

public static native boolean getTopicExists(int topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ public static native TopicInfo[] getTopicInfosStr(

public static native boolean getTopicRetained(int topic);

public static native void setTopicCached(int topic, boolean value);

public static native boolean getTopicCached(int topic);

public static native String getTopicTypeString(int topic);

public static native boolean getTopicExists(int topic);
Expand Down
19 changes: 19 additions & 0 deletions ntcore/src/main/java/edu/wpi/first/networktables/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,25 @@ public boolean isRetained() {
return NetworkTablesJNI.getTopicRetained(m_handle);
}

/**
* Allow storage of the topic's last value, allowing the value to be read (and not just accessed
* through event queues and listeners).
*
* @param cached True for cached, false for not cached.
*/
public void setCached(boolean cached) {
NetworkTablesJNI.setTopicCached(m_handle, cached);
}

/**
* Returns whether the topic's last value is stored.
*
* @return True if the topic is cached.
*/
public boolean isCached() {
return NetworkTablesJNI.getTopicCached(m_handle);
}

/**
* Determines if the topic is currently being published.
*
Expand Down
99 changes: 81 additions & 18 deletions ntcore/src/main/native/cpp/LocalStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,22 +175,26 @@ void LocalStorage::Impl::CheckReset(TopicData* topic) {
}

bool LocalStorage::Impl::SetValue(TopicData* topic, const Value& value,
unsigned int eventFlags, bool isDuplicate,
unsigned int eventFlags,
bool suppressIfDuplicate,
const PublisherData* publisher) {
const bool isDuplicate = topic->IsCached() && topic->lastValue == value;
DEBUG4("SetValue({}, {}, {}, {})", topic->name, value.time(), eventFlags,
isDuplicate);
if (topic->type != NT_UNASSIGNED && topic->type != value.type()) {
return false;
}
// Make sure value isn't older than last value
if (!topic->lastValue || topic->lastValue.time() == 0 ||
value.time() >= topic->lastValue.time()) {
// TODO: notify option even if older value
if (!(suppressIfDuplicate && isDuplicate)) {
topic->type = value.type();
topic->lastValue = value;
topic->lastValueFromNetwork = false;
NotifyValue(topic, eventFlags, isDuplicate, publisher);
if (topic->IsCached()) {
topic->lastValue = value;
topic->lastValueFromNetwork = false;
}
NotifyValue(topic, value, eventFlags, isDuplicate, publisher);
if (topic->datalogType == value.type()) {
for (auto&& datalog : topic->datalogs) {
datalog.Append(value);
Expand All @@ -202,8 +206,8 @@ bool LocalStorage::Impl::SetValue(TopicData* topic, const Value& value,
return true;
}

void LocalStorage::Impl::NotifyValue(TopicData* topic, unsigned int eventFlags,
bool isDuplicate,
void LocalStorage::Impl::NotifyValue(TopicData* topic, const Value& value,
unsigned int eventFlags, bool isDuplicate,
const PublisherData* publisher) {
bool isNetwork = (eventFlags & NT_EVENT_VALUE_REMOTE) != 0;
for (auto&& subscriber : topic->localSubscribers) {
Expand All @@ -213,11 +217,11 @@ void LocalStorage::Impl::NotifyValue(TopicData* topic, unsigned int eventFlags,
(!isNetwork && !subscriber->config.disableLocal)) &&
(!publisher || (publisher && (subscriber->config.excludePublisher !=
publisher->handle)))) {
subscriber->pollStorage.emplace_back(topic->lastValue);
subscriber->pollStorage.emplace_back(value);
subscriber->handle.Set();
if (!subscriber->valueListeners.empty()) {
m_listenerStorage.Notify(subscriber->valueListeners, eventFlags,
topic->handle, 0, topic->lastValue);
topic->handle, 0, value);
}
}
}
Expand All @@ -227,7 +231,7 @@ void LocalStorage::Impl::NotifyValue(TopicData* topic, unsigned int eventFlags,
subscriber->handle.Set();
if (!subscriber->valueListeners.empty()) {
m_listenerStorage.Notify(subscriber->valueListeners, eventFlags,
topic->handle, 0, topic->lastValue);
topic->handle, 0, value);
}
}
}
Expand All @@ -249,6 +253,22 @@ void LocalStorage::Impl::SetFlags(TopicData* topic, unsigned int flags) {
topic->properties.erase("retained");
update["retained"] = wpi::json();
}
if ((flags & NT_UNCACHED) != 0) {
topic->properties["cached"] = false;
update["cached"] = false;
} else {
topic->properties.erase("cached");
update["cached"] = wpi::json();
}
if ((flags & NT_UNCACHED) != 0) {
topic->lastValue = {};
topic->lastValueNetwork = {};
topic->lastValueFromNetwork = false;
}
if ((flags & NT_UNCACHED) != 0 && (flags & NT_PERSISTENT) != 0) {
WARN("topic {}: disabling cached property disables persistent storage",
topic->name);
}
topic->flags = flags;
if (!update.empty()) {
PropertiesUpdated(topic, update, NT_EVENT_NONE, true, false);
Expand Down Expand Up @@ -283,6 +303,20 @@ void LocalStorage::Impl::SetRetained(TopicData* topic, bool value) {
PropertiesUpdated(topic, update, NT_EVENT_NONE, true, false);
}

void LocalStorage::Impl::SetCached(TopicData* topic, bool value) {
wpi::json update = wpi::json::object();
if (value) {
topic->flags &= ~NT_UNCACHED;
topic->properties.erase("cached");
update["cached"] = wpi::json();
} else {
topic->flags |= NT_UNCACHED;
topic->properties["cached"] = false;
update["cached"] = false;
}
PropertiesUpdated(topic, update, NT_EVENT_NONE, true, false);
}

void LocalStorage::Impl::SetProperties(TopicData* topic,
const wpi::json& update,
bool sendNetwork) {
Expand Down Expand Up @@ -328,6 +362,28 @@ void LocalStorage::Impl::PropertiesUpdated(TopicData* topic,
}
}
}
it = topic->properties.find("cached");
if (it != topic->properties.end()) {
if (auto val = it->get_ptr<bool*>()) {
if (*val) {
topic->flags &= ~NT_UNCACHED;
} else {
topic->flags |= NT_UNCACHED;
}
}
}

if ((topic->flags & NT_UNCACHED) != 0) {
topic->lastValue = {};
topic->lastValueNetwork = {};
topic->lastValueFromNetwork = false;
}

if ((topic->flags & NT_UNCACHED) != 0 &&
(topic->flags & NT_PERSISTENT) != 0) {
WARN("topic {}: disabling cached property disables persistent storage",
topic->name);
}
}

topic->propertiesStr = topic->properties.dump();
Expand Down Expand Up @@ -895,20 +951,22 @@ bool LocalStorage::Impl::PublishLocalValue(PublisherData* publisher,
return false;
}
if (publisher->active) {
bool isDuplicate, isNetworkDuplicate, suppressDuplicates;
bool isNetworkDuplicate, suppressDuplicates;
if (force || publisher->config.keepDuplicates) {
suppressDuplicates = false;
isNetworkDuplicate = false;
} else {
suppressDuplicates = true;
isNetworkDuplicate = (publisher->topic->lastValueNetwork == value);
isNetworkDuplicate = publisher->topic->IsCached() &&
(publisher->topic->lastValueNetwork == value);
}
isDuplicate = (publisher->topic->lastValue == value);
if (!isNetworkDuplicate && m_network) {
publisher->topic->lastValueNetwork = value;
if (publisher->topic->IsCached()) {
publisher->topic->lastValueNetwork = value;
}
m_network->SetValue(publisher->handle, value);
}
return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL, isDuplicate,
return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL,
suppressDuplicates, publisher);
} else {
return false;
Expand Down Expand Up @@ -940,6 +998,10 @@ bool LocalStorage::Impl::SetDefaultEntryValue(NT_Handle pubsubentryHandle,
return false;
}
if (auto topic = GetTopic(pubsubentryHandle)) {
if (!topic->IsCached()) {
WARN("ignoring default value on non-cached topic '{}'", topic->name);
return false;
}
if (!topic->lastValue &&
(topic->type == NT_UNASSIGNED || topic->type == value.type() ||
IsNumericCompatible(topic->type, value.type()))) {
Expand Down Expand Up @@ -1026,10 +1088,11 @@ void LocalStorage::NetworkPropertiesUpdate(std::string_view name,
void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
if (m_impl.SetValue(topic, value, NT_EVENT_VALUE_REMOTE,
value == topic->lastValue, false, nullptr)) {
topic->lastValueNetwork = value;
topic->lastValueFromNetwork = true;
if (m_impl.SetValue(topic, value, NT_EVENT_VALUE_REMOTE, false, nullptr)) {
if (topic->IsCached()) {
topic->lastValueNetwork = value;
topic->lastValueFromNetwork = true;
}
}
}
}
Expand Down
27 changes: 23 additions & 4 deletions ntcore/src/main/native/cpp/LocalStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,22 @@ class LocalStorage final : public net::ILocalStorage {
}
}

void SetTopicCached(NT_Topic topicHandle, bool value) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
m_impl.SetCached(topic, value);
}
}

bool GetTopicCached(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
return (topic->flags & NT_UNCACHED) == 0;
} else {
return false;
}
}

bool GetTopicExists(NT_Handle handle) {
std::scoped_lock lock{m_mutex};
TopicData* topic = m_impl.GetTopic(handle);
Expand Down Expand Up @@ -361,6 +377,8 @@ class LocalStorage final : public net::ILocalStorage {

bool Exists() const { return onNetwork || !localPublishers.empty(); }

bool IsCached() const { return (flags & NT_UNCACHED) == 0; }

TopicInfo GetTopicInfo() const;

// invariants
Expand Down Expand Up @@ -565,14 +583,15 @@ class LocalStorage final : public net::ILocalStorage {
void CheckReset(TopicData* topic);

bool SetValue(TopicData* topic, const Value& value, unsigned int eventFlags,
bool isDuplicate, bool suppressIfDuplicate,
const PublisherData* publisher);
void NotifyValue(TopicData* topic, unsigned int eventFlags,
bool isDuplicate, const PublisherData* publisher);
bool suppressIfDuplicate, const PublisherData* publisher);
void NotifyValue(TopicData* topic, const Value& value,
unsigned int eventFlags, bool isDuplicate,
const PublisherData* publisher);

void SetFlags(TopicData* topic, unsigned int flags);
void SetPersistent(TopicData* topic, bool value);
void SetRetained(TopicData* topic, bool value);
void SetCached(TopicData* topic, bool value);
void SetProperties(TopicData* topic, const wpi::json& update,
bool sendNetwork);
void PropertiesUpdated(TopicData* topic, const wpi::json& update,
Expand Down
24 changes: 24 additions & 0 deletions ntcore/src/main/native/cpp/jni/NetworkTablesJNI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,30 @@ Java_edu_wpi_first_networktables_NetworkTablesJNI_getTopicRetained
return nt::GetTopicRetained(topic);
}

/*
* Class: edu_wpi_first_networktables_NetworkTablesJNI
* Method: setTopicCached
* Signature: (IZ)V
*/
JNIEXPORT void JNICALL
Java_edu_wpi_first_networktables_NetworkTablesJNI_setTopicCached
(JNIEnv*, jclass, jint topic, jboolean value)
{
nt::SetTopicCached(topic, value);
}

/*
* Class: edu_wpi_first_networktables_NetworkTablesJNI
* Method: getTopicCached
* Signature: (I)Z
*/
JNIEXPORT jboolean JNICALL
Java_edu_wpi_first_networktables_NetworkTablesJNI_getTopicCached
(JNIEnv*, jclass, jint topic)
{
return nt::GetTopicCached(topic);
}

/*
* Class: edu_wpi_first_networktables_NetworkTablesJNI
* Method: getTopicTypeString
Expand Down
Loading