def _fixup_pe_header(self, pe):
"""Fixes the PE header from an in-memory representation to an
on-disk representation."""
for section in pe.sections:
section.PointerToRawData = section.VirtualAddress
section.SizeOfRawData = max(
section.Misc_VirtualSize, section.SizeOfRawData
)
reloc = pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_BASERELOC"]
if len(pe.OPTIONAL_HEADER.DATA_DIRECTORY) < reloc:
return
reloc = pe.OPTIONAL_HEADER.DATA_DIRECTORY[reloc]
if not reloc.VirtualAddress or not reloc.Size:
return
# Disable relocations as those have already been applied.
reloc.VirtualAddress = reloc.Size = 0
pe.FILE_HEADER.Characteristics |= \
pefile.IMAGE_CHARACTERISTICS["IMAGE_FILE_RELOCS_STRIPPED"]
python类DIRECTORY_ENTRY的实例源码
def initialize(self, sample):
if(self.already_initialized):
return self.library
self.already_initialized = True
try:
self.library = pefile.PE(data=sample.getBinary(), fast_load=True)
# see if this initializations can be done on plugins.
self.library.parse_data_directories(directories=[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_TLS'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']])
except pefile.PEFormatError:
# print("parse fail")
self.library = None
# print(traceback.format_exc())
logging.error("Error parsing pefileModule with sample:%s",
sample.getID(), exc_info=True)
def get_import_size_stats(self):
# self.pe.parse_data_directories() # si if has fast load.
total = 0
if (self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT']].VirtualAddress == 0):
return 0, 0, 0
for entry in self.pe.DIRECTORY_ENTRY_IMPORT:
total = total + len(entry.imports)
# print entry.dll
# for imp in entry.imports:
# print '\t', hex(imp.address), imp.name
cant_librerias = (len(self.pe.DIRECTORY_ENTRY_IMPORT))
total_imports = total
promedio = total / cant_librerias
return total_imports, cant_librerias, promedio
def getImports(self):
if (self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT']].VirtualAddress == 0):
return None
d = {}
# print(self.pe.DIRECTORY_ENTRY_IMPORT)
for entry in self.pe.DIRECTORY_ENTRY_IMPORT:
aux = []
for i in range(len(entry.dll)):
if(ord(entry.dll[i]) >= 128):
aux.append('.')
else:
aux.append(entry.dll[i])
dll_name = "".join(aux)
# print entry.dll
# print entry.imports
l = []
for imp in entry.imports:
l.append(str(imp.name))
# print '\t', hex(imp.address), imp.name
d[unicode(str(dll_name), "utf-8")] = l
return d
def test_write_header_fields(self):
"""Verify correct field data modification."""
# Test version information writing
control_file = os.path.join(REGRESSION_TESTS_DIR, 'MSVBVM60.DLL')
pe = pefile.PE(control_file, fast_load=True)
pe.parse_data_directories(
directories=[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']])
original_data = pe.write()
str1 = b'string1'
str2 = b'str2'
str3 = b'string3'
pe.FileInfo[0].StringTable[0].entries['FileDescription'] = str1
pe.FileInfo[0].StringTable[0].entries['FileVersion'] = str2
pe.FileInfo[0].StringTable[0].entries['InternalName'] = str3
new_data = pe.write()
diff, differences = 0, list()
for idx in range(len(original_data)):
if original_data[idx] != new_data[idx]:
diff += 1
# Skip the zeroes that pefile automatically adds to pad a new,
# shorter string, into the space occupied by a longer one.
if new_data[idx] != 0:
differences.append(chr(new_data[idx]))
# Verify all modifications in the file were the ones we just made
#
self.assertEqual(''.join(differences).encode('utf-8', 'backslashreplace'), str1 + str2 + str3)
pe.close()
def ScanFile(filename, signatures, minimumEntropy):
global oLogger
if not FileContentsStartsWithMZ(filename):
return
try:
pe = GetPEObject(filename)
except pefile.PEFormatError:
oLogger.PrintAndLog(('%s', '%s'), (filename, 'PEFormatError'))
return
except TypeError:
oLogger.PrintAndLog(('%s', '%s'), (filename, 'TypeError'))
return
try:
raw = pe.write()
except MemoryError:
oLogger.PrintAndLog(('%s', '%s'), (filename, 'MemoryError'))
return
entropy = pe.sections[0].entropy_H(raw)
if entropy >= minimumEntropy:
countFlagsExecute = 0
countFlagsExecuteAndWrite = 0
for section in pe.sections:
if section.IMAGE_SCN_MEM_EXECUTE:
countFlagsExecute += 1
if section.IMAGE_SCN_MEM_EXECUTE and section.IMAGE_SCN_MEM_WRITE:
countFlagsExecuteAndWrite += 1
calculatedCRC = pe.generate_checksum()
crcDifferent = pe.OPTIONAL_HEADER.CheckSum != 0 and pe.OPTIONAL_HEADER.CheckSum != calculatedCRC
info = GetVersionInfo(pe)
oLogger.PrintAndLog(('%s', '%f', '%d', '%d', '%d', '%d', '%08X', '%08X', '%d', '%s', '%s', '%s', '%s'), (filename, entropy, len(pe.sections), countFlagsExecute, countFlagsExecuteAndWrite, pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY']].Size, pe.OPTIONAL_HEADER.CheckSum, calculatedCRC, crcDifferent, time.asctime(time.gmtime(pe.FILE_HEADER.TimeDateStamp)), repr(RVOES(info, 'CompanyName')), repr(RVOES(info, 'ProductName')), hashlib.md5(raw).hexdigest()))
def get_reloc_diff(rinstrs):
"""When reordering relocatable instructions, we also need to update
the relocation info. This function returns a byte diff of the relocation
section."""
diff = []
relocations = {}
# TODO cache relocations
pe = pefile.PE(inp_dump.get_input_file_path(), fast_load=True)
pe.parse_data_directories(directories=[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_BASERELOC']])
if pe.OPTIONAL_HEADER.DATA_DIRECTORY[5].Size > 0:
for base_reloc in pe.DIRECTORY_ENTRY_BASERELOC:
for reloc in filter(lambda x: x.type == 3, base_reloc.entries): # HIGHLOW
relocations[reloc.rva] = reloc.struct.get_file_offset()
base = pe.OPTIONAL_HEADER.ImageBase
# now check if we reordered any relocatable data
for rins in filter(lambda x: x.inst_len >= 5, rinstrs):
# print rins.disas, hex(rins.addr), hex(rins.raddr)
# a relocatable ref can be found after the first byte (opcode) and is
# 4 bytes long (no need to check the last three bytes of the instruction)
for rva in xrange(rins.addr - base + 1, rins.addr - base + rins.inst_len - 3):
if rva in relocations:
foff = relocations[rva]
new_rva = rva + rins.raddr - rins.addr
new_rva_h = ((new_rva >> 8) & 0xf) | 3 << 4 # 3 is HIGHLOW
# print "relocations: %x %x %x %x %x %x %x" % (rva, new_rva, rva & 0xff,
# new_rva & 0xff, (rva >> 8) & 0xff, (new_rva >> 8) & 0xff, new_rva_h)
diff.append((foff + 1, chr((rva >> 8) & 0xff), chr(new_rva_h)))
diff.append((foff, chr(rva & 0xff), chr(new_rva & 0xff)))
return diff
def _getImports_pe(pth):
"""
Find the binary dependencies of PTH.
This implementation walks through the PE header
and uses library pefile for that and supports
32/64bit Windows
"""
import pefile
dlls = set()
# By default library pefile parses all PE information.
# We are only interested in the list of dependent dlls.
# Performance is improved by reading only needed information.
# https://code.google.com/p/pefile/wiki/UsageExamples
pe = pefile.PE(pth, fast_load=True)
pe.parse_data_directories(directories=[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
])
# Some libraries have no other binary dependencies. Use empty list
# in that case. Otherwise pefile would return None.
# e.g. C:\windows\system32\kernel32.dll on Wine
for entry in getattr(pe, 'DIRECTORY_ENTRY_IMPORT', []):
dll_str = winutils.convert_dll_name_to_str(entry.dll)
dlls.add(dll_str)
# We must also read the exports table to find forwarded symbols:
# http://blogs.msdn.com/b/oldnewthing/archive/2006/07/19/671238.aspx
exportSymbols = getattr(pe, 'DIRECTORY_ENTRY_EXPORT', None)
if exportSymbols:
for sym in exportSymbols.symbols:
if sym.forwarder is not None:
# sym.forwarder is for example 'KERNEL32.EnterCriticalSection'
dll, _ = sym.forwarder.split('.')
dlls.add(winutils.convert_dll_name_to_str(dll) + ".dll")
return dlls
def checkTSL(self):
_tls = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_TLS']].VirtualAddress
if _tls:
return _tls
else:
return None
def process(self):
pelib = self._getLibrary(PEFileModule().getName())
if(pelib is None):
return ""
try:
if (pelib.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT']].VirtualAddress == 0):
return ""
except Exception, e:
print str(e)
return ""
d = []
dir_ent_imp = None
try:
dir_ent_imp = pelib.DIRECTORY_ENTRY_IMPORT
except Exception, e:
print str(e)
return ""
for entry in dir_ent_imp:
dll_name = repr(entry.dll).lower()
l = []
for imp in entry.imports:
l.append(repr(imp.name).lower())
# aux={}
# aux["name"]=imp.name
# aux["ordinal"]=imp.ordinal
# l.append(aux)
dic_ent = {"lib": dll_name, "functions": l}
d.append(dic_ent)
return d
def hash_imports(hashfun, path, data=None):
if data:
pe = pefile.PE(data=data)
else:
pe = pefile.PE(path)
dll_name = os.path.split(path)[-1].split('.')[0]
try:
x = pe.IMAGE_DIRECTORY_ENTRY_EXPORT
except:
if pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT']].VirtualAddress != 0:
pe.parse_data_directories(
directories=[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT']])
ret = {}
# write name of library as well
h = hashfun(dll_name)
ret[h] = dll_name
h = hashfun(dll_name.lower())
ret[h] = dll_name.lower()
h = hashfun(dll_name.upper())
ret[h] = dll_name.upper()
if dll_name[-3:].lower() != 'dll':
h = hashfun(dll_name.lower() + '.dll')
h = hashfun(dll_name.upper() + '.DLL')
for entry in pe.DIRECTORY_ENTRY_EXPORT.symbols:
if entry.name != None:
for n in [entry.name.lower(), entry.name.upper(), entry.name]:
h = hashfun(n)
ret[h] = n
return {dll_name: ret}
def get_debug_data(pe, type=DEBUG_TYPE[u'IMAGE_DEBUG_TYPE_CODEVIEW']):
retval = None
if not hasattr(pe, u'DIRECTORY_ENTRY_DEBUG'):
# fast loaded - load directory
pe.parse_data_directories(DIRECTORY_ENTRY[u'IMAGE_DIRECTORY_ENTRY_DEBUG'])
if not hasattr(pe, u'DIRECTORY_ENTRY_DEBUG'):
raise PENoDebugDirectoryEntriesError()
else:
for entry in pe.DIRECTORY_ENTRY_DEBUG:
off = entry.struct.PointerToRawData
size = entry.struct.SizeOfData
if entry.struct.Type == type:
retval = pe.__data__[off:off+size]
break
return retval
def reorder_imports(input_dir, output_dir, architecture):
"""Swap chrome_elf.dll to be the first import of chrome.exe.
Also copy over any related files that might be needed
(pdbs, manifests etc.).
"""
# TODO(thakis): See if there is a reliable way to write the
# correct executable in the first place, so that this script
# only needs to verify that and not write a whole new exe.
input_image = os.path.join(input_dir, 'chrome.exe')
output_image = os.path.join(output_dir, 'chrome.exe')
# pefile mmap()s the whole executable, and then parses parts of
# it into python data structures for ease of processing.
# To write the file again, only the mmap'd data is written back,
# so modifying the parsed python objects generally has no effect.
# However, parsed raw data ends up in pe.Structure instances,
# and these all get serialized back when the file gets written.
# So things that are in a Structure must have their data set
# through the Structure, while other data must bet set through
# the set_bytes_*() methods.
pe = pefile.PE(input_image, fast_load=True)
if architecture == 'x64':
assert pe.PE_TYPE == pefile.OPTIONAL_HEADER_MAGIC_PE_PLUS
else:
assert pe.PE_TYPE == pefile.OPTIONAL_HEADER_MAGIC_PE
pe.parse_data_directories(directories=[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT']])
found_elf = False
for i, peimport in enumerate(pe.DIRECTORY_ENTRY_IMPORT):
if peimport.dll.lower() == 'chrome_elf.dll':
assert not found_elf, 'only one chrome_elf.dll import expected'
found_elf = True
if i > 0:
swap = pe.DIRECTORY_ENTRY_IMPORT[0]
# Morally we want to swap peimport.struct and swap.struct here,
# but the pe module doesn't expose a public method on Structure
# to get all data of a Structure without explicitly listing all
# field names.
# NB: OriginalFirstThunk and Characteristics are an union both at
# offset 0, handling just one of them is enough.
peimport.struct.OriginalFirstThunk, swap.struct.OriginalFirstThunk = \
swap.struct.OriginalFirstThunk, peimport.struct.OriginalFirstThunk
peimport.struct.TimeDateStamp, swap.struct.TimeDateStamp = \
swap.struct.TimeDateStamp, peimport.struct.TimeDateStamp
peimport.struct.ForwarderChain, swap.struct.ForwarderChain = \
swap.struct.ForwarderChain, peimport.struct.ForwarderChain
peimport.struct.Name, swap.struct.Name = \
swap.struct.Name, peimport.struct.Name
peimport.struct.FirstThunk, swap.struct.FirstThunk = \
swap.struct.FirstThunk, peimport.struct.FirstThunk
assert found_elf, 'chrome_elf.dll import not found'
pe.write(filename=output_image)
for fname in glob.iglob(os.path.join(input_dir, 'chrome.exe.*')):
shutil.copy(fname, os.path.join(output_dir, os.path.basename(fname)))
return 0
def _get_signature(self):
"""If this executable is signed, get its signature(s)."""
dir_index = pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_SECURITY"]
if len(self.pe.OPTIONAL_HEADER.DATA_DIRECTORY) < dir_index:
return []
dir_entry = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[dir_index]
if not dir_entry or not dir_entry.VirtualAddress or not dir_entry.Size:
return []
if not HAVE_MCRYPTO:
log.critical("You do not have the m2crypto library installed "
"preventing certificate extraction: "
"pip install m2crypto")
return []
signatures = self.pe.write()[dir_entry.VirtualAddress+8:]
bio = M2Crypto.BIO.MemoryBuffer(signatures)
if not bio:
return []
pkcs7_obj = M2Crypto.m2.pkcs7_read_bio_der(bio.bio_ptr())
if not pkcs7_obj:
return []
ret = []
p7 = M2Crypto.SMIME.PKCS7(pkcs7_obj)
for cert in p7.get0_signers(M2Crypto.X509.X509_Stack()) or []:
subject = cert.get_subject()
ret.append({
"serial_number": "%032x" % cert.get_serial_number(),
"common_name": subject.CN,
"country": subject.C,
"locality": subject.L,
"organization": subject.O,
"email": subject.Email,
"sha1": "%040x" % int(cert.get_fingerprint("sha1"), 16),
"md5": "%032x" % int(cert.get_fingerprint("md5"), 16),
})
if subject.GN and subject.SN:
ret[-1]["full_name"] = "%s %s" % (subject.GN, subject.SN)
elif subject.GN:
ret[-1]["full_name"] = subject.GN
elif subject.SN:
ret[-1]["full_name"] = subject.SN
return ret
def process2(self):
pe = self._getLibrary(PEFileModule().getName())
if(pe is None):
return ""
# get the security directory entry
address = pe.OPTIONAL_HEADER.DATA_DIRECTORY[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY']].VirtualAddress
if address > 0:
# Always in DER format AFAIK
derData = pe.write()[address + 8:]
else:
logging.debug("address 0")
return
(contentInfo, rest) = decoder.decode(
derData, asn1Spec=rfc2315.ContentInfo())
contentType = contentInfo.getComponentByName('contentType')
if contentType == rfc2315.signedData:
signedData = decode(contentInfo.getComponentByName(
'content'), asn1Spec=rfc2315.SignedData())
for sd in signedData:
if sd == '':
continue
signerInfos = sd.getComponentByName('signerInfos')
for si in signerInfos:
issuerAndSerial = si.getComponentByName(
'issuerAndSerialNumber')
issuer = issuerAndSerial.getComponentByName(
'issuer').getComponent()
for i in issuer:
for r in i:
at = r.getComponentByName('type')
if rfc2459.id_at_countryName == at:
cn = decode(r.getComponentByName(
'value'), asn1Spec=rfc2459.X520countryName())
print(cn[0])
elif rfc2459.id_at_organizationName == at:
on = decode(r.getComponentByName(
'value'), asn1Spec=rfc2459.X520OrganizationName())
print(on[0].getComponent())
elif rfc2459.id_at_organizationalUnitName == at:
ou = decode(r.getComponentByName(
'value'), asn1Spec=rfc2459.X520OrganizationalUnitName())
print(ou[0].getComponent())
elif rfc2459.id_at_commonName == at:
cn = decode(r.getComponentByName(
'value'), asn1Spec=rfc2459.X520CommonName())
print(cn[0].getComponent())
else:
print at
def main():
# set current dir
set_home()
# read file
data = get_file_data(path)
# get pefile infos
pe_info = pefile.PE(data = data, fast_load=False)
#--------------------------------------
#TimeDateStamp
#clean from whole file
TimeDateStamp = pe_info.FILE_HEADER.TimeDateStamp
#raw to byte string
TimeDateStamp = struct.pack('I',TimeDateStamp)
new_data = data.replace(TimeDateStamp,'\x00\x00\x00\x00')
#--------------------------------------
#DEBUG info
if hasattr(pe_info,'DIRECTORY_ENTRY_DEBUG'):
#clean debug datas
for debug in pe_info.DIRECTORY_ENTRY_DEBUG:
d_addr = debug.struct.PointerToRawData
d_size = debug.struct.SizeOfData
new_data = fill_zero(new_data,d_addr,d_size)
#clean debug dir
dir = pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DEBUG']
debug_offset = pe_info.get_offset_from_rva(pe_info.OPTIONAL_HEADER.DATA_DIRECTORY[dir].VirtualAddress)
debug_size = pe_info.OPTIONAL_HEADER.DATA_DIRECTORY[dir].Size
new_data = fill_zero(new_data,debug_offset,debug_size)
#clean links to dir
offset = pe_info.OPTIONAL_HEADER.DATA_DIRECTORY[dir].__file_offset__
size = pe_info.OPTIONAL_HEADER.DATA_DIRECTORY[dir].__format_length__
new_data = fill_zero(new_data,offset,size)
SaveFile(path,new_data)
print '[OK] post-compile\n'
def _get_digital_signers(self):
if not self.pe:
return None
retlist = None
if HAVE_CRYPTO:
address = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY']].VirtualAddress
#check if file is digitally signed
if address == 0:
return retlist
signature = self.pe.write()[address+8:]
# BIO.MemoryBuffer expects an argument of type 'str'
if type(signature) is bytearray:
signature = str(signature)
bio = BIO.MemoryBuffer(signature)
if bio:
swig_pkcs7 = m2.pkcs7_read_bio_der(bio.bio_ptr())
if swig_pkcs7:
p7 = SMIME.PKCS7(swig_pkcs7)
xst = p7.get0_signers(X509.X509_Stack())
retlist = []
if xst:
for cert in xst:
sn = cert.get_serial_number()
sha1_fingerprint = cert.get_fingerprint('sha1').lower().rjust(40, '0')
md5_fingerprint = cert.get_fingerprint('md5').lower().rjust(32, '0')
subject_str = str(cert.get_subject())
try:
cn = subject_str[subject_str.index("/CN=")+len("/CN="):]
except:
continue
retlist.append({
"sn": str(sn),
"cn": cn,
"sha1_fingerprint": sha1_fingerprint,
"md5_fingerprint": md5_fingerprint
})
return retlist