Skip to content

Commit

Permalink
fix(modem): Added support for inflatable buffer
Browse files Browse the repository at this point in the history
As a configurable option, if disabled we report an error.

Closes #272
  • Loading branch information
david-cermak committed Aug 28, 2023
1 parent 2e42b9b commit cb6e03a
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 34 deletions.
11 changes: 11 additions & 0 deletions components/esp_modem/Kconfig
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ menu "esp-modem"
in command mode might come fragmented in rare cases so might need to retry
AT commands.

config ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED
bool "Use inflatable buffer in DCE"
default n
help
If enabled we will process the ongoing AT command by growing the current
buffer (if we've run out the preconfigured buffer).
If disabled, we simply report a failure.
Use this if additional allocation is not a problem and you need to reliably process
all commands, usually with sporadically longer responses than the configured buffer.
Could be also used to defragment AT replies in CMUX mode if CMUX_DEFRAGMENT_PAYLOAD=n

config ESP_MODEM_CMUX_DELAY_AFTER_DLCI_SETUP
int "Delay in ms to wait before creating another virtual terminal"
default 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ extern "C" void app_main(void)
#endif
assert(dce);

/* Try to connect to the network and publish an mqtt topic */
StatusHandler handler;

if (dte_config.uart_config.flow_control == ESP_MODEM_FLOW_CONTROL_HW) {
if (command_result::OK != dce->set_flow_control(2, 2)) {
ESP_LOGE(TAG, "Failed to set the set_flow_control mode");
Expand Down Expand Up @@ -215,8 +218,6 @@ extern "C" void app_main(void)
}
#endif

/* Try to connect to the network and publish an mqtt topic */
StatusHandler handler;
if (!handler.wait_for(StatusHandler::IP_Event, 60000)) {
ESP_LOGE(TAG, "Cannot get IP within specified timeout... exiting");
return;
Expand Down
76 changes: 73 additions & 3 deletions components/esp_modem/include/cxx_include/esp_modem_dte.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ class DTE : public CommandableIf {
*/
void set_read_cb(std::function<bool(uint8_t *data, size_t len)> f);

/**
* @brief Sets read callback for manual command processing
* Note that this API also locks the command API, which can only be used
* after you remove the callback by dte->on_read(nullptr)
*
* @param on_data Function to be called when a command response is available
*/
void on_read(got_line_cb on_data) override;

/**
Expand Down Expand Up @@ -122,7 +129,6 @@ class DTE : public CommandableIf {
}
friend class Scoped<DTE>; /*!< Declaring "Scoped<DTE> lock(dte)" locks this instance */
private:
static const size_t GOT_LINE = SignalGroup::bit0; /*!< Bit indicating response available */

[[nodiscard]] bool setup_cmux(); /*!< Internal setup of CMUX mode */
[[nodiscard]] bool exit_cmux(); /*!< Exit of CMUX mode and cleanup */
Expand All @@ -134,9 +140,73 @@ class DTE : public CommandableIf {
std::shared_ptr<Terminal> primary_term; /*!< Reference to the primary terminal (mostly for sending commands) */
std::shared_ptr<Terminal> secondary_term; /*!< Secondary terminal for this DTE */
modem_mode mode; /*!< DTE operation mode */
SignalGroup signal; /*!< Event group used to signal request-response operations */
command_result result; /*!< Command result of the currently exectuted command */
std::function<bool(uint8_t *data, size_t len)> on_data; /*!< on data callback for current terminal */

#ifdef CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED
/**
* @brief Implements an extra buffer that is used to capture partial reads from underlying terminals
* when we run out of the standard buffer
*/
struct extra_buffer {
extra_buffer(): buffer(nullptr) {}
~extra_buffer()
{
delete buffer;
}
std::vector<uint8_t> *buffer;
size_t consumed{0};
void grow(size_t need_size);
void deflate()
{
grow(0);
consumed = 0;
}
[[nodiscard]] uint8_t *begin() const
{
return &buffer->at(0);
}
[[nodiscard]] uint8_t *current() const
{
return &buffer->at(0) + consumed;
}
} inflatable;
#endif // CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED

/**
* @brief Set internal command callbacks to the underlying terminal.
* Here we capture command replies to be processed by supplied command callbacks in struct command_cb.
*/
void set_command_callbacks();

/**
* @brief This abstracts command callback processing and implements its locking, signaling of completion and timeouts.
*/
struct command_cb {
static const size_t GOT_LINE = SignalGroup::bit0; /*!< Bit indicating response available */
got_line_cb got_line; /*!< Supplied command callback */
Lock line_lock{}; /*!< Command callback locking mechanism */
char separator{}; /*!< Command reply separator (end of line/processing unit) */
command_result result{}; /*!< Command return code */
SignalGroup signal; /*!< Event group used to signal request-response operations */
bool process_line(uint8_t *data, size_t consumed, size_t len); /*!< Lets the processing callback handle one line (processing unit) */
bool wait_for_line(uint32_t time_ms); /*!< Waiting for command processing */
void set(got_line_cb l, char s = '\n') /*!< Sets the command callback atomically */
{
Scoped<Lock> lock(line_lock);
if (l) {
// if we set the line callback, we have to reset the signal and the result
signal.clear(GOT_LINE);
result = command_result::TIMEOUT;
}
got_line = std::move(l);
separator = s;
}
void give_up() /*!< Reports other than timeout error when processing replies (out of buffer) */
{
result = command_result::FAIL;
signal.set(GOT_LINE);
}
} command_cb; /*!< Command callback utility class */
};

/**
Expand Down
1 change: 1 addition & 0 deletions components/esp_modem/src/esp_modem_dce.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ static bool exit_data(DTE &dte, ModuleIf &device, Netif &netif)
});
netif.wait_until_ppp_exits();
if (!signal->wait(1, 2000)) {
dte.set_read_cb(nullptr);
if (!device.set_mode(modem_mode::COMMAND_MODE)) {
return false;
}
Expand Down
157 changes: 132 additions & 25 deletions components/esp_modem/src/esp_modem_dte.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,118 @@ static const size_t dte_default_buffer_size = 1000;
DTE::DTE(const esp_modem_dte_config *config, std::unique_ptr<Terminal> terminal):
buffer(config->dte_buffer_size),
cmux_term(nullptr), primary_term(std::move(terminal)), secondary_term(primary_term),
mode(modem_mode::UNDEF) {}
mode(modem_mode::UNDEF)
{
set_command_callbacks();
}

DTE::DTE(std::unique_ptr<Terminal> terminal):
buffer(dte_default_buffer_size),
cmux_term(nullptr), primary_term(std::move(terminal)), secondary_term(primary_term),
mode(modem_mode::UNDEF) {}
mode(modem_mode::UNDEF)
{
set_command_callbacks();
}

DTE::DTE(const esp_modem_dte_config *config, std::unique_ptr<Terminal> t, std::unique_ptr<Terminal> s):
buffer(config->dte_buffer_size),
cmux_term(nullptr), primary_term(std::move(t)), secondary_term(std::move(s)),
mode(modem_mode::DUAL_MODE) {}
mode(modem_mode::UNDEF)
{
set_command_callbacks();
}

DTE::DTE(std::unique_ptr<Terminal> t, std::unique_ptr<Terminal> s):
buffer(dte_default_buffer_size),
cmux_term(nullptr), primary_term(std::move(t)), secondary_term(std::move(s)),
mode(modem_mode::DUAL_MODE) {}
mode(modem_mode::UNDEF)
{
set_command_callbacks();
}

command_result DTE::command(const std::string &command, got_line_cb got_line, uint32_t time_ms, const char separator)
void DTE::set_command_callbacks()
{
Scoped<Lock> l(internal_lock);
result = command_result::TIMEOUT;
signal.clear(GOT_LINE);
primary_term->set_read_cb([this, got_line, separator](uint8_t *data, size_t len) {
if (!data) {
primary_term->set_read_cb([this](uint8_t *data, size_t len) {
Scoped<Lock> l(command_cb.line_lock);
if (command_cb.got_line == nullptr) {
return false;
}
if (data) {
// For terminals which post data directly with the callback (CMUX)
// we cannot defragment unless we allocate, but
// we'll try to process the data on the actual buffer
#ifdef CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED
if (inflatable.consumed != 0) {
inflatable.grow(inflatable.consumed + len);
std::memcpy(inflatable.current(), data, len);
data = inflatable.begin();
}
if (command_cb.process_line(data, inflatable.consumed, len)) {
return true;
}
// at this point we're sure that the data processing hasn't finished,
// and we have to grow the inflatable buffer (if enabled) or give up
if (inflatable.consumed == 0) {
inflatable.grow(len);
std::memcpy(inflatable.begin(), data, len);
}
inflatable.consumed += len;
return false;
#else
if (command_cb.process_line(data, 0, len)) {
return true;
}
// cannot inflate and the processing hasn't finishes in the first iteration -> report a failure
command_cb.give_up();
return true;
#endif
}
// data == nullptr: Terminals which request users to read current data
// we're able to use DTE's buffer to defragment it; as long as we consume less that the buffer size
if (buffer.size > buffer.consumed) {
data = buffer.get();
len = primary_term->read(data + buffer.consumed, buffer.size - buffer.consumed);
} else {
buffer.consumed = 0; // if the underlying terminal contains data, we cannot fragment
}
if (memchr(data + buffer.consumed, separator, len)) {
result = got_line(data, buffer.consumed + len);
if (result == command_result::OK || result == command_result::FAIL) {
signal.set(GOT_LINE);
if (command_cb.process_line(data, buffer.consumed, len)) {
return true;
}
buffer.consumed += len;
return false;
}
// we have used the entire DTE's buffer, need to use the inflatable buffer to continue
#ifdef CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED
if (inflatable.consumed == 0) {
inflatable.grow(buffer.size + len);
std::memcpy(inflatable.begin(), buffer.get(), buffer.size);
inflatable.consumed = buffer.size;
} else {
inflatable.grow(inflatable.consumed + len);
}
len = primary_term->read(inflatable.current(), len);
if (command_cb.process_line(inflatable.begin(), inflatable.consumed, len)) {
return true;
}
buffer.consumed += len;
inflatable.consumed += len;
return false;
#else
// cannot inflate -> report a failure
command_cb.give_up();
return true;
#endif
});
}

command_result DTE::command(const std::string &command, got_line_cb got_line, uint32_t time_ms, const char separator)
{
Scoped<Lock> l1(internal_lock);
command_cb.set(got_line, separator);
primary_term->write((uint8_t *)command.c_str(), command.length());
auto got_lf = signal.wait(GOT_LINE, time_ms);
if (got_lf && result == command_result::TIMEOUT) {
ESP_MODEM_THROW_IF_ERROR(ESP_ERR_INVALID_STATE);
}
command_cb.wait_for_line(time_ms);
command_cb.set(nullptr);
buffer.consumed = 0;
primary_term->set_read_cb(nullptr);
return result;
#ifdef CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED
inflatable.deflate();
#endif
return command_cb.result;
}

command_result DTE::command(const std::string &cmd, got_line_cb got_line, uint32_t time_ms)
Expand Down Expand Up @@ -91,6 +156,7 @@ void DTE::exit_cmux_internal()
primary_term = std::move(ejected.first);
buffer = std::move(ejected.second);
secondary_term = primary_term;
set_command_callbacks();
}

bool DTE::setup_cmux()
Expand All @@ -113,7 +179,7 @@ bool DTE::setup_cmux()
cmux_term = nullptr;
return false;
}

set_command_callbacks();
return true;
}

Expand All @@ -135,6 +201,7 @@ bool DTE::set_mode(modem_mode m)
if (mode == modem_mode::CMUX_MODE || mode == modem_mode::CMUX_MANUAL_MODE || mode == modem_mode::DUAL_MODE) {
// mode stays the same, but need to swap terminals (as command has been switched)
secondary_term.swap(primary_term);
set_command_callbacks();
} else {
mode = m;
}
Expand Down Expand Up @@ -177,6 +244,7 @@ bool DTE::set_mode(modem_mode m)
// manual CMUX transitions: Swap terminals
if (m == modem_mode::CMUX_MANUAL_SWAP && mode == modem_mode::CMUX_MANUAL_MODE) {
secondary_term.swap(primary_term);
set_command_callbacks();
return true;
}
mode = modem_mode::UNDEF;
Expand All @@ -185,6 +253,10 @@ bool DTE::set_mode(modem_mode m)

void DTE::set_read_cb(std::function<bool(uint8_t *, size_t)> f)
{
if (f == nullptr) {
set_command_callbacks();
return;
}
on_data = std::move(f);
secondary_term->set_read_cb([this](uint8_t *data, size_t len) {
if (!data) { // if no data available from terminal callback -> need to explicitly read some
Expand Down Expand Up @@ -246,6 +318,41 @@ void DTE::on_read(got_line_cb on_read_cb)
});
}

bool DTE::command_cb::process_line(uint8_t *data, size_t consumed, size_t len)
{
if (memchr(data + consumed, separator, len)) {
result = got_line(data, consumed + len);
if (result == command_result::OK || result == command_result::FAIL) {
signal.set(GOT_LINE);
return true;
}
}
return false;
}

bool DTE::command_cb::wait_for_line(uint32_t time_ms)
{
auto got_lf = signal.wait(command_cb::GOT_LINE, time_ms);
if (got_lf && result == command_result::TIMEOUT) {
ESP_MODEM_THROW_IF_ERROR(ESP_ERR_INVALID_STATE);
}
return got_lf;
}

#ifdef CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED
void DTE::extra_buffer::grow(size_t need_size)
{
if (need_size == 0) {
delete buffer;
buffer = nullptr;
} else if (buffer == nullptr) {
buffer = new std::vector<uint8_t>(need_size);
} else {
buffer->resize(need_size);
}
}
#endif

/**
* Implemented here to keep all headers C++11 compliant
*/
Expand Down
4 changes: 0 additions & 4 deletions components/esp_modem/src/esp_modem_uart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ class UartTerminal : public Terminal {

void set_read_cb(std::function<bool(uint8_t *data, size_t len)> f) override
{
ESP_MODEM_THROW_IF_FALSE(signal.wait(TASK_PARAMS, 1000), "Failed to set UART task params");
on_read = std::move(f);
}

Expand All @@ -91,7 +90,6 @@ class UartTerminal : public Terminal {
static const size_t TASK_INIT = BIT0;
static const size_t TASK_START = BIT1;
static const size_t TASK_STOP = BIT2;
static const size_t TASK_PARAMS = BIT3;

QueueHandle_t event_queue;
uart_resource uart;
Expand All @@ -118,9 +116,7 @@ void UartTerminal::task()
return; // exits to the static method where the task gets deleted
}
while (signal.is_any(TASK_START)) {
signal.set(TASK_PARAMS);
if (get_event(event, 100)) {
signal.clear(TASK_PARAMS);
switch (event.type) {
case UART_DATA:
uart_get_buffered_data_len(uart.port, &len);
Expand Down

0 comments on commit cb6e03a

Please sign in to comment.