Skip to content

Commit

Permalink
feat(udf): top_n_value_ratio_cate & top_n_value_key_ratio_cate (#3329)
Browse files Browse the repository at this point in the history
- top_n_value_ratio_cate
- top_n_key_ratio_cate
- list_except_by_key
- list_except_by_value
  • Loading branch information
aceforeverd authored and dl239 committed Jun 29, 2023
1 parent 2816e21 commit ea0cbde
Show file tree
Hide file tree
Showing 12 changed files with 512 additions and 93 deletions.
44 changes: 44 additions & 0 deletions cases/function/function/test_udaf_function.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2719,3 +2719,47 @@ cases:
200, 1-2, 2, NULL
300, 1-2, 2, 2
400, 1-2, 2, 2
- id: 66
desc: top_n_value_ratio_cate/top_n_key_ratio_cate
sql: |
select
idx,
top_n_value_ratio_cate(val, val > 100, cate, 2) over w as ratio_by_value,
top_n_key_ratio_cate(val, val > 100, cate, 2) over w as ratio_by_key,
top_n_value_ratio_cate(val, val > 100, cate, -1) over w as ratio_by_value_full,
top_n_key_ratio_cate(val, val > 100, cate, -2) over w as ratio_by_key_full,
top_n_value_ratio_cate(val, val > 100, cate, 0) over w as ratio_by_value_empty,
top_n_key_ratio_cate(val, val > 100, cate, 0) over w as ratio_by_key_empty,
from t1
window w as (
partition by gp order by ts
rows_range between 10s preceding and 1s preceding)
inputs:
- name: t1
columns: ["idx int", "gp int", "val int", "cate string", "ts timestamp"]
indexs: ['idx:gp:ts']
data: |
0, 1, 200, a, 1000
100, 1, 300, b, 2000
200, 1, 200, NULL, 3000
300, 1, 10, b, 4000
400, 1, 101, c, 5000
500, 1, 101, c, 6000
expect:
columns:
- idx int
- ratio_by_value string
- ratio_by_key string
- ratio_by_value_full string
- ratio_by_key_full string
- ratio_by_value_empty string
- ratio_by_key_empty string
order: idx
rows:
- [0, "", "", "", "", "", ""]
- [100, "a:1.000000", "a:1.000000", "a:1.000000", "a:1.000000", "", ""]
- [200, "b:1.000000,a:1.000000", "b:1.000000,a:1.000000", "b:1.000000,a:1.000000", "b:1.000000,a:1.000000", "", ""]
- [300, "b:1.000000,a:1.000000", "b:1.000000,a:1.000000", "b:1.000000,a:1.000000", "b:1.000000,a:1.000000", "", ""]
- [400, "a:1.000000,b:0.500000", "b:0.500000,a:1.000000", "a:1.000000,b:0.500000", "b:0.500000,a:1.000000", "", ""]
- [500, "c:1.000000,a:1.000000", "c:1.000000,b:0.500000", "c:1.000000,a:1.000000,b:0.500000", "c:1.000000,b:0.500000,a:1.000000", "", ""]
33 changes: 33 additions & 0 deletions cases/function/test_feature_zero_function.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,36 @@ cases:
- [3, 3, 1, 1]
- [4, 0, 0, 0]
- [5, 2, 2, 2]

- id: 7
desc: list_except_by_key/list_except_by_value
inputs:
- name: t1
columns: ["idx int", "gp int", "val string", "ts timestamp"]
indexs: ['idx:gp:ts']
rows:
- [100, 1, "a:1,b:2,c:0", 1000]
- [101, 1, "abc", 1000]
- [102, 1, "a,b,c", 1000]
# note
# 1.'abc' as kv pair parsed to `abc=`, value is a empty string
# 2. list is not kv pair but simply 'k1,k2,k3' ? it fallbacked (same result as) filter on whole kv pair
sql: |
select idx,
`join`(list_except_by_key(split(val, ','), 'a,b'), " ") as keys_filterd,
`join`(list_except_by_value(split(val, ','), '1,2'), " ") as values_filterd,
`join`(list_except_by_key(split(val, ','), ''), " ") as filter_nothing1,
`join`(list_except_by_value(split(val, ','), ''), " ") as filter_nothing2,
from t1
expect:
order: idx
columns:
- idx int
- keys_filterd string
- values_filterd string
- filter_nothing1 string
- filter_nothing2 string
rows:
- [100, "c:0", "c:0", "a:1 b:2 c:0", "a:1 b:2 c:0"]
- [101, "abc", "abc", "abc", ""]
- [102, "c", "a b c", "a b c", ""]
91 changes: 52 additions & 39 deletions hybridse/src/udf/containers.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct ContainerStorageTypeTrait {

template <>
struct ContainerStorageTypeTrait<openmldb::base::StringRef> {
// FIXME: StringRef do not own data, ref #2944
using type = codec::StringRef;
static codec::StringRef to_stored_value(codec::StringRef* t) {
return t == nullptr ? codec::StringRef() : *t;
Expand Down Expand Up @@ -164,57 +165,57 @@ class TopKContainer {
BoundT bound_ = -1; // delayed to be set by first push
};

template <typename K, typename V>
struct DefaultPairCmp {
template <typename>
struct is_pair : std::false_type {};
template <typename... T>
struct is_pair<std::pair<T...>> : std::true_type {};

// (4, 2), (1, 4), (2, 4)
template <typename U = V>
std::enable_if_t<!is_pair<U>::value, bool> operator()(const std::pair<K, U>& lhs,
const std::pair<K, U>& rhs) const {
if (lhs.second == rhs.second) {
return lhs.first < rhs.first;
}

return lhs.second < rhs.second;
}

// For AVG cate, StorageV is pair(int, double)
template <typename U = V>
std::enable_if_t<std::is_same_v<U, std::pair<int64_t, double>>, bool> operator()(
const std::pair<K, U>& lhs, const std::pair<K, U>& rhs) const {
double lavg = lhs.second.second / lhs.second.first;
double ravg = rhs.second.second / rhs.second.first;
if (lavg == ravg) {
return lhs.first < rhs.first;
}

return lavg < ravg;
}
};

template <typename K, typename V,
typename StorageV = typename ContainerStorageTypeTrait<V>::type>
typename StorageV = typename ContainerStorageTypeTrait<V>::type,
template <typename, typename> typename PairCmp = DefaultPairCmp>
class BoundedGroupByDict {
public:
// forward & export K & StorageV type
// export data types
using Key = K;
using Value = V;
using StorageValue = StorageV;

// actual input type
using InputK = typename DataTypeTrait<K>::CCallArgType;
using InputV = typename DataTypeTrait<V>::CCallArgType;

// actual stored type
using StorageK = typename ContainerStorageTypeTrait<K>::type;

// self type
using ContainerT = BoundedGroupByDict<K, V, StorageV>;
using ContainerT = BoundedGroupByDict<K, V, StorageV, PairCmp>;

using FormatValueF = std::function<uint32_t(const StorageV&, char*, size_t)>;

template <typename>
struct is_pair : std::false_type {};
template <typename... T>
struct is_pair<std::pair<T...>> : std::true_type {};

struct PairCmp {
// (4, 2), (1, 4), (2, 4)
template <typename U = StorageV>
std::enable_if_t<!is_pair<U>::value, bool> operator()(
const std::pair<StorageK, U>& lhs, const std::pair<StorageK, U>& rhs) const {
if (lhs.second == rhs.second) {
return lhs.first < rhs.first;
}

return lhs.second < rhs.second;
}

// StorageV is pair(int, double)
template <typename U = StorageV>
std::enable_if_t<std::is_same_v<U, std::pair<int64_t, double>>, bool> operator()(
const std::pair<StorageK, U>& lhs, const std::pair<StorageK, U>& rhs) const {
double lavg = lhs.second.second / lhs.second.first;
double ravg = rhs.second.second / rhs.second.first;
if (lavg == ravg) {
return lhs.first < rhs.first;
}

return lavg < ravg;
}
};

// convert to internal key and value
static inline StorageK to_stored_key(const InputK& key) {
return ContainerStorageTypeTrait<K>::to_stored_value(key);
Expand Down Expand Up @@ -283,6 +284,12 @@ class BoundedGroupByDict {
}
}

if (str_len == 0) {
output->size_ = 0;
output->data_ = "";
return;
}

// allocate string buffer
char* buffer = udf::v1::AllocManagedStringBuf(str_len);
if (buffer == nullptr) {
Expand Down Expand Up @@ -349,11 +356,11 @@ class BoundedGroupByDict {
output->data_ = "";
return;
}
std::set<std::pair<StorageK, StorageV>, PairCmp> ordered_set;
std::set<std::pair<StorageK, StorageV>, PairCmp<StorageK, StorageV>> ordered_set;
for (auto& kv : map_) {
ordered_set.emplace(kv.first, kv.second);

if (topn >= 0 && ordered_set.size() > static_cast<size_t>(topn)) {
if (topn >= 0 && ordered_set.size() > static_cast<uint64_t>(topn)) {
ordered_set.erase(ordered_set.begin());
}
}
Expand All @@ -375,6 +382,12 @@ class BoundedGroupByDict {
}
}

if (outlen == 0) {
output->size_ = 0;
output->data_ = "";
return;
}

// allocate string buffer
char* buffer = udf::v1::AllocManagedStringBuf(outlen);
if (buffer == nullptr) {
Expand Down
Loading

0 comments on commit ea0cbde

Please sign in to comment.