Skip to content

Commit

Permalink
sched: use object pool to store indication PDUs in scheduler and avoi…
Browse files Browse the repository at this point in the history
…d mallocs
  • Loading branch information
frankist authored and codebot committed Oct 30, 2024
1 parent 4627e44 commit eddeb3e
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 82 deletions.
1 change: 1 addition & 0 deletions include/srsran/scheduler/scheduler_feedback_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ struct uci_indication {

struct srs_indication {
struct srs_indication_pdu {
srs_indication_pdu() = default;
srs_indication_pdu(const du_ue_index_t ue_idx,
const rnti_t ue_rnti_,
std::optional<phy_time_unit> ta,
Expand Down
219 changes: 146 additions & 73 deletions lib/scheduler/ue_scheduling/ue_event_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "../logging/scheduler_metrics_handler.h"
#include "../srs/srs_scheduler.h"
#include "../uci_scheduling/uci_scheduler_impl.h"
#include "srsran/support/memory_pool/unbounded_object_pool.h"

using namespace srsran;

Expand Down Expand Up @@ -107,6 +108,69 @@ class ue_event_manager::ue_dl_buffer_occupancy_manager final : public scheduler_
ue_event_queue pending_evs;
};

class srsran::pdu_indication_pool
{
constexpr static size_t UCI_INITIAL_POOL_SIZE = MAX_PUCCH_PDUS_PER_SLOT;
constexpr static size_t PHR_INITIAL_POOL_SIZE = 8;
constexpr static size_t CRC_INITIAL_POOL_SIZE = MAX_PUSCH_PDUS_PER_SLOT;
constexpr static size_t SRS_INITIAL_POOL_SIZE = MAX_SRS_PDUS_PER_SLOT;
constexpr static size_t BSR_INITIAL_POOL_SIZE = MAX_PUSCH_PDUS_PER_SLOT;

public:
using uci_ptr = unbounded_object_pool<uci_indication::uci_pdu>::ptr;
using phr_ptr = unbounded_object_pool<ul_phr_indication_message>::ptr;
using crc_ptr = unbounded_object_pool<ul_crc_pdu_indication>::ptr;
using srs_ptr = unbounded_object_pool<srs_indication::srs_indication_pdu>::ptr;
using bsr_ptr = unbounded_object_pool<ul_bsr_indication_message>::ptr;

pdu_indication_pool() :
pending_ucis(UCI_INITIAL_POOL_SIZE),
pending_phrs(PHR_INITIAL_POOL_SIZE),
pending_crcs(CRC_INITIAL_POOL_SIZE),
pending_srss(SRS_INITIAL_POOL_SIZE),
pending_bsrs(BSR_INITIAL_POOL_SIZE)
{
}

uci_ptr create_uci(const uci_indication::uci_pdu& pdu)
{
auto ret = pending_ucis.get();
*ret = pdu;
return ret;
}
phr_ptr create_phr(const ul_phr_indication_message& phr_ind)
{
auto ret = pending_phrs.get();
*ret = phr_ind;
return ret;
}
crc_ptr create_crc(const ul_crc_pdu_indication& crc_ind)
{
auto ret = pending_crcs.get();
*ret = crc_ind;
return ret;
}
srs_ptr create_srs(const srs_indication::srs_indication_pdu& srs_ind)
{
auto ret = pending_srss.get();
*ret = srs_ind;
return ret;
}
bsr_ptr create_bsr(const ul_bsr_indication_message& bsr_ind)
{
auto ret = pending_bsrs.get();
*ret = bsr_ind;
return ret;
}

private:
unbounded_object_pool<uci_indication::uci_pdu> pending_ucis;
unbounded_object_pool<ul_phr_indication_message> pending_phrs;
unbounded_object_pool<ul_crc_pdu_indication> pending_crcs;
unbounded_object_pool<srs_indication::srs_indication_pdu> pending_srss;
unbounded_object_pool<ul_bsr_indication_message> pending_bsrs;
};

// Initial capacity for the common and cell event lists, in order to avoid std::vector reallocations. We use the max
// nof UEs as a conservative estimate of the expected number of events per slot.
static constexpr size_t COMMON_EVENT_LIST_SIZE = MAX_NOF_DU_UES * 2;
Expand All @@ -115,6 +179,7 @@ static constexpr size_t CELL_EVENT_LIST_SIZE = MAX_NOF_DU_UES * 2;
ue_event_manager::ue_event_manager(ue_repository& ue_db_) :
ue_db(ue_db_),
logger(srslog::fetch_basic_logger("SCHED")),
ind_pdu_pool(std::make_unique<pdu_indication_pool>()),
common_events(COMMON_EVENT_LIST_SIZE),
dl_bo_mng(std::make_unique<ue_dl_buffer_occupancy_manager>(*this))
{
Expand Down Expand Up @@ -298,68 +363,69 @@ void ue_event_manager::handle_ul_bsr_indication(const ul_bsr_indication_message&
{
srsran_sanity_check(cell_exists(bsr_ind.cell_index), "Invalid cell index");

auto handle_ul_bsr_ind_impl = [this, bsr_ind]() {
if (not ue_db.contains(bsr_ind.ue_index)) {
log_invalid_ue_index(bsr_ind.ue_index, "BSR");
auto handle_ul_bsr_ind_impl = [this, bsr_ind = ind_pdu_pool->create_bsr(bsr_ind)]() {
if (not ue_db.contains(bsr_ind->ue_index)) {
log_invalid_ue_index(bsr_ind->ue_index, "BSR");
return;
}
auto& u = ue_db[bsr_ind.ue_index];
auto& u = ue_db[bsr_ind->ue_index];
du_cell_index_t pcell_idx = u.get_pcell().cell_index;

// Handle event.
u.handle_bsr_indication(bsr_ind);
u.handle_bsr_indication(*bsr_ind);

if (u.get_pcell().is_in_fallback_mode()) {
// Signal SRB fallback scheduler with the new SRB0/SRB1 buffer state.
du_cells[pcell_idx].fallback_sched->handle_ul_bsr_indication(bsr_ind.ue_index, bsr_ind);
du_cells[pcell_idx].fallback_sched->handle_ul_bsr_indication(bsr_ind->ue_index, *bsr_ind);
}

// Log event.
if (du_cells[pcell_idx].ev_logger->enabled()) {
scheduler_event_logger::bsr_event event{};
event.ue_index = bsr_ind.ue_index;
event.rnti = bsr_ind.crnti;
event.type = bsr_ind.type;
event.reported_lcgs = bsr_ind.reported_lcgs;
event.ue_index = bsr_ind->ue_index;
event.rnti = bsr_ind->crnti;
event.type = bsr_ind->type;
event.reported_lcgs = bsr_ind->reported_lcgs;
event.tot_ul_pending_bytes = units::bytes{u.pending_ul_newtx_bytes()};
du_cells[pcell_idx].ev_logger->enqueue(event);
}

// Notify metrics handler.
du_cells[pcell_idx].metrics->handle_ul_bsr_indication(bsr_ind);
du_cells[pcell_idx].metrics->handle_ul_bsr_indication(*bsr_ind);
};

if (not common_events.try_push(common_event_t{bsr_ind.ue_index, handle_ul_bsr_ind_impl})) {
if (not common_events.try_push(common_event_t{bsr_ind.ue_index, std::move(handle_ul_bsr_ind_impl)})) {
logger.warning("ue={}: Discarding UE BSR. Cause: Event queue is full", bsr_ind.ue_index);
}
}

void ue_event_manager::handle_ul_phr_indication(const ul_phr_indication_message& phr_ind)
{
for (const cell_ph_report& cell_phr : phr_ind.phr.get_phr()) {
srsran_sanity_check(cell_exists(cell_phr.serv_cell_id), "Invalid serving cell index={}", cell_phr.serv_cell_id);

if (not cell_specific_events[cell_phr.serv_cell_id].try_push(
cell_event_t{phr_ind.ue_index,
[this, cell_phr, phr_ind](ue_cell& ue_cc) {
ue_cc.channel_state_manager().handle_phr(cell_phr);

// Log event.
scheduler_event_logger::phr_event event{};
event.ue_index = phr_ind.ue_index;
event.rnti = phr_ind.rnti;
event.cell_index = cell_phr.serv_cell_id;
event.ph = cell_phr.ph;
event.p_cmax = cell_phr.p_cmax;
du_cells[cell_phr.serv_cell_id].ev_logger->enqueue(event);

// Notify metrics handler.
du_cells[cell_phr.serv_cell_id].metrics->handle_ul_phr_indication(phr_ind);
},
"UL PHR",
true})) {
logger.warning("Discarding PHR. Cause: Event queue is full");
auto handle_phr_impl = [this, phr_ind = ind_pdu_pool->create_phr(phr_ind)]() {
auto& u = ue_db[phr_ind->ue_index];
for (const cell_ph_report& cell_phr : phr_ind->phr.get_phr()) {
srsran_sanity_check(
u.nof_cells() <= cell_phr.serv_cell_id, "Invalid serving cell index={}", cell_phr.serv_cell_id);
auto& ue_cc = u.get_cell(to_ue_cell_index(cell_phr.serv_cell_id));

ue_cc.channel_state_manager().handle_phr(cell_phr);

// Log event.
scheduler_event_logger::phr_event event{};
event.ue_index = phr_ind->ue_index;
event.rnti = phr_ind->rnti;
event.cell_index = cell_phr.serv_cell_id;
event.ph = cell_phr.ph;
event.p_cmax = cell_phr.p_cmax;
du_cells[cell_phr.serv_cell_id].ev_logger->enqueue(event);
}

// Notify metrics handler.
du_cells[u.get_pcell().cfg().cell_cfg_common.cell_index].metrics->handle_ul_phr_indication(*phr_ind);
};

if (not common_events.try_push(common_event_t{phr_ind.ue_index, std::move(handle_phr_impl)})) {
logger.warning("Discarding PHR. Cause: Event queue is full");
}
}

Expand All @@ -369,29 +435,35 @@ void ue_event_manager::handle_crc_indication(const ul_crc_indication& crc_ind)
for (unsigned i = 0, e = crc_ind.crcs.size(); i != e; ++i) {
if (not cell_specific_events[crc_ind.cell_index].try_push(cell_event_t{
crc_ind.crcs[i].ue_index,
[this, sl_rx = crc_ind.sl_rx, crc = crc_ind.crcs[i]](ue_cell& ue_cc) {
[this, sl_rx = crc_ind.sl_rx, crc_ptr = ind_pdu_pool->create_crc(crc_ind.crcs[i])](ue_cell& ue_cc) {
const double delay_ms =
static_cast<double>(last_sl - sl_rx) *
(static_cast<double>(10) / static_cast<double>(du_cells[ue_cc.cell_index].cfg->nof_slots_per_frame));

const int tbs = ue_cc.handle_crc_pdu(sl_rx, crc);
const int tbs = ue_cc.handle_crc_pdu(sl_rx, *crc_ptr);
if (tbs < 0) {
return;
}

// Process Timing Advance Offset.
if (crc.tb_crc_success and crc.time_advance_offset.has_value() and crc.ul_sinr_dB.has_value()) {
if (crc_ptr->tb_crc_success and crc_ptr->time_advance_offset.has_value() and
crc_ptr->ul_sinr_dB.has_value()) {
ue_db[ue_cc.ue_index].handle_ul_n_ta_update_indication(
ue_cc.cell_index, crc.ul_sinr_dB.value(), crc.time_advance_offset.value());
ue_cc.cell_index, crc_ptr->ul_sinr_dB.value(), crc_ptr->time_advance_offset.value());
}

// Log event.
du_cells[ue_cc.cell_index].ev_logger->enqueue(scheduler_event_logger::crc_event{
crc.ue_index, crc.rnti, ue_cc.cell_index, sl_rx, crc.harq_id, crc.tb_crc_success, crc.ul_sinr_dB});
du_cells[ue_cc.cell_index].ev_logger->enqueue(scheduler_event_logger::crc_event{crc_ptr->ue_index,
crc_ptr->rnti,
ue_cc.cell_index,
sl_rx,
crc_ptr->harq_id,
crc_ptr->tb_crc_success,
crc_ptr->ul_sinr_dB});

// Notify metrics handler.
du_cells[ue_cc.cell_index].metrics->handle_crc_indication(crc, units::bytes{(unsigned)tbs});
du_cells[ue_cc.cell_index].metrics->handle_ul_delay(crc.ue_index, delay_ms);
du_cells[ue_cc.cell_index].metrics->handle_crc_indication(*crc_ptr, units::bytes{(unsigned)tbs});
du_cells[ue_cc.cell_index].metrics->handle_ul_delay(crc_ptr->ue_index, delay_ms);
},
"CRC",
true})) {
Expand Down Expand Up @@ -442,12 +514,13 @@ void ue_event_manager::handle_uci_indication(const uci_indication& ind)
srsran_sanity_check(cell_exists(ind.cell_index), "Invalid cell index");

for (unsigned i = 0, e = ind.ucis.size(); i != e; ++i) {
const uci_indication::uci_pdu& uci = ind.ucis[i];
auto uci_ptr = ind_pdu_pool->create_uci(ind.ucis[i]);

if (not cell_specific_events[ind.cell_index].try_push(cell_event_t{
uci.ue_index,
[this, uci_sl = ind.slot_rx, uci_pdu = uci](ue_cell& ue_cc) {
if (const auto* pucch_f0f1 = std::get_if<uci_indication::uci_pdu::uci_pucch_f0_or_f1_pdu>(&uci_pdu.pdu)) {
ind.ucis[i].ue_index,
[this, uci_sl = ind.slot_rx, uci_pdu = std::move(uci_ptr)](ue_cell& ue_cc) {
if (const auto* pucch_f0f1 =
std::get_if<uci_indication::uci_pdu::uci_pucch_f0_or_f1_pdu>(&uci_pdu->pdu)) {
// Process DL HARQ ACKs.
if (not pucch_f0f1->harqs.empty()) {
handle_harq_ind(ue_cc, uci_sl, pucch_f0f1->harqs, pucch_f0f1->ul_sinr_dB);
Expand All @@ -471,7 +544,7 @@ void ue_event_manager::handle_uci_indication(const uci_indication& ind)
ue_db[ue_cc.ue_index].handle_ul_n_ta_update_indication(
ue_cc.cell_index, pucch_f0f1->ul_sinr_dB.value(), pucch_f0f1->time_advance_offset.value());
}
} else if (const auto* pusch_pdu = std::get_if<uci_indication::uci_pdu::uci_pusch_pdu>(&uci_pdu.pdu)) {
} else if (const auto* pusch_pdu = std::get_if<uci_indication::uci_pdu::uci_pusch_pdu>(&uci_pdu->pdu)) {
// Process DL HARQ ACKs.
if (not pusch_pdu->harqs.empty()) {
handle_harq_ind(ue_cc, uci_sl, pusch_pdu->harqs, std::nullopt);
Expand All @@ -482,7 +555,7 @@ void ue_event_manager::handle_uci_indication(const uci_indication& ind)
handle_csi(ue_cc, *pusch_pdu->csi);
}
} else if (const auto* pucch_f2f3f4 =
std::get_if<uci_indication::uci_pdu::uci_pucch_f2_or_f3_or_f4_pdu>(&uci_pdu.pdu)) {
std::get_if<uci_indication::uci_pdu::uci_pucch_f2_or_f3_or_f4_pdu>(&uci_pdu->pdu)) {
// Process DL HARQ ACKs.
if (not pucch_f2f3f4->harqs.empty()) {
handle_harq_ind(ue_cc, uci_sl, pucch_f2f3f4->harqs, pucch_f2f3f4->ul_sinr_dB);
Expand Down Expand Up @@ -517,13 +590,13 @@ void ue_event_manager::handle_uci_indication(const uci_indication& ind)
}

// Report the UCI PDU to the metrics handler.
du_cells[ue_cc.cell_index].metrics->handle_uci_pdu_indication(uci_pdu);
du_cells[ue_cc.cell_index].metrics->handle_uci_pdu_indication(*uci_pdu);
},
"UCI",
// Note: We do not warn if the UE is not found, because there is this transient period when the UE
// is about to receive and process the RRC Release, but it is still sending CSI or SR in the PUCCH. If we
// stop the PUCCH scheduling for the UE about to be released, we could risk interference between UEs in the
// PUCCH.
// stop the PUCCH scheduling for the UE about to be released, we could risk interference between UEs in
// the PUCCH.
false})) {
logger.warning("UCI discarded. Cause: Event queue is full");
}
Expand All @@ -537,27 +610,27 @@ void ue_event_manager::handle_srs_indication(const srs_indication& ind)
for (unsigned i = 0, e = ind.srss.size(); i != e; ++i) {
const srs_indication::srs_indication_pdu& srs_pdu = ind.srss[i];

if (not cell_specific_events[ind.cell_index].try_push(cell_event_t{
srs_pdu.ue_index,
[this, channel_matrix = srs_pdu.channel_matrix, time_advance_offset = srs_pdu.time_advance_offset](
ue_cell& ue_cc) {
// Indicate the channel matrix.
ue_cc.handle_srs_channel_matrix(channel_matrix);

// Handle time aligment measurement if present.
if (time_advance_offset.has_value()) {
// Assume some SINR for the TA feedback using the channel matrix topology and near zero noise variance.
float frobenius_norm = channel_matrix.frobenius_norm();
float noise_var = near_zero;
float sinr_dB = convert_power_to_dB(frobenius_norm * frobenius_norm / noise_var);

// Notify UL TA update.
ue_db[ue_cc.ue_index].handle_ul_n_ta_update_indication(
ue_cc.cell_index, sinr_dB, time_advance_offset.value());
}
},
"SRS",
false})) {
if (not cell_specific_events[ind.cell_index].try_push(
cell_event_t{srs_pdu.ue_index,
[this, srs_ptr = ind_pdu_pool->create_srs(ind.srss[i])](ue_cell& ue_cc) {
// Indicate the channel matrix.
ue_cc.handle_srs_channel_matrix(srs_ptr->channel_matrix);

// Handle time aligment measurement if present.
if (srs_ptr->time_advance_offset.has_value()) {
// Assume some SINR for the TA feedback using the channel matrix topology and near zero
// noise variance.
float frobenius_norm = srs_ptr->channel_matrix.frobenius_norm();
float noise_var = near_zero;
float sinr_dB = convert_power_to_dB(frobenius_norm * frobenius_norm / noise_var);

// Notify UL TA update.
ue_db[ue_cc.ue_index].handle_ul_n_ta_update_indication(
ue_cc.cell_index, sinr_dB, srs_ptr->time_advance_offset.value());
}
},
"SRS",
false})) {
logger.warning("SRS indication discarded. Cause: Event queue is full");
}
}
Expand Down
Loading

0 comments on commit eddeb3e

Please sign in to comment.