Skip to content

Commit

Permalink
fix: update mqtt subscriptions when topic changed (#1156)
Browse files Browse the repository at this point in the history
* update mqtt subscriptions when topic was changed
* DPL/Huawei: manage MQTT subscriptions in map

---------

Co-authored-by: Bernhard Kirchen <[email protected]>
  • Loading branch information
AndreasBoehm and schlimmchen authored Aug 20, 2024
1 parent e7d454f commit 65407db
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 23 deletions.
16 changes: 16 additions & 0 deletions include/MqttHandleHuawei.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@
#include <mutex>
#include <deque>
#include <functional>
#include <frozen/map.h>
#include <frozen/string.h>

class MqttHandleHuaweiClass {
public:
void init(Scheduler& scheduler);

void forceUpdate();

void subscribeTopics();
void unsubscribeTopics();

private:
void loop();

Expand All @@ -24,6 +31,15 @@ class MqttHandleHuaweiClass {
Mode
};

static constexpr frozen::string _cmdtopic = "huawei/cmd/";
static constexpr frozen::map<frozen::string, Topic, 5> _subscriptions = {
{ "limit_online_voltage", Topic::LimitOnlineVoltage },
{ "limit_online_current", Topic::LimitOnlineCurrent },
{ "limit_offline_voltage", Topic::LimitOfflineVoltage },
{ "limit_offline_current", Topic::LimitOfflineCurrent },
{ "mode", Topic::Mode },
};

void onMqttMessage(Topic t,
const espMqttClientTypes::MessageProperties& properties,
const char* topic, const uint8_t* payload, size_t len,
Expand Down
21 changes: 21 additions & 0 deletions include/MqttHandlePowerLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@
#include <mutex>
#include <deque>
#include <functional>
#include <frozen/map.h>
#include <frozen/string.h>

class MqttHandlePowerLimiterClass {
public:
void init(Scheduler& scheduler);

void forceUpdate();

void subscribeTopics();
void unsubscribeTopics();

private:
void loop();

Expand All @@ -28,6 +35,20 @@ class MqttHandlePowerLimiterClass {
TargetPowerConsumption
};

static constexpr frozen::string _cmdtopic = "powerlimiter/cmd/";
static constexpr frozen::map<frozen::string, MqttPowerLimiterCommand, 10> _subscriptions = {
{ "threshold/soc/start", MqttPowerLimiterCommand::BatterySoCStartThreshold },
{ "threshold/soc/stop", MqttPowerLimiterCommand::BatterySoCStopThreshold },
{ "threshold/soc/full_solar_passthrough", MqttPowerLimiterCommand::FullSolarPassthroughSoC },
{ "threshold/voltage/start", MqttPowerLimiterCommand::VoltageStartThreshold },
{ "threshold/voltage/stop", MqttPowerLimiterCommand::VoltageStopThreshold },
{ "threshold/voltage/full_solar_passthrough_start", MqttPowerLimiterCommand::FullSolarPassThroughStartVoltage },
{ "threshold/voltage/full_solar_passthrough_stop", MqttPowerLimiterCommand::FullSolarPassThroughStopVoltage },
{ "mode", MqttPowerLimiterCommand::Mode },
{ "upper_power_limit", MqttPowerLimiterCommand::UpperPowerLimit },
{ "target_power_consumption", MqttPowerLimiterCommand::TargetPowerConsumption },
};

void onMqttCmd(MqttPowerLimiterCommand command, const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total);

Task _loopTask;
Expand Down
33 changes: 23 additions & 10 deletions src/MqttHandleHuawei.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include "MessageOutput.h"
#include "MqttSettings.h"
#include "Huawei_can.h"
// #include "Failsafe.h"
#include "WebApi_Huawei.h"
#include <ctime>

Expand All @@ -19,27 +18,41 @@ void MqttHandleHuaweiClass::init(Scheduler& scheduler)
_loopTask.setIterations(TASK_FOREVER);
_loopTask.enable();

subscribeTopics();

_lastPublish = millis();
}

void MqttHandleHuaweiClass::forceUpdate()
{
_lastPublish = 0;
}

void MqttHandleHuaweiClass::subscribeTopics()
{
String const& prefix = MqttSettings.getPrefix();

auto subscribe = [&prefix, this](char const* subTopic, Topic t) {
String fullTopic(prefix + "huawei/cmd/" + subTopic);
String fullTopic(prefix + _cmdtopic.data() + subTopic);
MqttSettings.subscribe(fullTopic.c_str(), 0,
std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, t,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5, std::placeholders::_6));
};

subscribe("limit_online_voltage", Topic::LimitOnlineVoltage);
subscribe("limit_online_current", Topic::LimitOnlineCurrent);
subscribe("limit_offline_voltage", Topic::LimitOfflineVoltage);
subscribe("limit_offline_current", Topic::LimitOfflineCurrent);
subscribe("mode", Topic::Mode);

_lastPublish = millis();

for (auto const& s : _subscriptions) {
subscribe(s.first.data(), s.second);
}
}

void MqttHandleHuaweiClass::unsubscribeTopics()
{
String const prefix = MqttSettings.getPrefix() + _cmdtopic.data();
for (auto const& s : _subscriptions) {
MqttSettings.unsubscribe(prefix + s.first.data());
}
}

void MqttHandleHuaweiClass::loop()
{
Expand Down
36 changes: 23 additions & 13 deletions src/MqttHandlePowerLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,41 @@ void MqttHandlePowerLimiterClass::init(Scheduler& scheduler)
using std::placeholders::_5;
using std::placeholders::_6;

subscribeTopics();

_lastPublish = millis();
}

void MqttHandlePowerLimiterClass::forceUpdate()
{
_lastPublish = 0;
}

void MqttHandlePowerLimiterClass::subscribeTopics()
{
String const& prefix = MqttSettings.getPrefix();

auto subscribe = [&prefix, this](char const* subTopic, MqttPowerLimiterCommand command) {
String fullTopic(prefix + "powerlimiter/cmd/" + subTopic);
String fullTopic(prefix + _cmdtopic.data() + subTopic);
MqttSettings.subscribe(fullTopic.c_str(), 0,
std::bind(&MqttHandlePowerLimiterClass::onMqttCmd, this, command,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5, std::placeholders::_6));
};

subscribe("threshold/soc/start", MqttPowerLimiterCommand::BatterySoCStartThreshold);
subscribe("threshold/soc/stop", MqttPowerLimiterCommand::BatterySoCStopThreshold);
subscribe("threshold/soc/full_solar_passthrough", MqttPowerLimiterCommand::FullSolarPassthroughSoC);
subscribe("threshold/voltage/start", MqttPowerLimiterCommand::VoltageStartThreshold);
subscribe("threshold/voltage/stop", MqttPowerLimiterCommand::VoltageStopThreshold);
subscribe("threshold/voltage/full_solar_passthrough_start", MqttPowerLimiterCommand::FullSolarPassThroughStartVoltage);
subscribe("threshold/voltage/full_solar_passthrough_stop", MqttPowerLimiterCommand::FullSolarPassThroughStopVoltage);
subscribe("mode", MqttPowerLimiterCommand::Mode);
subscribe("upper_power_limit", MqttPowerLimiterCommand::UpperPowerLimit);
subscribe("target_power_consumption", MqttPowerLimiterCommand::TargetPowerConsumption);

_lastPublish = millis();
for (auto const& s : _subscriptions) {
subscribe(s.first.data(), s.second);
}
}

void MqttHandlePowerLimiterClass::unsubscribeTopics()
{
String const prefix = MqttSettings.getPrefix() + _cmdtopic.data();
for (auto const& s : _subscriptions) {
MqttSettings.unsubscribe(prefix + s.first.data());
}
}

void MqttHandlePowerLimiterClass::loop()
{
Expand Down
15 changes: 15 additions & 0 deletions src/WebApi_mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
*/
#include "WebApi_mqtt.h"
#include "Configuration.h"
#include "MqttHandleBatteryHass.h"
#include "MqttHandleHass.h"
#include "MqttHandlePowerLimiterHass.h"
#include "MqttHandleInverter.h"
#include "MqttHandleHuawei.h"
#include "MqttHandlePowerLimiter.h"
#include "MqttHandleVedirectHass.h"
#include "MqttHandleVedirect.h"
#include "MqttSettings.h"
Expand Down Expand Up @@ -307,17 +311,28 @@ void WebApiMqttClass::onMqttAdminPost(AsyncWebServerRequest* request)
// Check if base topic was changed
if (strcmp(config.Mqtt.Topic, root["mqtt_topic"].as<String>().c_str())) {
MqttHandleInverter.unsubscribeTopics();
MqttHandleHuawei.unsubscribeTopics();
MqttHandlePowerLimiter.unsubscribeTopics();

strlcpy(config.Mqtt.Topic, root["mqtt_topic"].as<String>().c_str(), sizeof(config.Mqtt.Topic));
MqttHandleInverter.subscribeTopics();
MqttHandleHuawei.subscribeTopics();
MqttHandlePowerLimiter.subscribeTopics();
}

WebApi.writeConfig(retMsg);

WebApi.sendJsonResponse(request, response, __FUNCTION__, __LINE__);

MqttSettings.performReconnect();

MqttHandleBatteryHass.forceUpdate();
MqttHandleHass.forceUpdate();
MqttHandlePowerLimiterHass.forceUpdate();
MqttHandleVedirectHass.forceUpdate();

MqttHandleHuawei.forceUpdate();
MqttHandlePowerLimiter.forceUpdate();
MqttHandleVedirect.forceUpdate();
}

Expand Down

0 comments on commit 65407db

Please sign in to comment.