Skip to content

Commit

Permalink
Stream reflectiontable msgpack directly to file (#1115)
Browse files Browse the repository at this point in the history
The previous implementation held the reflection table up to 3 times in
memory during the output phase. This implementation passes the python
filehandle into C++ space and then writes the msgpack output directly
to the file. As a result memory consumption is significantly reduced.
  • Loading branch information
Anthchirp committed Feb 3, 2020
1 parent 878f08d commit 5255a86
Show file tree
Hide file tree
Showing 9 changed files with 779 additions and 1 deletion.
13 changes: 13 additions & 0 deletions array_family/boost_python/flex_reflection_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <boost/python.hpp>
#include <boost/python/def.hpp>
#include <boost/python/suite/indexing/map_indexing_suite.hpp>
#include <dials/util/python_streambuf.h>
#include <numeric>
#include <dials/array_family/boost_python/flex_table_suite.h>
#include <dials/array_family/reflection_table.h>
Expand All @@ -28,6 +29,7 @@
namespace dials { namespace af { namespace boost_python {

using namespace boost::python;
using dials::util::streambuf;
using dials::algorithms::profile_model::gaussian_rs::CoordinateSystem;
using dials::model::Observation;
using dials::model::Shoebox;
Expand Down Expand Up @@ -846,6 +848,16 @@ namespace dials { namespace af { namespace boost_python {
return data_bytes;
}

/**
* Pack the reflection table in msgpack format into a streambuf object
* @param self The reflection table
* @param output A streambuf object encapsulating a Python file-like object
*/
void reflection_table_as_msgpack_to_file(reflection_table self, streambuf& output) {
streambuf::ostream os(output);
msgpack::pack(os, self);
}

/**
* Override default reference func to avoid copying
*/
Expand Down Expand Up @@ -1010,6 +1022,7 @@ namespace dials { namespace af { namespace boost_python {
&split_indices_by_experiment_id<flex_table_type>)
.def("compute_phi_range", &compute_phi_range<flex_table_type>)
.def("as_msgpack", &reflection_table_as_msgpack)
.def("as_msgpack_to_file", &reflection_table_as_msgpack_to_file)
.def("from_msgpack", &reflection_table_from_msgpack)
.staticmethod("from_msgpack")
.def("experiment_identifiers", &T::experiment_identifiers)
Expand Down
3 changes: 2 additions & 1 deletion array_family/flex.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import cctbx.array_family.flex
import cctbx.miller
import dials_array_family_flex_ext
import dials.util.ext
import libtbx.smart_open
import six
import six.moves.cPickle as pickle
Expand Down Expand Up @@ -222,7 +223,7 @@ def as_msgpack_file(self, filename):
if filename and hasattr(filename, "__fspath__"):
filename = filename.__fspath__()
with libtbx.smart_open.for_writing(filename, "wb") as outfile:
outfile.write(self.as_msgpack())
self.as_msgpack_to_file(dials.util.ext.streambuf(python_file_obj=outfile))

@staticmethod
def from_msgpack_file(filename):
Expand Down
1 change: 1 addition & 0 deletions newsfragments/1115.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Significantly reduce the amount of memory required to write .refl output files
4 changes: 4 additions & 0 deletions util/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ if os.name == "nt":
env.Prepend(LIBS=["Advapi32"])

env.SharedLibrary(target="#/lib/dials_util_ext", source=sources, LIBS=env["LIBS"])
env.SharedLibrary(
target="#/lib/dials_util_streambuf_test_ext",
source="boost_python/streambuf_test_ext.cpp",
)
31 changes: 31 additions & 0 deletions util/boost_python/ext.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,36 @@
#include <dials/util/scale_down_array.h>
#include <dials/util/masking.h>
#include <dials/util/export_mtz_helpers.h>
#include <dials/util/python_streambuf.h>

std::size_t dials::util::streambuf::default_buffer_size = 1024;
namespace dials { namespace util { namespace boost_python {
struct python_streambuf_wrapper {
typedef dials::util::streambuf wt;

static void wrap() {
using namespace boost::python;
class_<wt, boost::noncopyable>("streambuf", no_init)
.def(init<boost::python::object &, std::size_t>(
(arg("python_file_obj"), arg("buffer_size") = 0)))
.def_readwrite("default_buffer_size",
wt::default_buffer_size,
"The default size of the buffer sitting "
"between a Python file object and a C++ stream.");
}
};

struct python_ostream_wrapper {
typedef dials::util::ostream wt;

static void wrap() {
using namespace boost::python;
class_<std::ostream, boost::noncopyable>("std_ostream", no_init);
class_<wt, boost::noncopyable, bases<std::ostream> >("ostream", no_init)
.def(init<boost::python::object &, std::size_t>(
(arg("python_file_obj"), arg("buffer_size") = 0)));
}
};

using namespace boost::python;
BOOST_PYTHON_MODULE(dials_util_ext) {
Expand Down Expand Up @@ -32,5 +60,8 @@ namespace dials { namespace util { namespace boost_python {
class_<ResolutionMaskGenerator>("ResolutionMaskGenerator", no_init)
.def(init<const BeamBase &, const Panel &>())
.def("apply", &ResolutionMaskGenerator::apply);

python_streambuf_wrapper::wrap();
python_ostream_wrapper::wrap();
}
}}} // namespace dials::util::boost_python
121 changes: 121 additions & 0 deletions util/boost_python/streambuf_test_ext.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#include <boost/python/module.hpp>
#include <boost/python/def.hpp>

#include <dials/util/python_streambuf.h>
#include <fstream>

namespace dials { namespace util { namespace {

template <class StreamType>
boost::python::object append_status(StreamType const& s, std::string& result) {
if (!s.good()) result += "[ ";
if (s.bad()) result += "bad, ";
if (s.fail()) result += "fail, ";
if (s.eof()) result += "eof";
if (!s.good()) result += " ]";
boost::python::object data_bytes(boost::python::handle<>(
PyBytes_FromStringAndSize(result.c_str(), result.size())));
return data_bytes;
}

// Coding should be fun
// 012345678901234567890

boost::python::object read_word(streambuf& input) {
streambuf::istream is(input);
std::string word, result;

while (is >> word) {
result += word + ", ";
};

return append_status(is, result);
}

boost::python::object read_and_seek(streambuf& input) {
streambuf::istream is(input);
std::string word, result;

is.seekg(6);
is >> word;
result += word + ", "; // should
is.seekg(6, std::ios_base::beg);
is >> word;
result += word + ", "; // should
is.seekg(-3, std::ios_base::cur);
is >> word;
result += word + ", "; // uld
is.seekg(-11, std::ios_base::cur);
is >> word;
result += word + ", "; // ding
is.seekg(-4, std::ios_base::end);
is >> word;
result += word + ", "; // fun

return append_status(is, result);
}

boost::python::object partial_read(streambuf& input) {
streambuf::istream is(input);
std::string word, result;

is >> word;
result += word + ", ";
is >> word;
result += word + ", ";

return append_status(is, result);
}

boost::python::object write_word_ostream(std::ostream& os) {
std::string result;

os << 2 << " times " << 1.6 << " equals " << 3.2;
os.flush();

return append_status(os, result);
}

boost::python::object write_and_seek_ostream(std::ostream& os) {
std::string result;

os << 1000 << " timEs " << 5555 << " equalS " << 1000000;
// 1000 timEs 5555 equalS 1700000
// 0123456789012345678901234567890
os.seekp(-19, std::ios_base::cur);
os << 1000;
os.seekp(6, std::ios_base::cur);
os << "s";
os.seekp(-14, std::ios_base::cur);
os << "e";
os.flush();

return append_status(os, result);
}

boost::python::object write_word(streambuf& output) {
streambuf::ostream os(output);
return write_word_ostream(os);
}

boost::python::object write_and_seek(streambuf& output) {
streambuf::ostream os(output);
return write_and_seek_ostream(os);
}

void wrap_all() {
using namespace boost::python;
def("read_word", read_word);
def("read_and_seek", read_and_seek);
def("partial_read", partial_read);
def("write_word", write_word);
def("write_word", write_word_ostream);
def("write_and_seek", write_and_seek);
def("write_and_seek", write_and_seek_ostream);
}

}}} // namespace dials::util::

BOOST_PYTHON_MODULE(dials_util_streambuf_test_ext) {
dials::util::wrap_all();
}
2 changes: 2 additions & 0 deletions util/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@
"add_dials_batches",
"dials_u_to_mosflm",
"scale_down_array",
"streambuf",
"ostream",
)
Loading

0 comments on commit 5255a86

Please sign in to comment.