Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEST-28 #328

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 73 additions & 100 deletions bin/fe
Original file line number Diff line number Diff line change
Expand Up @@ -5,123 +5,96 @@
fe - the File Editor. Uses RCS to maintain ACL policy versioning.
"""

__author__ = 'Jathan McCollum, Mark Ellzey Thomas, Michael Shields'
__author__ = ('Jathan McCollum, Mark Ellzey Thomas, Michael Shields, '
'Michael Harding')
__maintainer__ = 'Jathan McCollum'
__email__ = '[email protected]'
__copyright__ = 'Copyright 2002-2013, AOL Inc.'
__version__ = '1.2.1'


import os
import re
from simpleparse.error import ParserSyntaxError
import sys
import termios

from trigger import acl
from trigger.acl.parser import TIP
import os
#import trigger.acl.fe as fe
from trigger.utils.cli import yesno
from trigger.conf import settings
from simpleparse.error import ParserSyntaxError
from trigger.exceptions import ParseError
import trigger.acl.fe as fe
import re

psa_checks = (
(lambda t: 'source-port' in t.match, 'source port'),
(lambda t: 'tos' in t.match, 'ToS'),
(lambda t: 'precedence' in t.match, 'precedence'),
(lambda t: 'protocol' in t.match and 'igmp' in t.match['protocol'], 'IGMP')
)

def gsr_checks(a):
'''PSA and SALSA checks, if the ACL is tagged appropriately.'''
ok = True
max_lines, psa = None, False
if [c for c in a.comments if 'SALSA' in c]:
max_lines = 128
elif [c for c in a.comments if 'PSA-128' in c]:
max_lines, psa = 128, True
elif [c for c in a.comments if 'PSA-448' in c]:
max_lines, psa = 448, True
if max_lines and len(a.terms) > max_lines:
print 'ACL has %d lines, max %d (GSR limit)' \
% (len(a.terms), max_lines)
ok = False
if psa:
for t in a.terms:
for check, reason in psa_checks:
if check(t):
print 'Match on %s not permitted in PSA mode' % reason
for line in t.output_ios():
print ' ' + line
ok = False
return ok

def normalize(a):
'''Fix up the ACL, and return False if there are problems.'''

if isinstance(a, acl.PolicerGroup):
return True

ok = True

# JunOS counter policy and duplicate term names.
if a.format == 'junos':
names = set()
for t in a.terms:
if t.name in names:
print 'Duplicate term name', t.name
ok = False
else:
names.add(t.name)

# GSR checks.
if a.format == 'ios': # not ios_named
if not gsr_checks(a):
ok = False

# Check for 10/8.
for t in a.terms:
for addr in ('address', 'source-address', 'destination-address'):
for block in t.match.get(addr, []):
if block == TIP('10.0.0.0/8'):
print 'Matching on 10.0.0.0/8 is never correct'
for line in t.output(a.format):
print ' ' + line
ok = False

# Here is the place to put other policy checks; for example, we could
# check blue to green HTTP and make sure all those terms have a comment
# in a standardized format saying it was approved.

return ok
# end built-in acl parsing
sys.path.append(os.path.dirname(__file__))

def edit(editfile):
"""
Edits the file and calls normalize(). Loops until it passes or the user
confirms they want to bypass failed normalization. Returns a file object of
the diff output.
"""
editor = os.environ.get('EDITOR', 'vim')
if editor.startswith('vi'):
os.spawnlp(os.P_WAIT, editor, editor, '+set sw=4', '+set softtabstop=4',
editfile)
else:
os.spawnlp(os.P_WAIT, editor, editor, editfile)

if os.path.basename(editfile): #.startswith('acl.'):
print 'Normalizing ACL...'
a = None
accepted = False
# following line is essentially a no-op
# do plug in processing here
# figure out the base file name, so we can do regular
# expression parsing
fn = os.path.basename(editfile)
try:
#import fe_extensions
#tests = fe_extensions.tests
try:
a = acl.parse(file(editfile))
except ParserSyntaxError, e:
print e
except TypeError, e:
print e
except Exception, e:
print 'ACL parse failed: ', sys.exc_type, ':', e
if a and normalize(a):
output = '\n'.join(a.output(replace=True)) + '\n'
file(editfile, 'w').write(output)
sys.path.extend(settings.FE_EXT_DIRS)
except:
pass
ctests = settings.FE_TESTS
tl = []
tests = []
for c in ctests:
if isinstance(c,basestring):
tl.append(c)
else:
assert(len(c)==2)
if re.match(c[0],fn):
tl.extend(c[1])
else:
continue
# convert the strings to functions
for s in tl:
s = s.split('.')
m = __import__(s[0])
for n in s[1:]:
m = getattr(m, n)
tests.append(m)
except (AttributeError, NameError):
tests = (fe.acltest,)
while not accepted:
editor = os.environ.get('EDITOR', 'vim')
if editor.startswith('vi'):
os.spawnlp(os.P_WAIT, editor, editor, '+set sw=4',
'+set softtabstop=4', editfile)
else:
if yesno('Re-edit?'):
return edit(editfile)
os.spawnlp(os.P_WAIT, editor, editor, editfile)
for test in tests:
# match means we think we have the right test,
# output is either 'False' or the transformed
# input
# alternately, a syntax error can be thrown up,
# meaning we got far enough that this is probably
# the actual error
try:
matches, output = test(editfile)
except (ParserSyntaxError, ParseError) as e:
print e
break
if not matches:
continue
# do a 'None' check to allow empty files...
if output is not None:
file(editfile, 'w').write(output)
accepted = True
break
# can't parse the output
if not accepted and not yesno("can't parse, retry?"):
accepted = True
# should we allow a write here?

# should use a list version of popen
return os.popen('rcsdiff -u -b -q ' + editfile).read()
Expand Down
Empty file added conf/__init__.py
Empty file.
65 changes: 65 additions & 0 deletions conf/fe_extensions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""
fe_extensions.py - Contains parser extensions for the 'fe' utility
"""
from trigger import acl
import IPy
from simpleparse.parser import Parser
import trigger.exceptions

declaration = r'''
root := 'policy-options',! "problem parsing policy options",ws,'{',ws,plb,ws,'}',ws
<ws> := [ \t\n]*
replace := 'replace:'
identifier := [-a-zA-Z]+
plb := replace?,ws,'prefix-list',ws,identifier,ws,'{',ples,ws,'}'
ples := ple+
ple := ws,?-"}",! "problem around line %(line)s",ip,ws,';'
ip := [0-9]+,'.',[0-9]+,'.',[0-9]+,'.',[0-9]+,('/',[0-9]+)?
'''

from simpleparse.dispatchprocessor import *
class MyProcessorClass( DispatchProcessor ):
# example
def production_name( self, (tag,start,stop,subtags), buffer ):
"""Process the given production and its children"""
pass

def plb( self, (tag,start,stop,subtags), buffer):
return [dispatchList(self,subtags,buffer)]
pass

def identifier( self, (tag,start,stop,subtags),buffer):
return buffer[start:stop]

def ip( self, (tag,start,stop,subtags), buffer):
try:
return IPy.IP(buffer[start:stop])
except ValueError as ve:
e = trigger.exceptions.ParseError("Can't parse this as an IP address: %s "
"(around line %d)" %
(buffer[start:stop], lines(0,start,buffer)))
raise e

def replace( self, (tag,start,stop,subtags), buffer):
return 'replace'

def ples( self, (tag,start,stop,subtags), buffer):
return ['ples',dispatchList(self,subtags,buffer)]

def ple( self, (tag,start,stop,subtags), buffer):
return dispatch(self,subtags[0],buffer)

class MyParser(Parser):
def buildProcessor( self ):
return MyProcessorClass()

def parse_prefixlist(filename):
import pdb; pdb.set_trace()
fc = open(filename).read()
# add quick test here... like a regular expression test
parser = MyParser(declaration)
x = parser.parse(fc)
if x[-1] == len(fc):
# success, we 'ate' the whole file
return True, fc
return False, None
29 changes: 29 additions & 0 deletions conf/trigger_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,3 +505,32 @@ def _create_cm_ticket_stub(**args):
NOTIFICATION_HANDLERS = [
'trigger.utils.notifications.handlers.email_handler',
]

# extention directories for additional parsing/tests
# add the configuration directory, at least
FE_EXT_DIRS = (
os.path.dirname(__file__),
)

# a list of functions to use when running fe - functions
# are run in order...

import re

# alternative form for plugins - regular expression for the filename,
# followed by list of parsers.
# make sure that parser list is iterable, i.e. ('x',) vs ('x') or 'x'
# regular expression filtered plugins can be mixed with single
# entries, as shows
#FE_TESTS = (
# (r'^acl\..*$', ('fe_extensions.parse_prefixlist',
# 'trigger.acl.fe.acltest')),
# 'some.other.function',
#)

# list of parsers, including plugins - trigger.acl.fe.acltest
# is the test that used to be run by default.
FE_TESTS = (
'fe_extensions.parse_prefixlist',
'trigger.acl.fe.acltest'
)
Loading