def main(argv):
parser = OptionParser()
parser.add_option("-f", "--filedir", action="store", dest="filedir", help="path to directory containing files to unpack", metavar="DIR")
(options, args) = parser.parse_args()
if options.filedir == None:
parser.error("Specify dir with files")
else:
try:
filelist = open(os.path.join(options.filedir, "LIST")).readlines()
except:
parser.error("'LIST' not found in file dir")
## first process the LIST file
pkgmeta = []
for unpackfile in filelist:
try:
unpacks = unpackfile.strip().split()
if len(unpacks) == 3:
origin = "unknown"
(package, version, filename) = unpacks
else:
(package, version, filename, origin) = unpacks
pkgmeta.append((options.filedir, filename))
except Exception, e:
# oops, something went wrong
print >>sys.stderr, e
pool = multiprocessing.Pool()
unpackresults = pool.map(unpack, pkgmeta, 1)
pool.terminate()
for i in unpackresults:
if i != None:
(filename, result) = i
if not result:
print "corrupt archive: %s" % filename
python类open()的实例源码
def scanArchitecture(filename, tags, cursor, conn, filehashes, blacklist=[], scanenv={}, scandebug=False, unpacktempdir=None):
if not 'elf' in tags:
return
archres = elfcheck.getArchitecture(filename, tags)
if archres != None:
return (['architecture'], archres)
## search markers for various open source programs
## This search is not accurate, but might come in handy in some situations
def searchUnpackLzip(filename, tempdir=None, blacklist=[], offsets={}, scanenv={}, debug=False):
hints = {}
if not 'lzip' in offsets:
return ([], blacklist, [], hints)
if offsets['lzip'] == []:
return ([], blacklist, [], hints)
filesize = os.stat(filename).st_size
if filesize < 5:
return ([], blacklist, [], hints)
diroffsets = []
tags = []
counter = 1
for offset in offsets['lzip']:
blacklistoffset = extractor.inblacklist(offset, blacklist)
if blacklistoffset != None:
continue
## sanity check, only versions 0 or 1 are supported
lzipfile = open(filename, 'rb')
lzipfile.seek(offset+4)
lzipversion = lzipfile.read(1)
lzipfile.close()
if struct.unpack('<B', lzipversion)[0] > 1:
continue
tmpdir = dirsetup(tempdir, filename, "lzip", counter)
(res, lzipsize) = unpackLzip(filename, offset, tmpdir)
if res != None:
diroffsets.append((res, offset, lzipsize))
blacklist.append((offset, offset+lzipsize))
counter = counter + 1
if offset == 0 and lzipsize == filesize:
tags.append("compressed")
tags.append("lzip")
else:
## cleanup
os.rmdir(tmpdir)
return (diroffsets, blacklist, tags, hints)
def gzipcrc32(filename):
datafile = open(filename, 'rb')
datafile.seek(0)
databuffer = datafile.read(10000000)
crc32 = binascii.crc32('')
while databuffer != '':
crc32 = binascii.crc32(databuffer, crc32)
databuffer = datafile.read(10000000)
datafile.close()
crc32 = crc32 & 0xffffffff
return crc32
def searchUnpackKnownGzip(filename, tempdir=None, scanenv={}, debug=False):
## first check if the file actually could be a valid gzip file
gzipfile = open(filename, 'rb')
gzipfile.seek(0)
gzipheader = gzipfile.read(3)
gzipfile.close()
if gzipheader != fsmagic.fsmagic['gzip']:
return ([], [], [], {})
## then try unpacking it.
res = searchUnpackGzip(filename, tempdir, [], {'gzip': [0]}, scanenv, debug)
(diroffsets, blacklist, newtags, hints) = res
failed = False
## there were results, so check if they were successful
if diroffsets != []:
if len(diroffsets) != 1:
failed = True
else:
(dirpath, startoffset, endoffset) = diroffsets[0]
if startoffset != 0 or endoffset != os.stat(filename).st_size:
failed = True
if failed:
for i in diroffsets:
(dirpath, startoffset, endoffset) = i
try:
shutil.rmtree(dirpath)
except:
pass
return ([], [], [], {})
else:
return (diroffsets, blacklist, newtags, hints)
return ([], [], [], {})
def searchUnpackKnownBzip2(filename, tempdir=None, scanenv={}, debug=False):
## first check if the file actually could be a valid gzip file
bzip2file = open(filename, 'rb')
bzip2file.seek(0)
bzip2header = bzip2file.read(3)
bzip2file.close()
if bzip2header != fsmagic.fsmagic['bz2']:
return ([], [], [], {})
## then try unpacking it.
res = searchUnpackBzip2(filename, tempdir, [], {'bz2': [0]}, scanenv, debug)
(diroffsets, blacklist, newtags, hints) = res
failed = False
## there were results, so check if they were successful
if diroffsets != []:
if len(diroffsets) != 1:
failed = True
else:
(dirpath, startoffset, endoffset) = diroffsets[0]
if startoffset != 0 or endoffset != os.stat(filename).st_size:
failed = True
if failed:
for i in diroffsets:
(dirpath, startoffset, endoffset) = i
try:
shutil.rmtree(dirpath)
except:
pass
return ([], [], [], {})
else:
return (diroffsets, blacklist, newtags, hints)
return ([], [], [], {})
## search and unpack bzip2 compressed files
def searchUnpackRZIP(filename, tempdir=None, blacklist=[], offsets={}, scanenv={}, debug=False):
hints = {}
if not 'rzip' in offsets:
return ([], blacklist, [], hints)
if offsets['rzip'] == []:
return ([], blacklist, [], hints)
if offsets['rzip'][0] != 0:
return ([], blacklist, [], hints)
if os.stat(filename).st_size < 10:
return ([], blacklist, [], hints)
diroffsets = []
tags = []
offset = 0
rzipfile = open(filename, 'rb')
rzipfile.seek(0)
rzipdata = rzipfile.read(10)
rzipfile.close()
rzipsize = struct.unpack('>L', rzipdata[6:10])[0]
blacklistoffset = extractor.inblacklist(offset, blacklist)
if blacklistoffset != None:
return (diroffsets, blacklist, tags, hints)
tmpdir = dirsetup(tempdir, filename, "rzip", 1)
res = unpackRZIP(filename, offset, rzipsize, tmpdir)
if res != None:
rzipdir = res
diroffsets.append((rzipdir, offset, 0))
#blacklist.append((offset, offset + unpackrzipsize))
#if offset == 0:
# tags.append("compressed")
# tags.append("rzip")
else:
## cleanup
os.rmdir(tmpdir)
return (diroffsets, blacklist, tags, hints)
def searchUnpackAndroidSparse(filename, tempdir=None, blacklist=[], offsets={}, scanenv={}, debug=False):
hints = {}
if not 'android-sparse' in offsets:
return ([], blacklist, [], hints)
if offsets['android-sparse'] == []:
return ([], blacklist, [], hints)
diroffsets = []
counter = 1
tags = []
for offset in offsets['android-sparse']:
blacklistoffset = extractor.inblacklist(offset, blacklist)
if blacklistoffset != None:
continue
## first see if the major version is correct
sparsefile = open(filename, 'rb')
sparsefile.seek(offset+4)
sparsedata = sparsefile.read(2)
sparsefile.close()
if len(sparsedata) != 2:
break
majorversion = struct.unpack('<H', sparsedata)[0]
if not majorversion == 1:
continue
tmpdir = dirsetup(tempdir, filename, "android-sparse", counter)
res = unpackAndroidSparse(filename, offset, tmpdir)
if res != None:
(sparsesize, sparsedir) = res
diroffsets.append((sparsedir, offset, sparsesize))
blacklist.append((offset, offset + sparsesize))
counter = counter + 1
else:
## cleanup
os.rmdir(tmpdir)
return (diroffsets, blacklist, tags, hints)
def searchUnpackIHex(filename, tempdir=None, blacklist=[], offsets={}, scanenv={}, debug=False):
hints = {}
tags = []
diroffsets = []
counter = 1
filesize = os.stat(filename).st_size
tmpdir = dirsetup(tempdir, filename, "ihex", counter)
tmpfile = tempfile.mkstemp(dir=tmpdir)
datafile = open(filename, 'r')
foundend = False
offset = 0
for d in datafile:
if foundend:
os.fdopen(tmpfile[0]).close()
datafile.close()
os.rmdir(tmpdir)
return (diroffsets, blacklist, tags, hints)
b = d.strip()
if not b.startswith(':'):
if not b.startswith('#'):
break
if len(b) < 3:
break
bytecount = ord(b[1:3].decode('hex'))
address = struct.unpack('>H', b[3:7].decode('hex'))
recordtype = ord(b[7:9].decode('hex'))
if recordtype == 1:
foundend = True
break
if recordtype != 0:
continue
databytes = b[9:9+bytecount*2].decode('hex')
os.write(tmpfile[0], databytes)
os.fdopen(tmpfile[0]).close()
datafile.close()
diroffsets.append((tmpdir, offset, filesize))
blacklist.append((offset, offset + filesize))
return (diroffsets, blacklist, tags, hints)
## sometimes MP3 audio files are embedded into binary blobs
def run(self):
ret = []
source = open(self.filepath, "rb").read()
# Get rid of superfluous comments.
source = re.sub("/\\*.*?\\*/", "", source, flags=re.S)
for script in re.findall(self.script_re, source, re.I | re.S):
try:
x = bs4.BeautifulSoup(script, "html.parser")
language = x.script.attrs.get("language", "").lower()
except:
language = None
# We can't rely on bs4 or any other HTML/XML parser to provide us
# with the raw content of the xml tag as they decode html entities
# and all that, leaving us with a corrupted string.
source = re.match("<.*>(.*)</.*>$", script, re.S).group(0)
# Decode JScript.Encode encoding.
if language in ("jscript.encode", "vbscript.encode"):
source = self.decode(source)
ret.append(to_unicode(source))
return ret
def _get_keys(self):
"""Get any embedded plaintext public and/or private keys."""
buf = open(self.file_path).read()
ret = set()
ret.update(re.findall(self.PUBKEY_RE, buf))
ret.update(re.findall(self.PRIVKEY_RE, buf))
return list(ret)
def get_filetype(data):
"""There are two versions of python-magic floating around, and annoyingly, the interface
changed between versions, so we try one method and if it fails, then we try the other"""
if sys.modules.has_key('magic'):
try:
ms = magic.open(magic.MAGIC_NONE)
ms.load()
return ms.buffer(data)
except:
return magic.from_buffer(data)
def io_dd(indir, offset, size, outdir):
"""
Given a path to a target file, extract size bytes from specified offset
to given output file.
"""
if not size:
return
with open(indir, "rb") as ifp:
with open(outdir, "wb") as ofp:
ifp.seek(offset, 0)
ofp.write(ifp.read(size))
def io_md5(target):
"""
Performs MD5 with a block size of 64kb.
"""
blocksize = 65536
hasher = hashlib.md5()
with open(target, 'rb') as ifp:
buf = ifp.read(blocksize)
while buf:
hasher.update(buf)
buf = ifp.read(blocksize)
return hasher.hexdigest()
def http_download(url, filename):
'''
Download the file in `url` storing it in the path given by `filename`.
'''
with open(filename, 'w') as f:
http_download_to_file(url, f)
def srm_download(url, filename):
'''
Download the file in `url` storing it in the path given by `filename`.
'''
with open(filename, 'w') as f:
srm_download_to_file(url, f)
def _get_hc_dirs():
try:
read_conf = open(str(os.path.expanduser('~'))+'/automal_conf.conf', 'r').readlines()
except IOError:
sys.exit("-- : -- Unable to access conf. file. Run and re-enter new conf. file paths.")
for line in read_conf:
if line.startswith('ipath'):
input_path = str(line.split('=')[1]).strip()
if line.startswith('opath'):
output_path = str(line.split('=')[1]).strip()
return(input_path, output_path)
def _get_file_type(full_targ_path):
# This function takes the full path of a target sample and determines/returns the file type via python-magic.
try:
magicObj = magic.open(magic.MAGIC_NONE)
magicObj.load()
magic_out = str(magicObj.file(full_targ_path))
except AttributeError:
magic_out = str(magic.from_file(full_targ_path))
return(magic_out)
def _swf_analysis(full_targ_path):
# This function calls swftools, tools, and flasm against SWF samples to extract data and/or performs analysis as needed.
command_out = subprocess.Popen(["swfdump", "-a", full_targ_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0]
command2_out = subprocess.Popen(["swfextract", full_targ_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0]
command2_list = command2_out.split('\n')
command_out_list = command_out.split('\n')
swf_ioc_res = ""
for out in command_out_list:
strOut = str(out)
ioc_list = ["http", "www", ".com", ".net", ".info", "GetVariable", "GetURL", 'String:"_post"', 'String:"send"', "\\\\", "pushstring", "url.split", ".php", "urlmon", ".exe"]
for indi in ioc_list:
if indi in strOut:
swf_ioc_res = "Present"
if len(swf_ioc_res) == 0:
swf_ioc_res = "None"
extract_list_fns = []
for out in command2_list:
if "JPEG" in out:
j_id = out.rfind(' ')+1
j_id = int(out[j_id:len(out)])
# Sometimes picture extraction doesn't occur... correctly, so we suppress the output. If we get it, great, if we don't whatever, for now.
os_null = open(os.devnull, 'wb')
subprocess.Popen(['swfextract', full_targ_path, '-j', str(j_id), '-o', '/tmp/automal/'+str(j_id)+'.jpg'], stdout=os_null, stderr=os_null)
subprocess.Popen(['swfextract', full_targ_path, '-p', str(j_id), '-o', '/tmp/automal/'+str(j_id)+'.png'], stdout=os_null, stderr=os_null)
extract_list_fns.append('/tmp/automal/'+str(j_id))
return(command_out, extract_list_fns, command2_out, swf_ioc_res)
def _c_sample_out_dir(targ, automal_dir):
# When we analyze samples, a output directory named after the MD5 hash of the sample is created and/or used for the samples specific exports, putput info, etc.
out_md5 = str(hashlib.md5(targ).hexdigest())
out_full_path = automal_dir+'/'+out_md5
if not os.path.exists(out_full_path):
os.makedirs(out_full_path)
out_file_Obj = open(out_full_path+'/Output.txt', 'a')
return (out_file_Obj, out_full_path, out_md5)