Skip to content

Commit

Permalink
Refactor, add file and stream wrapper classes
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Dec 28, 2020
1 parent 86ec45e commit b0fd657
Show file tree
Hide file tree
Showing 18 changed files with 406 additions and 240 deletions.
5 changes: 3 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ set(XTENSOR_IO_HEADERS
${XTENSOR_IO_INCLUDE_DIR}/xtensor-io/xio_gcs_handler.hpp
${XTENSOR_IO_INCLUDE_DIR}/xtensor-io/xio_gdal_handler.hpp
${XTENSOR_IO_INCLUDE_DIR}/xtensor-io/xio_gzip.hpp
${XTENSOR_IO_INCLUDE_DIR}/xtensor-io/xio_file.hpp
${XTENSOR_IO_INCLUDE_DIR}/xtensor-io/xio_vsilfile.hpp
${XTENSOR_IO_INCLUDE_DIR}/xtensor-io/xio_file_wrapper.hpp
${XTENSOR_IO_INCLUDE_DIR}/xtensor-io/xio_vsilfile_wrapper.hpp
${XTENSOR_IO_INCLUDE_DIR}/xtensor-io/xio_stream_wrapper.hpp
${XTENSOR_IO_INCLUDE_DIR}/xtensor-io/xnpz.hpp
${XTENSOR_IO_INCLUDE_DIR}/xtensor-io/xtensor-io.hpp
${XTENSOR_IO_INCLUDE_DIR}/xtensor-io/xtensor_io_config.hpp
Expand Down
7 changes: 5 additions & 2 deletions include/xtensor-io/xio_aws_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "xtensor/xarray.hpp"
#include "xtensor/xexpression.hpp"
#include "xfile_array.hpp"
#include "xio_stream_wrapper.hpp"
#include <aws/core/Aws.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/GetObjectRequest.h>
Expand Down Expand Up @@ -70,7 +71,8 @@ namespace xt
request.SetKey(path2);

std::shared_ptr<Aws::IOStream> writer = Aws::MakeShared<Aws::FStream>("SampleAllocationTag", path, std::ios_base::in | std::ios_base::binary);
dump_file(*writer, expression, m_format_config);
auto s = xostream_wrapper(*writer);
dump_file(s, expression, m_format_config);

request.SetBody(writer);

Expand Down Expand Up @@ -109,7 +111,8 @@ namespace xt
}

auto& reader = outcome.GetResultWithOwnership().GetBody();
load_file<ET>(reader, array, m_format_config);
auto s = xistream_wrapper(reader);
load_file<ET>(s, array, m_format_config);
}

template <class C>
Expand Down
91 changes: 33 additions & 58 deletions include/xtensor-io/xio_binary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,17 @@
#include "xtensor/xadapt.hpp"
#include "xtensor-io.hpp"
#include "xfile_array.hpp"
#include "xio_file.hpp"
#include "xio_stream_wrapper.hpp"

namespace xt
{
namespace detail
{
// load_bin "overload" for file-like objects
// we check that `fclose` can be called on them!
template<typename T, class I>
auto load_bin_imp(I& file, std::string& buffer)
-> decltype(file.ftell(), void())
{
file.fseek(0, SEEK_END);
std::size_t size = file.ftell();
buffer.resize(size);
file.rewind();
file.fread(&buffer[0], 1, size);
}

// load_bin "overload" for stream-like objects
// we check that they have a `tellg` method!
template<typename T, class I>
auto load_bin_imp(I& stream, std::string& buffer)
-> decltype(stream.tellg(), void())
{
buffer = {std::istreambuf_iterator<char>{stream}, {}};
}

template <typename T, class I>
inline xt::svector<T> load_bin(I& stream, bool as_big_endian)
{
std::string buffer;
load_bin_imp<T>(stream, buffer);
stream.read_all(buffer);
std::size_t uncompressed_size = buffer.size() / sizeof(T);
xt::svector<T> uncompressed_buffer(uncompressed_size);
std::copy((const T*)(buffer.data()), (const T*)(buffer.data()) + uncompressed_size, uncompressed_buffer.begin());
Expand All @@ -58,26 +36,6 @@ namespace xt
return uncompressed_buffer;
}

// dump_bin "overload" for file-like objects
// we check that `fclose` can be called on them!
template <class O>
auto dump_bin_imp(O& file, const char* uncompressed_buffer, std::size_t uncompressed_size)
-> decltype(file.ftell(), void())
{
file.fwrite(uncompressed_buffer, 1, uncompressed_size);
file.fflush();
}

// dump_bin "overload" for stream-like objects
// we check that they have a `tellp` method!
template <class O>
auto dump_bin_imp(O& stream, const char* uncompressed_buffer, std::size_t uncompressed_size)
-> decltype(stream.tellp(), void())
{
stream.write(uncompressed_buffer, std::streamsize(uncompressed_size));
stream.flush();
}

template <class O, class E>
inline void dump_bin(O& stream, const xexpression<E>& e, bool as_big_endian)
{
Expand All @@ -100,26 +58,28 @@ namespace xt
{
uncompressed_buffer = reinterpret_cast<const char*>(eval_ex.data());
}
dump_bin_imp(stream, uncompressed_buffer, uncompressed_size);
stream.write(uncompressed_buffer, uncompressed_size);
stream.flush();
}
} // namespace detail

template <typename E, class O>
inline void dump_bin(O& stream, const xexpression<E>& e, bool as_big_endian=is_big_endian())
{
detail::dump_bin(stream, e, as_big_endian);
}

/**
* Save xexpression to binary format
*
* @param stream An output stream to which to dump the data
* @param e the xexpression
*/
template <typename E, class O>
inline void dump_bin(O& stream, const xexpression<E>& e, bool as_big_endian=is_big_endian())
{
detail::dump_bin(stream, e, as_big_endian);
}

template <typename E>
inline void dump_bin(std::ofstream& stream, const xexpression<E>& e, bool as_big_endian=is_big_endian())
{
detail::dump_bin(stream, e, as_big_endian);
auto s = xostream_wrapper(stream);
detail::dump_bin(s, e, as_big_endian);
}

/**
Expand All @@ -129,14 +89,21 @@ namespace xt
* @param e the xexpression
*/
template <typename E>
inline void dump_bin(const std::string& filename, const xexpression<E>& e, bool as_big_endian=is_big_endian())
inline void dump_bin(const char* filename, const xexpression<E>& e, bool as_big_endian=is_big_endian())
{
std::ofstream stream(filename, std::ofstream::binary);
if (!stream.is_open())
{
std::runtime_error("IO Error: failed to open file");
}
detail::dump_bin(stream, e, as_big_endian);
auto s = xostream_wrapper(stream);
detail::dump_bin(s, e, as_big_endian);
}

template <typename E>
inline void dump_bin(const std::string& filename, const xexpression<E>& e, bool as_big_endian=is_big_endian())
{
dump_bin<E>(filename.c_str(), e, as_big_endian);
}

/**
Expand All @@ -148,7 +115,8 @@ namespace xt
inline std::string dump_bin(const xexpression<E>& e, bool as_big_endian=is_big_endian())
{
std::stringstream stream;
detail::dump_bin(stream, e, as_big_endian);
auto s = xostream_wrapper(stream);
detail::dump_bin(s, e, as_big_endian);
return stream.str();
}

Expand Down Expand Up @@ -180,14 +148,21 @@ namespace xt
* @return xarray with contents from binary file
*/
template <typename T, layout_type L = layout_type::dynamic>
inline auto load_bin(const std::string& filename, bool as_big_endian=is_big_endian())
inline auto load_bin(const char* filename, bool as_big_endian=is_big_endian())
{
std::ifstream stream(filename, std::ifstream::binary);
if (!stream.is_open())
{
std::runtime_error("load_bin: failed to open file " + filename);
std::runtime_error(std::string("load_bin: failed to open file ") + filename);
}
return load_bin<T, L>(stream, as_big_endian);
auto s = xistream_wrapper(stream);
return load_bin<T, L>(s, as_big_endian);
}

template <typename T, layout_type L = layout_type::dynamic>
inline auto load_bin(const std::string& filename, bool as_big_endian=is_big_endian())
{
return load_bin<T, L>(filename.c_str(), as_big_endian);
}

struct xio_binary_config
Expand Down
66 changes: 45 additions & 21 deletions include/xtensor-io/xio_blosc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "xtensor-io.hpp"
#include "xfile_array.hpp"
#include "blosc.h"
#include "xio_stream_wrapper.hpp"

namespace xt
{
Expand All @@ -31,11 +32,12 @@ namespace xt
}
}

template <typename T>
inline xt::svector<T> load_blosc_file(std::istream& stream, bool as_big_endian)
template <typename T, class I>
inline xt::svector<T> load_blosc(I& stream, bool as_big_endian)
{
init_blosc();
std::string compressed_buffer{std::istreambuf_iterator<char>{stream}, {}};
std::string compressed_buffer;
stream.read_all(compressed_buffer);
auto compressed_size = compressed_buffer.size();
std::size_t uncompressed_size = 0;
int res = blosc_cbuffer_validate(compressed_buffer.data(), compressed_size, &uncompressed_size);
Expand All @@ -60,7 +62,7 @@ namespace xt
}

template <class O, class E>
inline void dump_blosc_stream(O& stream, const xexpression<E>& e, bool as_big_endian, int clevel, int shuffle, const char* cname, std::size_t blocksize)
inline void dump_blosc(O& stream, const xexpression<E>& e, bool as_big_endian, int clevel, int shuffle, const char* cname, std::size_t blocksize)
{
init_blosc();
using value_type = typename E::value_type;
Expand Down Expand Up @@ -99,8 +101,8 @@ namespace xt
{
XTENSOR_THROW(std::runtime_error, "Blosc: compression error");
}
stream.write(compressed_buffer,
std::streamsize(true_compressed_size));
stream.write(compressed_buffer, std::streamsize(true_compressed_size));
stream.flush();
char_allocator.deallocate(compressed_buffer, max_compressed_size);
}
} // namespace detail
Expand All @@ -111,10 +113,17 @@ namespace xt
* @param stream An output stream to which to dump the data
* @param e the xexpression
*/
template <typename E, class O>
inline void dump_blosc(O& stream, const xexpression<E>& e, bool as_big_endian=is_big_endian(), int clevel=5, int shuffle=1, const char* cname="blosclz", std::size_t blocksize=0)
{
detail::dump_blosc(stream, e, as_big_endian, clevel, shuffle, cname, blocksize);
}

template <typename E>
inline void dump_blosc(std::ostream& stream, const xexpression<E>& e, bool as_big_endian=is_big_endian(), int clevel=5, int shuffle=1, const char* cname="blosclz", std::size_t blocksize=0)
{
detail::dump_blosc_stream(stream, e, as_big_endian, clevel, shuffle, cname, blocksize);
auto s = xostream_wrapper(stream);
detail::dump_blosc(s, e, as_big_endian, clevel, shuffle, cname, blocksize);
}

/**
Expand All @@ -124,14 +133,21 @@ namespace xt
* @param e the xexpression
*/
template <typename E>
inline void dump_blosc(const std::string& filename, const xexpression<E>& e, bool as_big_endian=is_big_endian(), int clevel=5, int shuffle=1, const char* cname="blosclz", std::size_t blocksize=0)
inline void dump_blosc(const char* filename, const xexpression<E>& e, bool as_big_endian=is_big_endian(), int clevel=5, int shuffle=1, const char* cname="blosclz", std::size_t blocksize=0)
{
std::ofstream stream(filename, std::ofstream::binary);
if (!stream.is_open())
{
XTENSOR_THROW(std::runtime_error, "Blosc: failed to open file " + filename);
XTENSOR_THROW(std::runtime_error, std::string("Blosc: failed to open file ") + filename);
}
detail::dump_blosc_stream(stream, e, as_big_endian, clevel, shuffle, cname, blocksize);
auto s = xostream_wrapper(stream);
detail::dump_blosc(s, e, as_big_endian, clevel, shuffle, cname, blocksize);
}

template <typename E>
inline void dump_blosc(const std::string& filename, const xexpression<E>& e, bool as_big_endian=is_big_endian(), int clevel=5, int shuffle=1, const char* cname="blosclz", std::size_t blocksize=0)
{
dump_blosc<E>(filename.c_str(), e, as_big_endian, clevel, shuffle, cname, blocksize);
}

/**
Expand All @@ -143,7 +159,8 @@ namespace xt
inline std::string dump_blosc(const xexpression<E>& e, bool as_big_endian=is_big_endian(), int clevel=5, int shuffle=1, const char* cname="blosclz", std::size_t blocksize=0)
{
std::stringstream stream;
detail::dump_blosc_stream(stream, e, as_big_endian, clevel, shuffle, cname, blocksize);
auto s = xostream_wrapper(stream);
detail::dump_blosc(s, e, as_big_endian, clevel, shuffle, cname, blocksize);
return stream.str();
}

Expand All @@ -156,10 +173,10 @@ namespace xt
* Fortran format
* @return xarray with contents from blosc file
*/
template <typename T, layout_type L = layout_type::dynamic>
inline auto load_blosc(std::istream& stream, bool as_big_endian=is_big_endian())
template <typename T, layout_type L = layout_type::dynamic, class I>
inline auto load_blosc(I& stream, bool as_big_endian=is_big_endian())
{
xt::svector<T> uncompressed_buffer = detail::load_blosc_file<T>(stream, as_big_endian);
xt::svector<T> uncompressed_buffer = detail::load_blosc<T>(stream, as_big_endian);
std::vector<std::size_t> shape = {uncompressed_buffer.size()};
auto array = adapt(std::move(uncompressed_buffer), shape);
return array;
Expand All @@ -175,14 +192,21 @@ namespace xt
* @return xarray with contents from blosc file
*/
template <typename T, layout_type L = layout_type::dynamic>
inline auto load_blosc(const std::string& filename, bool as_big_endian=is_big_endian())
inline auto load_blosc(const char* filename, bool as_big_endian=is_big_endian())
{
std::ifstream stream(filename, std::ifstream::binary);
if (!stream.is_open())
{
XTENSOR_THROW(std::runtime_error, "Blosc: failed to open file " + filename);
XTENSOR_THROW(std::runtime_error, std::string("Blosc: failed to open file ") + filename);
}
return load_blosc<T, L>(stream, as_big_endian);
auto s = xistream_wrapper(stream);;
return load_blosc<T, L>(s, as_big_endian);
}

template <typename T, layout_type L = layout_type::dynamic>
inline auto load_blosc(const std::string& filename, bool as_big_endian=is_big_endian())
{
return load_blosc<T, L>(filename.c_str(), as_big_endian);
}

struct xio_blosc_config
Expand Down Expand Up @@ -230,8 +254,8 @@ namespace xt
}
};

template <class E>
void load_file(std::istream& stream, xexpression<E>& e, const xio_blosc_config& config)
template <class E, class I>
void load_file(I& stream, xexpression<E>& e, const xio_blosc_config& config)
{
E& ex = e.derived_cast();
auto shape = ex.shape();
Expand All @@ -246,8 +270,8 @@ namespace xt
}
}

template <class E>
void dump_file(std::ostream& stream, const xexpression<E> &e, const xio_blosc_config& config)
template <class E, class O>
void dump_file(O& stream, const xexpression<E> &e, const xio_blosc_config& config)
{
dump_blosc(stream, e, config.big_endian, config.clevel, config.shuffle, config.cname.c_str(), config.blocksize);
}
Expand Down
7 changes: 5 additions & 2 deletions include/xtensor-io/xio_disk_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <xtensor/xarray.hpp>
#include <xtensor/xexpression.hpp>
#include <xtensor-io/xio_stream_wrapper.hpp>

namespace xt
{
Expand Down Expand Up @@ -39,7 +40,8 @@ namespace xt
std::ofstream out_file(path, std::ofstream::binary);
if (out_file.is_open())
{
dump_file(out_file, expression, m_format_config);
auto s = xostream_wrapper(out_file);
dump_file(s, expression, m_format_config);
}
else
{
Expand All @@ -55,7 +57,8 @@ namespace xt
std::ifstream in_file(path, std::ifstream::binary);
if (in_file.is_open())
{
load_file<ET>(in_file, array, m_format_config);
auto s = xistream_wrapper(in_file);
load_file<ET>(s, array, m_format_config);
}
else
{
Expand Down
Loading

0 comments on commit b0fd657

Please sign in to comment.