def run(self):
"""Run analysis.
@return: analysis results dict or None.
"""
if not os.path.exists(self.file_path):
return {}
try:
self.pe = pefile.PE(self.file_path)
except pefile.PEFormatError:
return {}
results = {}
results["peid_signatures"] = self._get_peid_signatures()
results["pe_imports"] = self._get_imported_symbols()
results["pe_exports"] = self._get_exported_symbols()
results["pe_sections"] = self._get_sections()
results["pe_resources"] = self._get_resources()
results["pe_versioninfo"] = self._get_versioninfo()
results["pe_imphash"] = self._get_imphash()
results["pe_timestamp"] = self._get_timestamp()
results["pdb_path"] = self._get_pdb_path()
results["signature"] = self._get_signature()
results["imported_dll_count"] = len([x for x in results["pe_imports"] if x.get("dll")])
return results
python类PEFormatError()的实例源码
def initialize(self, sample):
if(self.already_initialized):
return self.library
self.already_initialized = True
try:
self.library = pefile.PE(data=sample.getBinary(), fast_load=True)
# see if this initializations can be done on plugins.
self.library.parse_data_directories(directories=[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_TLS'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']])
except pefile.PEFormatError:
# print("parse fail")
self.library = None
# print(traceback.format_exc())
logging.error("Error parsing pefileModule with sample:%s",
sample.getID(), exc_info=True)
def __read_props(self):
def _loword(dword):
return dword & 0x0000ffff
def _hiword(dword):
return dword >> 16
self.__props = {}
try:
pe = pefile.PE(self.__path)
except pefile.PEFormatError as e:
raise ValueError(e.value)
else:
ms = pe.VS_FIXEDFILEINFO.ProductVersionMS
ls = pe.VS_FIXEDFILEINFO.ProductVersionLS
self.__props['fixed_version'] = '.'.join(map(str, (_hiword(ms), _loword(ms), _hiword(ls), _loword(ls))))
for file_info in pe.FileInfo:
if file_info.Key == b'StringFileInfo':
for st in file_info.StringTable:
for entry in st.entries.items():
self.__props[entry[0].decode('latin_1')] = entry[1].decode('latin_1')
# noinspection PyAbstractClass
def test_nt_headers_exception(self):
"""pefile should fail parsing invalid data (missing NT headers)"""
# Take a known good file.
control_file = os.path.join(REGRESSION_TESTS_DIR, 'MSVBVM60.DLL')
pe = pefile.PE(control_file, fast_load=True)
# Truncate it at the PE header and add invalid data.
pe_header_offest = pe.DOS_HEADER.e_lfanew
corrupted_data = pe.__data__[:pe_header_offest] + b'\0' * (1024 * 10)
self.assertRaises(pefile.PEFormatError, pefile.PE, data=corrupted_data)
def test_dos_header_exception_large_data(self):
"""pefile should fail parsing 10KiB of invalid data
(missing DOS header).
"""
# Generate 10KiB of zeroes
data = b'\0' * (1024 * 10)
# Attempt to parse data and verify PE header, a PEFormatError exception
# is thrown.
self.assertRaises(pefile.PEFormatError, pefile.PE, data=data)
def test_dos_header_exception_small_data(self):
"""pefile should fail parsing 64 bytes of invalid data
(missing DOS header).
"""
# Generate 64 bytes of zeroes
data = b'\0' * (64)
# Attempt to parse data and verify PE header a PEFormatError exception
# is thrown.
self.assertRaises(pefile.PEFormatError, pefile.PE, data=data)
def test_empty_file_exception(self):
"""pefile should fail parsing empty files."""
# Take a known good file
control_file = os.path.join(REGRESSION_TESTS_DIR, 'empty_file')
self.assertRaises(pefile.PEFormatError, pefile.PE, control_file)
def ScanFile(filename, signatures, minimumEntropy):
global oLogger
if not FileContentsStartsWithMZ(filename):
return
try:
pe = GetPEObject(filename)
except pefile.PEFormatError:
oLogger.PrintAndLog(('%s', '%s'), (filename, 'PEFormatError'))
return
except TypeError:
oLogger.PrintAndLog(('%s', '%s'), (filename, 'TypeError'))
return
try:
raw = pe.write()
except MemoryError:
oLogger.PrintAndLog(('%s', '%s'), (filename, 'MemoryError'))
return
entropy = pe.sections[0].entropy_H(raw)
if entropy >= minimumEntropy:
countFlagsExecute = 0
countFlagsExecuteAndWrite = 0
for section in pe.sections:
if section.IMAGE_SCN_MEM_EXECUTE:
countFlagsExecute += 1
if section.IMAGE_SCN_MEM_EXECUTE and section.IMAGE_SCN_MEM_WRITE:
countFlagsExecuteAndWrite += 1
calculatedCRC = pe.generate_checksum()
crcDifferent = pe.OPTIONAL_HEADER.CheckSum != 0 and pe.OPTIONAL_HEADER.CheckSum != calculatedCRC
info = GetVersionInfo(pe)
oLogger.PrintAndLog(('%s', '%f', '%d', '%d', '%d', '%d', '%08X', '%08X', '%d', '%s', '%s', '%s', '%s'), (filename, entropy, len(pe.sections), countFlagsExecute, countFlagsExecuteAndWrite, pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY']].Size, pe.OPTIONAL_HEADER.CheckSum, calculatedCRC, crcDifferent, time.asctime(time.gmtime(pe.FILE_HEADER.TimeDateStamp)), repr(RVOES(info, 'CompanyName')), repr(RVOES(info, 'ProductName')), hashlib.md5(raw).hexdigest()))
def __check_session(self):
if not __sessions__.is_set():
self.log('error', "No open session")
return False
if not self.pe:
try:
self.pe = pefile.PE(__sessions__.current.file.path)
except pefile.PEFormatError as e:
self.log('error', "Unable to parse PE file: {0}".format(e))
return False
return True
def getArquitecture(self):
try:
if(self.pe.OPTIONAL_HEADER.Magic == int("0x020B", 16)):
return ("PE+")
elif(self.pe.OPTIONAL_HEADER.Magic == int("0x010B", 16)):
return ("PE")
elif(self.pe.OPTIONAL_HEADER.Magic == int("0x0107", 16)):
return ("IMG_ROM")
else:
return "UNKNOWN"
except pefile.PEFormatError:
return "FORMAT"
return None
def __check_session(self):
if not __sessions__.is_set():
self.log('error', "No open session")
return False
if not self.pe:
try:
self.pe = pefile.PE(__sessions__.current.file.path)
except pefile.PEFormatError as e:
self.log('error', "Unable to parse PE file: {0}".format(e))
return False
return True
def check_pe_header(filepath):
try:
pe = pefile.PE(filepath)
if (pe.DOS_HEADER.e_magic == int(0x5a4d) and pe.NT_HEADERS.Signature == int(0x4550)):
return True
except (pefile.PEFormatError):
return False
except(Exception) as e:
print("LOG - Something weird %s - %s" % (str(e), filepath))
return False
return False
def process(self):
pelib = self._getLibrary(PEFileModule().getName())
if(pelib is None):
return ""
ret = []
if hasattr(pelib, 'DIRECTORY_ENTRY_RESOURCE'):
i = 0
for resource_type in pelib.DIRECTORY_ENTRY_RESOURCE.entries:
if resource_type.name is not None:
name = "%s" % resource_type.name
else:
name = "%s" % pefile.RESOURCE_TYPE.get(
resource_type.struct.Id)
if name is None:
name = "%d" % resource_type.struct.Id
if hasattr(resource_type, 'directory'):
for resource_id in resource_type.directory.entries:
if hasattr(resource_id, 'directory'):
for resource_lang in resource_id.directory.entries:
try:
data = pelib.get_data(
resource_lang.data.struct.OffsetToData, resource_lang.data.struct.Size)
# fd=open(name,'wb')
# fd.write(data)
# (data)
except pefile.PEFormatError:
return "corrupt"
filetype = MIME_TYPE(data, False)
lang = pefile.LANG.get(
resource_lang.data.lang, 'unknown')
sublang = pefile.get_sublang_name_for_lang(
resource_lang.data.lang, resource_lang.data.sublang)
entry = {}
entry["name"] = self._normalize(name)
entry["rva"] = self._normalize(
hex(resource_lang.data.struct.OffsetToData))
entry["size"] = self._normalize(
hex(resource_lang.data.struct.Size))
entry["type"] = self._normalize(filetype)
entry["lang"] = self._normalize(lang)
entry["sublang"] = self._normalize(sublang)
entry["sha1"] = SHA1(data)
ret.append(entry)
return ret
def main():
parser = argparse.ArgumentParser(description='Process pehash in different ways')
parser.add_argument('--totalhash', dest='totalhash', action='store_true',
default=False, help="Generate totalhash pehash")
parser.add_argument('--anymaster', dest='anymaster', action='store_true',
default=False, help="Generate anymaster pehash")
parser.add_argument('--anymaster_v1', dest='anymaster_v1', action='store_true',
default=False, help="Generate anymaster v1.0.1 pehash")
parser.add_argument('--endgame', dest='endgame', action='store_true',
default=False, help="Generate endgame pehash")
parser.add_argument('--crits', dest='crits', action='store_true',
default=False, help="Generate crits pehash")
parser.add_argument('--pehashng', dest='pehashng', action='store_true',
default=False, help="Generate pehashng (https://github.com/AnyMaster/pehashng)")
parser.add_argument('-v', dest='verbose', action='count', default=0,
help="Raise pehash exceptions instead of ignoring.")
parser.add_argument('binaries', metavar='binaries', type=str, nargs='+',
help='list of pe files to process')
args = parser.parse_args()
for binary in args.binaries:
try:
pe = pefile.PE(binary)
do_all = not (args.totalhash or args.anymaster or args.anymaster_v1 or
args.endgame or args.crits or args.pehashng)
raise_on_error = True if args.verbose > 0 else False
if args.totalhash or do_all:
print("{}\tTotalhash\t{}".format(binary, totalhash_hex(pe=pe, raise_on_error=raise_on_error)))
if args.anymaster or do_all:
print("{}\tAnyMaster\t{}".format(binary, anymaster_hex(pe=pe, raise_on_error=raise_on_error)))
if args.anymaster_v1 or do_all:
print("{}\tAnyMaster_v1.0.1\t{}".format(binary, anymaster_v1_0_1_hex(pe=pe, raise_on_error=raise_on_error)))
if args.endgame or do_all:
print("{}\tEndGame\t{}".format(binary, endgame_hex(pe=pe, raise_on_error=raise_on_error)))
if args.crits or do_all:
print("{}\tCrits\t{}".format(binary, crits_hex(pe=pe, raise_on_error=raise_on_error)))
if args.pehashng or do_all:
print("{}\tpeHashNG\t{}".format(binary, pehashng_hex(pe=pe, raise_on_error=raise_on_error)))
except pefile.PEFormatError:
print("ERROR: {} is not a PE file".format(binary))
def main():
"""
arg1 - path to files
arg2 - path to res dir
"""
ben_files = []
corrupted_files = []
for file_name in os.listdir(sys.argv[1]):
path_to_file = join(sys.argv[1], file_name)
if not os.path.isfile(path_to_file): continue
print "Try to ent-split file", file_name
try:
ben_files.append([file_name, build_tree(
get_execution_block(path_to_file))])
except pefile.PEFormatError:
print "Can't process file"
corrupted_files.append(file_name)
for ben_file in ben_files:
ben_file.append(quad_fourier(ben_file[1]))
ben_sorted_groups = []
for ben_file in ben_files:
was_found = False
if(ben_file[2][0] == 0.0): continue
for group in ben_sorted_groups:
if abs(group[0][0] - ben_file[2][0]) / group[0][0] < limit and \
abs(group[0][1] - ben_file[2][1]) / group[0][1] < limit:
was_found = True
group[1].append(ben_file[0])
break
if not was_found:
ben_sorted_groups.append([ben_file[2], [ben_file[0]]])
unsorted_groups = filter(lambda x: len(x[1]) == 1, ben_sorted_groups)
ben_sorted_groups = filter(lambda x: len(x[1]) > 1, ben_sorted_groups)
res_dir = sys.argv[2]
for group in ben_sorted_groups:
dir_name = join(res_dir, "[{0}]V={1},E={2}".format(len(group[1]), "%.2f"%group[0][0], "%.2f"%group[0][1]))
os.mkdir(join(dir_name))
for file_name in group[1]:
shutil.copyfile(join(sys.argv[1], file_name), join(dir_name, os.path.basename(file_name)))
def get_attr_pe(r, sha256):
path = r.hget(sha256, 'path')
try:
pe = pefile.PE(path)
except (pefile.PEFormatError):
print("{} not a PE file".format(path))
return False
r.hset(sha256, 'is_pefile', True)
if hasattr(pe, 'FILE_HEADER'):
r.hset(sha256, 'timestamp', pe.FILE_HEADER.TimeDateStamp)
r.hset(sha256, 'timestamp_iso', datetime.datetime.fromtimestamp(pe.FILE_HEADER.TimeDateStamp).isoformat())
r.zincrby('timestamps', pe.FILE_HEADER.TimeDateStamp)
r.sadd('timestamp:{}'.format(pe.FILE_HEADER.TimeDateStamp), sha256)
imphash = pe.get_imphash()
r.hset(sha256, 'imphash', imphash)
r.zincrby('imphashs', imphash)
r.sadd('imphash:{}'.format(imphash), sha256)
if hasattr(pe, 'OPTIONAL_HEADER'):
r.hset(sha256, 'entrypoint', pe.OPTIONAL_HEADER.AddressOfEntryPoint)
r.zincrby('entrypoints', pe.OPTIONAL_HEADER.AddressOfEntryPoint)
r.sadd('entrypoint:{}'.format(pe.OPTIONAL_HEADER.AddressOfEntryPoint), sha256)
if hasattr(pe, 'FILE_HEADER'):
r.hset(sha256, 'secnumber', pe.FILE_HEADER.NumberOfSections)
r.zincrby('secnumbers', pe.FILE_HEADER.NumberOfSections)
r.sadd('secnumber:{}'.format(pe.FILE_HEADER.NumberOfSections), sha256)
if hasattr(pe, 'VS_VERSIONINFO'):
for entry in pe.FileInfo:
if hasattr(entry, 'StringTable'):
for st_entry in entry.StringTable:
ofn = st_entry.entries.get(b'OriginalFilename')
if ofn:
if isinstance(ofn, bytes):
o = ofn.decode()
else:
o = ofn
r.hset(sha256, 'originalfilename', o)
r.zincrby('originalfilenames', o)
r.sadd(u'originalfilename:{}'.format(o), sha256)
# Section info: names, sizes, entropy vals
for section in pe.sections:
name = section.Name.decode('utf-8', 'ignore').replace('\x00', '')
r.sadd('{}:secnames'.format(sha256), name)
r.hset('{}:{}'.format(sha256, name), 'size', section.SizeOfRawData)
r.hset('{}:{}'.format(sha256, name), 'entropy', H(section.get_data()))
# adding section info to PE data
r.hset(sha256, 'nb_tls', check_tls(pe))
r.hset(sha256, 'ep_section', check_ep_section(pe))
return True
# Returns Entropy value for given data chunk