Skip to content

Commit

Permalink
disable zero copy partial write combined with multibuffering (#1163) (#…
Browse files Browse the repository at this point in the history
…1168)

core: disable zero copy partial write combined with multibuffering (#1163)
  • Loading branch information
rex-schilasky authored Jul 31, 2023
1 parent db15d93 commit 50699d8
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 36 deletions.
4 changes: 2 additions & 2 deletions ecal/core/src/io/ecal_memfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,15 @@ namespace eCAL
}
}

size_t CMemoryFile::WritePayload(CPayloadWriter& payload_, const size_t len_, const size_t offset_)
size_t CMemoryFile::WritePayload(CPayloadWriter& payload_, const size_t len_, const size_t offset_, bool force_full_write_ /*= false*/)
{
if (!m_created) return(0);

void* wbuf(nullptr);
if (GetWriteAddress(wbuf, len_ + offset_) != 0u)
{
// (re)write complete buffer
if (!m_payload_initialized)
if (!m_payload_initialized || force_full_write_)
{
bool const success = payload_.Write(static_cast<char*>(wbuf) + offset_, len_);
if (!success)
Expand Down
11 changes: 6 additions & 5 deletions ecal/core/src/io/ecal_memfile.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,14 @@ namespace eCAL
/**
* @brief Apply payload on the memory file.
*
* @param payload_ The payload.
* @param len_ The number of bytes to write.
* @param offset_ The offset for writing the data.
* @param payload_ The payload.
* @param len_ The number of bytes to write.
* @param offset_ The offset for writing the data.
* @param force_full_write_ Force full write action.
*
* @return Number of bytes access (len if succeeded otherwise zero).
* @return Number of bytes accessed (len if succeeded otherwise zero).
**/
size_t WritePayload(CPayloadWriter& payload_, size_t len_, size_t offset_);
size_t WritePayload(CPayloadWriter& payload_, size_t len_, size_t offset_, bool force_full_write_ = false);

/**
* @brief Maximum data size of the whole memory file.
Expand Down
4 changes: 2 additions & 2 deletions ecal/core/src/io/ecal_memfile_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ namespace eCAL
return false;
}

bool CSyncMemoryFile::Write(CPayloadWriter& payload_, const SWriterAttr& data_)
bool CSyncMemoryFile::Write(CPayloadWriter& payload_, const SWriterAttr& data_, bool force_full_write_/* = false*/)
{
if (!m_created)
{
Expand Down Expand Up @@ -197,7 +197,7 @@ namespace eCAL
// write the buffer
if (data_.len > 0)
{
written &= m_memfile.WritePayload(payload_, data_.len, wbytes) > 0;
written &= m_memfile.WritePayload(payload_, data_.len, wbytes, force_full_write_) > 0;
}
// release write access
m_memfile.ReleaseWriteAccess();
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/io/ecal_memfile_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace eCAL
bool Disconnect(const std::string& process_id_);

bool CheckSize(size_t size_);
bool Write(CPayloadWriter& payload_, const SWriterAttr& data_);
bool Write(CPayloadWriter& payload_, const SWriterAttr& data_, bool force_full_write_ = false);

std::string GetName() const;
size_t GetSize() const;
Expand Down
35 changes: 9 additions & 26 deletions ecal/core/src/readwrite/ecal_writer_shm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,17 @@ namespace eCAL

bool CDataWriterSHM::SetBufferCount(size_t buffer_count_)
{
// no need to adapt anything
if (m_memory_file_vec.size() == buffer_count_) return true;

// buffer count zero not allowed
if (buffer_count_ < 1)
{
Logging::Log(log_level_error, m_topic_name + "::CDataWriterSHM::SetBufferCount minimal number of memory files is 1 !");
return false;
}

// retrieve the memory file size of existing files
size_t memory_file_size(0);
if (!m_memory_file_vec.empty())
{
Expand All @@ -119,37 +124,14 @@ namespace eCAL
memory_file_size = m_memory_file_attr.min_size;
}

// ----------------------------------------------------------------------
// REMOVE ME IN ECAL6
// ----------------------------------------------------------------------
// recreate memory buffer list to stay compatible to older versions
// for the case that we have ONE existing buffer
// and that single buffer is communicated with an older shm datareader
// in this case we need to invalidate (destroy) the existing buffer
// and the old datareader will get blind (fail safe)
// otherwise it would still receive every n-th write
// this state change will lead to some lost samples
if ((m_memory_file_vec.size() == 1) && (m_memory_file_vec.size() < buffer_count_))
{
m_memory_file_vec.clear();
}
// ----------------------------------------------------------------------
// REMOVE ME IN ECAL6
// ----------------------------------------------------------------------

// increase buffer count
// recreate memory file vector
m_memory_file_vec.clear();
while (m_memory_file_vec.size() < buffer_count_)
{
auto sync_memfile = std::make_shared<CSyncMemoryFile>(m_memfile_base_name, memory_file_size, m_memory_file_attr);
m_memory_file_vec.push_back(sync_memfile);
}

// decrease buffer count
while (m_memory_file_vec.size() > buffer_count_)
{
m_memory_file_vec.pop_back();
}

return true;
}

Expand Down Expand Up @@ -185,7 +167,8 @@ namespace eCAL
if (!m_created) return false;

// write content
const bool sent = m_memory_file_vec[m_write_idx]->Write(payload_, attr_);
const bool force_full_write(m_memory_file_vec.size() > 1);
const bool sent = m_memory_file_vec[m_write_idx]->Write(payload_, attr_, force_full_write);

// and increment file index
m_write_idx++;
Expand Down
1 change: 1 addition & 0 deletions testing/ecal/pubsub_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ find_package(GTest REQUIRED)

set(pubsub_test_src
src/pubsub_gettopics.cpp
src/pubsub_multibuffer
src/pubsub_test.cpp
src/pubsub_receive_test.cpp
)
Expand Down
Loading

0 comments on commit 50699d8

Please sign in to comment.