- Refactoring stuff into a package.
This commit is contained in:
301
gui/Scyther/XMLReader.py
Normal file
301
gui/Scyther/XMLReader.py
Normal file
@@ -0,0 +1,301 @@
|
||||
#
|
||||
# XMLReader
|
||||
#
|
||||
# Note:
|
||||
# This requires python elementtree to work
|
||||
# See: http://effbot.org/zone/element-index.htm
|
||||
#
|
||||
# On Fedora Core you can install this by installing the python-elementtree rpm
|
||||
# Things will be a lot faster and consume less memory if you install the
|
||||
# cElementTree module
|
||||
#
|
||||
|
||||
# Check for cElementTree presence. Otherwise use ElementTree.
|
||||
useiter = True
|
||||
try:
|
||||
import cElementTree
|
||||
except ImportError:
|
||||
useiter = False
|
||||
from elementtree import ElementTree
|
||||
|
||||
## Simply pick cElementTree
|
||||
#import cElementTree
|
||||
## Simply pick ElementTree
|
||||
#useiter = False
|
||||
#from elementtree import ElementTree
|
||||
|
||||
import Term
|
||||
import Attack
|
||||
import Trace
|
||||
import Claim
|
||||
import sys
|
||||
|
||||
class XMLReader(object):
|
||||
|
||||
def __init__ (self):
|
||||
self.varlist = []
|
||||
pass
|
||||
|
||||
def readXML(self, input):
|
||||
# Use iter parse when possble so we can clear the attack after reading
|
||||
# it in order to preserve memory (this requires cElementTree)
|
||||
|
||||
attackbuffer = []
|
||||
claims = []
|
||||
|
||||
if useiter:
|
||||
parser = cElementTree.iterparse(input)
|
||||
else:
|
||||
parser = ElementTree.parse(input).findall('*')
|
||||
|
||||
for elem in parser:
|
||||
# The iter parser receives the input in tuples (event and element)
|
||||
# we only need the event
|
||||
if useiter:
|
||||
elem = elem[1]
|
||||
|
||||
if elem.tag == 'state':
|
||||
attack = self.readAttack(elem)
|
||||
attackbuffer.append(attack)
|
||||
if useiter:
|
||||
elem.clear()
|
||||
|
||||
if elem.tag == 'claimstatus':
|
||||
claim = self.readClaim(elem)
|
||||
claim.attacks = attackbuffer
|
||||
claims.append(claim)
|
||||
|
||||
# link to parent
|
||||
for attack in claim.attacks:
|
||||
attack.claim = claim
|
||||
|
||||
attackbuffer = []
|
||||
if useiter:
|
||||
elem.clear()
|
||||
|
||||
return claims
|
||||
|
||||
# Read a term from XML
|
||||
def readTerm(self,xml):
|
||||
# If xml is None the term should also be none
|
||||
if xml == None:
|
||||
return None
|
||||
# If this is a term variable read it directly
|
||||
if (xml.tag in ('tuple','const','apply','encrypt','var')):
|
||||
return self.readSubTerm(xml)
|
||||
# Otherwise read from it's first child
|
||||
children = xml.getchildren()
|
||||
assert(len(children) == 1)
|
||||
return self.readSubTerm(children[0])
|
||||
|
||||
def readSubTerm(self, tag):
|
||||
if tag.tag == 'tuple':
|
||||
return Term.TermTuple(self.readTerm(tag.find('op1')),self.readTerm(tag.find('op2')))
|
||||
elif tag.tag == 'const':
|
||||
return Term.TermConstant(tag.text)
|
||||
elif tag.tag == 'apply':
|
||||
return Term.TermApply(self.readTerm(tag.find('function')),self.readTerm(tag.find('arg')))
|
||||
elif tag.tag == 'encrypt':
|
||||
return Term.TermEncrypt(self.readTerm(tag.find('op')),self.readTerm(tag.find('key')))
|
||||
elif tag.tag == 'var':
|
||||
name = Term.TermConstant(tag.get('name'))
|
||||
# Instantiate this variable if possible (note this list is empty while reading
|
||||
# the variables section of the XML file)
|
||||
for inst in self.varlist:
|
||||
if inst.name == name:
|
||||
return inst
|
||||
# If it is not instantiated in varlist, just return a variable with this name and no
|
||||
# value
|
||||
return Term.TermVariable(name,None)
|
||||
else:
|
||||
raise Term.InvalidTerm, "Invalid term type in XML: %s" % tag.tag
|
||||
|
||||
def readEvent(self,xml):
|
||||
label = self.readTerm(xml.find('label'))
|
||||
follows = xml.findall('follows')
|
||||
followlist = []
|
||||
for follow in follows:
|
||||
follow = follow.find('after')
|
||||
if follow == None:
|
||||
# Ignore follow definitions that do not contain after
|
||||
continue
|
||||
follow = (int(follow.get('run')),int(follow.get('index')))
|
||||
followlist.append(follow)
|
||||
|
||||
(etype,index) = (xml.get('type'),int(xml.get('index')))
|
||||
if etype in ('send','read'):
|
||||
fr = self.readTerm(xml.find('from'))
|
||||
to = self.readTerm(xml.find('to'))
|
||||
message = self.readTerm(xml.find('message'))
|
||||
if (etype == 'send'):
|
||||
return Trace.EventSend(index,label,followlist,fr,to,message)
|
||||
else:
|
||||
return Trace.EventRead(index,label,followlist,fr,to,message)
|
||||
elif xml.get('type') == 'claim':
|
||||
role = self.readTerm(xml.find('role'))
|
||||
etype = self.readTerm(xml.find('type'))
|
||||
argument = self.readTerm(xml.find('argument'))
|
||||
# Freshness claims are implemented as Empty claims with
|
||||
# (Fresh,Value) as arguments
|
||||
try:
|
||||
if etype == 'Empty' and argument[0] == 'Fresh':
|
||||
etype = Term.TermConstant('Fresh')
|
||||
argument = argument[1]
|
||||
elif etype == 'Empty' and argument[0] == 'Compromised':
|
||||
etype = Term.TermConstant('Compromised')
|
||||
argument = argument[1]
|
||||
except:
|
||||
pass
|
||||
return Trace.EventClaim(index,label,followlist,role,etype,argument)
|
||||
else:
|
||||
raise Trace.InvalidAction, "Invalid action in XML: %s" % (xml.get('type'))
|
||||
|
||||
def readRun(self,xml):
|
||||
assert(xml.tag == 'run')
|
||||
run = Trace.Run()
|
||||
run.id = int(xml.find('runid').text)
|
||||
# TODO why is protocol name a term??
|
||||
run.protocol = str(self.readTerm(xml.find('protocol')))
|
||||
run.intruder = xml.find('protocol').get('intruder') == 'true'
|
||||
run.role = xml.find('rolename').text
|
||||
for role in xml.find('roleagents'):
|
||||
name = role.find('rolename').text
|
||||
agent = self.readTerm(role.find('agent'))
|
||||
run.roleAgents[name] = agent
|
||||
for eventxml in xml.find('eventlist'):
|
||||
action = self.readEvent(eventxml)
|
||||
action.run = run
|
||||
run.eventList.append(action)
|
||||
return run
|
||||
|
||||
# Read protocol description for a certain role
|
||||
def readRoleDescr(self,xml):
|
||||
assert(xml.tag == 'role')
|
||||
run = Trace.Run()
|
||||
# We will need the last label later on to see if a
|
||||
# run is complete
|
||||
run.lastLabel = None
|
||||
run.role = xml.find('rolename').text
|
||||
for eventxml in xml.find('eventlist'):
|
||||
action = self.readEvent(eventxml)
|
||||
action.run = run
|
||||
run.eventList.append(action)
|
||||
run.lastLabel = action.label
|
||||
return run
|
||||
|
||||
def readTypeList(self,xml):
|
||||
result = []
|
||||
vartypes = xml.find('type').find('termlist')
|
||||
for vartype in vartypes:
|
||||
# We will assume that types are simple strings
|
||||
result.append(str(self.readTerm(vartype)))
|
||||
return result
|
||||
|
||||
def readClaim(self, xml):
|
||||
claim = Claim.Claim()
|
||||
for event in xml.getchildren():
|
||||
if event.tag == 'claimtype':
|
||||
claim.claimtype = self.readTerm(event)
|
||||
elif event.tag == 'label':
|
||||
# We store the full protocol,label construct for
|
||||
# consistency with the technical parts, so it is left to
|
||||
# the __str__ of claim to select the right element
|
||||
claim.label = self.readTerm(event)
|
||||
elif event.tag == 'protocol':
|
||||
claim.protocol = self.readTerm(event)
|
||||
elif event.tag == 'role':
|
||||
claim.role = self.readTerm(event)
|
||||
elif event.tag == 'parameter':
|
||||
claim.parameter = self.readTerm(event)
|
||||
|
||||
elif event.tag == 'failed':
|
||||
claim.failed = int(event.text)
|
||||
elif event.tag == 'count':
|
||||
claim.count = int(event.text)
|
||||
elif event.tag == 'states':
|
||||
claim.states = int(event.text)
|
||||
|
||||
elif event.tag == 'complete':
|
||||
claim.complete = True
|
||||
elif event.tag == 'timebound':
|
||||
claim.timebound = True
|
||||
else:
|
||||
print >>sys.stderr,"Warning unknown tag in claim: %s" % claim.tag
|
||||
|
||||
claim.analyze()
|
||||
return claim
|
||||
|
||||
def readAttack(self, xml):
|
||||
self.varlist = []
|
||||
attack = Attack.Attack()
|
||||
attack.id = int(xml.get('id'))
|
||||
# A state contains 4 direct child nodes:
|
||||
# broken, system, variables and semitrace
|
||||
# optionally a fifth: dot
|
||||
for event in xml.getchildren():
|
||||
if event.tag == 'broken':
|
||||
attack.broken.append((self.readTerm(event.find('claim')),
|
||||
self.readTerm(event.find('label'))))
|
||||
elif event.tag == 'system':
|
||||
attack.match = int(event.find('match').text)
|
||||
for term in event.find('commandline'):
|
||||
if attack.commandline != '':
|
||||
attack.commandline += ' '
|
||||
attack.commandline += term.text
|
||||
for term in event.find('untrusted').find('termlist'):
|
||||
attack.untrusted.append(str(self.readTerm(term)))
|
||||
for term in event.find('initialknowledge').find('termlist'):
|
||||
attack.initialKnowledge.append(self.readTerm(term))
|
||||
for keypair in event.find('inversekeys'):
|
||||
inverse = []
|
||||
for term in keypair:
|
||||
inverse.append(self.readTerm(term))
|
||||
assert(len(inverse) == 0 or len(inverse) == 2)
|
||||
attack.inverseKeys.append(inverse)
|
||||
# TODO why is protocol name a term??
|
||||
for protocolxml in event.findall('protocol'):
|
||||
protocol = str(self.readTerm(protocolxml.find('name')))
|
||||
descr = Trace.ProtocolDescription(protocol)
|
||||
attack.protocoldescr[protocol] = descr
|
||||
for rolexml in protocolxml.findall('role'):
|
||||
roledescr = self.readRoleDescr(rolexml)
|
||||
descr.roledescr[roledescr.role] = roledescr
|
||||
|
||||
elif event.tag == 'semitrace':
|
||||
for runxml in event:
|
||||
run = self.readRun(runxml)
|
||||
run.attack = attack
|
||||
attack.semiTrace.runs.append(run)
|
||||
|
||||
elif event.tag == 'dot':
|
||||
# Apparently Scyther already generated dot output,
|
||||
# store
|
||||
attack.scytherDot = event.text
|
||||
|
||||
elif event.tag == 'variables':
|
||||
# Read the variables one by one
|
||||
for varxml in event:
|
||||
if varxml.get('typeflaw') == 'true':
|
||||
attack.typeflaws = True
|
||||
var = self.readTerm(varxml.find('name').find('term'))
|
||||
var.types = self.readTypeList(varxml.find('name'))
|
||||
|
||||
substxml = varxml.find('substitution')
|
||||
# Read substitution if present
|
||||
if substxml != None:
|
||||
subst = self.readTerm(substxml.find('term'))
|
||||
subst.types = self.readTypeList(substxml)
|
||||
newvar = Term.TermVariable(var.name,subst)
|
||||
newvar.types = var.types
|
||||
var = newvar
|
||||
|
||||
attack.variables.append(var)
|
||||
|
||||
# When all have been read set self.varlist so that when
|
||||
# we read terms in the attacks they can be filled in using
|
||||
# this list
|
||||
self.varlist = attack.variables
|
||||
else:
|
||||
print >>sys.stderr,"Warning unknown tag in attack: %s" % event.tag
|
||||
return attack
|
||||
|
||||
Reference in New Issue
Block a user