Skip to content

Commit

Permalink
sender fairness -> servicepolicy
Browse files Browse the repository at this point in the history
  • Loading branch information
ccanel committed Feb 29, 2024
1 parent 35841ff commit c48f036
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 45 deletions.
6 changes: 3 additions & 3 deletions ratemon/model/gen_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def parse_opened_exp(
out_flp,
skip_smoothed,
select_tail_percent=None,
sender_fairness=False,
servicepolicy=False,
):
"""Parse an experiment. Return the smallest safe window size."""
print(f"Parsing: {exp_flp}")
Expand Down Expand Up @@ -1314,7 +1314,7 @@ def parse_exp(
out_dir,
skip_smoothed,
select_tail_percent,
sender_fairness=False,
servicepolicy=False,
always_reparse=False,
parse_func=parse_opened_exp,
):
Expand All @@ -1336,7 +1336,7 @@ def parse_exp(
out_flp,
skip_smoothed,
select_tail_percent,
sender_fairness,
servicepolicy,
)
except AssertionError:
traceback.print_exc()
Expand Down
32 changes: 16 additions & 16 deletions ratemon/runtime/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def smooth(labels):
return label


def make_decision_sender_fairness(
def make_decision_servicepolicy(
args, flowkeys, min_rtt_us, fets, label, flow_to_decisions
):
"""Make a fairness decision for all flows from a sender.
Expand Down Expand Up @@ -360,8 +360,8 @@ def make_decision(
Base the decision on the flow's label and existing decision. Use the flow's features
to calculate any necessary flow metrics, such as the throughput.
"""
if args.sender_fairness:
new_decision = make_decision_sender_fairness(
if args.servicepolicy:
new_decision = make_decision_servicepolicy(
args, flowkeys, min_rtt_us, fets, label, flow_to_decisions
)
else:
Expand Down Expand Up @@ -514,7 +514,7 @@ def parse_from_inference_queue(
val, flow_to_rwnd, flow_to_decisions, flow_to_prev_features
):
"""Parse a message from the inference queue."""
sender_fairness = False
servicepolicy = False
epoch = num_flows_expected = None
opcode, fourtuple = val[:2]
flowkey = flow_utils.FlowKey(*fourtuple)
Expand All @@ -528,8 +528,8 @@ def parse_from_inference_queue(
win_to_loss_event_rate,
) = val[2:]

if opcode.startswith("inference-sender-fairness"):
sender_fairness = True
if opcode.startswith("inference-servicepolicy"):
servicepolicy = True
epoch, num_flows_expected = opcode.split("-")[-2:]
epoch = int(epoch)
num_flows_expected = int(num_flows_expected)
Expand All @@ -553,7 +553,7 @@ def parse_from_inference_queue(
win_to_loss_event_rate,
fourtuple,
flowkey,
sender_fairness,
servicepolicy,
epoch,
num_flows_expected,
)
Expand Down Expand Up @@ -621,7 +621,7 @@ def build_features(

def wait_or_batch(
net,
sender_fairness,
servicepolicy,
batch,
waiting_room,
fourtuple,
Expand All @@ -633,8 +633,8 @@ def wait_or_batch(
epoch,
num_flows_expected,
):
"""Decide whether a flow should wait (if doing sender fairness) or be batched."""
if not sender_fairness:
"""Decide whether a flow should wait (if doing servicepolicy) or be batched."""
if not servicepolicy:
batch.append(([fourtuple], [flowkey], min_rtt_us, all_fets, in_fets))
logging.info(
"Adding %d packets from flow %s to batch.",
Expand All @@ -654,7 +654,7 @@ def wait_or_batch(
[],
)
logging.info(
"Adding %d packets from flow %s to sender fairness waiting room.",
"Adding %d packets from flow %s to servicepolicy waiting room.",
len(in_fets),
flowkey,
)
Expand Down Expand Up @@ -686,7 +686,7 @@ def wait_or_batch(
)
)
logging.info(
"Sender fairness waiting room for sender %s is full. "
"Servicepolicy waiting room for sender %s is full. "
"Adding %d merged packets to batch.",
utils.int_to_ip_str(merged_flowkeys[0].remote_addr),
len(merged_in_fets),
Expand Down Expand Up @@ -781,7 +781,7 @@ def loop_iteration(
Includes: checking if the current batch is ready and running it, pulling a
message from the inference queue, computing features, and deciding if the
flow should wait (sender fairness) or be batched immediately.
flow should wait (servicepolicy) or be batched immediately.
"""
# First, check whether we should run inference on the current batch.
if maybe_run_batch(
Expand Down Expand Up @@ -847,7 +847,7 @@ def loop_iteration(
win_to_loss_event_rate,
fourtuple,
flowkey,
sender_fairness,
servicepolicy,
epoch,
num_flows_expected,
) = parse_res
Expand Down Expand Up @@ -876,7 +876,7 @@ def loop_iteration(

packets_covered_by_batch += wait_or_batch(
net,
sender_fairness,
servicepolicy,
batch,
waiting_room,
fourtuple,
Expand All @@ -898,7 +898,7 @@ def loop_iteration(
def inference_loop(args, flow_to_rwnd, que, inference_flags, done):
"""Receive packets and run inference on them."""
logging.info("Loading model: %s", args.model_file)
if args.sender_fairness:
if args.servicepolicy:
net = models.MathisFairness()
else:
net = models.load_model(args.model_file)
Expand Down
18 changes: 9 additions & 9 deletions ratemon/runtime/ratemon_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def check_flows(args, longest_window, que, inference_flags):
with FLOWS.lock:
for fourtuple, flow in FLOWS.items():
# A fourtuple might add other fourtuples to to_check if
# args.sender_fairness is True.
# args.servicepolicy is True.
if fourtuple in to_check:
continue

Expand All @@ -216,7 +216,7 @@ def check_flows(args, longest_window, que, inference_flags):
logging.info("No incoming packets for flow %s", flow)

if flow.is_ready(args.smoothing_window, longest_window):
if args.sender_fairness:
if args.servicepolicy:
# Only want to add this flow if all the flows from this
# sender are ready.
if FLOWS.sender_okay(
Expand Down Expand Up @@ -334,11 +334,11 @@ def check_flow(fourtuple, args, longest_window, que, inference_flags, epoch=0):
flow.min_rtt_us,
win_to_loss_event_rate,
)
if args.sender_fairness:
if args.servicepolicy:
que.put(
(
# inference-sender-fairness-<epoch>-<sender IP>-<num flows to expect>
f"inference-sender-fairness-{epoch}-{flow.flowkey.remote_addr}-{len(FLOWS.get_flows_from_sender(flow.flowkey.remote_addr))}",
# inference-servicepolicy-<epoch>-<sender IP>-<num flows to expect>
f"inference-servicepolicy-{epoch}-{flow.flowkey.remote_addr}-{len(FLOWS.get_flows_from_sender(flow.flowkey.remote_addr))}",
*info,
),
block=False,
Expand Down Expand Up @@ -530,7 +530,7 @@ def parse_args():
type=int,
)
parser.add_argument(
"--sender-fairness",
"--servicepolicy",
action="store_true",
help=(
"Combine all flows from one sender and enforce fairness between "
Expand All @@ -556,8 +556,8 @@ def parse_args():

assert path.isdir(args.cgroup), f'"--cgroup={args.cgroup}" is not a directory.'
assert (
args.sender_fairness or args.model_file is not None
), "Specify one of '--model-file' or '--sender-fairness'. "
args.servicepolicy or args.model_file is not None
), "Specify one of '--model-file' or '--servicepolicy'. "
return args


Expand All @@ -566,7 +566,7 @@ def run(args):
# Need to load the model to check the input features to see the longest window.
in_spc = (
models.MathisFairness()
if args.sender_fairness
if args.servicepolicy
else models.load_model(args.model_file)
).in_spc

Expand Down
34 changes: 17 additions & 17 deletions ratemon/scripts/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def plot_lines(
ys,
alpha=0.75,
linestyle=(
# If this is a sender fairness graph but not the first
# If this is a servicepolicy graph but not the first
# sender, or a cubic flow in a flow fairness graph...
"solid"
if "Service 2" in label or label == "cubic"
Expand Down Expand Up @@ -210,7 +210,7 @@ def plot_flows_over_time(
out_flp,
flw_to_pkts,
flw_to_cca,
sender_fairness=False,
servicepolicy=False,
flw_to_sender=None,
xlim=None,
bottleneck_Mbps=None,
Expand Down Expand Up @@ -257,9 +257,9 @@ def plot_flows_over_time(

lines.append((throughputs, flw))

# If sender_fairness, then graph the total throughput of each sender instead of the
# If servicepolicy, then graph the total throughput of each sender instead of the
# throughput of each flow.
if sender_fairness and flw_to_sender is not None:
if servicepolicy and flw_to_sender is not None:
sender_to_tputs = dict()
# Accumulate the throughput of each sender.
for throughputs, flw in lines:
Expand Down Expand Up @@ -301,7 +301,7 @@ def plot_flows_over_time(
else:
lines = [(throughputs, flw_to_cca[flw]) for (throughputs, flw) in lines]

colors = [COLORS_MAP["blue"], COLORS_MAP["red"]] if sender_fairness else None
colors = [COLORS_MAP["blue"], COLORS_MAP["red"]] if servicepolicy else None

# If we are supposed to mark the bottleneck bandwidth, then create a horizontal
# line and prepend it to the lines.
Expand All @@ -326,11 +326,11 @@ def plot_flows_over_time(
None,
exp.bw_Mbps if exp.use_bess else None,
out_flp,
legendloc=("center" if sender_fairness else "upper right"),
linewidth=(1 if sender_fairness else 1),
legendloc=("center" if servicepolicy else "upper right"),
linewidth=(1 if servicepolicy else 1),
colors=colors,
bbox_to_anchor=((0.5, 1.15) if sender_fairness else None),
legend_ncol=(2 if sender_fairness else 1),
bbox_to_anchor=((0.5, 1.15) if servicepolicy else None),
legend_ncol=(2 if servicepolicy else 1),
figsize=(5, 2.6),
)

Expand Down Expand Up @@ -421,7 +421,7 @@ def parse_opened_exp(
out_flp,
skip_smoothed,
select_tail_percent,
sender_fairness,
servicepolicy,
):
# skip_smoothed is not used but is kept to maintain API compatibility
# with gen_features.parse_opened_exp().
Expand Down Expand Up @@ -518,7 +518,7 @@ def parse_opened_exp(
out_flp[:-4] + "_flows.pdf",
flw_to_pkts,
flw_to_cca,
sender_fairness,
servicepolicy,
flw_to_sender,
)
# Plot each sender separately.
Expand Down Expand Up @@ -575,7 +575,7 @@ def parse_opened_exp(
out = (
exp,
params,
get_jfi(flw_to_pkts, sender_fairness, flw_to_sender),
get_jfi(flw_to_pkts, servicepolicy, flw_to_sender),
overall_util,
class_to_util,
bneck_to_maxmin_ratios,
Expand Down Expand Up @@ -748,12 +748,12 @@ def calculate_maxmin_ratios(params, flw_to_pkts, flw_to_sender, sender_to_flws):
return bneck_to_maxmin_ratios


def get_jfi(flw_to_pkts, sender_fairness=False, flw_to_sender=None):
def get_jfi(flw_to_pkts, servicepolicy=False, flw_to_sender=None):
flw_to_tput_bps = {
flw: 0 if len(pkts) == 0 else utils.safe_tput_bps(pkts, 0, len(pkts) - 1)
for flw, pkts in flw_to_pkts.items()
}
if sender_fairness:
if servicepolicy:
assert flw_to_sender is not None
sender_to_tput_bps = collections.defaultdict(float)
for flw, tput_bps in flw_to_tput_bps.items():
Expand Down Expand Up @@ -891,7 +891,7 @@ def main(args):
print("Logging to:", log_flp)
logging.info("Evaluating experiments in: %s", args.exp_dir)

our_label = "ServicePolicy" if args.sender_fairness else "FlowPolicy"
our_label = "ServicePolicy" if args.servicepolicy else "FlowPolicy"

# Find all experiments.
pcaps = [
Expand All @@ -901,7 +901,7 @@ def main(args):
path.join(args.out_dir, "individual_results"),
False, # skip_smoothed
args.select_tail_percent,
args.sender_fairness,
args.servicepolicy,
True, # always_reparse
parse_opened_exp,
)
Expand Down Expand Up @@ -1664,7 +1664,7 @@ def parse_args():
type=str,
)
parser.add_argument(
"--sender-fairness",
"--servicepolicy",
action="store_true",
help="Evaluate fairness across senders instead of across flows.",
)
Expand Down

0 comments on commit c48f036

Please sign in to comment.