def open(self):
self.__inPos = 0
self.__inPosOld = 0
self.__outFile = None
self.__current = DirHasher.FileIndex.Stat()
try:
if os.path.exists(self.__cachePath):
self.__inFile = open(self.__cachePath, "rb")
sig = self.__inFile.read(4)
if sig == DirHasher.FileIndex.SIGNATURE:
self.__mismatch = False
self.__inPos = self.__inPosOld = 4
self.__readEntry() # prefetch first entry
else:
logging.getLogger(__name__).info(
"Wrong signature at '%s': %s", self.__cachePath, sig)
self.__inFile.close()
self.__inFile = None
self.__mismatch = True
else:
self.__inFile = None
self.__mismatch = True
except OSError as e:
raise BuildError("Error opening hash cache: " + str(e))
python类open()的实例源码
def parse_icon(self, icon_path=None):
"""
parse icon.
:param icon_path: icon storage path
"""
if not icon_path:
icon_path = os.path.dirname(os.path.abspath(__file__))
pkg_name_path = os.path.join(icon_path, self.package)
if not os.path.exists(pkg_name_path):
os.mkdir(pkg_name_path)
aapt_line = "aapt dump badging %s | grep 'application-icon' | awk -F ':' '{print $2}'" % self.get_filename()
parse_icon_rt = os.popen(aapt_line).read()
icon_paths = [icon.replace("'", '') for icon in parse_icon_rt.split('\n') if icon]
zfile = zipfile.ZipFile(StringIO.StringIO(self.__raw), mode='r')
for icon in icon_paths:
icon_name = icon.replace('/', '_')
data = zfile.read(icon)
with open(os.path.join(pkg_name_path, icon_name), 'w+b') as icon_file:
icon_file.write(data)
print "APK ICON in: %s" % pkg_name_path
def readrewritelist(rewritelist):
## rewrite is a hash. Key is sha256 of the file.
rewrite = {}
try:
rewritefile = open(rewritelist, 'r')
rewritelines = rewritefile.readlines()
rewritefile.close()
for r in rewritelines:
rs = r.strip().split()
## format error, bail out
if len(rs) != 7:
return {}
(package, version, filename, origin, sha256, newp, newv) = rs
## dupe, skip
if sha256 in rewrite:
continue
rewrite[sha256] = {'package': package, 'version': version, 'filename': filename, 'origin': origin, 'newpackage': newp, 'newversion': newv}
except:
return {}
return rewrite
## split on the special characters, plus remove special control characters that are
## at the beginning and end of the string in escaped form.
## Return a list of strings.
def get_filetype(data):
"""There are two versions of python-magic floating around, and annoyingly, the interface
changed between versions, so we try one method and if it fails, then we try the other.
NOTE: you may need to alter the magic_file for your system to point to the magic file."""
if sys.modules.has_key('magic'):
try:
ms = magic.open(magic.MAGIC_NONE)
ms.load()
return ms.buffer(data)
except:
try:
return magic.from_buffer(data)
except magic.MagicException:
magic_custom = magic.Magic(magic_file='C:\windows\system32\magic')
return magic_custom.from_buffer(data)
return ''
def parse_icon(self, icon_path=None):
"""
parse icon.
:param icon_path: icon storage path
"""
if not icon_path:
icon_path = os.path.dirname(os.path.abspath(__file__))
pkg_name_path = os.path.join(icon_path, self.package)
if not os.path.exists(pkg_name_path):
os.mkdir(pkg_name_path)
aapt_line = "aapt dump badging %s | grep 'application-icon' | awk -F ':' '{print $2}'" % self.get_filename()
parse_icon_rt = os.popen(aapt_line).read()
icon_paths = [icon.replace("'", '') for icon in parse_icon_rt.split('\n') if icon]
zfile = zipfile.ZipFile(StringIO.StringIO(self.__raw), mode='r')
for icon in icon_paths:
icon_name = icon.replace('/', '_')
data = zfile.read(icon)
with open(os.path.join(pkg_name_path, icon_name), 'w+b') as icon_file:
icon_file.write(data)
print "APK ICON in: %s" % pkg_name_path
def _get_filetype(self, data):
"""Gets filetype, uses libmagic if available.
@param data: data to be analyzed.
@return: file type or None.
"""
if not HAVE_MAGIC:
return None
try:
ms = magic.open(magic.MAGIC_NONE)
ms.load()
file_type = ms.buffer(data)
except:
try:
file_type = magic.from_buffer(data)
except Exception:
return None
finally:
try:
ms.close()
except:
pass
return file_type
def magic(indata, mime=False):
"""
Performs file magic while maintaining compatibility with different
libraries.
"""
try:
if mime:
mymagic = magic.open(magic.MAGIC_MIME_TYPE)
else:
mymagic = magic.open(magic.MAGIC_NONE)
mymagic.load()
except AttributeError:
mymagic = magic.Magic(mime)
mymagic.file = mymagic.from_file
return mymagic.file(indata)
def smart_open(filename):
'''
Returns an open file object if `filename` is plain text, else assumes
it is a bzip2 compressed file and returns a file-like object to
handle it.
'''
if isplaintext(filename):
f = open(filename, 'rt')
else:
file_type = mimetype(filename)
if file_type.find('gzip') > -1:
f = gzip.GzipFile(filename, 'rt')
elif file_type.find('bzip2') > -1:
f = bz2file.open(filename, 'rt')
else:
pass # Not supported format
return f
def srm_download_to_file(url, file_):
'''
Download the file in `url` storing it in the `file_` file-like
object.
'''
logger = logging.getLogger('dumper.__init__')
ctx = gfal2.creat_context() # pylint: disable=no-member
infile = ctx.open(url, 'r')
try:
chunk = infile.read(CHUNK_SIZE)
except GError as e:
if e[1] == 70:
logger.debug('GError(70) raised, using GRIDFTP PLUGIN:STAT_ON_OPEN=False workarround to download %s', url)
ctx.set_opt_boolean('GRIDFTP PLUGIN', 'STAT_ON_OPEN', False)
infile = ctx.open(url, 'r')
chunk = infile.read(CHUNK_SIZE)
else:
raise
while chunk:
file_.write(chunk)
chunk = infile.read(CHUNK_SIZE)
def _hardcode_setup():
# A user may want to hardcode in and out paths for their analysis files. If no config file exists, it will ask if you want to store your in/out options in one and use them from then on out.
sane = 0
home_dir = str(os.path.expanduser('~'))
input_conf = raw_input('-- : -- Please enter full path to input sample directory: ')
output_conf = raw_input('-- : -- Please enter full path to sample/analysis output directory: ')
if not os.path.exists(input_conf) or not os.path.exists(output_conf):
sys.exit("-- : -- Invalid paths detected. Please check input. Run again to re-enter paths.")
print '-- : -- Is this input directory correct "'+str(input_conf)+'"?'
print '-- : -- Is this output directory correct "'+str(output_conf)+'"?'
while sane == 0:
dir_choice = raw_input('-- : -- Yes or No: ')
if dir_choice == 'Yes':
conf_file = open(home_dir+'/automal_conf.conf', 'w')
conf_file.write('ipath='+str(input_conf)+'\n')
conf_file.write('opath='+str(output_conf)+'\n')
conf_file.close()
sane = 1
if dir_choice == 'No':
sys.exit('-- : -- Exited on incorrect paths. Run again to re-enter.')
if dir_choice != 'No' and dir_choice != 'Yes':
print '-- : -- Invalid choice, try again. Yes or No.'
def _get_guest_digital_signers(self):
retdata = dict()
cert_data = dict()
cert_info = os.path.join(CUCKOO_ROOT, "storage", "analyses",
str(self.results["info"]["id"]), "aux",
"DigiSig.json")
if os.path.exists(cert_info):
with open(cert_info, "r") as cert_file:
buf = cert_file.read()
if buf:
cert_data = json.loads(buf)
if cert_data:
retdata = {
"aux_sha1": cert_data["sha1"],
"aux_timestamp": cert_data["timestamp"],
"aux_valid": cert_data["valid"],
"aux_error": cert_data["error"],
"aux_error_desc": cert_data["error_desc"],
"aux_signers": cert_data["signers"]
}
return retdata
def fuzzy_hash(self):
if not hasattr(self, '_fuzzy_hash'):
# tlsh is not meaningful with files smaller than 512 bytes
if os.stat(self.path).st_size >= 512:
h = tlsh.Tlsh()
with open(self.path, 'rb') as f:
for buf in iter(lambda: f.read(32768), b''):
h.update(buf)
h.final()
try:
self._fuzzy_hash = h.hexdigest()
except ValueError:
# File must contain a certain amount of randomness.
self._fuzzy_hash = None
else:
self._fuzzy_hash = None
return self._fuzzy_hash
def has_same_content_as(self, other):
logger.debug('File.has_same_content: %s %s', self, other)
if os.path.isdir(self.path) or os.path.isdir(other.path):
return False
# try comparing small files directly first
try:
my_size = os.path.getsize(self.path)
other_size = os.path.getsize(other.path)
except OSError:
# files not readable (e.g. broken symlinks) or something else,
# just assume they are different
return False
if my_size == other_size and my_size <= SMALL_FILE_THRESHOLD:
try:
with profile('command', 'cmp (internal)'):
with open(self.path, 'rb') as file1, open(other.path, 'rb') as file2:
return file1.read() == file2.read()
except OSError:
# one or both files could not be opened for some reason,
# assume they are different
return False
return self.cmp_external(other)
def magic(indata, mime=False):
"""
Performs file magic while maintaining compatibility with different
libraries.
"""
try:
if mime:
mymagic = magic.open(magic.MAGIC_MIME_TYPE)
else:
mymagic = magic.open(magic.MAGIC_NONE)
mymagic.load()
except AttributeError:
mymagic = magic.Magic(mime)
mymagic.file = mymagic.from_file
return mymagic.file(indata)
def get_type(self):
try:
ms = magic.open(magic.MAGIC_NONE)
ms.load()
file_type = ms.file(self.path)
except:
try:
file_type = magic.from_file(self.path)
except:
try:
import subprocess
file_process = subprocess.Popen(['file', '-b', self.path], stdout = subprocess.PIPE)
file_type = file_process.stdout.read().strip()
except:
return ''
finally:
try:
ms.close()
except:
pass
return file_type
def get_type(data):
try:
ms = magic.open(magic.MAGIC_NONE)
ms.load()
file_type = ms.buffer(data)
except:
try:
file_type = magic.from_buffer(data)
except:
return ''
finally:
try:
ms.close()
except:
pass
return file_type
def generate_html(p, templateFile, templateDict, outputFile, overwrite, logger):
if not Util.check_file_exists(templateFile):
logger.error("Template file does not exist: '%s'" %(templateFile))
os._exit(-1)
try:
template = jinja2.Environment(loader = jinja2.FileSystemLoader(searchpath=os.path.split(templateFile)[0])).get_template(os.path.split(templateFile)[1])
except Exception, e:
logger.error("Failed load template file '%s'" %(templateFile), exc_info=True)
os._exit(-1)
html = bokeh.embed.file_html(models=p, resources=bokeh.resources.INLINE, title=templateDict["title"] , template=template, template_variables=templateDict)
if Util.check_file_exists(outputFile) and not overwrite:
logger.error("Html file does exist: '%s'. Delete or use overwrite flag." %(outputFile))
os._exit(-1)
try:
file = open(outputFile,"w")
file.write(html)
file.close()
except Exception, e:
logger.error("Error while writing html file '%s'" %(outputFile), exc_info=True)
os._exit(-1)
# generates bokeh histogram_data
# gets data from every "LatencyList"
# data2 is just data/2.0
# commented out code is old and better to read but much slower due to "key not in" - if
def openCompressedFile(ycsbfile, dict, key, decompress, overwrite, logger):
try:
file = gzip.open(ycsbfile,"r")
dict[key]=cPickle.load(file)
file.close()
except Exception, e:
logger.error("Can't open '%s'. Is it really a compressed .ydc file?" %(ycsbfile), exc_info=True)
os._exit(-1)
# if you truly just want to decompress it, stop after saving plain ycsb file
if decompress:
try:
newFileName=os.path.splitext(os.path.basename(ycsbfile))[0]+".log"
if (not Util.check_file_exists(newFileName) or overwrite) and os.access(".", os.W_OK):
if key in dict.keys() and dict[key] != None:
decompressFile(dict[key], newFileName, logger)
else:
logger.error("Dictionary does not have filecontent or is null." , exc_info=True)
os._exit(-1)
else:
logger.error("Can't create '%s' to write. Does it already exist?" %(newFileName), exc_info=True)
os._exit(-1)
except Exception, e:
logger.error("Can't open '%s'." %("%s.log.log" %(os.path.basename(ycsbfile))), exc_info=True)
os._exit(-1)
def file_type(file_loc):
try:
import magic
except:
print('[W] ACBS cannot find libmagic bindings, will use bundled one instead.')
import lib.magic as magic
mco = magic.open(magic.MIME_TYPE | magic.MAGIC_SYMLINK)
mco.load()
try:
tp = mco.file(file_loc)
tp_list = tp.decode('utf-8').split('/')
except:
print('[W] Unable to determine the file type!')
return ['unknown', 'unknown']
return tp_list
def file_type_full(file_loc):
try:
import magic
except:
print('[W] ACBS cannot find libmagic bindings, will use bundled one instead.')
import lib.magic as magic
mco = magic.open(magic.NONE | magic.MAGIC_SYMLINK)
mco.load()
try:
tp = mco.file(file_loc)
except:
print('[W] Unable to determine the file type!')
return 'data'
return tp.decode('utf-8')
def hashFile(path):
m = hashlib.sha1()
try:
with open(path, 'rb', buffering=0) as f:
buf = f.read(16384)
while len(buf) > 0:
m.update(buf)
buf = f.read(16384)
except OSError as e:
logging.getLogger(__name__).warning("Cannot hash file: %s", str(e))
return m.digest()
def open(self):
pass
def hashDirectory(self, path):
self.__index.open()
try:
return self.__hashDir(os.fsencode(path))
finally:
self.__index.close()
def hashPath(self, path):
path = os.fsencode(path)
try:
s = os.lstat(path)
except OSError as err:
logging.getLogger(__name__).warning("Cannot stat '%s': %s", path, str(err))
return b''
self.__index.open()
try:
return self.__hashEntry(path, b'', s)
finally:
self.__index.close()
def _process_cache(self, split="\n", rstrip=True):
try:
ftype = magic.from_file(self.cache, mime=True)
except AttributeError:
try:
mag = magic.open(magic.MAGIC_MIME)
mag.load()
ftype = mag.file(self.cache)
except AttributeError as e:
raise RuntimeError('unable to detect cached file type')
if PYVERSION < 3:
ftype = ftype.decode('utf-8')
if ftype.startswith('application/x-gzip') or ftype.startswith('application/gzip'):
from csirtg_smrt.decoders.zgzip import get_lines
for l in get_lines(self.cache, split=split):
yield l
return
if ftype == "application/zip":
from csirtg_smrt.decoders.zzip import get_lines
for l in get_lines(self.cache, split=split):
yield l
return
# all others, mostly txt, etc...
with open(self.cache) as f:
for l in f:
yield l
def _cache_write(self, s):
with open(self.cache, 'wb') as f:
auth = False
if self.username:
auth = (self.username, self.password)
resp = self._cache_refresh(s, auth)
if not resp:
return
for block in resp.iter_content(1024):
f.write(block)
def get_mimetype(f):
try:
ftype = magic.from_file(f, mime=True)
except AttributeError:
try:
mag = magic.open(magic.MAGIC_MIME)
mag.load()
ftype = mag.file(f)
except AttributeError as e:
raise RuntimeError('unable to detect cached file type')
if PYVERSION < 3:
ftype = ftype.decode('utf-8')
return ftype
def get_type(f, mime=None):
if not mime:
mime = get_mimetype(f)
if isinstance(f, str):
f = open(f)
t = None
for tt in TESTS:
f.seek(0)
t = tt(f, mime)
if t:
return t
def detect_magic():
global g_m
if hasattr(magic, 'open'):
g_m = magic.open(magic.MAGIC_SYMLINK)
g_m.load()
def gethash(path, filename):
scanfile = open("%s/%s" % (path, filename), 'r')
h = hashlib.new('sha256')
scanfile.seek(0)
hashdata = scanfile.read(10000000)
while hashdata != '':
h.update(hashdata)
hashdata = scanfile.read(10000000)
scanfile.close()
return h.hexdigest()
## method to compare binaries. Returns the amount of bytes that differ
## according to bsdiff, or 0 if the files are identical