def _load_index(self):
index_path = self.index_path()
if not os.path.exists(index_path):
return {}
content_type = magic.from_file(index_path, mime=True)
if content_type == 'text/plain':
logger.debug('Detected plaintext encoding for reading index')
method = open
elif content_type in ('application/gzip', 'application/x-gzip'):
logger.debug('Detected gzip encoding for reading index')
method = gzip.open
else:
raise ValueError('Index is of unknown type', content_type)
with method(index_path, 'rt') as fp:
data = json.load(fp)
return data
python类from_file()的实例源码
def get_plaintext_document_body(fpath, keep_layout=False):
"""Given a file-path to a full-text, return a list of unicode strings
whereby each string is a line of the fulltext.
In the case of a plain-text document, this simply means reading the
contents in from the file. In the case of a PDF however,
this means converting the document to plaintext.
It raises UnknownDocumentTypeError if the document is not a PDF or
plain text.
@param fpath: (string) - the path to the fulltext file
@return: (list) of strings - each string being a line in the document.
"""
textbody = []
mime_type = magic.from_file(fpath, mime=True)
if mime_type == "text/plain":
with open(fpath, "r") as f:
textbody = [line.decode("utf-8") for line in f.readlines()]
elif mime_type == "application/pdf":
textbody = convert_PDF_to_plaintext(fpath, keep_layout)
else:
raise UnknownDocumentTypeError(mime_type)
return textbody
def __init__(self, filename):
"""
Creates a file object for a malware sample.
:param filename: The file name of the available malware sample.
"""
if not os.path.exists(filename):
raise ValueError("File {0} does not exist!".format(filename))
# Default settings of members
self.running_entropy_data = None
self.running_entropy_window_size = 0
self.file_size = 0
self.parsedfile = None
# Fill out other data here...
self.filename = filename
self.data = list()
self.filetype = magic.from_file(self.filename)
self._read_file()
self._parse_file_type()
def get_type(self):
try:
ms = magic.open(magic.MAGIC_NONE)
ms.load()
file_type = ms.file(self.path)
except:
try:
file_type = magic.from_file(self.path)
except:
try:
import subprocess
file_process = subprocess.Popen(['file', '-b', self.path], stdout = subprocess.PIPE)
file_type = file_process.stdout.read().strip()
except:
return ''
finally:
try:
ms.close()
except:
pass
return file_type
def file_parser(fname, pages=None):
if magic.from_file(fname, mime=True) == 'application/pdf':
try:
text_array = []
d = pdf.Document(fname)
for i, p in enumerate(d, start=1):
for f in p:
for b in f:
for l in b:
text_array.append(l.text.encode('UTF-8'))
if i == pages: # break after x pages
break
print "Processed %i pages" % (i)
return '\n'.join(text_array)
except Exception as e:
print "PDF Parser Exception: ", e
else:
try:
content = parser.from_file(fname)['content']
return (content or '').encode('UTF-8')
except Exception as e:
print "File Parser Exception: ", e
def save_file(self, msg, msg_type):
path = os.path.join("storage", self.channel_id)
if not os.path.exists(path):
os.makedirs(path)
filename = "%s_%s_%s" % (msg_type, msg['NewMsgId'], int(time.time()))
fullpath = os.path.join(path, filename)
msg['Text'](fullpath)
mime = magic.from_file(fullpath, mime=True)
if isinstance(mime, bytes):
mime = mime.decode()
guess_ext = mimetypes.guess_extension(mime) or ".unknown"
if guess_ext == ".unknown":
self.logger.warning("File %s with mime %s has no matching extensions.", fullpath, mime)
ext = ".jpeg" if mime == "image/jpeg" else guess_ext
os.rename(fullpath, "%s%s" % (fullpath, ext))
fullpath = "%s%s" % (fullpath, ext)
self.logger.info("File saved from WeChat\nFull path: %s\nMIME: %s", fullpath, mime)
return fullpath, mime
def file_magic(in_file):
print "\n\t\tFile Type :", magic.from_file(in_file)
def do_sample_type_detect(datafile):
"""
Checks the datafile type's.
"""
mtype = magic.from_file(datafile, mime=True)
stype = magic.from_file(datafile)
return (mtype, stype)
def _process_cache(self, split="\n", rstrip=True):
try:
ftype = magic.from_file(self.cache, mime=True)
except AttributeError:
try:
mag = magic.open(magic.MAGIC_MIME)
mag.load()
ftype = mag.file(self.cache)
except AttributeError as e:
raise RuntimeError('unable to detect cached file type')
if PYVERSION < 3:
ftype = ftype.decode('utf-8')
if ftype.startswith('application/x-gzip') or ftype.startswith('application/gzip'):
from csirtg_smrt.decoders.zgzip import get_lines
for l in get_lines(self.cache, split=split):
yield l
return
if ftype == "application/zip":
from csirtg_smrt.decoders.zzip import get_lines
for l in get_lines(self.cache, split=split):
yield l
return
# all others, mostly txt, etc...
with open(self.cache) as f:
for l in f:
yield l
def get_mimetype(f):
try:
ftype = magic.from_file(f, mime=True)
except AttributeError:
try:
mag = magic.open(magic.MAGIC_MIME)
mag.load()
ftype = mag.file(f)
except AttributeError as e:
raise RuntimeError('unable to detect cached file type')
if PYVERSION < 3:
ftype = ftype.decode('utf-8')
return ftype
def preprocess(sample):
"""Preprocess files after upload.
:param sample: :class:`~app.models.Sample`
:return:
"""
hash_path = os.path.join(
current_app.config['APP_UPLOADS_SAMPLES'],
sample.sha256
)
if zipfile.is_zipfile(hash_path):
mt = magic.from_file(hash_path, mime=True)
if mt in skip_mimes:
return None
current_app.log.debug('Extracting {}'.format(hash_path))
zfile = zipfile.ZipFile(hash_path)
for zipfo in zfile.namelist():
cfg = current_app.config
if zfile.getinfo(zipfo).compress_type == 99: # PK compat. v5.1
pwd = '-p{}'.format(cfg['INFECTED_PASSWD'])
with popen('7z', 'e', '-so', pwd, hash_path) as zproc:
buf, stderr = zproc.communicate()
else:
buf = zfile.read(zipfo,
pwd=bytes(cfg['INFECTED_PASSWD'], 'utf-8'))
digests = get_hashes(buf)
hash_path = os.path.join(cfg['APP_UPLOADS_SAMPLES'],
digests.sha256)
if not os.path.isfile(hash_path):
with open(hash_path, 'wb') as wf:
wf.write(buf)
s = Sample(user_id=sample.user_id, filename=zipfo,
parent_id=sample.id,
md5=digests.md5, sha1=digests.sha1,
sha256=digests.sha256, sha512=digests.sha512,
ctph=digests.ctph)
db.session.add(s)
db.session.commit()
def _check(self, file):
"""
Run apropriate check based on `file`'s extension and return it,
otherwise raise an Error
"""
if not os.path.exists(file):
raise Error("file \"{}\" not found".format(file))
_, extension = os.path.splitext(file)
try:
check = self.extension_map[extension[1:]]
except KeyError:
magic_type = magic.from_file(file)
for name, cls in self.magic_map.items():
if name in magic_type:
check = cls
break
else:
raise Error("unknown file type \"{}\", skipping...".format(file))
try:
with open(file) as f:
code = f.read()
except UnicodeDecodeError:
raise Error("file does not seem to contain text, skipping...")
# Ensure we don't warn about adding trailing newline
try:
if code[-1] != '\n':
code += '\n'
except IndexError:
pass
return check(code)
def handle(cls, user, club, file):
filename = os.urandom(8).encode('hex')
temppath = os.path.join('/tmp', filename)
file.save(temppath)
try:
# Don't use mimetypes.guess_type(temppath) -- Faked extensions
mime = magic.from_file(temppath, mime=True)
if mime not in cls._mimedict:
raise UploadNotSupported
filename = filename + cls._mimedict[mime]
permpath = cls.mk_internal_path(filename)
permdir = os.path.dirname(permpath)
if not os.path.isdir(permdir):
os.makedirs(permdir, 0o755)
# resize to 600, 450
cls._thumb(temppath, permpath)
fs.watch(permpath)
finally:
os.remove(temppath)
obj = cls.new()
obj.club = club
obj.uploader = user
obj._location = filename
obj.mime = mime
return obj.create()
def check(filepath):
result = magic.from_file(filepath, mime=True)
if re.match('application/pdf', result):
return True
return False
def get_magic(filename):
if g_m:
return g_m.file(filename)
else:
return magic.from_file(filename)
def guess_mime_type_from_file_contents(file_path):
""" Get type from file magic bytes. """
mt = magic.from_file(file_path, mime=True)
if mt:
return mt
def _compute_default_properties(self):
self['names'] = [os.path.basename(self['filepath'])]
self['detailed_type'] = magic.from_file(self['filepath'])
self['mime'] = magic.from_file(self['filepath'], mime=True)
self['analysis'] = []
# Init antivirus status
self['antivirus'] = {}
for module in dispatcher.get_antivirus_modules():
self['antivirus'][module.name] = False
self._set_type()
# Convert mime/types into clearer type
def create_by_old_paste(cls, filehash):
filepath = get_file_path(filehash)
mimetype = magic.from_file(filepath, mime=True)
filestat = os.stat(filepath)
size = filestat.st_size
rst = cls(filehash, mimetype, size, filehash=filehash)
return rst
def create_by_old_paste(cls, filehash, symlink):
filepath = get_file_path(filehash)
mimetype = magic.from_file(filepath, mime=True)
filestat = os.stat(filepath)
size = filestat.st_size
rst = cls(filehash, mimetype, size, filehash=filehash, symlink=symlink)
return rst
def create_by_old_paste(cls, filehash):
filepath = get_file_path(filehash)
mimetype = magic.from_file(filepath, mime=True)
filestat = os.stat(filepath)
size = filestat.st_size
rst = cls(filehash, mimetype, size, filehash=filehash)
return rst
def create_by_old_paste(cls, filehash):
filepath = get_file_path(filehash)
mimetype = magic.from_file(filepath, mime=True)
filestat = os.stat(filepath)
size = filestat.st_size
rst = cls(filehash, mimetype, size, filehash=filehash)
return rst
def create_by_old_paste(cls, filehash):
filepath = get_file_path(filehash)
mimetype = magic.from_file(filepath, mime=True)
filestat = os.stat(filepath)
size = filestat.st_size
rst = cls(filehash, mimetype, size, filehash=filehash)
return rst
def load_pickle(pickle_path, dataset_path):
if not os.path.exists(pickle_path):
import magic
image_files = []
for dir, _, _, in os.walk(dataset_path):
filenames = glob.glob( os.path.join(dir, '*.JPEG')) # may be JPEG, depending on your image files
image_files.append(filenames)
## use magic to perform a simple check of the images
# import magic
# for filename in filenames:
# if magic.from_file(filename, mime=True) == 'image/jpeg':
# image_files.append(filename)
# else:
# print '%s is not a jpeg!' % filename
# print magic.from_file(filename)
if len(image_files) > 0:
image_files = np.hstack(image_files)
dataset_filenames = {'image_path':image_files}
pickle.dump( dataset_filenames, open( pickle_path, "wb" ) )
else:
dataset_filenames = pickle.load( open( pickle_path, "rb" ) )
return dataset_filenames
# return a pd object
def get_executables(files):
"""
Filters the only executable files from a files array
"""
exec_files = []
for file in files:
if "executable" in magic.from_file(file):
exec_files.append(file)
return exec_files
def _get_and_cache(file_path, supported_formats):
mime_type = from_file(file_path, mime=True)
try:
fmt = supported_formats[mime_type]
MagicCharacterizerMixin._cache[file_path] = fmt
return fmt
except KeyError:
message = '{0} characterized as {1} format, which is not supported'
message = message.format(file_path, mime_type)
raise UnsupportedFormat(message, http_status_code=500)
def file_info(self, report):
info = []
with open(self.filename, 'rb') as f:
file = f.read()
if report == "output":
return ""
else:
info.append("File: {}".format(self.filename))
info.append("Size: {} bytes".format(os.path.getsize(self.filename)))
info.append("Type: {}".format(magic.from_file(self.filename, mime=True)))
info.append("MD5: {}".format(hashlib.md5(file).hexdigest()))
info.append("SHA1: {}".format(hashlib.sha1(file).hexdigest()))
if ssdeep_r:
info.append("ssdeep: {}".format(self.get_ssdeep()))
return info
def file_info(filename):
info = []
with open(filename, 'rb') as f:
file = f.read()
info.append("File: {}".format(filename))
info.append("Size: {} bytes".format(os.path.getsize(filename)))
info.append("Type: {}".format(magic.from_file(filename, mime=True)))
info.append("MD5: {}".format(hashlib.md5(file).hexdigest()))
info.append("SHA1: {}".format(hashlib.sha1(file).hexdigest()))
if ssdeep_r:
info.append("ssdeep: {}".format(ssdeep.hash_from_file(filename)))
return info
def post_file():
file_uuid = secure_filename(str(uuid.uuid4()))
filename = '/tmp/%s' % file_uuid
try:
file = request.files['file']
except Exception:
raise BadRequestException("Not a valid multipart upload form with "
"key named file.")
if 'Content-Range' in request.headers:
# Extract starting byte from Content-Range header string.
range_str = request.headers['Content-Range']
start_bytes = int(range_str.split(' ')[1].split('-')[0])
# Append chunk to the file on disk, or create new.
with open(filename, 'a') as f:
f.seek(start_bytes)
f.write(file.stream.read())
else:
# This is not a chunked request, so just save the whole file.
file.save(filename)
# Generate hash of file, and create new, or renew existing db row.
file_hashes = get_all_hashes(filename)
file_size = os.path.getsize(filename)
file_type = magic.from_file(filename, mime=True)
file = create_or_renew_by_hash(file_hashes, file_size, file_type)
file_id = file.file_id
file_dict = file.to_dict()
# Upload to swift and remove the local temp file.
upload_to_swift(filename, file_uuid)
os.remove(filename)
# Send message to worker queue with file details.
worker_msg = {"file_uuid": file_uuid, "file_id": file_id}
submit_worker_notification(worker_msg)
return jsonify(file_dict)
def maybe_gunzip(fname, base, ext):
if fname and 'gzip' in magic.from_file(fname):
start = time.time()
print("Gunzip file " + str(fname))
newf = safe_fname(base, ext)
sh("gunzip", fname, "-c >", newf)
fname = newf
print("Gunzip took %g seconds" % (time.time() - start))
return fname
def get_filetype(fpath):
"""Return a mime-style filetype string."""
return magic.from_file(fpath, mime=True)