def PE_CompilationTimestamp(self):
if self.pe:
return self.pedict['FILE_HEADER']['TimeDateStamp']['Value']
# PE: OS Version #
python类PE的实例源码
def PE_sections(self):
if self.pe:
table = []
for entry in self.pe.sections:
sect = {'entryname':str(entry.Name),'SizeOfRawData':hex(entry.SizeOfRawData),
'Entropy':entry.get_entropy(),
'MD5':entry.get_hash_md5(),
'SHA1':entry.get_hash_sha1(),
'SHA256':entry.get_hash_sha256(),
'SHA512':entry.get_hash_sha512()}
table.append(sect)
sect = {}
return table
# PE :Return dump_dict() for debug only #
def __init__(self, sample):
super(task_peinfo, self).__init__()
self.sid = sample.id
self.compile_timestamp = None
self.import_hash = ""
self.matches = []
self.metadata_extracted = []
self.fpath = sample.storage_file
self.tstart = None
self.tmessage = "PEINFO TASK %d :: " % (sample.id)
# ignore non-PE files
if "application/x-dosexec" not in sample.mime_type:
self.is_interrested = False
return
def test_selective_loading_integrity(self):
"""Verify integrity of loading the separate elements of the file as
opposed to do a single pass.
"""
control_file = os.path.join(REGRESSION_TESTS_DIR, 'MSVBVM60.DLL')
pe = pefile.PE(control_file, fast_load=True)
# Load the 16 directories.
pe.parse_data_directories(directories=list(range(0x10)))
# Do it all at once.
pe_full = pefile.PE(control_file, fast_load=False)
# Verify both methods obtained the same results.
self.assertEqual(pe_full.dump_info(), pe.dump_info())
pe.close()
pe_full.close()
def test_imphash(self):
"""Test imphash values."""
self.assertEqual(
pefile.PE(os.path.join(
REGRESSION_TESTS_DIR, 'mfc40.dll')).get_imphash(),
'b0f969ff16372d95ef57f05aa8f69409')
self.assertEqual(
pefile.PE(os.path.join(
REGRESSION_TESTS_DIR, 'kernel32.dll')).get_imphash(),
'437d147ea3f4a34fff9ac2110441696a')
self.assertEqual(
pefile.PE(os.path.join(
REGRESSION_TESTS_DIR, 'cmd.exe')).get_imphash(),
'd0058544e4588b1b2290b7f4d830eb0a')
def GetPEObject(filename):
if filename.lower().endswith('.zip'):
try:
oZipfile = zipfile.ZipFile(filename, 'r')
file = oZipfile.open(oZipfile.infolist()[0], 'r', C2BIP3('infected'))
except:
print('Error opening file %s' % filename)
print(sys.exc_info()[1])
sys.exit()
oPE = pefile.PE(data=file.read())
file.close()
oZipfile.close()
elif filename == '':
if sys.platform == "win32":
import msvcrt
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
oPE = pefile.PE(data=sys.stdin.read())
else:
oPE = pefile.PE(filename)
return oPE
def sections(self):
if not self.__check_session():
return
rows = []
for section in self.pe.sections:
rows.append([
section.Name,
hex(section.VirtualAddress),
hex(section.Misc_VirtualSize),
section.SizeOfRawData,
section.get_entropy()
])
self.log('info', "PE Sections:")
self.log('table', dict(header=['Name', 'RVA', 'VirtualSize', 'RawDataSize', 'Entropy'], rows=rows))
def _patch_pe(self, output, DEBUG=False):
"""
Generate a new PE adding a new section, '.ropf'
Using patch_size, it is possible to create seperate executables for testing purpose
"""
def md5sum(f):
import hashlib
return hashlib.md5(open(f, 'rb').read()).hexdigest()
self._get_reloc_entries()
print "\t[Before] %s (%s)" % (self.target, md5sum(self.target))
self.peinfo.printEssentialOptionalInfo()
# Write new PE applying all displaced regions
adjPE = peLib.AdjustPE(self.pe)
adjPE.update_section(self.moving_regions, self.moving_bin_total, self.reloc_entries, self.selected_diffs, DEBUG)
self.pe.write(filename=output)
print "\t[After] %s (%s)" % (output, md5sum(output))
peLib.PEInfo(pefile.PE(output)).printEssentialOptionalInfo()
def check_args(args):
# check if an input file is given
if len(args) == 0:
parser.error("no input file")
elif len(args) > 1:
parser.error("more than one input files")
# check if the input file exists
if not os.path.exists(args[0]):
parser.error("cannot access input file '%s'" % args[0])
sys.exit(1)
# check if the input file is executable
if not os.path.isfile(args[0]):
print 'The given arg is not a file.'
sys.exit(1)
else:
pe = pefile.PE(args[0])
if not (pe.is_exe() or pe.is_dll()):
print 'Input file should be executable (PE format: exe or dll)'
sys.exit(1)
return True
def get_sections(binary_file):
"""
Gets file sections => thanks to PE.
Returns an multiDimensional array: [binary_file, sections_exe, sections_data]
"""
sections_exe = []
sections_data = []
pe = pefile.PE(data=binary_file)
sections = pe.sections
for section in sections:
# 0x20000000 IMAGE_SCN_MEM_EXECUTE
# 0x40000000 IMAGE_SCN_MEM_READ
# 0x00000020 IMAGE_SCN_CNT_CODE
if all(section.Characteristics & n for n in [0x20000000, 0x40000000, 0x00000020]):
sections_exe.append(section)
else:
sections_data.append(section)
return [binary_file, sections_exe, sections_data]
def check_verinfo(self, pe):
""" Determine the version info in a PE file """
ret = []
if hasattr(pe, 'VS_VERSIONINFO'):
if hasattr(pe, 'FileInfo'):
for entry in pe.FileInfo:
if hasattr(entry, 'StringTable'):
for st_entry in entry.StringTable:
for str_entry in st_entry.entries.items():
ret.append(convert_to_printable(str_entry[0]) + ': ' + convert_to_printable(str_entry[1]) )
elif hasattr(entry, 'Var'):
for var_entry in entry.Var:
if hasattr(var_entry, 'entry'):
ret.append(convert_to_printable(var_entry.entry.keys()[0]) + ': ' + var_entry.entry.values()[0])
return '\n'.join(ret)
def getImports(pth):
"""
Forwards to the correct getImports implementation for the platform.
"""
if is_win or is_cygwin:
if pth.lower().endswith(".manifest"):
return []
try:
return _getImports_pe(pth)
except Exception as exception:
# Assemblies can pull in files which aren't necessarily PE,
# but are still needed by the assembly. Any additional binary
# dependencies should already have been handled by
# selectAssemblies in that case, so just warn, return an empty
# list and continue.
logger.warn('Can not get binary dependencies for file: %s', pth, exc_info=1)
return []
elif is_darwin:
return _getImports_macholib(pth)
else:
return _getImports_ldd(pth)
def overlay(self):
overlayOffset = self.pe.get_overlay_data_start_offset()
raw= self.pe.write()
if overlayOffset == None:
print (' No overlay Data Present')
else:
print ('Overlay Data is present which is often associated with malware')
print(' Start offset: 0x%08x' % overlayOffset)
overlaySize = len(raw[overlayOffset:])
print(' Size: 0x%08x %s %.2f%%' % (overlaySize, self.NumberOfBytesHumanRepresentation(overlaySize), float(overlaySize) / float(len(raw)) * 100.0))
print(' MD5: %s' % hashlib.md5(raw[overlayOffset:]).hexdigest())
print(' SHA-256: %s' % hashlib.sha256(raw[overlayOffset:]).hexdigest())
overlayMagic = raw[overlayOffset:][:4]
if type(overlayMagic[0]) == int:
overlayMagic = ''.join([chr(b) for b in overlayMagic])
print(' MAGIC: %s %s' % (binascii.b2a_hex(overlayMagic.encode('utf-8')), ''.join([self.IFF(ord(b) >= 32, b, '.') for b in overlayMagic])))
print(' PE file without overlay:')
print(' MD5: %s' % hashlib.md5(raw[:overlayOffset]).hexdigest())
print(' SHA-256: %s' % hashlib.sha256(raw[:overlayOffset]).hexdigest())
# Added by Yang
def run(self):
"""Run analysis.
@return: analysis results dict or None.
"""
if not os.path.exists(self.file_path):
return {}
try:
self.pe = pefile.PE(self.file_path)
except pefile.PEFormatError:
return {}
results = {}
results["peid_signatures"] = self._get_peid_signatures()
results["pe_imports"] = self._get_imported_symbols()
results["pe_exports"] = self._get_exported_symbols()
results["pe_sections"] = self._get_sections()
results["pe_resources"] = self._get_resources()
results["pe_versioninfo"] = self._get_versioninfo()
results["pe_imphash"] = self._get_imphash()
results["pe_timestamp"] = self._get_timestamp()
results["pdb_path"] = self._get_pdb_path()
results["signature"] = self._get_signature()
results["imported_dll_count"] = len([x for x in results["pe_imports"] if x.get("dll")])
return results
def _fixup_pe_header(self, pe):
"""Fixes the PE header from an in-memory representation to an
on-disk representation."""
for section in pe.sections:
section.PointerToRawData = section.VirtualAddress
section.SizeOfRawData = max(
section.Misc_VirtualSize, section.SizeOfRawData
)
reloc = pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_BASERELOC"]
if len(pe.OPTIONAL_HEADER.DATA_DIRECTORY) < reloc:
return
reloc = pe.OPTIONAL_HEADER.DATA_DIRECTORY[reloc]
if not reloc.VirtualAddress or not reloc.Size:
return
# Disable relocations as those have already been applied.
reloc.VirtualAddress = reloc.Size = 0
pe.FILE_HEADER.Characteristics |= \
pefile.IMAGE_CHARACTERISTICS["IMAGE_FILE_RELOCS_STRIPPED"]
def __init__(self, file_path):
super(PEExecutable, self).__init__(file_path)
self.helper = pefile.PE(self.fp)
self.architecture = self._identify_arch()
if self.architecture is None:
raise Exception('Architecture is not recognized')
logging.debug('Initialized {} {} with file \'{}\''.format(self.architecture, type(self).__name__, file_path))
self.pack_endianness = '<'
self.sections = [section_from_pe_section(s, self.helper) for s in self.helper.sections]
if hasattr(self.helper, 'DIRECTORY_ENTRY_IMPORT'):
self.libraries = [dll.dll for dll in self.helper.DIRECTORY_ENTRY_IMPORT]
else:
self.libraries = []
def check_aslr():
# first check for a potentially rebased user32.dll
from ctypes import windll
from ctypes import wintypes
check_dlls = ["user32.dll", "kernel32.dll", "ntdll.dll"]
offsets = []
is_aslr = False
windll.kernel32.GetModuleHandleW.restype = wintypes.HMODULE
windll.kernel32.GetModuleHandleW.argtypes = [wintypes.LPCWSTR]
windll.kernel32.GetModuleFileNameW.restype = wintypes.DWORD
windll.kernel32.GetModuleFileNameW.argtypes = [wintypes.HANDLE, wintypes.LPWSTR, wintypes.DWORD]
for dll_name in check_dlls:
h_module_base = windll.kernel32.GetModuleHandleW(dll_name)
# next get the module's file path
module_path = wintypes.create_unicode_buffer(255)
windll.kernel32.GetModuleFileNameW(h_module_base, module_path, 255)
# then the ImageBase from python.exe file
pe = pefile.PE(module_path.value)
pe_header_base_addr = pe.OPTIONAL_HEADER.ImageBase
offsets.append(pe_header_base_addr - h_module_base)
for dll_name, offset in zip(check_dlls, offsets):
LOG.debug("Memory vs. File ImageBase offset (%s): 0x%x", dll_name, offset)
is_aslr |= offset != 0
return is_aslr
def exeImportsFuncs(filename, allstrings):
try:
pe = pefile.PE(filename)
importlist = []
for entry in pe.DIRECTORY_ENTRY_IMPORT:
importlist.append(entry.dll)
for imp in entry.imports:
importlist.append(imp.name)
for imp in importlist:
if imp in allstrings: allstrings.remove(imp)
if len(allstrings) > 0:
return list(set(allstrings))
else:
print '[!] No Extractable Attributes Present in Hash: '+str(md5sum(filename)) + ' Please Remove it from the Sample Set and Try Again!'
sys.exit(1)
except:
return allstrings
#EML File parsing, and comparision based on dictionary entries .... plus regexes looking for domains/links in text/html
def initialize(self, sample):
if(self.already_initialized):
return self.library
self.already_initialized = True
try:
self.library = pefile.PE(data=sample.getBinary(), fast_load=True)
# see if this initializations can be done on plugins.
self.library.parse_data_directories(directories=[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_TLS'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']])
except pefile.PEFormatError:
# print("parse fail")
self.library = None
# print(traceback.format_exc())
logging.error("Error parsing pefileModule with sample:%s",
sample.getID(), exc_info=True)
def check_verinfo(self, pe):
""" Determine the version info in a PE file """
ret = []
if hasattr(pe, 'VS_VERSIONINFO'):
if hasattr(pe, 'FileInfo'):
for entry in pe.FileInfo:
if hasattr(entry, 'StringTable'):
for st_entry in entry.StringTable:
for str_entry in st_entry.entries.items():
ret.append(convert_to_printable(str_entry[0]) + ': ' + convert_to_printable(str_entry[1]) )
elif hasattr(entry, 'Var'):
for var_entry in entry.Var:
if hasattr(var_entry, 'entry'):
ret.append(convert_to_printable(var_entry.entry.keys()[0]) + ': ' + var_entry.entry.values()[0])
return '\n'.join(ret)
def _get_overlay(self):
"""Get information on the PE overlay
@return: overlay dict or None.
"""
if not self.pe:
return None
try:
off = self.pe.get_overlay_data_start_offset()
except:
log.error("Your version of pefile is out of date. Please update to the latest version on https://github.com/erocarrera/pefile")
return None
if off is None:
return None
overlay = {}
overlay["offset"] = "0x{0:08x}".format(off)
overlay["size"] = "0x{0:08x}".format(len(self.pe.__data__) - off)
return overlay
def get_codedconfig(data):
coded_config = None
try:
pe = pe = pype32.PE(data=data)
m = pe.ntHeaders.optionalHeader.dataDirectory[14].info
for i in m.directory.resources.info:
if i['name'] == "Data.bin":
coded_config = i["data"]
except:
pe = pefile.PE(data=data)
for entry in pe.DIRECTORY_ENTRY_RESOURCE.entries:
if str(entry.name) == "RC_DATA" or "RCData":
new_dirs = entry.directory
for res in new_dirs.entries:
data_rva = res.directory.entries[0].data.struct.OffsetToData
size = res.directory.entries[0].data.struct.Size
data = pe.get_memory_mapped_image()[data_rva:data_rva+size]
coded_config = data
# Icons can get in the way.
if coded_config.startswith('\x28\x00\x00'):
break
return coded_config
def extract_config(rawData):
try:
pe = pefile.PE(data=rawData)
try:
rt_string_idx = [
entry.id for entry in
pe.DIRECTORY_ENTRY_RESOURCE.entries].index(pefile.RESOURCE_TYPE['RT_RCDATA'])
except ValueError, e:
return None
except AttributeError, e:
return None
rt_string_directory = pe.DIRECTORY_ENTRY_RESOURCE.entries[rt_string_idx]
for entry in rt_string_directory.directory.entries:
if str(entry.name) == 'XTREME':
data_rva = entry.directory.entries[0].data.struct.OffsetToData
size = entry.directory.entries[0].data.struct.Size
data = pe.get_memory_mapped_image()[data_rva:data_rva+size]
return data
except:
return None
def config(raw_data):
pe = pefile.PE(data=raw_data, fast_load=False)
for section in pe.sections:
if section.Name.rstrip('\x00') == ".bss":
config = section.get_data()[:section.SizeOfRawData]
if config == None:
return
config_list = config.split(b'\x00')
# Crude check to make sure we have a decrypted section
if config_list[0] == "ADVAPI32.DLL":
config_dict = parse_config(config_list)
return config_dict
return
def config(raw_data):
pe = pefile.PE(data=raw_data, fast_load=False)
data = yara_scan(raw_data, '$opcodes03')
key_va = struct.unpack('i', data[0][19:23])[0]
key_hex = pe_data(pe, key_va, 16)
data_2 = yara_scan(raw_data, '$opcodes04')
config_list = []
for section in data_2:
length = struct.unpack('i', section[9:13])[0]
data_va = struct.unpack('i', section[17:21])[0]
sec_data = pe_data(pe, data_va, length)
dec = decrypt_rc4(key_hex, sec_data)
if '\x00' in dec:
dec = dec[:dec.index('\x00')]
config_list.append(dec)
config_dict = parse_config(config_list)
return config_dict
def get_config(data):
try:
pe = pefile.PE(data=data)
try:
rt_string_idx = [
entry.id for entry in
pe.DIRECTORY_ENTRY_RESOURCE.entries].index(pefile.RESOURCE_TYPE['RT_RCDATA'])
except ValueError, e:
return
except AttributeError, e:
return
rt_string_directory = pe.DIRECTORY_ENTRY_RESOURCE.entries[rt_string_idx]
for entry in rt_string_directory.directory.entries:
if str(entry.name) == "CFG":
data_rva = entry.directory.entries[0].data.struct.OffsetToData
size = entry.directory.entries[0].data.struct.Size
data = pe.get_memory_mapped_image()[data_rva:data_rva+size]
cleaned = data.replace('\x00', '')
raw_config = cleaned.split('##')
return raw_config
except:
return
def get_config(data):
try:
pe = pefile.PE(data=data)
try:
rt_string_idx = [
entry.id for entry in
pe.DIRECTORY_ENTRY_RESOURCE.entries].index(pefile.RESOURCE_TYPE['RT_RCDATA'])
except ValueError, e:
return
except AttributeError, e:
return
rt_string_directory = pe.DIRECTORY_ENTRY_RESOURCE.entries[rt_string_idx]
for entry in rt_string_directory.directory.entries:
if str(entry.name) == "GREAME":
data_rva = entry.directory.entries[0].data.struct.OffsetToData
size = entry.directory.entries[0].data.struct.Size
data = pe.get_memory_mapped_image()[data_rva:data_rva+size]
raw_config = data.split('####@####')
return raw_config
except:
return None
def get_long_line(data):
try:
raw_config = None
pe = pefile.PE(data=data)
for entry in pe.DIRECTORY_ENTRY_RESOURCE.entries:
if str(entry.name) == "RT_RCDATA":
new_dirs = entry.directory
for entry in new_dirs.entries:
if str(entry.name) == '0':
data_rva = entry.directory.entries[0].data.struct.OffsetToData
size = entry.directory.entries[0].data.struct.Size
data = pe.get_memory_mapped_image()[data_rva:data_rva+size]
raw_config = data
except:
raw_config = None
if raw_config != None:
return raw_config, 'V1'
try:
m = re.search('\x69\x00\x6F\x00\x6E\x00\x00\x59(.*)\x6F\x43\x00\x61\x00\x6E', data)
raw_config = m.group(0)[4:-12]
return raw_config, 'V2'
except:
return None, None
def extract_config(raw_data):
pe = pefile.PE(data=raw_data)
try:
rt_string_idx = [
entry.id for entry in
pe.DIRECTORY_ENTRY_RESOURCE.entries
].index(pefile.RESOURCE_TYPE['RT_RCDATA'])
except:
return None
rt_string_directory = pe.DIRECTORY_ENTRY_RESOURCE.entries[rt_string_idx]
for entry in rt_string_directory.directory.entries:
if str(entry.name) == 'CFG':
data_rva = entry.directory.entries[0].data.struct.OffsetToData
size = entry.directory.entries[0].data.struct.Size
data = pe.get_memory_mapped_image()[data_rva:data_rva+size]
return data
def main(args=None):
(options, args) = parser.parse_args(args)
if options.input_object and options.output_object:
print("stripping an object file")
obj.strip(options.input_object, options.output_object)
elif options.input_image and options.output_image:
print("fixing an image")
pe = pefile.PE(options.input_image)
pe.default_timestamp()
pe.write(options.output_image)
pe.close()
elif options.input_lib and options.output_lib:
lib.fix_lib_timestamps(options.input_lib, options.output_lib)
else:
parser.print_help()