Skip to content

Commit

Permalink
enh(FileStream): Add FileStreamBuf::resizeBuffer to set larger intern…
Browse files Browse the repository at this point in the history
…al buffers.

Larger buffers improve performance significantly when streaming large quantity of data on very fast devices.
  • Loading branch information
matejk committed Jul 30, 2024
1 parent 669be63 commit e4b1538
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 53 deletions.
52 changes: 34 additions & 18 deletions Foundation/include/Poco/BufferedBidirectionalStreamBuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ class BasicBufferedBidirectionalStreamBuf: public std::basic_streambuf<ch, tr>
/// for implementing an iostream.
{
protected:
typedef std::basic_streambuf<ch, tr> Base;
typedef std::basic_ios<ch, tr> IOS;
typedef ch char_type;
typedef tr char_traits;
typedef ba Allocator;
typedef typename Base::int_type int_type;
typedef typename Base::pos_type pos_type;
typedef typename Base::off_type off_type;
typedef typename IOS::openmode openmode;
using Base = std::basic_streambuf<ch, tr>;
using IOS = std::basic_ios<ch, tr>;
using char_type = ch;
using char_traits = tr;
using Allocator = ba;
using int_type = typename Base::int_type;
using pos_type = typename Base::pos_type;
using off_type = typename Base::off_type;
using openmode = typename IOS::openmode;

public:
BasicBufferedBidirectionalStreamBuf(std::streamsize bufferSize, openmode mode):
Expand All @@ -69,6 +69,9 @@ class BasicBufferedBidirectionalStreamBuf: public std::basic_streambuf<ch, tr>
Allocator::deallocate(_pWriteBuffer, _bufsize);
}

BasicBufferedBidirectionalStreamBuf(const BasicBufferedBidirectionalStreamBuf&) = delete;
BasicBufferedBidirectionalStreamBuf& operator = (const BasicBufferedBidirectionalStreamBuf&) = delete;

virtual int_type overflow(int_type c)
{
if (!(_mode & IOS::out)) return char_traits::eof();
Expand Down Expand Up @@ -130,6 +133,22 @@ class BasicBufferedBidirectionalStreamBuf: public std::basic_streambuf<ch, tr>
this->setp(_pWriteBuffer, _pWriteBuffer + _bufsize);
}

virtual bool resizeBuffer(std::streamsize bufferSize)
{
if (_bufsize != bufferSize)
{
Allocator::deallocate(_pReadBuffer, _bufsize);
Allocator::deallocate(_pWriteBuffer, _bufsize);

_bufsize = bufferSize;
_pReadBuffer = Allocator::allocate(_bufsize);
_pWriteBuffer = Allocator::allocate(_bufsize);
}
resetBuffers();

return true;
}

private:
virtual int readFromDevice(char_type* /*buffer*/, std::streamsize /*length*/)
{
Expand All @@ -152,13 +171,10 @@ class BasicBufferedBidirectionalStreamBuf: public std::basic_streambuf<ch, tr>
return -1;
}

std::streamsize _bufsize;
char_type* _pReadBuffer;
char_type* _pWriteBuffer;
openmode _mode;

BasicBufferedBidirectionalStreamBuf(const BasicBufferedBidirectionalStreamBuf&);
BasicBufferedBidirectionalStreamBuf& operator = (const BasicBufferedBidirectionalStreamBuf&);
std::streamsize _bufsize {0};
char_type* _pReadBuffer {nullptr};
char_type* _pWriteBuffer {nullptr};
openmode _mode {0};
};


Expand All @@ -172,8 +188,8 @@ class BasicBufferedBidirectionalStreamBuf: public std::basic_streambuf<ch, tr>
#if defined(_MSC_VER) && defined(POCO_DLL) && !defined(Foundation_EXPORTS)
template class Foundation_API BasicBufferedBidirectionalStreamBuf<char, std::char_traits<char>>;
#endif
typedef BasicBufferedBidirectionalStreamBuf<char, std::char_traits<char>> BufferedBidirectionalStreamBuf;

using BufferedBidirectionalStreamBuf
= BasicBufferedBidirectionalStreamBuf<char, std::char_traits<char>>;

} // namespace Poco

Expand Down
8 changes: 4 additions & 4 deletions Foundation/include/Poco/FileStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Foundation_API FileIOS: public virtual std::ios
FileIOS();
/// Creates the basic stream.

~FileIOS();
~FileIOS() override;
/// Destroys the stream.

virtual void open(const std::string& path, std::ios::openmode mode);
Expand Down Expand Up @@ -115,7 +115,7 @@ class Foundation_API FileInputStream: public FileIOS, public std::istream
/// Throws a FileNotFoundException (or a similar exception) if the file
/// does not exist or is not accessible for other reasons.

~FileInputStream();
~FileInputStream() override;
/// Destroys the stream.

void open(const std::string& path, std::ios::openmode mode = std::ios::in) override;
Expand Down Expand Up @@ -158,7 +158,7 @@ class Foundation_API FileOutputStream: public FileIOS, public std::ostream
/// for std::ofstream, which is std::ios::out only. This is for backwards compatibility
/// with earlier POCO versions.

~FileOutputStream();
~FileOutputStream() override;
/// Destroys the FileOutputStream.

void open(const std::string& path, std::ios::openmode mode = std::ios::out | std::ios::trunc) override;
Expand Down Expand Up @@ -203,7 +203,7 @@ class Foundation_API FileStream: public FileIOS, public std::iostream
/// for std::fstream, which is std::ios::out only. This is for backwards compatibility
/// with earlier POCO versions.

~FileStream();
~FileStream() override;
/// Destroys the FileOutputStream.

void open(const std::string& path, std::ios::openmode mode = std::ios::out | std::ios::in) override;
Expand Down
18 changes: 12 additions & 6 deletions Foundation/include/Poco/FileStream_POSIX.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Foundation_API FileStreamBuf: public BufferedBidirectionalStreamBuf
FileStreamBuf();
/// Creates a FileStreamBuf.

~FileStreamBuf();
~FileStreamBuf() override;
/// Destroys the FileStream.

void open(const std::string& path, std::ios::openmode mode);
Expand All @@ -49,10 +49,16 @@ class Foundation_API FileStreamBuf: public BufferedBidirectionalStreamBuf
/// Closes the File stream buffer. Returns true if successful,
/// false otherwise.

std::streampos seekoff(std::streamoff off, std::ios::seekdir dir, std::ios::openmode mode = std::ios::in | std::ios::out);
bool resizeBuffer(std::streamsize bufferSize) override;
/// Resizes internal buffer. Minimum size is BUFFER_SIZE.
/// Minimum is used when requested size is smaller.
/// Buffer can be resized only when the file is not open.
/// Returns true if resize succeeded.

std::streampos seekoff(std::streamoff off, std::ios::seekdir dir, std::ios::openmode mode = std::ios::in | std::ios::out) override;
/// Change position by offset, according to way and mode.

std::streampos seekpos(std::streampos pos, std::ios::openmode mode = std::ios::in | std::ios::out);
std::streampos seekpos(std::streampos pos, std::ios::openmode mode = std::ios::in | std::ios::out) override;
/// Change to specified position, according to mode.

void flushToDisk();
Expand All @@ -70,13 +76,13 @@ class Foundation_API FileStreamBuf: public BufferedBidirectionalStreamBuf
BUFFER_SIZE = 4096
};

int readFromDevice(char* buffer, std::streamsize length);
int writeToDevice(const char* buffer, std::streamsize length);
int readFromDevice(char* buffer, std::streamsize length) override;
int writeToDevice(const char* buffer, std::streamsize length) override;

private:
std::string _path;
NativeHandle _fd;
std::streamoff _pos;
std::streamoff _pos {0};
};


Expand Down
17 changes: 11 additions & 6 deletions Foundation/include/Poco/FileStream_WIN32.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

namespace Poco {


class Foundation_API FileStreamBuf: public BufferedBidirectionalStreamBuf
/// This stream buffer handles Fileio
{
Expand All @@ -48,10 +47,16 @@ class Foundation_API FileStreamBuf: public BufferedBidirectionalStreamBuf
/// Closes the File stream buffer. Returns true if successful,
/// false otherwise.

std::streampos seekoff(std::streamoff off, std::ios::seekdir dir, std::ios::openmode mode = std::ios::in | std::ios::out);
bool resizeBuffer(std::streamsize bufferSize) override;
/// Resizes internal buffer. Minimum size is BUFFER_SIZE.
/// Minimum is used when requested size is smaller.
/// Buffer can be resized only when the file is not open.
/// Returns true if resize succeeded.

std::streampos seekoff(std::streamoff off, std::ios::seekdir dir, std::ios::openmode mode = std::ios::in | std::ios::out) override;
/// change position by offset, according to way and mode

std::streampos seekpos(std::streampos pos, std::ios::openmode mode = std::ios::in | std::ios::out);
std::streampos seekpos(std::streampos pos, std::ios::openmode mode = std::ios::in | std::ios::out) override;
/// change to specified position, according to mode

void flushToDisk();
Expand All @@ -69,13 +74,13 @@ class Foundation_API FileStreamBuf: public BufferedBidirectionalStreamBuf
BUFFER_SIZE = 4096
};

int readFromDevice(char* buffer, std::streamsize length);
int writeToDevice(const char* buffer, std::streamsize length);
int readFromDevice(char* buffer, std::streamsize length) override;
int writeToDevice(const char* buffer, std::streamsize length) override;

private:
std::string _path;
NativeHandle _handle;
UInt64 _pos;
UInt64 _pos {0};
};


Expand Down
26 changes: 19 additions & 7 deletions Foundation/src/FileStream_POSIX.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ int FileStreamBuf::readFromDevice(char* buffer, std::streamsize length)
if (getMode() & std::ios::out)
sync();

int n = read(_fd, buffer, length);
int n = ::read(_fd, buffer, length);
if (n == -1)
File::handleLastError(_path);
_pos += n;
Expand All @@ -108,9 +108,9 @@ int FileStreamBuf::writeToDevice(const char* buffer, std::streamsize length)
if (_fd == -1) return -1;

#if defined(POCO_VXWORKS)
int n = write(_fd, const_cast<char*>(buffer), length);
int n = ::write(_fd, const_cast<char*>(buffer), length);
#else
int n = write(_fd, buffer, length);
int n = ::write(_fd, buffer, length);
#endif
if (n == -1)
File::handleLastError(_path);
Expand Down Expand Up @@ -139,6 +139,18 @@ bool FileStreamBuf::close()
}


bool FileStreamBuf::resizeBuffer(std::streamsize bufferSize)
{
if (_fd != -1)
return false;

if (bufferSize < BUFFER_SIZE)
bufferSize = BUFFER_SIZE;

return BufferedBidirectionalStreamBuf::resizeBuffer(bufferSize);
}


std::streampos FileStreamBuf::seekoff(std::streamoff off, std::ios::seekdir dir, std::ios::openmode mode)
{
if (_fd == -1 || !(getMode() & mode))
Expand All @@ -165,7 +177,7 @@ std::streampos FileStreamBuf::seekoff(std::streamoff off, std::ios::seekdir dir,
{
whence = SEEK_END;
}
_pos = lseek(_fd, off, whence);
_pos = ::lseek(_fd, off, whence);
return _pos;
}

Expand All @@ -180,7 +192,7 @@ std::streampos FileStreamBuf::seekpos(std::streampos pos, std::ios::openmode mod

resetBuffers();

_pos = lseek(_fd, pos, SEEK_SET);
_pos = ::lseek(_fd, pos, SEEK_SET);
return _pos;
}

Expand All @@ -190,7 +202,7 @@ void FileStreamBuf::flushToDisk()
if (getMode() & std::ios::out)
{
sync();
if (fsync(_fd) != 0)
if (::fsync(_fd) != 0)
File::handleLastError(_path);
}
}
Expand All @@ -204,7 +216,7 @@ FileStreamBuf::NativeHandle FileStreamBuf::nativeHandle() const
Poco::UInt64 FileStreamBuf::size() const
{
struct stat stat_buf;
int rc = fstat(_fd, &stat_buf);
int rc = ::fstat(_fd, &stat_buf);
if (rc < 0)
{
Poco::SystemException(strerror(errno), errno);
Expand Down
36 changes: 24 additions & 12 deletions Foundation/src/FileStream_WIN32.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void FileStreamBuf::open(const std::string& path, std::ios::openmode mode)

std::wstring utf16Path;
FileImpl::convertPath(path, utf16Path);
_handle = CreateFileW(utf16Path.c_str(), access, shareMode, NULL, creationDisp, flags, NULL);
_handle = ::CreateFileW(utf16Path.c_str(), access, shareMode, NULL, creationDisp, flags, NULL);

if (_handle == INVALID_HANDLE_VALUE)
File::handleLastError(_path);
Expand Down Expand Up @@ -98,10 +98,10 @@ int FileStreamBuf::readFromDevice(char* buffer, std::streamsize length)
sync();

DWORD bytesRead(0);
BOOL rc = ReadFile(_handle, buffer, static_cast<DWORD>(length), &bytesRead, NULL);
BOOL rc = ::ReadFile(_handle, buffer, static_cast<DWORD>(length), &bytesRead, NULL);
if (rc == 0)
{
if (GetLastError() == ERROR_BROKEN_PIPE)
if (::GetLastError() == ERROR_BROKEN_PIPE)
{
// Read from closed pipe -> treat as EOF
return 0;
Expand All @@ -124,14 +124,14 @@ int FileStreamBuf::writeToDevice(const char* buffer, std::streamsize length)
{
LARGE_INTEGER li;
li.QuadPart = 0;
li.LowPart = SetFilePointer(_handle, li.LowPart, &li.HighPart, FILE_END);
if (li.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR)
li.LowPart = ::SetFilePointer(_handle, li.LowPart, &li.HighPart, FILE_END);
if (li.LowPart == INVALID_SET_FILE_POINTER && ::GetLastError() != NO_ERROR)
File::handleLastError(_path);
_pos = li.QuadPart;
}

DWORD bytesWritten(0);
BOOL rc = WriteFile(_handle, buffer, static_cast<DWORD>(length), &bytesWritten, NULL);
BOOL rc = ::WriteFile(_handle, buffer, static_cast<DWORD>(length), &bytesWritten, NULL);
if (rc == 0)
File::handleLastError(_path);

Expand All @@ -155,13 +155,25 @@ bool FileStreamBuf::close()
{
success = false;
}
CloseHandle(_handle);
::CloseHandle(_handle);
_handle = INVALID_HANDLE_VALUE;
}
return success;
}


bool FileStreamBuf::resizeBuffer(std::streamsize bufferSize)
{
if (_handle != INVALID_HANDLE_VALUE)
return false;

if (bufferSize < BUFFER_SIZE)
bufferSize = BUFFER_SIZE;

return BufferedBidirectionalStreamBuf::resizeBuffer(bufferSize);
}


std::streampos FileStreamBuf::seekoff(std::streamoff off, std::ios::seekdir dir, std::ios::openmode mode)
{
if (INVALID_HANDLE_VALUE == _handle || !(getMode() & mode))
Expand Down Expand Up @@ -191,9 +203,9 @@ std::streampos FileStreamBuf::seekoff(std::streamoff off, std::ios::seekdir dir,

LARGE_INTEGER li;
li.QuadPart = off;
li.LowPart = SetFilePointer(_handle, li.LowPart, &li.HighPart, offset);
li.LowPart = ::SetFilePointer(_handle, li.LowPart, &li.HighPart, offset);

if (li.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR)
if (li.LowPart == INVALID_SET_FILE_POINTER && ::GetLastError() != NO_ERROR)
File::handleLastError(_path);
_pos = li.QuadPart;
return std::streampos(static_cast<std::streamoff>(_pos));
Expand All @@ -212,9 +224,9 @@ std::streampos FileStreamBuf::seekpos(std::streampos pos, std::ios::openmode mod

LARGE_INTEGER li;
li.QuadPart = pos;
li.LowPart = SetFilePointer(_handle, li.LowPart, &li.HighPart, FILE_BEGIN);
li.LowPart = ::SetFilePointer(_handle, li.LowPart, &li.HighPart, FILE_BEGIN);

if (li.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR)
if (li.LowPart == INVALID_SET_FILE_POINTER && ::GetLastError() != NO_ERROR)
File::handleLastError(_path);
_pos = li.QuadPart;
return std::streampos(static_cast<std::streamoff>(_pos));
Expand All @@ -226,7 +238,7 @@ void FileStreamBuf::flushToDisk()
if (getMode() & std::ios::out)
{
sync();
if (FlushFileBuffers(_handle) == 0)
if (::FlushFileBuffers(_handle) == 0)
File::handleLastError(_path);
}
}
Expand Down
Loading

0 comments on commit e4b1538

Please sign in to comment.