Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modifying --runstats and validating arguments #2

Open
wants to merge 7 commits into
base: phase-2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/cls/tabular/cls_tabular.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1933,13 +1933,11 @@ int exec_runstats_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
return -EINVAL;
}

CLS_LOG(20, "exec_runstats_op: db_schema=%s", op.db_schema.c_str());
CLS_LOG(20, "exec_runstats_op: table_name=%s", op.table_name.c_str());
CLS_LOG(20, "exec_runstats_op: runstats_args=%s", op.runstats_args.c_str());
CLS_LOG(20, "exec_runstats_op: data_schema=%s", op.data_schema.c_str());

using namespace Tables;
std::string dbschema = op.db_schema;
std::string table_name = op.table_name;
std::string runstats_args = op.runstats_args;
schema_vec data_schema = schemaFromString(op.data_schema);

return 0;
Expand Down
16 changes: 6 additions & 10 deletions src/cls/tabular/cls_tabular.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,37 +305,33 @@ WRITE_CLASS_ENCODER(test_op)

struct stats_op {

std::string db_schema;
std::string table_name;
std::string runstats_args;
std::string data_schema;

stats_op() {}
stats_op(std::string dbscma, std::string tname, std::string dtscma) :
db_schema(dbscma), table_name(tname), data_schema(dtscma) { }
stats_op(std::string runstats_args, std::string dtscma) :
runstats_args(runstats_args), data_schema(dtscma) { }

// serialize the fields into bufferlist to be sent over the wire
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
::encode(db_schema, bl);
::encode(table_name, bl);
::encode(runstats_args, bl);
::encode(data_schema, bl);
ENCODE_FINISH(bl);
}

// deserialize the fields from the bufferlist into this struct
void decode(bufferlist::iterator& bl) {
DECODE_START(1, bl);
::decode(db_schema, bl);
::decode(table_name, bl);
::decode(runstats_args, bl);
::decode(data_schema, bl);
DECODE_FINISH(bl);
}

std::string toString() {
std::string s;
s.append("stats_op:");
s.append(" .db_schema=" + db_schema);
s.append(" .table_name=" + table_name);
s.append(" .runstats_args=" + runstats_args);
s.append(" .data_schema=" + data_schema);
return s;
}
Expand Down
68 changes: 67 additions & 1 deletion src/cls/tabular/cls_tabular_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,23 @@ static inline int strtou64(const std::string value, uint64_t *out)
try {
v = boost::lexical_cast<uint64_t>(value);
} catch (boost::bad_lexical_cast &) {
CLS_ERR("converting key into numeric value %s", value.c_str());
return -EIO;
}

*out = v;
return 0;
}

/*
* Convert string into float value.
*/
static inline int strtofloat(const std::string value, float *out)
{
float v;

try {
v = boost::lexical_cast<float>(value);
} catch (boost::bad_lexical_cast &) {
return -EIO;
}

Expand Down Expand Up @@ -490,6 +506,56 @@ class TypedPredicate : public PredicateBase
}
};

template <class T>
class LimitValue
{
public:
T val;
LimitValue(T v) : val(v) {};
LimitValue(const LimitValue& rhs);
LimitValue& operator=(const LimitValue& rhs);
};

template <typename T>
class StatsArgument
{
private:
const std::string col_name;
LimitValue<T> min;
LimitValue<T> max;
const uint64_t buckets;
const float sampling;

public:
StatsArgument(std::string col, const T& min_val, const T& max_val, uint64_t b, float s) :
col_name(col), min(min_val), max(max_val), buckets(b), sampling(s) {}

~StatsArgument() { }
StatsArgument& getThis() {return *this;}
const StatsArgument& getThis() const {return *this;}
T MinVal() {return min.val;}
T MaxVal() {return max.val;}

std::string toString() {
std::string s("StatsArgument:\n");
s.append(" col_name=" + col_name + "\n");
s.append(" min=");
std::stringstream ss_min;
ss_min << this->MinVal();
s.append(ss_min.str());
s.append("\n");
s.append(" max=");
std::stringstream ss_max;
ss_max << this->MaxVal();
s.append(ss_max.str());
s.append("\n");
s.append(" buckets=" + std::to_string(buckets) + "\n");
s.append(" sampling=" + std::to_string(sampling) + "\n");
s.append("\n");
return s;
}
};

// col metadata used for the schema
const int NUM_COL_INFO_FIELDS = 5;
struct col_info {
Expand Down
144 changes: 142 additions & 2 deletions src/progly/query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ std::string qop_groupby_cols;
std::string qop_orderby_cols;
std::string qop_index_preds;
std::string qop_index2_preds;
std::string qop_runstats_args;

// build index op params for flatbufs
bool idx_op_idx_unique;
Expand All @@ -87,7 +88,7 @@ std::string qop_file_name;
std::string qop_tree_name;

// other exec flags
bool runstats;
string runstats;
std::string project_cols;

// prints full record header and metadata
Expand Down Expand Up @@ -404,8 +405,147 @@ void worker_exec_runstats_op(librados::IoCtx *ioctx, stats_op op)
}
std::string oid = target_objects.back();
target_objects.pop_back();
std::cout << "computing stats...table: " << op.table_name << " oid: "
std::cout << "computing stats...table: " << op.runstats_args << " oid: "
<< oid << std::endl;

// Validating input
// Step-1 : Check if runstats_args has 5 arguments (col, min, max, bucket, sampling)
std::string arguments = op.runstats_args;
boost::trim(arguments);
vector<std::string> args;
boost::split(args, arguments, boost::is_any_of(","),
boost::token_compress_on);
assert(args.size() == 5);

// Step-2 : Check if the passed column is valid
std::string col = args[0];
std::string data_schema = op.data_schema;

boost::trim(col);
boost::trim(data_schema);

boost::to_upper(col);

assert (!data_schema.empty());

Tables::schema_vec table_schema = Tables::schemaFromString(data_schema);
Tables::schema_vec sv = schemaFromColNames(table_schema, col);
if (sv.empty()) {
cerr << "Error: colname=" << col << " not present in schema."
<< std::endl;
assert (Tables::TablesErrCodes::RequestedColNotPresent == 0);
}

// Step-3: Validate sampling argument: should be a float > 0 && <= 1
float sampling;
int err = Tables::strtofloat(args[4], &sampling);
if (err != 0) {
cerr << "Error: Invalid sampling data type." << std::endl;
exit(1);
}
if (!(sampling > 0 && sampling <= 1)) {
cerr << "Error: Invalid sampling value = " << sampling << " Should lie in range (0, 1]." << std::endl;
exit(1);
}

// Step-4 : Check number of buckets: should be int
uint64_t number_of_buckets;
err = Tables::strtou64(args[3], &number_of_buckets);
if (err != 0) {
cerr << "Error: Invalid number_of_buckets data type." << std::endl;
exit(1);
}

// Step-5: Get column data type, pack all arguments together and pass to function
Tables::col_info column = sv[0];
std::string min = args[1];
std::string max = args[2];
switch (column.type) {
case Tables::SkyDataType::SDT_INT8: {
Tables::StatsArgument<int8_t>* s = \
new Tables::StatsArgument<int8_t> \
(col, static_cast<int8_t>(std::stol(min)), static_cast<int8_t>(std::stol(max)),
number_of_buckets, sampling);
std::cout << s->toString() << "\n";
break;
}
case Tables::SkyDataType::SDT_INT16: {
Tables::StatsArgument<int16_t>* s = \
new Tables::StatsArgument<int16_t> \
(col, static_cast<int16_t>(std::stol(min)), static_cast<int16_t>(std::stol(max)),
number_of_buckets, sampling);
std::cout << s->toString() << "\n";
break;
}
case Tables::SkyDataType::SDT_INT32: {
Tables::StatsArgument<int32_t>* s = \
new Tables::StatsArgument<int32_t> \
(col, static_cast<int32_t>(std::stol(min)), static_cast<int32_t>(std::stol(max)),
number_of_buckets, sampling);
std::cout << s->toString() << "\n";
break;
}
case Tables::SkyDataType::SDT_INT64: {
Tables::StatsArgument<int64_t>* s = \
new Tables::StatsArgument<int64_t> \
(col, static_cast<int64_t>(std::stol(min)), static_cast<int64_t>(std::stol(max)),
number_of_buckets, sampling);
std::cout << s->toString() << "\n";
break;
}
case Tables::SkyDataType::SDT_UINT8: {
Tables::StatsArgument<uint8_t>* s = \
new Tables::StatsArgument<uint8_t> \
(col, static_cast<uint8_t>(std::stol(min)), static_cast<uint8_t>(std::stol(max)),
number_of_buckets, sampling);
std::cout << s->toString() << "\n";
break;
}
case Tables::SkyDataType::SDT_UINT16: {
Tables::StatsArgument<uint16_t>* s = \
new Tables::StatsArgument<uint16_t> \
(col, static_cast<uint16_t>(std::stol(min)), static_cast<uint16_t>(std::stol(max)),
number_of_buckets, sampling);
std::cout << s->toString() << "\n";
break;
}
case Tables::SkyDataType::SDT_UINT32: {
Tables::StatsArgument<uint32_t>* s = \
new Tables::StatsArgument<uint32_t> \
(col, static_cast<uint32_t>(std::stol(min)), static_cast<uint32_t>(std::stol(max)),
number_of_buckets, sampling);
std::cout << s->toString() << "\n";
break;
}
case Tables::SkyDataType::SDT_UINT64: {
Tables::StatsArgument<uint64_t>* s = \
new Tables::StatsArgument<uint64_t> \
(col, static_cast<uint64_t>(std::stol(min)), static_cast<uint64_t>(std::stol(max)),
number_of_buckets, sampling);
std::cout << s->toString() << "\n";
break;
}
case Tables::SkyDataType::SDT_FLOAT: {
Tables::StatsArgument<float>* s = \
new Tables::StatsArgument<float> \
(col, static_cast<float>(std::stol(min)), static_cast<float>(std::stol(max)),
number_of_buckets, sampling);
std::cout << s->toString() << "\n";
break;
}
case Tables::SkyDataType::SDT_DOUBLE: {
Tables::StatsArgument<double>* s = \
new Tables::StatsArgument<double> \
(col, static_cast<double>(std::stol(min)), static_cast<double>(std::stol(max)),
number_of_buckets, sampling);
std::cout << s->toString() << "\n";
break;
}
default: assert (Tables::TablesErrCodes::UnknownSkyDataType == 0);
}

// bucket_min | bucket_max | count | dead_count | star_representation

work_lock.unlock();

ceph::bufferlist inbl, outbl;
Expand Down
3 changes: 2 additions & 1 deletion src/progly/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ extern std::string qop_groupby_cols;
extern std::string qop_orderby_cols;
extern std::string qop_index_preds;
extern std::string qop_index2_preds;
extern std::string qop_runstats_args;

extern bool idx_op_idx_unique;
extern bool idx_op_ignore_stopwords;
Expand All @@ -118,7 +119,7 @@ extern std::string qop_file_name;
extern std::string qop_tree_name;

// other exec flags
extern bool runstats;
extern std::string runstats;
extern std::string project_cols;

// for debugging, prints full record header and metadata
Expand Down
10 changes: 6 additions & 4 deletions src/progly/run-query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ int main(int argc, char **argv)
std::string index2_preds;
std::string index_cols;
std::string index2_cols;
std::string runstats_args;
bool lock_obj_free;
bool lock_obj_init;
bool lock_obj_get;
Expand Down Expand Up @@ -157,7 +158,7 @@ int main(int argc, char **argv)
("index-delims", po::value<std::string>(&text_index_delims)->default_value(""), "Use delim for text indexes (def=whitespace")
("index-ignore-stopwords", po::bool_switch(&text_index_ignore_stopwords)->default_value(false), "Ignore stopwords when building text index. (def=false)")
("index-plan-type", po::value<int>(&index_plan_type)->default_value(Tables::SIP_IDX_STANDARD), "If 2 indexes, for intersection plan use '2', for union plan use '3' (def='1')")
("runstats", po::bool_switch(&runstats)->default_value(false), "Run statistics on the specified table name")
("runstats", po::value<std::string>(&runstats_args)->default_value(""), "Run statistics on the specified table name")
("transform-format-type", po::value<std::string>(&trans_format_str)->default_value("SFT_FLATBUF_FLEX_ROW"), "Destination format type ")
("verbose", po::bool_switch(&print_verbose)->default_value(false), "Print detailed record metadata.")
("header", po::bool_switch(&header)->default_value(false), "Print row header (i.e., row schema")
Expand Down Expand Up @@ -357,7 +358,7 @@ int main(int argc, char **argv)
assert (!index_cols.empty());
assert (use_cls);
}
if (runstats) {
if (runstats_args != "") {
assert (use_cls);
}

Expand Down Expand Up @@ -845,10 +846,11 @@ int main(int argc, char **argv)

// for RUNSTATS job
// launch run statistics on given table here.
if (query == "flatbuf" && runstats) {
if (query == "flatbuf" && runstats_args != "") {

// create idx_op for workers
stats_op op(qop_db_schema_name, qop_table_name, qop_data_schema);
qop_runstats_args = runstats_args;
stats_op op(qop_runstats_args, qop_data_schema);

if (debug)
cout << "DEBUG: stats op=" << op.toString() << endl;
Expand Down