diff --git a/bessctl/conf/testing/module_tests/drr.py b/bessctl/conf/testing/module_tests/drr.py index 6330e4d61..434958678 100644 --- a/bessctl/conf/testing/module_tests/drr.py +++ b/bessctl/conf/testing/module_tests/drr.py @@ -30,16 +30,15 @@ import scapy.all as scapy -# ensures that given infinite input that the module does not crash. +#ensures that given infinite input that the module does not crash. +def crash_test(queue_type, queue_dict): + if queue_type == 1: + return [DRR(codel=queue_dict), 1, 1] + else: + return [DRR(queue=queue_dict), 1, 1] - -def crash_test(): - return [DRR(), 1, 1] - -# tests to make that inividual packets gets through the module - - -def basic_output_test(): +# tests to make that individual packets gets through the module +def basic_output_test(queue_type, queue_dict): # produces the number of duplicate packets specified by num_pkts and uses the provided # the specifications in the packet along with dummy ports. @@ -50,8 +49,11 @@ def gen_packet_list(protocol, input_ip, output_ip, num_pkts): packet_list.append({'input_port': 0, 'input_packet': cur_pkt, 'output_port': 0, "output_packet": cur_pkt}) return packet_list - - single_basic = DRR(num_flows=2, max_flow_queue_size=100) + if queue_type== 1: + single_basic = DRR(num_flows=2, max_flow_queue_size= 100, codel=queue_dict); + else: + single_basic = DRR(num_flows=2, max_flow_queue_size= 100, queue=queue_dict); + monitor_task(single_basic, 0) out = [] @@ -59,7 +61,11 @@ def gen_packet_list(protocol, input_ip, output_ip, num_pkts): out.append([single_basic, # test this module 1, 1, # it has one input port and one output port single_packet]) - batch_basic = DRR(num_flows=4, max_flow_queue_size=100) + if queue_type== 1: + batch_basic = DRR(num_flows=4, max_flow_queue_size= 100, codel=queue_dict); + else: + batch_basic = DRR(num_flows=4, max_flow_queue_size= 100, queue=queue_dict); + monitor_task(batch_basic, 0) packet_list = gen_packet_list(scapy.TCP, '22.22.22.1', '22.22.22.1', 2) packet_list += gen_packet_list(scapy.TCP, '22.22.11.1', '22.22.11.1', 2) @@ -75,7 +81,7 @@ def fairness_test(): # Takes the number of flows n, the quantum to give drr, the list packet rates for each flow # and the packet rate for the module. runs this setup for five seconds and tests that # throughput for each flow had a jaine fairness of atleast .95. - def fairness_n_flow_test(n, quantum, rates, module_rate): + def fairness_n_flow_test(n, quantum, rates, module_rate, queue_type, queue_dict): err = bess.reset_all() packets = [] @@ -98,7 +104,11 @@ def fairness_n_flow_test(n, quantum, rates, module_rate): me_out = Measure() snk = Sink() - q = DRR(num_flows=n + 1, quantum=quantum) + if queue_type == 1: + q = DRR(num_flows= n+1, quantum=quantum, codel=queue_dict) + else: + q = DRR(num_flows= n+1, quantum=quantum, queue=queue_dict) + me_in -> q -> me_out -> exm measure_out = [] @@ -127,16 +137,18 @@ def fairness_n_flow_test(n, quantum, rates, module_rate): if square_sum == 0: fair = 0 else: - fair = f(me_out) / square_sum - assert abs(.99 - fair) <= .05 - - fairness_n_flow_test(2, 1000, [80000, 20000], 30000) - fairness_n_flow_test( - 5, 1000, [110000, 200000, 70000, 60000, 40000], 150000) - - ten_flows = [210000, 120000, 130000, 160000, - 100000, 105000, 90000, 70000, 60000, 40000] - fairness_n_flow_test(10, 1000, ten_flows, 300000) + fair = f(me_out)/square_sum + assert abs(.99 - fair) <=.05 + llqueue_dict = {} # all default values + codel_dict = {} # all default values + fairness_n_flow_test(2, 1000, [80000, 20000], 30000, 0, llqueue_dict) + fairness_n_flow_test(2, 1000, [80000, 20000], 30000, 1, codel_dict) + fairness_n_flow_test(5, 1000, [110000, 200000, 70000, 60000, 40000], 150000, 0, llqueue_dict) + fairness_n_flow_test(5, 1000, [110000, 200000, 70000, 60000, 40000], 150000, 1, codel_dict) + + ten_flows = [210000, 120000, 130000, 160000, 100000, 105000, 90000, 70000, 60000, 40000] + fairness_n_flow_test(10, 1000, ten_flows , 300000, 0, llqueue_dict) + fairness_n_flow_test(10, 1000, ten_flows , 300000, 1, codel_dict) # hund_flows= [] # cur = 200000 @@ -145,6 +157,10 @@ def fairness_n_flow_test(n, quantum, rates, module_rate): # cur -= 1600 # fairness_n_flow_test(100, 1000, hund_flows, 300000) -OUTPUT_TEST_INPUTS += basic_output_test() +llqueue_dict = {} # all default values llqueue queue type: 0 +codel_dict = {} # all default values codel queue type: 1 +OUTPUT_TEST_INPUTS += basic_output_test(0, llqueue_dict) +OUTPUT_TEST_INPUTS += basic_output_test(1, codel_dict) CUSTOM_TEST_FUNCTIONS.append(fairness_test) -CRASH_TEST_INPUTS.append(crash_test()) +CRASH_TEST_INPUTS.append(crash_test(0, llqueue_dict)) +CRASH_TEST_INPUTS.append(crash_test(1, codel_dict)) diff --git a/core/modules/drr.cc b/core/modules/drr.cc index 662d95281..69ccd8c4e 100644 --- a/core/modules/drr.cc +++ b/core/modules/drr.cc @@ -38,6 +38,12 @@ #include "../utils/ether.h" #include "../utils/ip.h" #include "../utils/udp.h" +#include "../utils/common.h" +#include "../utils/codel.h" + +using bess::utils::Codel; +using bess::utils::LockLessQueue; +using bess::pb::DRRArg; uint32_t RoundToPowerTwo(uint32_t v) { v--; @@ -60,20 +66,21 @@ DRR::DRR() : quantum_(kDefaultQuantum), max_queue_size_(kFlowQueueMax), max_number_flows_(kDefaultNumFlows), - flow_ring_(nullptr), - current_flow_(nullptr) { + flow_queue_(), + current_flow_(nullptr), + codel_target_(0), + codel_window_(0), + init_queue_size_(0) { is_task_ = true; } - DRR::~DRR() { for (auto it = flows_.begin(); it != flows_.end();) { RemoveFlow(it->second); it++; } - std::free(flow_ring_); } -CommandResponse DRR::Init(const bess::pb::DRRArg& arg) { +CommandResponse DRR::Init(const DRRArg& arg) { CommandResponse err; task_id_t tid; @@ -95,16 +102,29 @@ CommandResponse DRR::Init(const bess::pb::DRRArg& arg) { } } - /* register task */ + // register task tid = RegisterTask(nullptr); if (tid == INVALID_TASK_ID) { return CommandFailure(ENOMEM, "task creation failed"); } - int err_num = 0; - flow_ring_ = AddQueue(max_number_flows_, &err_num); - if (err_num != 0) { - return CommandFailure(-err_num); + // get flow queue arguments + if (arg.has_codel()) { + const DRRArg::Codel& codel = arg.codel(); + if (!(codel_target_ = codel.target())) { + codel_target_ = Codel::kDefaultTarget; + } + + if (!(codel_window_ = codel.window())) { + codel_window_ = Codel::kDefaultWindow; + } + + } else { + const DRRArg::DropTailQueue& drop_queue = arg.queue(); + + if (!(init_queue_size_ = drop_queue.size())) { + init_queue_size_ = LockLessQueue::kDefaultRingSize; + } } return CommandSuccess(); @@ -133,7 +153,7 @@ void DRR::ProcessBatch(bess::PacketBatch* batch) { // if the Flow doesn't exist create one // and add the packet to the new Flow if (it == nullptr) { - if (llring_full(flow_ring_)) { + if (flow_queue_.size() >= max_number_flows_) { bess::Packet::Free(pkt); } else { AddNewFlow(pkt, id, &err); @@ -158,10 +178,7 @@ struct task_result DRR::RunTask(void*) { bess::PacketBatch batch; int err = 0; batch.clear(); - uint32_t total_bytes = 0; - if (flow_ring_ != NULL) { - total_bytes = GetNextBatch(&batch, &err); - } + uint32_t total_bytes = GetNextBatch(&batch, &err); assert(err >= 0); // TODO(joshua) do proper error checking if (total_bytes > 0) { @@ -177,7 +194,7 @@ struct task_result DRR::RunTask(void*) { uint32_t DRR::GetNextBatch(bess::PacketBatch* batch, int* err) { Flow* f; uint32_t total_bytes = 0; - uint32_t count = llring_count(flow_ring_); + uint32_t count = flow_queue_.size(); if (current_flow_) { count++; } @@ -192,35 +209,34 @@ uint32_t DRR::GetNextBatch(bess::PacketBatch* batch, int* err) { if (batch_size == batch->cnt()) { break; } else { - count = llring_count(flow_ring_); + count = flow_queue_.size(); batch_size = batch->cnt(); } } count--; f = GetNextFlow(err); - if (*err != 0) { + if (*err) { return total_bytes; } else if (f == nullptr) { continue; } - uint32_t bytes = GetNextPackets(batch, f, err); + uint32_t bytes = GetNextPackets(batch, f); total_bytes += bytes; - if (*err != 0) { - return total_bytes; - } - if (llring_empty(f->queue) && !f->next_packet) { + if (f->queue->Empty() && !f->next_packet) { f->deficit = 0; } // if the flow doesn't have any more packets to give, reenqueue it if (!f->next_packet || f->next_packet->total_len() > f->deficit) { - *err = llring_enqueue(flow_ring_, f); - if (*err != 0) { - return total_bytes; - } + if (flow_queue_.size() < max_number_flows_) { + flow_queue_.push_back(f); + } else { + *err = -1; + return total_bytes; + } } else { // knowing that the while statement will exit, keep the flow that still // has packets at the front @@ -235,18 +251,24 @@ DRR::Flow* DRR::GetNextFlow(int* err) { double now = get_epoch_time(); if (!current_flow_) { - *err = llring_dequeue(flow_ring_, reinterpret_cast(&f)); - if (*err < 0) { + + if (!flow_queue_.empty()) { + f = flow_queue_.front(); + flow_queue_.pop_front(); + } else { + *err = -1; return nullptr; } - if (llring_empty(f->queue) && !f->next_packet) { + if (f->queue->Empty() && !f->next_packet) { // if the flow expired, remove it if (now - f->timer > kTtl) { RemoveFlow(f); } else { - *err = llring_enqueue(flow_ring_, f); - if (*err < 0) { + if (flow_queue_.size() < max_number_flows_) { + flow_queue_.push_back(f); + } else { + *err = -1; return nullptr; } } @@ -261,15 +283,15 @@ DRR::Flow* DRR::GetNextFlow(int* err) { return f; } -uint32_t DRR::GetNextPackets(bess::PacketBatch* batch, Flow* f, int* err) { +uint32_t DRR::GetNextPackets(bess::PacketBatch* batch, Flow* f) { uint32_t total_bytes = 0; bess::Packet* pkt; - while (!batch->full() && (!llring_empty(f->queue) || f->next_packet)) { + while (!batch->full() && (!f->queue->Empty() || f->next_packet)) { // makes sure there isn't already a packet at the front if (!f->next_packet) { - *err = llring_dequeue(f->queue, reinterpret_cast(&pkt)); - if (*err < 0) { + int err = f->queue->Pop(pkt); + if (err) { return total_bytes; } } else { @@ -311,21 +333,26 @@ void DRR::AddNewFlow(bess::Packet* pkt, FlowId id, int* err) { Flow* f = new Flow(id); // TODO(joshua) do proper error checking - f->queue = AddQueue(static_cast(kFlowQueueSize), err); - - if (*err != 0) { - return; + if(!init_queue_size_) { + f->queue = new Codel(bess::Packet::Free, max_queue_size_, + codel_target_, codel_window_); + } else { + f->queue = new LockLessQueue(init_queue_size_); } flows_.Insert(id, f); Enqueue(f, pkt, err); - if (*err != 0) { + if (*err) { return; } // puts flow in round robin - *err = llring_enqueue(flow_ring_, f); + if (flow_queue_.size() < max_number_flows_) { + flow_queue_.push_back(f); + } else { + *err = -1; + } } void DRR::RemoveFlow(Flow* f) { @@ -336,77 +363,34 @@ void DRR::RemoveFlow(Flow* f) { delete f; } -llring* DRR::AddQueue(uint32_t slots, int* err) { - int bytes = llring_bytes_with_slots(slots); - int ret; - - llring* queue = static_cast(aligned_alloc(alignof(llring), bytes)); - if (!queue) { - *err = -ENOMEM; - return nullptr; - } - - ret = llring_init(queue, slots, 1, 1); - if (ret) { - std::free(queue); - *err = -EINVAL; - return nullptr; - } - return queue; -} void DRR::Enqueue(Flow* f, bess::Packet* newpkt, int* err) { // if the queue is full. drop the packet. - if (llring_count(f->queue) >= max_queue_size_) { + if (f->queue->Size() >= max_queue_size_) { bess::Packet::Free(newpkt); return; } // creates a new queue if there is not enough space for the new packet // in the old queue - if (llring_full(f->queue)) { + if (f->queue->Full()) { uint32_t slots = - RoundToPowerTwo(llring_count(f->queue) * kQueueGrowthFactor); - f->queue = ResizeQueue(f->queue, slots, err); - if (*err != 0) { + RoundToPowerTwo(f->queue->Size() * kQueueGrowthFactor); + *err = f->queue->Resize(slots); + if (*err) { bess::Packet::Free(newpkt); return; } } - *err = llring_enqueue(f->queue, reinterpret_cast(newpkt)); - if (*err == 0) { + *err = f->queue->Push(newpkt); + if (!*err) { f->timer = get_epoch_time(); } else { bess::Packet::Free(newpkt); } } -llring* DRR::ResizeQueue(llring* old_queue, uint32_t new_size, int* err) { - llring* new_queue = AddQueue(new_size, err); - if (*err != 0) { - return nullptr; - } - - // migrates packets from the old queue - if (old_queue) { - bess::Packet* pkt; - - while (llring_dequeue(old_queue, reinterpret_cast(&pkt)) == 0) { - *err = llring_enqueue(new_queue, pkt); - if (*err == -LLRING_ERR_NOBUF) { - bess::Packet::Free(pkt); - *err = 0; - } else if (*err != 0) { - std::free(new_queue); - return nullptr; - } - } - - std::free(old_queue); - } - return new_queue; -} CommandResponse DRR::SetQuantumSize(uint32_t size) { if (size == 0) { diff --git a/core/modules/drr.h b/core/modules/drr.h index fc6eb6aa8..afdeca385 100644 --- a/core/modules/drr.h +++ b/core/modules/drr.h @@ -32,18 +32,21 @@ #define BESS_MODULES_DRR_H_ #include +#include #include -#include "../kmod/llring.h" #include "../module.h" #include "../pb/module_msg.pb.h" #include "../pktbatch.h" #include "../utils/cuckoo_map.h" #include "../utils/ip.h" +#include "../utils/queue.h" +#include "../utils/lock_less_queue.h" using bess::utils::Ipv4Prefix; using bess::utils::CuckooMap; +using bess::utils::Queue; /* * This module implements Deficit Round Robin, a fair queueing algorithm, for @@ -105,7 +108,7 @@ class DRR final : public Module { int deficit; // the allocated bytes to the flow double timer; // to determine if TTL should be used FlowId id; // allows the flow to remove itself from the map - struct llring* queue; // queue to store current packets for flow + Queue* queue; // queue to store current packets for flow bess::Packet* next_packet; // buffer to store next packet from the queue. Flow() : deficit(0), timer(0), id(), next_packet(nullptr){}; Flow(FlowId new_id) @@ -113,7 +116,7 @@ class DRR final : public Module { ~Flow() { if (queue) { bess::Packet* pkt; - while (llring_sc_dequeue(queue, reinterpret_cast(&pkt)) == 0) { + while (!queue->Pop(pkt)) { bess::Packet::Free(pkt); } @@ -186,19 +189,7 @@ class DRR final : public Module { CommandResponse SetMaxFlowQueueSize(uint32_t queue_size); /* - Creates a new larger llring queue of the specifed size and moves over all - of the entries from the old queue and frees the old_queue. Takes a pointer - to the - location of the current queue, the new size of the queue and integer - pointer - to set on error. If the integer pointer is set than the return value will be - a - nullptr. Returns a pointer to the new llring otherwise. - */ - llring* ResizeQueue(llring* old_queue, uint32_t new_size, int* err); - - /* - Puts the packet into the llring queue within the flow. Takes the flow to + Puts the packet into the queue within the flow. Takes the flow to enqueue the packet into, the packet to enqueue into the flow's queue and integer pointer to be set on error. */ @@ -238,7 +229,7 @@ class DRR final : public Module { from and integer pointer to set on error. Returns the total bytes put in batch */ - uint32_t GetNextPackets(bess::PacketBatch* batch, Flow* f, int* err); + uint32_t GetNextPackets(bess::PacketBatch* batch, Flow* f); /* gets the next flow from the queue of flows. Returns nullptr if the next @@ -247,14 +238,6 @@ class DRR final : public Module { */ Flow* GetNextFlow(int* err); - /* - allocates llring queue space and adds the queue to the specified flow with - size indicated by slots. Takes the Flow to add the queue, the number - of slots for the queue to have and the integer pointer to set on error. - Returns a llring queue. - */ - llring* AddQueue(uint32_t slots, int* err); - // the number of bytes to allocate to each flow in each round. uint32_t quantum_; @@ -267,7 +250,14 @@ class DRR final : public Module { // state map used to reunite packets with their flow CuckooMap flows_; - llring* flow_ring_; // llring used for round robin. + std::deque flow_queue_; // queue used for round robin. Flow* current_flow_; // store current flow between batch rounds. + + // given codel is the queue type for flows + uint64_t codel_target_; // desired delay in microseconds that codel tries to maintain + uint64_t codel_window_; // window in mircroseconds between changing states in codel + + // given llring is the queue type for flows + uint32_t init_queue_size_; // the initial size of each flow's queue }; #endif // BESS_MODULES_DRR_H_ diff --git a/protobuf/module_msg.proto b/protobuf/module_msg.proto index 03908af1d..c4cd93af8 100644 --- a/protobuf/module_msg.proto +++ b/protobuf/module_msg.proto @@ -239,6 +239,21 @@ message DRRArg { uint32 num_flows = 1; /// Number of flows to handle in module uint64 quantum = 2; /// the number of bytes to allocate to each on every round uint32 max_flow_queue_size = 3; /// the max size that any Flows queue can get + + + message DropTailQueue{ + uint32 size = 1; /// the initial size of each flow's queue + } + + message Codel { + uint64 target = 1; /// the desired delay in microseconds that codel tries to maintain + uint64 window = 2; /// window in microseconds between changing states in codel + } + + oneof params { + DropTailQueue queue = 4; /// Llring arguments if llring chosen for each flow's queue + Codel codel = 5; /// Codel arguments if Codel chosen for each flow's queue + } } /**