Skip to content

Commit

Permalink
Development (#2)
Browse files Browse the repository at this point in the history
* sfety/transfer

* Added a stop() function for the receiver

* Update CMakeLists.txt

Bump version to 0.10.0
  • Loading branch information
kuehnhammer authored Feb 21, 2022
1 parent 19e3313 commit d15b169
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 6 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.16)

project (libflute VERSION 0.9.0)
project (libflute VERSION 0.10.0)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED True)
Expand Down
2 changes: 1 addition & 1 deletion examples/flute-receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ auto main(int argc, char **argv) -> int {
arguments.flute_interface,
arguments.mcast_target,
(short)arguments.mcast_port,
0,
16,
io);

// Configure IPSEC, if enabled
Expand Down
9 changes: 9 additions & 0 deletions include/Receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,19 @@ namespace LibFlute {
*/
void remove_expired_files(unsigned max_age);

/**
* Remove a file from the list that matches the passed content location
*/
void remove_file_with_content_location(const std::string& cl);

/**
* Register a callback for file reception notifications
*
* @param cb Function to call on file completion
*/
void register_completion_callback(completion_callback_t cb) { _completion_cb = cb; };

void stop() { _running = false; }
private:

void handle_receive_from(const boost::system::error_code& error,
Expand All @@ -95,5 +102,7 @@ namespace LibFlute {
std::string _mcast_address;

completion_callback_t _completion_cb = nullptr;

bool _running = true;
};
};
6 changes: 6 additions & 0 deletions src/File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ LibFlute::File::File(LibFlute::FileDeliveryTable::FileEntry entry)
: _meta( std::move(entry) )
, _received_at( time(nullptr) )
{
spdlog::debug("Creating File from FileEntry");
// Allocate a data buffer
spdlog::debug("Allocating buffer");
_buffer = (char*)malloc(_meta.fec_oti.transfer_length);
if (_buffer == nullptr)
{
Expand All @@ -51,7 +53,9 @@ LibFlute::File::File(uint32_t toi,
size_t length,
bool copy_data)
{
spdlog::debug("Creating File from data");
if (copy_data) {
spdlog::debug("Allocating buffer");
_buffer = (char*)malloc(length);
if (_buffer == nullptr)
{
Expand Down Expand Up @@ -87,8 +91,10 @@ LibFlute::File::File(uint32_t toi,

LibFlute::File::~File()
{
spdlog::debug("Destroying File");
if (_own_buffer && _buffer != nullptr)
{
spdlog::debug("Freeing buffer");
free(_buffer);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/FileDeliveryTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ LibFlute::FileDeliveryTable::FileDeliveryTable(uint32_t instance_id, char* buffe
auto fdt_instance = doc.FirstChildElement("FDT-Instance");
_expires = std::stoull(fdt_instance->Attribute("Expires"));

spdlog::debug("Received new FDT with instance ID {}", instance_id);
spdlog::debug("Received new FDT with instance ID {}: {}", instance_id, buffer);

uint8_t def_fec_encoding_id = 0;
auto val = fdt_instance->Attribute("FEC-OTI-FEC-Encoding-ID");
Expand Down
23 changes: 20 additions & 3 deletions src/Receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ auto LibFlute::Receiver::enable_ipsec(uint32_t spi, const std::string& key) -> v
auto LibFlute::Receiver::handle_receive_from(const boost::system::error_code& error,
size_t bytes_recvd) -> void
{
if (!_running) return;

if (!error)
{
spdlog::debug("Received {} bytes", bytes_recvd);
spdlog::trace("Received {} bytes", bytes_recvd);
try {
auto alc = LibFlute::AlcPacket(_data, bytes_recvd);

Expand Down Expand Up @@ -106,6 +108,7 @@ auto LibFlute::Receiver::handle_receive_from(const boost::system::error_code& er
spdlog::debug("File with TOI {} completed", alc.toi());
if (alc.toi() != 0 && _completion_cb) {
_completion_cb(_files[alc.toi()]);
_files.erase(alc.toi());
}

if (alc.toi() == 0) { // parse complete FDT
Expand All @@ -116,14 +119,15 @@ auto LibFlute::Receiver::handle_receive_from(const boost::system::error_code& er
for (const auto& file_entry : _fdt->file_entries()) {
// automatically receive all files in the FDT
if (_files.find(file_entry.toi) == _files.end()) {
spdlog::debug("Starting reception for file with TOI {}", file_entry.toi);
spdlog::debug("Starting reception for file with TOI {}: {} ({})", file_entry.toi,
file_entry.content_location, file_entry.content_type);
_files.emplace(file_entry.toi, std::make_shared<LibFlute::File>(file_entry));
}
}
}
}
} else {
spdlog::debug("Discarding packet for unknown or already completed file with TOI {}", alc.toi());
spdlog::trace("Discarding packet for unknown or already completed file with TOI {}", alc.toi());
}
} catch (std::exception ex) {
spdlog::warn("Failed to decode ALC/FLUTE packet: {}", ex.what());
Expand Down Expand Up @@ -163,3 +167,16 @@ auto LibFlute::Receiver::remove_expired_files(unsigned max_age) -> void
}
}
}

auto LibFlute::Receiver::remove_file_with_content_location(const std::string& cl) -> void
{
const std::lock_guard<std::mutex> lock(_files_mutex);
for (auto it = _files.cbegin(); it != _files.cend();)
{
if ( it->second->meta().content_location == cl) {
it = _files.erase(it);
} else {
++it;
}
}
}

0 comments on commit d15b169

Please sign in to comment.