def find_unusual_xors(functions):
# TODO find xors in tight loops
candidate_functions = []
for fva in functions:
cva = fva
while cva != idaapi.BADADDR and cva < idc.FindFuncEnd(fva):
if idc.GetMnem(cva) == "xor":
if idc.GetOpnd(cva, 0) != idc.GetOpnd(cva, 1):
g_logger.debug("suspicious XOR instruction at 0x%08X in function 0x%08X: %s", cva, fva,
idc.GetDisasm(cva))
ph = idc.PrevHead(cva)
nh = idc.NextHead(cva)
ip = idc.GetDisasm(ph)
ia = idc.GetDisasm(nh)
if ip and ia:
g_logger.debug("Instructions: %s; %s; %s", ip, idc.GetDisasm(cva), ia)
if ph or nh:
if is_security_cookie(cva, ph, nh):
g_logger.debug("XOR related to security cookie: %s", idc.GetDisasm(cva))
else:
g_logger.debug("unusual XOR: %s", idc.GetDisasm(cva))
candidate_functions.append(fva)
break
cva = idc.NextHead(cva)
return candidate_functions
python类GetDisasm()的实例源码
def find_suspicous_movs(functions):
candidate_functions = []
regs = ["esp", "ebp", "rsp", "rbp"]
for fva in functions:
for (loopStart, loopEnd) in find_tight_loops(fva):
cva = loopStart
while cva <= loopEnd:
if idc.GetMnem(cva) == "mov":
if is_list_item_in_s(regs, idc.GetOpnd(cva, 0)):
cva = idc.NextHead(cva)
continue
# identify register dereferenced writes to memory, e.g. mov [eax], cl
if idc.GetOpType(cva, 0) == OP_TYPE.BASE_INDEX.value:
if idc.GetOpType(cva, 1) not in [OP_TYPE.IMMEDIATE.value, OP_TYPE.IMMEDIATE_FAR.value,
OP_TYPE.IMMEDIATE_NEAR.value]:
g_logger.debug("suspicious MOV instruction at 0x%08X in function 0x%08X: %s", cva, fva,
idc.GetDisasm(cva))
candidate_functions.append(fva)
break
cva = idc.NextHead(cva)
return candidate_functions
def find_xrefs_from( self, func_ea ):
xrefs = []
for item in idautils.FuncItems( func_ea ):
ALL_XREFS = 0
for ref in idautils.XrefsFrom( item, ALL_XREFS ):
if ref.type not in XrefsFromFinder.XREF_TYPE2STR:
continue
if ref.to in idautils.FuncItems( func_ea ):
continue
disas = idc.GetDisasm( item )
curr_xref = XrefFrom( item, ref.to, ref.type, disas )
xrefs.append( curr_xref )
return xrefs
def get_bad_addresses(verbose=True):
""" gets all the unmapped addressed from IDA's database """
ret = []
curEa = idc.MinEA()
while True:
if verbose:
print "[+] getting more bad addresses 0x%08X" % (curEa)
# the regex "(DC[DQ]| B.*) +0x" will retrieve the following:
# 1. DCD 0x...
# 2. DCQ 0x...
# 3. B 0x.....
# 4. BL 0x....
curEa = get_next_bad_addr(curEa, "(DC[DQ]| B.*) +0x")
if curEa == idc.BADADDR:
break
if verbose:
print "[+] found bad address at 0x%08X" % (curEa)
dcd = idc.GetDisasm(curEa)
res = re.findall("0x\w{8,}", dcd)
for r in res:
ret.append(int(r, 16))
if verbose:
print "[+] found %d bad addresses" % len(ret)
return ret
def main():
ibt = IdaBackTracer()
for ibt.api in ibt.send_api:
adr = idc.LocByName(ibt.api)
if ibt.api in ibt.xrefs:
ibt.xrefs[ibt.api] = []
ibt.xrefs[ibt.api] = CodeRefsTo(adr, 1)
for ibt.api, ref in ibt.xrefs.iteritems():
for address in list(ref):
if ibt.api == "WSASendTo":
arg_adr = ibt.get_arg(address, 2)
print idc.GetDisasm(address)
print idc.GetDisasm(arg_adr)
print GetOpnd(arg_adr, 0)
# TODO: Add trace function for none reg arguments like push 0, push [eax], push [0x40000000]
if GetOpnd(arg_adr, 0) in ibt.registers:
ibt.trace_reg(arg_adr, GetOpnd(arg_adr, 0))
#print '%d st occurance of %s in %s : %s'%(count[ibt.api], ibt.api, hex(adr),idc.GetDisasm(adr))
#print 'send buffer is %d arg of %s : %s' % (2, format(buffer,'%x'), idc.GetDisasm(buffer))
#ibt.trace_reg(buffer,GetOpnd(buffer, 0))
def resolveStackAddress(self, address, symbol):
if symbol[0] == "0x0":
return None
info = {}
info['module'] = str(symbol[1])
segm = get_segm_by_name(info['module'])
if segm is not None:
locEA = segm.startEA
delta = address - int(symbol[0], 16) + locEA
func = get_func(delta)
if func is not None:
info['symbol'] = str(get_func_name(delta))
else:
info['symbol'] = str(GetDisasm(delta))
elif symbol[2] != '':
if symbol[2] == '<redacted>':
info['symbol'] = "+0x%X" % (address - int(symbol[0], 16))
else:
info['symbol'] = str(symbol[2])
else:
info['symbol'] = ''
return info
def get_call_name(head):
instruction_type = GetInstructionType(head)
if instruction_type == CALL_INSTRUCTION:
opnd = idc.GetOpnd(head, 0)
if opnd not in registers:
opnd = opnd.replace("ds:","")
return opnd
else:
opnd = idc.GetDisasm(head)
opnd = opnd[opnd.find(";") + 1:]
opnd = opnd.replace(" ", "")
if opnd != None:
return opnd
return None
def get_disasm(self, ea):
return idc.GetDisasm(ea)
def highlight_non_zero_xor(self, ea):
highlight_eas = []
if self.get_mnem(ea) == "xor":
if idc.GetOpnd(ea, 0) != idc.GetOpnd(ea, 1):
ph = idc.PrevHead(ea)
nh = idc.NextHead(ea)
ip = idc.GetDisasm(ph)
ia = idc.GetDisasm(nh)
if ph or nh:
if not self.is_security_cookie(ea, ph, nh):
highlight_eas.append(ea)
MySetColor(ea, self.color)
return highlight_eas
def is_security_cookie(self, va, ph, nh):
# for security cookie check the xor should use ESP or EBP
if idc.GetOpnd(va, 1) not in ["esp", "ebp", "rsp", "rbp"]:
return False
if "security" in idc.GetOpnd(ph, 1):
return True
elif "security" in idc.GetDisasm(nh):
return True
elif "security" in idc.GetDisasm(idc.NextHead(nh)):
return True
return False
def is_security_cookie(va, ph, nh):
# for security cookie check the xor should use ESP or EBP
if idc.GetOpnd(va, 1) not in ["esp", "ebp", "rsp", "rbp"]:
return False
if "security" in idc.GetOpnd(ph, 1):
return True
elif "security" in idc.GetDisasm(nh):
return True
elif "security" in idc.GetDisasm(idc.NextHead(nh)):
return True
return False
def refine_results(self):
likely_retag = 0
fp_retag = 0
fn_retag = 0
for rtn_addr, candidates in self.functions_candidates.items():
for addr in sorted(candidates):
res = self.results[addr]
val = sum([x in res.predicate for x in ["(0 :: 2)", "7x", "7y", u"²"]])
final_status = res.status
alive, dead = res.alive_branch, res.dead_branch
if res.status == self.po.NOT_OPAQUE:
if val != 0:
fn_retag += 1
final_status = self.po.OPAQUE
jmp_target = [x for x in idautils.CodeRefsFrom(addr, 0)][0]
next_target = [x for x in idautils.CodeRefsFrom(addr, 1) if x != jmp_target][0]
alive, dead = (next_target, jmp_target) if idc.GetDisasm(addr)[:2] == "jz" else (jmp_target, next_target)
self.functions_spurious_instrs[rtn_addr].update(res.dependency+[addr])
elif res.status == self.po.OPAQUE:
if val == 0:
fp_retag += 1
final_status = self.po.NOT_OPAQUE
elif res.status == self.po.LIKELY:
if val == 0:
final_status = self.po.NOT_OPAQUE
else:
final_status = self.po.OPAQUE
jmp_target = [x for x in idautils.CodeRefsFrom(addr, 0)][0]
next_target = [x for x in idautils.CodeRefsFrom(addr, 1) if x != jmp_target][0]
alive, dead = (next_target, jmp_target) if idc.GetDisasm(addr)[:2] == "jz" else (jmp_target, next_target)
self.functions_spurious_instrs[rtn_addr].update(res.dependency+[addr])
likely_retag += 1
self.results[addr] = AddrRet(final_status, res.k, res.dependency, res.predicate, res.distance, alive, dead)
print "Retag: FP->OK:%d" % fp_retag
print "Retag: FN->OP:%d" % fn_retag
print "Retag: Lkl->OK:%d" % likely_retag
def dump(self):
return '\n'.join([idc.GetDisasm(x) for x in self.instrs])
def dump_alive(self):
return '\n'.join([idc.GetDisasm(x) for x in self.instrs if self.instrs_status[x] == Status.ALIVE])
def safe_path_to(self, addr):
path = self.full_path_to(addr) # Start from the full path
i = -1
for ea, k in zip(path, range(len(path))): # Compute i such that it is safe
nb_preds = len([x for x in idautils.CodeRefsTo(ea, True)])
if nb_preds > 1:
i = k
elif idc.GetDisasm(ea).startswith("call"):
i = k+1
print i
if i == -1:
return path
else:
return path[i:]
def handleQuickInstHook(self, address, once, breakpoint=False):
# safety checks, can be start of the function
if address in self.idbHookMap and self.idbHookMap[address].hook.type == "func":
dlg = AskYN(0, "Address contains function hook!\nDo you want to remove it?")
if dlg != 1:
return
# remove function hook
self.handleUnhookFunc(address)
offset, moduleName = self.getAddressDetails(address)
hook = InstHook()
hook.id = address
hook.mnemonic = GetDisasm(address)
hook.address = offset
hook.module = moduleName
hook.once = once
hook.breakpoint = breakpoint
entry = HookEntry(hook)
outJSON = json.dumps({
"req_id": kFridaLink_SetHookRequest,
"data": entry.genSetRequest()
})
SetColor(address, CIC_ITEM, kIDAViewColor_HookedInst)
refresh_idaview_anyway()
self.clientSocket.sendto(outJSON, self.clientAddress)
self.idbHookMap[address] = entry
self.idbHooksView.setContent(self.idbHookMap)
def handleHookInstEdit(self, screenEA = None):
if self.hookedInstruction() == False:
return
if screenEA is not None:
address = screenEA
else:
address = ScreenEA()
entry = self.idbHookMap[address]
entry.hook.mnemonic = GetDisasm(address)
hookDlg = InstructionHookDialog(entry.hook.module, "%X" % entry.hook.id, entry.hook.mnemonic, entry.hook.recentSrcFile)
hookDlg.Compile()
hookDlg.script.value = entry.hook.script
hookDlg.trigger.value = 0 if entry.hook.once == True else 1
ok = hookDlg.Execute()
if ok != 1:
return
flags = HookEntry.UDP_NONE
once = True if hookDlg.trigger.value == 0 else False
if entry.hook.once != once:
entry.hook.once = once
flags |= HookEntry.UPD_TRIGGER
entry.hook.recentSrcFile = hookDlg.recentScriptFile
if entry.hook.script != hookDlg.script.value:
entry.hook.script = hookDlg.script.value
flags |= HookEntry.UPD_SCRIPT
outJSON = json.dumps({
"req_id": kFridaLink_UpdHookRequest,
"data": entry.genUpdRequest(flags)
})
self.clientSocket.sendto(outJSON, self.clientAddress)
def handleGetRealAddress(self, screenEA = None):
if screenEA is not None:
address = screenEA
else:
address = ScreenEA()
offset, moduleName = self.getAddressDetails(address)
for module in self.targetModules:
if module['name'] == moduleName:
moduleBase = module['base']
realAddr = int(moduleBase,16) + offset
self.handleFraplLog("info", "[ %s ] 0x%X => 0x%X %s" % (moduleName, address, realAddr, GetDisasm(address)))
break
def GetInstruction(ea):
if ea is None:
raise IdaPythonError("Address cannot be None")
disasm = idc.GetDisasm(ea)
try:
disasm = disasm[:disasm.index(';')]
except ValueError:
pass
if disasm == '':
return None
return disasm
def map_shared_bridges(dsc_file, adrfind):
""" finds branch islands in a given dyld_shared_cache file,
maps them to IDA's db and extract its addresses """
dsc_file.seek(0, 2)
filesize = dsc_file.tell()
dsc_file.seek(0)
ACCESS_READ = 1
a = mmap.mmap(dsc_file.fileno(), length=filesize, access=ACCESS_READ)
reexp = re.compile("\xcf\xfa\xed\xfe.{340,360}dyld_shared_cache_branch_islands")
print "[+] scanning dsc for BRANCH ISLANDS"
# this list will hold all our branch_islands segments
branch_islands_segments = []
jmp_to_code = collections.defaultdict(list)
for ma in reexp.finditer(a):
print "[+] WRITING BRANCH ISLAND: 0x%08X" % (ma.start())
fif = FileInFile(dsc_file, ma.start())
m = MachO_patched(fif)
if _IN_IDA:
for seg in m.segments:
for sec in seg.sections:
idc.AddSegEx(sec.addr,
sec.addr + sec.size, 0, 0,
idaapi.saRelPara, idaapi.scPub,
idc.ADDSEG_FILLGAP)
name = "branch_islands_%X%s%s" % (ma.start(), seg.segname, sec.sectname)
idc.RenameSeg(sec.addr, name)
idc.SetSegClass(sec.addr, "CODE")
idc.SetSegAddressing(sec.addr, 2)
dsc_file.seek(sec.offset)
memcpy(sec.addr, dsc_file.read(sec.size))
branch_islands_segments.append(sec.addr)
# make code
codeea = sec.addr
print "Going through the code!"
while codeea < (sec.addr + sec.size):
res = idc.MakeCode(codeea)
if not res:
print "[!] EA:0x%X ERR while making code" % codeea
codeea += 4
continue
d = idc.GetDisasm(codeea)
# if it's a "B 0x4dd13550"
if d.startswith("B "):
addr = d.split()[1]
if addr.startswith("0x"):
branchaddr = int(addr, 16)
jmp_to_code[branchaddr].append(codeea)
# idc.MakeRptCmt(codeea, "0x%X was taken!" % branchaddr)
codeea = idc.FindUnexplored(codeea, idc.SEARCH_DOWN)
label_and_fix_branch_islands(dsc_file, adrfind, jmp_to_code)
def trace_reg(self, adr, reg):
start = GetFunctionAttr(adr, FUNCATTR_START)
end = GetFunctionAttr(adr, FUNCATTR_END)
func_args = self.get_func_args_cmnt(start)
print func_args
address = PrevHead(adr, minea=0)
if adr == start:
return None
while start <= address <= end:
mn = GetMnem(address)
op1 = GetOpnd(address,0)
if reg in op1 and mn in ['mov', 'movsx', 'movzx', 'xchg', 'lea']:
op2 = GetOpnd(address,1)
idaapi.decode_insn(address)
if idaapi.cmd.Op2.type == idaapi.o_displ:
next_reg = op2[1:4]
if 'ebp' in op2:
op_2 = op2[5:-1]
print '1. %s: %s %s -> %s' % (hex(address),mn,op1,op_2)
for s in func_args:
if op_2.lower() in s.lower():
print '%s found in arguments of sub_%s' % (op_2,format(start, 'x'))
list_xref = list(CodeRefsTo(start, 1))
index = func_args.index(s) + 1
buffer_arg = self.get_arg(list_xref[0], index)
print 'send buffer is %d arg of sub_%s : %s' % (index, format(list_xref[0], 'x'),
idc.GetDisasm(buffer_arg))
return self.trace_reg(buffer_arg,GetOpnd(buffer_arg, 0))
return self.trace_reg(address,op_2)
elif next_reg in self.registers:
print '2. %s: %s %s -> %s' % (hex(address),mn,op1,op2)
return self.trace_reg(address,next_reg)
else:
if idaapi.cmd.Op2.type is idaapi.o_reg and 'eax' in GetOpnd(address,1):
has_call, c, adr = self.has_call_inst(address,0)
if has_call:
print '%s found as a candidate for DS initialization %d instructions after %s' % (
GetFunctionName(GetOperandValue(address,0)), c, idc.GetDisasm(address))
if self.check_init(GetOperandValue(adr,0)):
print '%s contains pointer to a heap allocated memory region %s' % (
GetOpnd(address,1) , GetDisasm(address))
print '%s: %s %s -> %s' % (hex(address),mn,op1,op2)
return self.trace_reg(address,op2)
address=PrevHead(address,minea=0)
def handleHookInstCust(self, screenEA = None):
if screenEA is not None:
address = screenEA
else:
address = ScreenEA()
# safety checks, can be start of the function
if address in self.idbHookMap and self.idbHookMap[address].hook.type == "func":
dlg = AskYN(0, "Address contains function hook!\nDo you want to remove it?")
if dlg != 1:
return
# remove function hook
self.handleUnhookFunc(address)
offset, moduleName = self.getAddressDetails(address)
hookDlg = InstructionHookDialog(moduleName, "%X" % address, GetDisasm(address), None)
hookDlg.Compile()
hookDlg.script.value = ""
ok = hookDlg.Execute()
if ok != 1:
return
hook = InstHook()
hook.id = address
hook.mnemonic = GetDisasm(address)
hook.address = offset
hook.module = moduleName
hook.once = True if hookDlg.trigger.value == 0 else False
hook.recentScriptFile = hookDlg.recentScriptFile
hook.script = hookDlg.script.value
entry = HookEntry(hook)
outJSON = json.dumps({
"req_id": kFridaLink_SetHookRequest,
"data": entry.genSetRequest()
})
SetColor(address, CIC_ITEM, kIDAViewColor_HookedInst)
refresh_idaview_anyway()
self.clientSocket.sendto(outJSON, self.clientAddress)
self.idbHookMap[address] = entry
self.idbHooksView.setContent(self.idbHookMap)