Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce memory usage estimation mode in data_frame_analyzer #584

Merged
merged 6 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bin/data_frame_analyzer/CCmdLineParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const std::string CCmdLineParser::DESCRIPTION = "Usage: data_frame_analyzer [opt
bool CCmdLineParser::parse(int argc,
const char* const* argv,
std::string& configFile,
bool& memoryUsageEstimationOnly,
std::string& logProperties,
std::string& logPipe,
bool& lengthEncodedInput,
Expand All @@ -35,6 +36,7 @@ bool CCmdLineParser::parse(int argc,
("version", "Display version information and exit")
("config", boost::program_options::value<std::string>(),
"The configuration file")
("memoryUsageEstimationOnly", "Whether to perform memory usage estimation only")
("logProperties", boost::program_options::value<std::string>(),
"Optional logger properties file")
("logPipe", boost::program_options::value<std::string>(),
Expand Down Expand Up @@ -66,6 +68,9 @@ bool CCmdLineParser::parse(int argc,
if (vm.count("config") > 0) {
configFile = vm["config"].as<std::string>();
}
if (vm.count("memoryUsageEstimationOnly") > 0) {
memoryUsageEstimationOnly = true;
}
if (vm.count("logProperties") > 0) {
logProperties = vm["logProperties"].as<std::string>();
}
Expand Down
1 change: 1 addition & 0 deletions bin/data_frame_analyzer/CCmdLineParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class CCmdLineParser {
static bool parse(int argc,
const char* const* argv,
std::string& configFile,
bool& memoryUsageEstimationOnly,
std::string& logProperties,
std::string& logPipe,
bool& lengthEncodedInput,
Expand Down
31 changes: 22 additions & 9 deletions bin/data_frame_analyzer/Main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <api/CDataFrameOutliersRunner.h>
#include <api/CIoManager.h>
#include <api/CLengthEncodedInputParser.h>
#include <api/CMemoryUsageEstimationResultJsonWriter.h>

#include "CCmdLineParser.h"

Expand Down Expand Up @@ -86,6 +87,7 @@ int main(int argc, char** argv) {

// Read command line options
std::string configFile;
bool memoryUsageEstimationOnly(false);
std::string logProperties;
std::string logPipe;
bool lengthEncodedInput(false);
Expand All @@ -94,8 +96,9 @@ int main(int argc, char** argv) {
std::string outputFileName;
bool isOutputFileNamedPipe(false);
if (ml::data_frame_analyzer::CCmdLineParser::parse(
argc, argv, configFile, logProperties, logPipe, lengthEncodedInput, inputFileName,
isInputFileNamedPipe, outputFileName, isOutputFileNamedPipe) == false) {
argc, argv, configFile, memoryUsageEstimationOnly, logProperties,
logPipe, lengthEncodedInput, inputFileName, isInputFileNamedPipe,
outputFileName, isOutputFileNamedPipe) == false) {
return EXIT_FAILURE;
}

Expand Down Expand Up @@ -127,13 +130,6 @@ int main(int argc, char** argv) {
}

using TInputParserUPtr = std::unique_ptr<ml::api::CInputParser>;
auto inputParser{[lengthEncodedInput, &ioMgr]() -> TInputParserUPtr {
if (lengthEncodedInput) {
return std::make_unique<ml::api::CLengthEncodedInputParser>(ioMgr.inputStream());
}
return std::make_unique<ml::api::CCsvInputParser>(
ioMgr.inputStream(), ml::api::CCsvInputParser::COMMA);
}()};

std::string analysisSpecificationJson;
bool couldReadConfigFile;
Expand All @@ -145,6 +141,16 @@ int main(int argc, char** argv) {

auto analysisSpecification =
std::make_unique<ml::api::CDataFrameAnalysisSpecification>(analysisSpecificationJson);

if (memoryUsageEstimationOnly) {
auto outStream = [&ioMgr]() {
return std::make_unique<ml::core::CJsonOutputStreamWrapper>(ioMgr.outputStream());
}();
ml::api::CMemoryUsageEstimationResultJsonWriter writer(*outStream);
analysisSpecification->estimateMemoryUsage(writer);
return EXIT_SUCCESS;
}

if (analysisSpecification->numberThreads() > 1) {
ml::core::startDefaultAsyncExecutor(analysisSpecification->numberThreads());
}
Expand All @@ -156,6 +162,13 @@ int main(int argc, char** argv) {

CCleanUpOnExit::add(dataFrameAnalyzer.dataFrameDirectory());

auto inputParser{[lengthEncodedInput, &ioMgr]() -> TInputParserUPtr {
if (lengthEncodedInput) {
return std::make_unique<ml::api::CLengthEncodedInputParser>(ioMgr.inputStream());
}
return std::make_unique<ml::api::CCsvInputParser>(
ioMgr.inputStream(), ml::api::CCsvInputParser::COMMA);
}()};
if (inputParser->readStreamIntoVecs(
[&dataFrameAnalyzer](const auto& fieldNames, const auto& fieldValues) {
return dataFrameAnalyzer.handleRecord(fieldNames, fieldValues);
Expand Down
6 changes: 6 additions & 0 deletions include/api/CDataFrameAnalysisRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class CRowRef;
}
namespace api {
class CDataFrameAnalysisSpecification;
class CMemoryUsageEstimationResultJsonWriter;

//! \brief Hierarchy for running a specific core::CDataFrame analyses.
//!
Expand Down Expand Up @@ -75,6 +76,11 @@ class API_EXPORT CDataFrameAnalysisRunner {
//! number of rows per subset.
void computeAndSaveExecutionStrategy();

//! Estimates memory usage in two cases: one partition (the whole data frame
//! fits in main memory) and maximum tolerable number of partitions (only
//! one partition needs to be loaded to main memory).
void estimateMemoryUsage(CMemoryUsageEstimationResultJsonWriter& writer) const;

//! Check if the data frame for this analysis should use in or out of core
//! storage.
bool storeDataFrameInMainMemory() const;
Expand Down
5 changes: 5 additions & 0 deletions include/api/CDataFrameAnalysisSpecification.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ class API_EXPORT CDataFrameAnalysisSpecification {
//! calling thread until the runner has finished.
CDataFrameAnalysisRunner* run(core::CDataFrame& frame) const;

//! Estimates memory usage in two cases: one partition (the whole data frame
//! fits in main memory) and maximum tolerable number of partitions (only
//! one partition needs to be loaded to main memory).
void estimateMemoryUsage(CMemoryUsageEstimationResultJsonWriter& writer) const;

private:
void initializeRunner(const rapidjson::Value& jsonAnalysis);

Expand Down
42 changes: 42 additions & 0 deletions include/api/CMemoryUsageEstimationResultJsonWriter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
#ifndef INCLUDED_ml_api_CMemoryUsageEstimationResultJsonWriter_h
#define INCLUDED_ml_api_CMemoryUsageEstimationResultJsonWriter_h

#include <core/CJsonOutputStreamWrapper.h>
#include <core/CNonCopyable.h>
#include <core/CRapidJsonConcurrentLineWriter.h>

#include <api/ImportExport.h>

#include <string>

namespace ml {
namespace api {

//! \brief
//! Write memory usage estimation result in JSON format
//!
//! DESCRIPTION:\n
//! Outputs the memory usage estimation result.
//!
class API_EXPORT CMemoryUsageEstimationResultJsonWriter : private core::CNonCopyable {
public:
//! \param[in] strmOut The wrapped stream to which to write output.
CMemoryUsageEstimationResultJsonWriter(core::CJsonOutputStreamWrapper& strmOut);

//! Writes the given memory usage estimation result in JSON format.
void write(const std::string& expectedMemoryUsageWithOnePartition,
const std::string& expectedMemoryUsageWithMaxPartitions);

private:
//! JSON line writer
core::CRapidJsonConcurrentLineWriter m_Writer;
};
}
}

#endif // INCLUDED_ml_api_CMemoryUsageEstimationResultJsonWriter_h
38 changes: 30 additions & 8 deletions lib/api/CDataFrameAnalysisRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <core/CScopedFastLock.h>

#include <api/CDataFrameAnalysisSpecification.h>
#include <api/CMemoryUsageEstimationResultJsonWriter.h>

#include <boost/iterator/counting_iterator.hpp>

Expand All @@ -24,6 +25,13 @@ std::size_t memoryLimitWithSafetyMargin(const CDataFrameAnalysisSpecification& s
return static_cast<std::size_t>(0.9 * static_cast<double>(spec.memoryLimit()) + 0.5);
}

std::size_t maximumNumberPartitions(const CDataFrameAnalysisSpecification& spec) {
// We limit the maximum number of partitions to rows^(1/2) because very
// large numbers of partitions are going to be slow and it is better to tell
// user to allocate more resources for the job in this case.
return static_cast<std::size_t>(std::sqrt(static_cast<double>(spec.numberRows())) + 0.5);
}

const std::size_t MAXIMUM_FRACTIONAL_PROGRESS{std::size_t{1}
<< ((sizeof(std::size_t) - 2) * 8)};
}
Expand All @@ -36,6 +44,25 @@ CDataFrameAnalysisRunner::~CDataFrameAnalysisRunner() {
this->waitToFinish();
}

void CDataFrameAnalysisRunner::estimateMemoryUsage(CMemoryUsageEstimationResultJsonWriter& writer) const {
std::size_t numberRows{m_Spec.numberRows()};
std::size_t numberColumns{m_Spec.numberColumns() + this->numberExtraColumns()};
std::size_t maxNumberPartitions{maximumNumberPartitions(m_Spec)};
if (maxNumberPartitions == 0) {
writer.write("0", "0");
return;
}
std::size_t expectedMemoryUsageWithOnePartition{
this->estimateMemoryUsage(numberRows, numberRows, numberColumns)};
std::size_t expectedMemoryUsageWithMaxPartitions{this->estimateMemoryUsage(
numberRows, numberRows / maxNumberPartitions, numberColumns)};
auto roundUpToNearestKilobyte = [](std::size_t bytes) {
return std::to_string((bytes + 1024 - 1) / 1024) + "kB";
};
writer.write(roundUpToNearestKilobyte(expectedMemoryUsageWithOnePartition),
roundUpToNearestKilobyte(expectedMemoryUsageWithMaxPartitions));
}

void CDataFrameAnalysisRunner::computeAndSaveExecutionStrategy() {

std::size_t numberRows{m_Spec.numberRows()};
Expand All @@ -45,17 +72,12 @@ void CDataFrameAnalysisRunner::computeAndSaveExecutionStrategy() {
LOG_TRACE(<< "memory limit = " << memoryLimit);

// Find the smallest number of partitions such that the size per partition
// is less than the memory limit. We limit this to rows^(1/2) because very
// large numbers of partitions are going to be slow and it is better to tell
// user to allocate more resources for the job in this case.

std::size_t maximumNumberPartitions{
static_cast<std::size_t>(std::sqrt(static_cast<double>(numberRows)) + 0.5)};
// is less than the memory limit.

std::size_t maxNumberPartitions{maximumNumberPartitions(m_Spec)};
std::size_t memoryUsage{0};

for (m_NumberPartitions = 1; m_NumberPartitions < maximumNumberPartitions;
++m_NumberPartitions) {
for (m_NumberPartitions = 1; m_NumberPartitions < maxNumberPartitions; ++m_NumberPartitions) {
std::size_t partitionNumberRows{numberRows / m_NumberPartitions};
memoryUsage = this->estimateMemoryUsage(numberRows, partitionNumberRows, numberColumns);
LOG_TRACE(<< "partition number rows = " << partitionNumberRows);
Expand Down
9 changes: 9 additions & 0 deletions lib/api/CDataFrameAnalysisSpecification.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <api/CDataFrameAnalysisConfigReader.h>
#include <api/CDataFrameBoostedTreeRunner.h>
#include <api/CDataFrameOutliersRunner.h>
#include <api/CMemoryUsageEstimationResultJsonWriter.h>

#include <rapidjson/document.h>
#include <rapidjson/ostreamwrapper.h>
Expand Down Expand Up @@ -183,6 +184,14 @@ CDataFrameAnalysisRunner* CDataFrameAnalysisSpecification::run(core::CDataFrame&
return nullptr;
}

void CDataFrameAnalysisSpecification::estimateMemoryUsage(CMemoryUsageEstimationResultJsonWriter& writer) const {
if (m_Runner == nullptr) {
HANDLE_FATAL(<< "Internal error: no runner available so can't estimate memory. Please report this problem.");
return;
}
m_Runner->estimateMemoryUsage(writer);
}

void CDataFrameAnalysisSpecification::initializeRunner(const rapidjson::Value& jsonAnalysis) {
// We pass of the interpretation of the parameters object to the appropriate
// analysis runner.
Expand Down
35 changes: 35 additions & 0 deletions lib/api/CMemoryUsageEstimationResultJsonWriter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

#include <api/CMemoryUsageEstimationResultJsonWriter.h>

namespace ml {
namespace api {
namespace {

// JSON field names
const std::string EXPECTED_MEMORY_USAGE_WITH_ONE_PARTITION("expected_memory_usage_with_one_partition");
const std::string EXPECTED_MEMORY_USAGE_WITH_MAX_PARTITIONS("expected_memory_usage_with_max_partitions");
}

CMemoryUsageEstimationResultJsonWriter::CMemoryUsageEstimationResultJsonWriter(core::CJsonOutputStreamWrapper& strmOut)
: m_Writer(strmOut) {
// Don't write any output in the constructor because, the way things work at
// the moment, the output stream might be redirected after construction
}

void CMemoryUsageEstimationResultJsonWriter::write(const std::string& expectedMemoryUsageWithOnePartition,
const std::string& expectedMemoryUsageWithMaxPartitions) {
m_Writer.StartObject();
m_Writer.Key(EXPECTED_MEMORY_USAGE_WITH_ONE_PARTITION);
m_Writer.String(expectedMemoryUsageWithOnePartition);
m_Writer.Key(EXPECTED_MEMORY_USAGE_WITH_MAX_PARTITIONS);
m_Writer.String(expectedMemoryUsageWithMaxPartitions);
m_Writer.EndObject();
m_Writer.flush();
}
}
}
1 change: 1 addition & 0 deletions lib/api/Makefile.first
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ CInputParser.cc \
CIoManager.cc \
CJsonOutputWriter.cc \
CLengthEncodedInputParser.cc \
CMemoryUsageEstimationResultJsonWriter.cc \
CModelPlotDataJsonWriter.cc \
CModelSizeStatsJsonWriter.cc \
CModelSnapshotJsonWriter.cc \
Expand Down
Loading