Skip to content

Commit

Permalink
Merge pull request #571 from schlimmchen/switch-context-on-huawei-mqt…
Browse files Browse the repository at this point in the history
…t-message

Fix: switch context when handling AC charger MQTT messages
  • Loading branch information
helgeerbe authored Dec 31, 2023
2 parents 08bc181 + 4632260 commit ef1aec3
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 89 deletions.
22 changes: 21 additions & 1 deletion include/MqttHandleHuawei.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,40 @@
#include <Huawei_can.h>
#include <espMqttClient.h>
#include <TaskSchedulerDeclarations.h>
#include <mutex>
#include <deque>
#include <functional>

class MqttHandleHuaweiClass {
public:
void init(Scheduler& scheduler);

private:
void loop();
void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total);

enum class Topic : unsigned {
LimitOnlineVoltage,
LimitOnlineCurrent,
LimitOfflineVoltage,
LimitOfflineCurrent,
Mode
};

void onMqttMessage(Topic t,
const espMqttClientTypes::MessageProperties& properties,
const char* topic, const uint8_t* payload, size_t len,
size_t index, size_t total);

Task _loopTask;

uint32_t _lastPublishStats;
uint32_t _lastPublish;

// MQTT callbacks to process updates on subscribed topics are executed in
// the MQTT thread's context. we use this queue to switch processing the
// user requests into the main loop's context (TaskScheduler context).
mutable std::mutex _mqttMutex;
std::deque<std::function<void()>> _mqttCallbacks;
};

extern MqttHandleHuaweiClass MqttHandleHuawei;
184 changes: 96 additions & 88 deletions src/MqttHandleHuawei.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@
#include "WebApi_Huawei.h"
#include <ctime>

#define TOPIC_SUB_LIMIT_ONLINE_VOLTAGE "limit_online_voltage"
#define TOPIC_SUB_LIMIT_ONLINE_CURRENT "limit_online_current"
#define TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE "limit_offline_voltage"
#define TOPIC_SUB_LIMIT_OFFLINE_CURRENT "limit_offline_current"
#define TOPIC_SUB_MODE "mode"

MqttHandleHuaweiClass MqttHandleHuawei;

void MqttHandleHuaweiClass::init(Scheduler& scheduler)
Expand All @@ -25,19 +19,22 @@ void MqttHandleHuaweiClass::init(Scheduler& scheduler)
_loopTask.setIterations(TASK_FOREVER);
_loopTask.enable();

using std::placeholders::_1;
using std::placeholders::_2;
using std::placeholders::_3;
using std::placeholders::_4;
using std::placeholders::_5;
using std::placeholders::_6;
String const& prefix = MqttSettings.getPrefix();

auto subscribe = [&prefix, this](char const* subTopic, Topic t) {
String fullTopic(prefix + "huawei/cmd/" + subTopic);
MqttSettings.subscribe(fullTopic.c_str(), 0,
std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, t,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5, std::placeholders::_6));
};

String topic = MqttSettings.getPrefix();
MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_ONLINE_VOLTAGE).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_ONLINE_CURRENT).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_OFFLINE_CURRENT).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_MODE).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
subscribe("limit_online_voltage", Topic::LimitOnlineVoltage);
subscribe("limit_online_current", Topic::LimitOnlineCurrent);
subscribe("limit_offline_voltage", Topic::LimitOfflineVoltage);
subscribe("limit_offline_current", Topic::LimitOfflineCurrent);
subscribe("mode", Topic::Mode);

_lastPublish = millis();

Expand All @@ -46,13 +43,21 @@ void MqttHandleHuaweiClass::init(Scheduler& scheduler)

void MqttHandleHuaweiClass::loop()
{
if (!MqttSettings.getConnected() ) {
const CONFIG_T& config = Configuration.get();

std::unique_lock<std::mutex> mqttLock(_mqttMutex);

if (!config.Huawei.Enabled) {
_mqttCallbacks.clear();
return;
}

const CONFIG_T& config = Configuration.get();
for (auto& callback : _mqttCallbacks) { callback(); }
_mqttCallbacks.clear();

if (!config.Huawei.Enabled) {
mqttLock.unlock();

if (!MqttSettings.getConnected() ) {
return;
}

Expand All @@ -78,76 +83,79 @@ void MqttHandleHuaweiClass::loop()
}


void MqttHandleHuaweiClass::onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total)
void MqttHandleHuaweiClass::onMqttMessage(Topic t,
const espMqttClientTypes::MessageProperties& properties,
const char* topic, const uint8_t* payload, size_t len,
size_t index, size_t total)
{
const CONFIG_T& config = Configuration.get();

// ignore messages if Huawei is disabled
if (!config.Huawei.Enabled) {
return;
std::string strValue(reinterpret_cast<const char*>(payload), len);
float payload_val = -1;
try {
payload_val = std::stof(strValue);
}

char token_topic[MQTT_MAX_TOPIC_STRLEN + 40]; // respect all subtopics
strncpy(token_topic, topic, MQTT_MAX_TOPIC_STRLEN + 40); // convert const char* to char*

char* setting;
char* rest = &token_topic[strlen(config.Mqtt.Topic)];

strtok_r(rest, "/", &rest); // Remove "huawei"
strtok_r(rest, "/", &rest); // Remove "cmd"

setting = strtok_r(rest, "/", &rest);

if (setting == NULL) {
catch (std::invalid_argument const& e) {
MessageOutput.printf("Huawei MQTT handler: cannot parse payload of topic '%s' as float: %s\r\n",
topic, strValue.c_str());
return;
}

char* strlimit = new char[len + 1];
memcpy(strlimit, payload, len);
strlimit[len] = '\0';
float payload_val = strtof(strlimit, NULL);
delete[] strlimit;

if (!strcmp(setting, TOPIC_SUB_LIMIT_ONLINE_VOLTAGE)) {
// Set voltage limit
MessageOutput.printf("Limit Voltage: %f V\r\n", payload_val);
HuaweiCan.setValue(payload_val, HUAWEI_ONLINE_VOLTAGE);

} else if (!strcmp(setting, TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE)) {
// Set current limit
MessageOutput.printf("Offline Limit Voltage: %f V\r\n", payload_val);
HuaweiCan.setValue(payload_val, HUAWEI_OFFLINE_VOLTAGE);

} else if (!strcmp(setting, TOPIC_SUB_LIMIT_ONLINE_CURRENT)) {
// Set current limit
MessageOutput.printf("Limit Current: %f A\r\n", payload_val);
HuaweiCan.setValue(payload_val, HUAWEI_ONLINE_CURRENT);

} else if (!strcmp(setting, TOPIC_SUB_LIMIT_OFFLINE_CURRENT)) {
// Set current limit
MessageOutput.printf("Offline Limit Current: %f A\r\n", payload_val);
HuaweiCan.setValue(payload_val, HUAWEI_OFFLINE_CURRENT);

} else if (!strcmp(setting, TOPIC_SUB_MODE)) {
// Control power on/off
if(payload_val == 3) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Full internal control");
HuaweiCan.setMode(HUAWEI_MODE_AUTO_INT);
}

if(payload_val == 2) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Internal on/off control, external power limit");
HuaweiCan.setMode(HUAWEI_MODE_AUTO_EXT);
}

if(payload_val == 1) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned ON");
HuaweiCan.setMode(HUAWEI_MODE_ON);
}

if(payload_val == 0) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned OFF");
HuaweiCan.setMode(HUAWEI_MODE_OFF);
}
}
std::lock_guard<std::mutex> mqttLock(_mqttMutex);

switch (t) {
case Topic::LimitOnlineVoltage:
MessageOutput.printf("Limit Voltage: %f V\r\n", payload_val);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue,
&HuaweiCan, payload_val, HUAWEI_ONLINE_VOLTAGE));
break;

case Topic::LimitOfflineVoltage:
MessageOutput.printf("Offline Limit Voltage: %f V\r\n", payload_val);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue,
&HuaweiCan, payload_val, HUAWEI_OFFLINE_VOLTAGE));
break;

case Topic::LimitOnlineCurrent:
MessageOutput.printf("Limit Current: %f A\r\n", payload_val);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue,
&HuaweiCan, payload_val, HUAWEI_ONLINE_CURRENT));
break;

case Topic::LimitOfflineCurrent:
MessageOutput.printf("Offline Limit Current: %f A\r\n", payload_val);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue,
&HuaweiCan, payload_val, HUAWEI_OFFLINE_CURRENT));
break;

case Topic::Mode:
switch (static_cast<int>(payload_val)) {
case 3:
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Full internal control");
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode,
&HuaweiCan, HUAWEI_MODE_AUTO_INT));
break;

case 2:
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Internal on/off control, external power limit");
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode,
&HuaweiCan, HUAWEI_MODE_AUTO_EXT));
break;

case 1:
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned ON");
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode,
&HuaweiCan, HUAWEI_MODE_ON));
break;

case 0:
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned OFF");
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode,
&HuaweiCan, HUAWEI_MODE_OFF));
break;

default:
MessageOutput.printf("[Huawei MQTT::] Invalid mode %.0f\r\n", payload_val);
break;
}
break;
}
}

0 comments on commit ef1aec3

Please sign in to comment.