def by(**type):
"""Search through all the enumerations within the database and return the first result.
like = glob match
regex = regular expression
index = particular index
identifier or id = internal id number
"""
searchstring = ', '.join("{:s}={!r}".format(k,v) for k,v in type.iteritems())
res = __builtin__.list(iterate(**type))
if len(res) > 1:
map(logging.info, ("[{:d}] {:s} & {:#x} ({:d} members){:s}".format(idaapi.get_enum_idx(n), idaapi.get_enum_name(n), mask(n), len(__builtin__.list(members(n))), " // {:s}".format(comment(n)) if comment(n) else '') for i,n in enumerate(res)))
logging.warn("{:s}.search({:s}) : Found {:d} matching results, returning the first one.".format(__name__, searchstring, len(res)))
res = next(iter(res), None)
if res is None:
raise LookupError("{:s}.search({:s}) : Found 0 matching results.".format(__name__, searchstring))
return res
python类map()的实例源码
def list(**type):
"""List all the enumerations within the database.
Search type can be identified by providing a named argument.
like = glob match
regex = regular expression
index = particular index
identifier = particular id number
pred = function predicate
"""
res = __builtin__.list(iterate(**type))
maxindex = max(__builtin__.map(idaapi.get_enum_idx, res))
maxname = max(__builtin__.map(utils.compose(idaapi.get_enum_name, len), res))
maxsize = max(__builtin__.map(size, res))
cindex = math.ceil(math.log(maxindex or 1)/math.log(10))
cmask = max(__builtin__.map(utils.compose(mask, math.log, functools.partial(operator.mul, 1.0/math.log(16)), math.ceil), res) or [database.config.bits()/4.0])
for n in res:
print("[{:{:d}d}] {:>{:d}s} & {:#<{:d}x} ({:d} members){:s}".format(idaapi.get_enum_idx(n), int(cindex), idaapi.get_enum_name(n), maxname, mask(n), int(cmask), len(__builtin__.list(members(n))), " // {:s}".format(comment(n)) if comment(n) else ''))
return
## members
def search(cls, **type):
"""Search through all of the functions within the database and return the first result.
Please review the help for functions.list for the definition of ``type``.
"""
query_s = ', '.join("{:s}={!r}".format(k,v) for k,v in type.iteritems())
res = __builtin__.list(cls.iterate(**type))
if len(res) > 1:
__builtin__.map(logging.info, (("[{:d}] {:s}".format(i, function.name(ea))) for i,ea in enumerate(res)))
f = utils.compose(function.by,function.name)
logging.warn("{:s}.search({:s}) : Found {:d} matching results, returning the first one. : {!r}".format('.'.join((__name__, cls.__name__)), query_s, len(res), f(res[0])))
res = __builtin__.next(iter(res), None)
if res is None:
raise LookupError("{:s}.search({:s}) : Found 0 matching results.".format('.'.join((__name__, cls.__name__)), query_s))
return res
def list(cls, **type):
"""List all of the names in the database that match ``type``.
Search can be constrained by the named argument ``type``.
like = glob match against name
ea, address = name is at address
name = exact name match
regex = regular-expression against name
index = name at index
pred = function predicate
"""
res = __builtin__.list(cls.__iterate__(**type))
maxindex = max(res or [1])
maxaddr = max(__builtin__.map(idaapi.get_nlist_ea, res) or [idaapi.BADADDR])
cindex = math.ceil(math.log(maxindex)/math.log(10))
caddr = math.floor(math.log(maxaddr)/math.log(16))
for index in res:
print "[{:>{:d}d}] {:0{:d}x} {:s}".format(index, int(cindex), idaapi.get_nlist_ea(index), int(caddr), idaapi.get_nlist_name(index))
return
def search(cls, **type):
"""Search through all of the names within the database and return the first result.
Please review the help for names.list for the definition of ``type``.
"""
query_s = ', '.join("{:s}={!r}".format(k,v) for k,v in type.iteritems())
res = __builtin__.list(cls.__iterate__(**type))
if len(res) > 1:
__builtin__.map(logging.info, (("[{:d}] {:x} {:s}".format(idx, idaapi.get_nlist_ea(idx), idaapi.get_nlist_name(idx))) for idx in res))
f1, f2 = idaapi.get_nlist_ea, idaapi.get_nlist_name
logging.warn("{:s}.search({:s}) : Found {:d} matching results, returning the first one. : {:x} {!r}".format('.'.join((__name__, cls.__name__)), query_s, len(res), f1(res[0]), f2(res[0])))
res = __builtin__.next(iter(res), None)
if res is None:
raise LookupError("{:s}.search({:s}) : Found 0 matching results.".format('.'.join((__name__, cls.__name__)), query_s))
return idaapi.get_nlist_ea(res)
def map(l, *args, **kwds):
"""Execute provided callback on all functions in database. Synonymous to map(l,db.functions()).
``l`` is defined as a function(address, *args, **kwds).
Any other arguments are passed to ``l`` unmodified.
"""
i,x = 0,here()
current = x
all = functions()
result = []
try:
for i,x in enumerate(all):
go(x)
print("{:x}: processing # {:d} of {:d} : {:s}".format(x, i+1, len(all), name(x)))
result.append( l(x, *args, **kwds) )
except KeyboardInterrupt:
print("{:x}: terminated at # {:d} of {:d} : {:s}".format(x, i+1, len(all), name(x)))
go(current)
return result
def search(cls, **type):
"""Search through all of the entry-points within the database and return the first result.
Please review the help for entry.list for the definition of ``type``.
"""
query_s = ', '.join("{:s}={!r}".format(k,v) for k,v in type.iteritems())
res = __builtin__.list(cls.__iterate__(**type))
if len(res) > 1:
__builtin__.map(logging.info, (("[{:d}] {:x} : ({:x}) {:s}".format(idx, cls.__address__(idx), cls.__entryordinal__(idx), cls.__entryname__(idx))) for idx in res))
f = utils.compose(idaapi.get_entry_ordinal, idaapi.get_entry)
logging.warn("{:s}.search({:s}) : Found {:d} matching results, returning the first one. : {:x}".format('.'.join((__name__,cls.__name__)), query_s, len(res), f(res[0])))
res = __builtin__.next(iter(res), None)
if res is None:
raise LookupError("{:s}.search({:s}) : Found 0 matching results.".format('.'.join((__name__,cls.__name__)), query_s))
return cls.__address__(res)
def list(cls, **type):
"""List all of the imports in the database that match ``type``.
Search can be constrained by the named argument ``type``.
like = glob match against import short name
ea, address = import is at address
fullname = glob match against import long name -> MODULE!function
module = glob match against module
ordinal = exact match against import ordinal number
name = exact match against import name
regex = regular-expression against import name
index = import name at index
pred = function predicate
"""
res = __builtin__.list(cls.iterate(**type))
maxaddr = max(__builtin__.map(utils.first, res) or [idaapi.BADADDR])
maxmodule = max(__builtin__.map(utils.compose(utils.second, utils.first, len), res) or [''])
caddr = math.floor(math.log(maxaddr)/math.log(16))
cordinal = max(__builtin__.map(utils.compose(utils.second, operator.itemgetter(2), "{:d}".format, len), res) or [1])
for ea,(module,name,ordinal) in res:
print "{:0{:d}x} {:s}<{:<d}>{:s} {:s}".format(ea, int(caddr), module, ordinal, ' '*(cordinal-len("{:d}".format(ordinal)) + (maxmodule-len(module))), name)
return
def search(cls, **type):
"""Search through all of the imports within the database and return the first result.
Please review the help for imports.list for the definition of ``type``.
"""
query_s = ', '.join("{:s}={!r}".format(k,v) for k,v in type.iteritems())
res = __builtin__.list(cls.iterate(**type))
if len(res) > 1:
__builtin__.map(logging.info, ("{:x} {:s}<{:d}> {:s}".format(ea, module, ordinal, name) for ea,(module,name,ordinal) in res))
f = utils.compose(utils.second, cls.__formatl__)
logging.warn("{:s}.search({:s}) : Found {:d} matching results, returning the first one. : {!r}".format('.'.join((__name__,cls.__name__)), query_s, len(res), f(res[0])))
res = __builtin__.next(iter(res), None)
if res is None:
raise LookupError("{:s}.search({:s}) : Found 0 matching results.".format('.'.join((__name__,cls.__name__)), query_s))
return res[0]
def prev(cls, ea, count):
ea = interface.address.within(ea)
isStop = lambda ea: _instruction.feature(ea) & idaapi.CF_STOP == idaapi.CF_STOP
invalidQ = utils.compose(utils.fap(utils.compose(type.is_code, operator.not_), isStop), any)
refs = filter(type.is_code, xref.up(ea))
if len(refs) > 1 and invalidQ(address.prev(ea)):
logging.fatal("{:s}.prev({:x}, count={:d}) : Unable to determine previous address due to multiple previous references being available : {:s}".format('.'.join((__name__, cls.__name__)), ea, count, ', '.join(__builtin__.map("{:x}".format,refs))))
return None
try:
if invalidQ(address.prev(ea)):
res = refs[0]
count += 1
else:
res = address.prev(ea)
except:
res = ea
return cls.prev(res, count-1) if count > 1 else res
def __start_monitoring(self, stdout, stderr=None):
"""Start monitoring threads. **used internally**"""
program = self.program
name = "thread-{:x}".format(program.pid)
# create monitoring threads + coroutines
if stderr:
res = process.monitorPipe(self.taskQueue, (stdout,program.stdout),(stderr,program.stderr), name=name)
else:
res = process.monitorPipe(self.taskQueue, (stdout,program.stdout), name=name)
res = map(None, res)
# attach a method for injecting data into a monitor
for t,q in res: t.send = q.send
threads,senders = zip(*res)
# update threads for destruction later
self.__threads.update(threads)
# set things off
for t in threads: t.start()
def monitor(send, pipe, blocksize=1, daemon=True, name=None):
"""Spawn a thread that reads `blocksize` bytes from `pipe` and dispatches it to `send`
For every single byte, `send` is called. The thread is named according to
the `name` parameter.
Returns the monitoring threading.thread instance
"""
def shuffle(send, pipe):
while not pipe.closed:
data = pipe.read(blocksize)
if len(data) == 0:
# pipe.read syscall was interrupted. so since we can't really
# determine why (cause...y'know..python), stop dancing so
# the parent will actually be able to terminate us
break
map(send,data)
return
if name:
monitorThread = threading.Thread(target=shuffle, name=name, args=(send,pipe))
else:
monitorThread = threading.Thread(target=shuffle, args=(send,pipe))
monitorThread.daemon = daemon
return monitorThread
def list(**type):
"""List all the structures within the database.
Search type can be identified by providing a named argument.
like = glob match
regex = regular expression
index = particular index
identifier = particular id number
pred = function predicate
"""
res = __builtin__.list(iterate(**type))
maxindex = max(__builtin__.map(utils.compose(operator.attrgetter('index'),"{:d}".format,len), res) or [1])
maxname = max(__builtin__.map(utils.compose(operator.attrgetter('name'),len), res) or [1])
maxsize = max(__builtin__.map(utils.compose(operator.attrgetter('size'),"{:x}".format,len), res) or [1])
for st in res:
print("[{:{:d}d}] {:>{:d}s} {:<+{:d}x} ({:d} members){:s}".format(idaapi.get_struc_idx(st.id), maxindex, st.name, maxname, st.size, maxsize, len(st.members), " // {:s}".format(st.comment) if st.comment else ''))
return
def up(self):
'''Return all the structures that reference this specific structure.'''
x, sid = idaapi.xrefblk_t(), self.id
# grab first structure that references this one
ok = x.first_to(sid, 0)
if not ok:
return ()
# continue collecting all structures that references this one
res = [(x.frm,x.iscode,x.type)]
while x.next_to():
res.append((x.frm,x.iscode,x.type))
# convert refs into a list of OREFs
refs = [ interface.OREF(xrfrom, xriscode, interface.ref_t.of(xrtype)) for xrfrom, xriscode, xrtype in res ]
# return as a tuple
return map(utils.compose(operator.itemgetter(0), instance), refs)
def down(self):
'''Return all the structures that are referenced by this specific structure.'''
x, sid = idaapi.xrefblk_t(), self.id
# grab structures that this one references
ok = x.first_from(sid, 0)
if not ok:
return []
# continue collecting all structures that this one references
res = [(x.to, x.iscode, x.type)]
while x.next_from():
res.append((x.to, x.iscode, x.type))
# convert refs into a list of OREFs
refs = [ interface.OREF(xrto, xriscode, interface.ref_t.of(xrtype)) for xrto, xriscode, xrtype in res ]
# return it as a tuple
return map(utils.compose(operator.itemgetter(0), instance), refs)
def list(self, **type):
"""List all the members within the structure.
Search type can be identified by providing a named argument.
like = glob match
regex = regular expression
index = particular index
identifier = particular id number
predicate = function predicate
"""
res = __builtin__.list(self.iterate(**type))
escape = repr
maxindex = max(__builtin__.map(utils.compose(operator.attrgetter('index'),"{:d}".format,len), res) or [1])
maxoffset = max(__builtin__.map(utils.compose(operator.attrgetter('offset'),"{:x}".format,len), res) or [1])
maxsize = max(__builtin__.map(utils.compose(operator.attrgetter('size'),"{:x}".format,len), res) or [1])
maxname = max(__builtin__.map(utils.compose(operator.attrgetter('name'), escape, len), res) or [1])
maxtype = max(__builtin__.map(utils.compose(operator.attrgetter('type'), repr, len), res) or [1])
for m in res:
print "[{:{:d}d}] {:>{:d}x}:+{:<{:d}x} {:<{:d}s} {:{:d}s} (flag={:x},dt_type={:x}{:s}){:s}".format(m.index, maxindex, m.offset, int(maxoffset), m.size, maxsize, escape(m.name), int(maxname), m.type, int(maxtype), m.flag, m.dt_type, '' if m.typeid is None else ",typeid={:x}".format(m.typeid), " // {:s}".format(m.comment) if m.comment else '')
return
def near_offset(self, offset):
'''Return the member near to the specified ``offset``.'''
min,max = map(lambda sz: sz + self.baseoffset, (idaapi.get_struc_first_offset(self.owner.ptr),idaapi.get_struc_last_offset(self.owner.ptr)))
if (offset < min) or (offset >= max):
logging.warn("{:s}.instance({:s}).members.near_offset : Requested offset {:+#x} not within bounds ({:#x},{:#x}). Trying anyways..".format(__name__, self.owner.name, offset, min, max))
res = offset - self.baseoffset
mem = idaapi.get_member(self.owner.ptr, res)
if mem is None:
logging.info("{:s}.instance({:s}).members.near_offset : Unable to locate member at offset {:+#x}. Trying get_best_fit_member instead.".format(__name__, self.owner.name, res))
mem = idaapi.get_best_fit_member(self.owner.ptr, res)
if mem is None:
raise LookupError("{:s}.instance({:s}).members.near_offset : Unable to find member near offset : {:+#x}".format(__name__, self.owner.name, offset))
index = self.index(mem)
return self[index]
def flatmap(f, items):
return chain.from_iterable(map(f, items))
def lmap(*args, **kwargs):
return list(map(*args, **kwargs))
def list(cls, enum):
# FIXME: make this consistent with every other .list
eid = by(enum)
res = __builtin__.list(cls.iterate(eid))
maxindex = max(__builtin__.map(utils.first, enumerate(res)) or [1])
maxvalue = max(__builtin__.map(utils.compose(cls.value, "{:x}".format, len), res) or [1])
for i, mid in enumerate(res):
print("[{:d}] {:>0{:d}x} {:s}".format(i, cls.value(mid), maxvalue, cls.name(mid)))
return
def list(cls, **type):
"""List all of the entry-points within the database that match ``type`.
Search can be constrained by the named argument ``type``.
like = glob match against entry-point name
ea, address = exact address match
name = exact entry-point name match
regex = regular-expression against entry-point name
index = particular index
greater, less = greater-or-equal against address, less-or-equal against address
pred = function predicate
"""
res = __builtin__.list(cls.__iterate__(**type))
to_address = utils.compose(idaapi.get_entry_ordinal, idaapi.get_entry)
to_numlen = utils.compose("{:x}".format, len)
maxindex = max(res+[1])
maxaddr = max(__builtin__.map(to_address, res) or [idaapi.BADADDR])
maxordinal = max(__builtin__.map(idaapi.get_entry_ordinal, res) or [1])
cindex = math.ceil(math.log(maxindex)/math.log(10))
caddr = math.floor(math.log(maxaddr)/math.log(16))
cordinal = math.floor(math.log(maxordinal)/math.log(16))
for index in res:
print "[{:{:d}d}] {:>{:d}x} : ({:{:d}x}) {:s}".format(index, int(cindex), to_address(index), int(caddr), cls.__entryordinal__(index), int(cindex), cls.__entryname__(index))
return
def prevreg(cls, ea, reg, *regs, **modifiers):
regs = (reg,) + regs
count = modifiers.get('count', 1)
args = ', '.join(["{:x}".format(ea)] + __builtin__.map("\"{:s}\"".format, regs) + __builtin__.map(utils.unbox("{:s}={!r}".format), modifiers.items()))
# generate each helper using the regmatch class
iterops = interface.regmatch.modifier(**modifiers)
uses_register = interface.regmatch.use(regs)
# if within a function, then sure we're within the chunk's bounds.
if function.within(ea):
(start, _) = function.chunk(ea)
fwithin = functools.partial(operator.le, start)
# otherwise ensure that we're not in the function and we're a code type.
else:
fwithin = utils.compose(utils.fap(utils.compose(function.within, operator.not_), type.is_code), all)
start = cls.walk(ea, cls.prev, fwithin)
start = top() if start == idaapi.BADADDR else start
# define a function for cls.walk to continue looping while
F = lambda ea: fwithin(ea) and not any(uses_register(ea, opnum) for opnum in iterops(ea))
# skip the current address
prevea = cls.prev(ea)
if prevea is None:
# FIXME: include registers in message
logging.fatal("{:s}.prevreg({:s}, ...) : Unable to start walking from previous address. : {:x}".format('.'.join((__name__, cls.__name__)), args, ea))
return ea
# now walk while none of our registers match
res = cls.walk(prevea, cls.prev, F)
if res == idaapi.BADADDR or (cls == address and res < start):
# FIXME: include registers in message
raise ValueError("{:s}.prevreg({:s}, ...) : Unable to find register{:s} within chunk. {:x}:{:x} : {:x}".format('.'.join((__name__, cls.__name__)), args, ('s','')[len(regs)>1], start, ea, res))
# recurse if the user specified it
modifiers['count'] = count - 1
return cls.prevreg( cls.prev(res), *regs, **modifiers) if count > 1 else res
def nextreg(cls, ea, reg, *regs, **modifiers):
regs = (reg,) + regs
count = modifiers.get('count',1)
args = ', '.join(["{:x}".format(ea)] + __builtin__.map("\"{:s}\"".format, regs) + __builtin__.map(utils.unbox("{:s}={!r}".format), modifiers.items()))
# generate each helper using the regmatch class
iterops = interface.regmatch.modifier(**modifiers)
uses_register = interface.regmatch.use(regs)
# if within a function, then sure we're within the chunk's bounds.
if function.within(ea):
(_,end) = function.chunk(ea)
fwithin = functools.partial(operator.gt, end)
# otherwise ensure that we're not in a function and we're a code type.
else:
fwithin = utils.compose(utils.fap(utils.compose(function.within, operator.not_), type.is_code), all)
end = cls.walk(ea, cls.next, fwithin)
end = bottom() if end == idaapi.BADADDR else end
# define a function for cls.walk to continue looping while
F = lambda ea: fwithin(ea) and not any(uses_register(ea, opnum) for opnum in iterops(ea))
# skip the current address
nextea = cls.next(ea)
if nextea is None:
# FIXME: include registers in message
logging.fatal("{:s}.nextreg({:s}) : Unable to start walking from next address. : {:x}".format('.'.join((__name__, cls.__name__)), args, ea))
return ea
# now walk while none of our registers match
res = cls.walk(nextea, cls.next, F)
if res == idaapi.BADADDR or (cls == address and res >= end):
# FIXME: include registers in message
raise ValueError("{:s}.nextreg({:s}, ...) : Unable to find register{:s} within chunk {:x}:{:x} : {:x}".format('.'.join((__name__, cls.__name__)), args, ('s','')[len(regs)>1], end, ea, res))
# recurse if the user specified it
modifiers['count'] = count - 1
return cls.nextreg(cls.next(res), *regs, **modifiers) if count > 1 else res
def document(cls, name, cache):
res = []
for func, types, _ in cache:
doc = (func.__doc__ or '').split('\n')
if len(doc) > 1:
res.append("{:s} ->".format(cls.prototype(func, types)))
res.extend("{: >{padding:d}s}".format(n, padding=len(name)+len(n)+1) for n in map(operator.methodcaller('strip'), doc))
elif len(doc) == 1:
res.append(cls.prototype(func, types) + (" -> {:s}".format(doc[0]) if len(doc[0]) else ''))
continue
return '\n'.join(res)
def __stop_monitoring(self):
"""Cleanup monitoring threads"""
P = self.program
if P.poll() is None:
raise RuntimeError("Unable to stop monitoring while process {!r} is still running.".format(P))
# stop the update thread
self.eventWorking.clear()
# forcefully close pipes that still open, this should terminate the monitor threads
# also, this fixes a resource leak since python doesn't do this on subprocess death
for p in (P.stdin,P.stdout,P.stderr):
while p and not p.closed:
try: p.close()
except: pass
continue
# join all monitoring threads
map(operator.methodcaller('join'), self.threads)
# now spin until none of them are alive
while len(self.threads) > 0:
for th in self.threads[:]:
if not th.is_alive(): self.__threads.discard(th)
del(th)
continue
# join the updater thread, and then remove it
self.taskQueue.put(None)
self.updater.join()
assert not self.updater.is_alive()
self.__updater = None
return
def __getstate__(self):
cmtt,cmtf = map(functools.partial(idaapi.get_struc_cmt,self.id), (True,False))
# FIXME: perhaps we should preserve the get_struc_idx result too
return (self.name,(cmtt,cmtf),self.members)
def __getstate__(self):
return (self.owner.name,self.baseoffset,map(self.__getitem__,range(len(self))))
def by_offset(self, offset):
'''Return the member at the specified ``offset``.'''
min,max = map(lambda sz: sz + self.baseoffset, (idaapi.get_struc_first_offset(self.owner.ptr),idaapi.get_struc_last_offset(self.owner.ptr)))
mptr = idaapi.get_member(self.owner.ptr, max - self.baseoffset)
msize = idaapi.get_member_size(mptr)
if (offset < min) or (offset >= max+msize):
raise LookupError("{:s}.instance({:s}).members.by_offset : Requested offset {:+#x} not within bounds ({:#x},{:#x})".format(__name__, self.owner.name, offset, min, max+msize))
mem = idaapi.get_member(self.owner.ptr, offset - self.baseoffset)
if mem is None:
raise LookupError("{:s}.instance({:s}).members.by_offset : Unable to find member at offset : {:+#x}".format(__name__, self.owner.name, offset))
index = self.index(mem)
return self[index]
def __repr__(self):
'''Display all the fields within the specified structure.'''
result = []
mn, ms = 0, 0
for i in xrange(len(self)):
m = self[i]
name,t,ofs,size,comment = m.name,m.type,m.offset,m.size,m.comment
result.append((i,name,t,ofs,size,comment))
mn = max((mn,len(name)))
ms = max((ms,len("{:x}".format(size))))
mi = len(str(len(self)))
mo = max(map(len,map("{:x}".format, (self.baseoffset,self.baseoffset+self.owner.size))))
return "{!r}\n{:s}".format(self.owner, '\n'.join("[{:{:d}d}] {:>{:d}x}:+{:<{:d}x} {:<{:d}s} {!r} {:s}".format(i,mi,o,mo,s,ms,"'{:s}'".format(n),mn+2,t," // {:s}".format(c) if c else '') for i,n,t,o,s,c in result))
def iterate(**type):
'''Iterate through each segment defined in the database.'''
if not type: type = {'predicate':lambda n: True}
def newsegment(index):
res = idaapi.getnseg(index)
res.index = index
return res
res = __builtin__.map(newsegment, xrange(idaapi.get_segm_qty()))
for k,v in type.iteritems():
res = __builtin__.list(__matcher__.match(k, v, res))
for n in res: yield n