Skip to content

Commit

Permalink
[TACACS+] Add config command for AAA authorization and accounting. (s…
Browse files Browse the repository at this point in the history
…onic-net#1889)

This pull request add config command for AAA authorization & accounting.

    Support TACACS per-command authorization & accounting.

    Change AAA config command to support authorization & accounting.
    Change show AAA command to support authorization & accounting.
    Add UT to cover changed code.

    1. Build following project and pass all UTs:
        make target/python-wheels/sonic_utilities-1.2-py3-none-any.whl
    2. Test new command manually.

    N/A

    Add config command for AAA authorization & accounting.
  • Loading branch information
liuh-80 committed Jul 4, 2023
1 parent 50296b9 commit 1c353ad
Show file tree
Hide file tree
Showing 3 changed files with 330 additions and 0 deletions.
38 changes: 38 additions & 0 deletions config/aaa.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,44 @@ def login(auth_protocol):
add_table_kv('AAA', 'authentication', 'login', val)
authentication.add_command(login)

# cmd: aaa authorization
@click.command()
@click.argument('protocol', nargs=-1, type=click.Choice([ "tacacs+", "local", "tacacs+ local"]))
def authorization(protocol):
"""Switch AAA authorization [tacacs+ | local | '\"tacacs+ local\"']"""
if len(protocol) == 0:
click.echo('Argument "protocol" is required')
return

if len(protocol) == 1 and (protocol[0] == 'tacacs+' or protocol[0] == 'local'):
add_table_kv('AAA', 'authorization', 'login', protocol[0])
elif len(protocol) == 1 and protocol[0] == 'tacacs+ local':
add_table_kv('AAA', 'authorization', 'login', 'tacacs+,local')
else:
click.echo('Not a valid command')
aaa.add_command(authorization)

# cmd: aaa accounting
@click.command()
@click.argument('protocol', nargs=-1, type=click.Choice(["disable", "tacacs+", "local", "tacacs+ local"]))
def accounting(protocol):
"""Switch AAA accounting [disable | tacacs+ | local | '\"tacacs+ local\"']"""
if len(protocol) == 0:
click.echo('Argument "protocol" is required')
return

if len(protocol) == 1:
if protocol[0] == 'tacacs+' or protocol[0] == 'local':
add_table_kv('AAA', 'accounting', 'login', protocol[0])
elif protocol[0] == 'tacacs+ local':
add_table_kv('AAA', 'accounting', 'login', 'tacacs+,local')
elif protocol[0] == 'disable':
del_table_key('AAA', 'accounting', 'login')
else:
click.echo('Not a valid command')
else:
click.echo('Not a valid command')
aaa.add_command(accounting)

@click.group()
def tacacs():
Expand Down
10 changes: 10 additions & 0 deletions show/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1434,10 +1434,20 @@ def aaa():
'authentication': {
'login': 'local (default)',
'failthrough': 'False (default)'
},
'authorization': {
'login': 'local (default)'
},
'accounting': {
'login': 'disable (default)'
}
}
if 'authentication' in data:
aaa['authentication'].update(data['authentication'])
if 'authorization' in data:
aaa['authorization'].update(data['authorization'])
if 'accounting' in data:
aaa['accounting'].update(data['accounting'])
for row in aaa:
entry = aaa[row]
for key in entry:
Expand Down
282 changes: 282 additions & 0 deletions tests/aaa_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
import imp
import os
import sys

from click.testing import CliRunner
from utilities_common.db import Db

import config.main as config
import show.main as show

test_path = os.path.dirname(os.path.abspath(__file__))
modules_path = os.path.dirname(test_path)
sys.path.insert(0, test_path)
sys.path.insert(0, modules_path)

import mock_tables.dbconnector

show_aaa_default_output="""\
AAA authentication login local (default)
AAA authentication failthrough False (default)
AAA authorization login local (default)
AAA accounting login disable (default)
"""

show_aaa_radius_output="""\
AAA authentication login radius
AAA authentication failthrough False (default)
AAA authorization login local (default)
AAA accounting login disable (default)
"""

show_aaa_radius_local_output="""\
AAA authentication login radius,local
AAA authentication failthrough False (default)
AAA authorization login local (default)
AAA accounting login disable (default)
"""

config_aaa_empty_output="""\
"""

config_aaa_not_a_valid_command_output="""\
Not a valid command
"""

show_aaa_tacacs_authentication_output="""\
AAA authentication login tacacs+
AAA authentication failthrough False (default)
AAA authorization login local (default)
AAA accounting login disable (default)
"""

show_aaa_tacacs_authorization_output="""\
AAA authentication login tacacs+
AAA authentication failthrough False (default)
AAA authorization login tacacs+
AAA accounting login disable (default)
"""

show_aaa_tacacs_local_authorization_output="""\
AAA authentication login tacacs+
AAA authentication failthrough False (default)
AAA authorization login tacacs+,local
AAA accounting login disable (default)
"""

show_aaa_tacacs_accounting_output="""\
AAA authentication login tacacs+
AAA authentication failthrough False (default)
AAA authorization login tacacs+,local
AAA accounting login tacacs+
"""

show_aaa_tacacs_local_accounting_output="""\
AAA authentication login tacacs+
AAA authentication failthrough False (default)
AAA authorization login tacacs+,local
AAA accounting login tacacs+,local
"""

show_aaa_disable_accounting_output="""\
AAA authentication login tacacs+
AAA authentication failthrough False (default)
AAA authorization login tacacs+,local
AAA accounting login disable
"""

class TestAaa(object):
@classmethod
def setup_class(cls):
os.environ['UTILITIES_UNIT_TESTING'] = "1"
print("SETUP")
import config.main
imp.reload(config.main)
import show.main
imp.reload(show.main)

@classmethod
def teardown_class(cls):
os.environ['UTILITIES_UNIT_TESTING'] = "0"
print("TEARDOWN")

def test_show_aaa_default(self):
runner = CliRunner()
result = runner.invoke(show.cli.commands["aaa"], [])
print(result.exit_code)
print(result.output)
assert result.exit_code == 0
assert result.output == show_aaa_default_output

def test_config_aaa_radius(self, get_cmd_module):
(config, show) = get_cmd_module
runner = CliRunner()
db = Db()
db.cfgdb.delete_table("AAA")

result = runner.invoke(config.config.commands["aaa"],\
["authentication", "login", "radius"], obj=db)
print(result.exit_code)
print(result.output)
assert result.exit_code == 0
assert result.output == config_aaa_empty_output

db.cfgdb.mod_entry("AAA", "authentication", {'login' : 'radius'})

result = runner.invoke(show.cli.commands["aaa"], [], obj=db)
assert result.exit_code == 0
assert result.output == show_aaa_radius_output

result = runner.invoke(config.config.commands["aaa"],\
["authentication", "login", "default"], obj=db)
print(result.exit_code)
print(result.output)
assert result.exit_code == 0
assert result.output == config_aaa_empty_output

db.cfgdb.delete_table("AAA")

result = runner.invoke(show.cli.commands["aaa"], [], obj=db)
assert result.exit_code == 0
assert result.output == show_aaa_default_output

def test_config_aaa_radius_local(self, get_cmd_module):
(config, show) = get_cmd_module
runner = CliRunner()
db = Db()
db.cfgdb.delete_table("AAA")

result = runner.invoke(config.config.commands["aaa"],\
["authentication", "login", "radius", "local"], obj=db)
print(result.exit_code)
print(result.output)
assert result.exit_code == 0
assert result.output == config_aaa_empty_output

db.cfgdb.mod_entry("AAA", "authentication", {'login' : 'radius,local'})

result = runner.invoke(show.cli.commands["aaa"], [], obj=db)
assert result.exit_code == 0
assert result.output == show_aaa_radius_local_output

result = runner.invoke(config.config.commands["aaa"],\
["authentication", "login", "default"], obj=db)
print(result.exit_code)
print(result.output)
assert result.exit_code == 0
assert result.output == config_aaa_empty_output

db.cfgdb.delete_table("AAA")

result = runner.invoke(show.cli.commands["aaa"], [], obj=db)
assert result.exit_code == 0
assert result.output == show_aaa_default_output

def test_config_aaa_radius_invalid(self):
runner = CliRunner()
result = runner.invoke(config.config.commands["aaa"],\
["authentication", "login", "radius", "tacacs+"])
print(result.exit_code)
print(result.output)
assert result.exit_code == 0
assert result.output == config_aaa_not_a_valid_command_output

def test_config_aaa_tacacs(self, get_cmd_module):
(config, show) = get_cmd_module
runner = CliRunner()
db = Db()
db.cfgdb.delete_table("AAA")

# test tacacs authentication
result = runner.invoke(config.config.commands["aaa"],\
["authentication", "login", "tacacs+"], obj=db)
print(result.exit_code)
print(result.output)
assert result.exit_code == 0
assert result.output == config_aaa_empty_output

db.cfgdb.mod_entry("AAA", "authentication", {'login' : 'tacacs+'})

result = runner.invoke(show.cli.commands["aaa"], [], obj=db)
assert result.exit_code == 0
assert result.output == show_aaa_tacacs_authentication_output

# test tacacs authorization
result = runner.invoke(config.config.commands["aaa"],\
["authorization", "tacacs+"], obj=db)
print(result.exit_code)
print(result.output)
assert result.exit_code == 0
assert result.output == config_aaa_empty_output

db.cfgdb.mod_entry("AAA", "authorization", {'login' : 'tacacs+'})

result = runner.invoke(show.cli.commands["aaa"], [], obj=db)
assert result.exit_code == 0
assert result.output == show_aaa_tacacs_authorization_output

# test tacacs + local authorization
result = runner.invoke(config.config.commands["aaa"],\
["authorization", "tacacs+ local"], obj=db)
print(result.exit_code)
print(result.output)
assert result.exit_code == 0
assert result.output == config_aaa_empty_output

db.cfgdb.mod_entry("AAA", "authorization", {'login' : 'tacacs+,local'})

result = runner.invoke(show.cli.commands["aaa"], [], obj=db)
assert result.exit_code == 0
assert result.output == show_aaa_tacacs_local_authorization_output

# test tacacs accounting
result = runner.invoke(config.config.commands["aaa"],\
["accounting", "tacacs+"], obj=db)
print(result.exit_code)
print(result.output)
assert result.exit_code == 0
assert result.output == config_aaa_empty_output

db.cfgdb.mod_entry("AAA", "accounting", {'login' : 'tacacs+'})

result = runner.invoke(show.cli.commands["aaa"], [], obj=db)
assert result.exit_code == 0
assert result.output == show_aaa_tacacs_accounting_output

# test tacacs + local accounting
result = runner.invoke(config.config.commands["aaa"],\
["accounting", "tacacs+ local"], obj=db)
print(result.exit_code)
print(result.output)
assert result.exit_code == 0
assert result.output == config_aaa_empty_output

db.cfgdb.mod_entry("AAA", "accounting", {'login' : 'tacacs+,local'})

result = runner.invoke(show.cli.commands["aaa"], [], obj=db)
assert result.exit_code == 0
assert result.output == show_aaa_tacacs_local_accounting_output

# test disable accounting
result = runner.invoke(config.config.commands["aaa"],\
["accounting", "disable"], obj=db)
print(result.exit_code)
print(result.output)
assert result.exit_code == 0
assert result.output == config_aaa_empty_output

db.cfgdb.mod_entry("AAA", "accounting", {'login' : 'disable'})

result = runner.invoke(show.cli.commands["aaa"], [], obj=db)
assert result.exit_code == 0
assert result.output == show_aaa_disable_accounting_output

0 comments on commit 1c353ad

Please sign in to comment.