diff --git a/etc/SciPass.xml b/etc/SciPass.xml index 6b2c087..0d1be53 100644 --- a/etc/SciPass.xml +++ b/etc/SciPass.xml @@ -2,7 +2,7 @@ diff --git a/python/Ryu.py b/python/Ryu.py index 75c5c33..88e77b7 100644 --- a/python/Ryu.py +++ b/python/Ryu.py @@ -173,7 +173,7 @@ def __init__(self,*args, **kwargs): self.balance_thread = hub.spawn(self._balance_loop) self.ports = defaultdict(dict); - self.prefix_bytes = defaultdict(lambda: defaultdict(int)) + self.prefix_bytes = defaultdict(lambda: defaultdict(dict)) self.lastStatsTime = {} self.flowmods = {} @@ -1010,13 +1010,18 @@ def process_flow_stats_of13(self, stats, dp): for prefix in prefix_bytes: for d in ("rx","tx"): - old_bytes = self.prefix_bytes[prefix][d] + old_bytes = 0 + if(self.prefix_bytes[dpid][prefix].has_key(d)): + old_bytes = self.prefix_bytes[dpid][prefix][d] new_bytes = prefix_bytes[prefix][d] bytes = new_bytes - old_bytes + self.logger.debug("Switch : " + str(dpid)) + self.logger.debug("Prefix : " + str(prefix)) + self.logger.debug("Bytes %s : %d ", d, bytes) #if we are less than the previous counter then we re-balanced #set back to 0 and start again if(bytes < 0): - self.prefix_bytes[prefix][d] = 0 + self.prefix_bytes[dpid][prefix][d] = 0 bytes = 0 if(stats_et == None): @@ -1029,7 +1034,7 @@ def process_flow_stats_of13(self, stats, dp): rate = 0 prefix_bps[prefix][d] = rate - self.prefix_bytes[prefix][d] = prefix_bytes[prefix][d] + self.prefix_bytes[dpid][prefix][d] = prefix_bytes[prefix][d] #--- update the balancer for prefix in prefix_bps.keys(): @@ -1153,13 +1158,18 @@ def process_flow_stats_of10(self, stats, dp): for prefix in prefix_bytes: for dir in ("rx","tx"): - old_bytes = self.prefix_bytes[prefix][dir] + old_bytes = 0 + if(self.prefix_bytes[dpid][prefix].has_key(dir)): + old_bytes = self.prefix_bytes[dpid][prefix][dir] new_bytes = prefix_bytes[prefix][dir] bytes = new_bytes - old_bytes #if we are less than the previous counter then we re-balanced #set back to 0 and start again + self.logger.debug("Switch : " + str(dpid)) + self.logger.debug("Prefix : " + str(prefix)) + self.logger.debug("Bytes %s : %d ", dir, bytes) if(bytes < 0): - self.prefix_bytes[prefix][dir] = 0 + self.prefix_bytes[dpid][prefix][dir] = 0 bytes = 0 if(stats_et == None): @@ -1172,7 +1182,7 @@ def process_flow_stats_of10(self, stats, dp): rate = 0 prefix_bps[prefix][dir] = rate - self.prefix_bytes[prefix][dir] = prefix_bytes[prefix][dir] + self.prefix_bytes[dpid][prefix][dir] = prefix_bytes[prefix][dir] #--- update the balancer diff --git a/python/SimpleBalancer.py b/python/SimpleBalancer.py index d9d1df1..a9ddca4 100644 --- a/python/SimpleBalancer.py +++ b/python/SimpleBalancer.py @@ -192,52 +192,6 @@ def getPrefixPriority(self, prefix): if(pfix.Contains(prefix)): return self.prefixPriorities[pfix] - def showStatus(self): - """returns text represention in show format of the current status balancing""" - - mode = "Sensor Load and Prefix Bandwidth" - - if(self.ignoreSensorLoad > 0): - mode = "Prefix Bandwidth" - - if(self.ignoreSensorLoad > 0 and self.ignorePrefixBW > 0): - mode = "IP Space" - - status = ""; - totalHosts = 0 - totalBW = 0; - - status = "Balance Method: %s:\n" % mode - - sensorHosts = defaultdict(int) - sensorBW = defaultdict(int) - for prefix in self.prefixSensor: - totalHosts = totalHosts + prefix.numhosts - totalBW = totalBW + self.prefixBW[prefix] - sensor = self.prefixSensor[prefix] - sensorBW[sensor] = sensorBW[sensor] + self.prefixBW[prefix] - - lastCount = sensorHosts[sensor] - sensorHosts[sensor] = lastCount + prefix.numhosts - - for sensor in self.sensorLoad: - sensorHostVal = sensorHosts[sensor] / float(totalHosts) - try: - sensorBwPer = sensorBW[sensor] / float(totalBW) - except ZeroDivisionError: - sensorBwPer = 0 - - status = status + "sensor: '"+sensor+"' bw: %.2f load: %.3f hosts: %.3f"%(sensorBwPer,self.sensorLoad[sensor],sensorHostVal )+"\n" - - for prefix in self.prefixSensor: - if(self.prefixSensor[prefix] == sensor): - prefixBW = self.prefixBW[prefix] - status = status + " "+str(prefix)+": %.3f "%(prefixBW/1000000.0)+"mbps\n" - - status = status + "\n" - - return status - #distributes prefixes through all groups #this is not going to be event but it is at least a start @@ -779,11 +733,11 @@ def merge(self): if(not self.getGroupStatus(group)): continue - load = self.getGroupLoad(group) + load = self.getGroupLoad(group) - if(load < minLoad): - minLoad = load - minSensor = group; + if(load < minLoad): + minLoad = load + minSensor = group; self.logger.debug("Min Group: " + str(minSensor)) group_a = self.getPrefixGroup(prefix_a) diff --git a/python/t/SimpleBalancerTest.py b/python/t/SimpleBalancerTest.py index f37ad22..ad6dae3 100644 --- a/python/t/SimpleBalancerTest.py +++ b/python/t/SimpleBalancerTest.py @@ -5,8 +5,13 @@ import unittest import xmlrunner import logging +import os +import json +import time from SimpleBalancer import SimpleBalancer,MaxPrefixlenError +from SciPass import SciPass from collections import defaultdict +from mock import Mock class TestInit(unittest.TestCase): @@ -531,10 +536,60 @@ def test_to_string(self): self.balancer = SimpleBalancer() self +class TestStateChange(unittest.TestCase): + + def setUp(self): + self.api = SciPass( logger = logging.getLogger(__name__), + config = str(os.getcwd()) + "/t/etc/SciPass_balancer_only.xml", + readState = False) + self.datapath = Mock(id=1) + self.api.switchJoined(self.datapath) + self.state = "/var/run/" + "%016x" % self.datapath.id + "IUPUI" + ".json" + + def tearDown(self): + os.remove(self.state) + + + def test_initial_config(self): + assert(os.path.isfile(self.state) == 1) + with open(self.state) as data_file: + data = json.load(data_file) + data = data[0] + switches = data["switch"].keys() + assert(switches[0] == "%016x" % self.datapath.id) + domain = data["switch"]["%016x" % self.datapath.id]["domain"].keys() + assert(domain[0] == "IUPUI") + mode = data["switch"]["%016x" % self.datapath.id]["domain"][domain[0]]["mode"].keys() + assert(mode[0] == "Balancer") + + def test_state_restore(self): + net1 = ipaddr.IPv4Network("192.168.0.0/24") + group = self.api.getBalancer("%016x" % self.datapath.id, "IUPUI").getPrefixGroup(net1) + res = self.api.getBalancer("%016x" % self.datapath.id, "IUPUI").splitSensorPrefix(group,net1,check=False) + self.assertTrue(res == 1) + net2 = ipaddr.IPv6Network("2001:0DB8::/48") + group = self.api.getBalancer("%016x" % self.datapath.id, "IUPUI").getPrefixGroup(net2) + res = self.api.getBalancer("%016x" % self.datapath.id, "IUPUI").splitSensorPrefix(group,net2,check=False) + self.assertTrue(res == 1) + self.api.switchLeave(self.datapath) + time.sleep(3) + self.api = SciPass( logger = logging.getLogger(__name__), + config = str(os.getcwd()) + "/t/etc/SciPass_balancer_only.xml", + readState = True) + self.api.switchJoined(self.datapath) + prefixes = self.api.getBalancer("%016x" % self.datapath.id, "IUPUI").getPrefixes() + prefixList = prefixes.keys() + assert(net1 not in prefixList) + assert(net2 not in prefixList) + net = [ipaddr.IPv4Network('192.168.0.0/25'), ipaddr.IPv4Network('192.168.0.128/25')] + assert(n in prefixList for n in net) + net = [ipaddr.IPv6Network('2001:db8::/49'), ipaddr.IPv6Network('2001:db8:0:8000::/49')] + assert(n in prefixList for n in net) def suite(): suite = unittest.TestLoader().loadTestsFromTestCase(TestInit) suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestSensorMods)) suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestPrefix)) suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestBalance)) + suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestStateChange)) return suite diff --git a/python/t/Test.py b/python/t/Test.py index 8ee669e..629fdc9 100644 --- a/python/t/Test.py +++ b/python/t/Test.py @@ -21,6 +21,6 @@ simple_balancer_only_tests = SimpleBalancerOnlyTest.suite() inline_tests = InlineTest.suite() suite = unittest.TestSuite([scipasstests, simplebalancertests, balancer_only_tests, inline_tests, simple_balancer_only_tests]) - + xmlrunner.XMLTestRunner(output='test-reports').run(suite)