def file_is(file_description, fmt):
"""Get if file stored in `file_path` is a `fmt` document.
:file_path: Full path for a `fmt` file or a buffer containing `fmt` data.
:returns: True if is `fmt` and False otherwise
"""
import magic
logger.debug("Checking filetype")
if isinstance(file_description, str):
# This means that the file_description is a string
result = re.match(
r".*%s.*" % fmt, magic.from_file(file_description, mime=True),
re.IGNORECASE
)
if result:
logger.debug(
"File %s appears to be of type %s" % (file_description, fmt)
)
elif isinstance(file_description, bytes):
# Suppose that file_description is a buffer
result = re.match(
r".*%s.*" % fmt, magic.from_buffer(file_description, mime=True)
)
if result:
logger.debug(
"Buffer appears to be of type %s" % (fmt)
)
return True if result else False
python类from_file()的实例源码
def register_files(self):
print("Start registering files")
for root, dirs, files in os.walk(self.extracted_path):
for file in files:
full_path = os.path.join(root, file)
if not os.path.isfile(full_path):
continue
path = full_path.replace(self.extracted_path, "")
content = ""
hash = ""
with open(full_path, "rb") as fd:
content = fd.read()
hash_content = "%s:%s" % (file, content)
hash = hashlib.md5(hash_content.encode('utf-8')).hexdigest()
try:
file_obj = FileModel.objects.get(hash=hash)
file_obj.firmware.add(self.firmware)
file_obj.save()
except FileModel.DoesNotExist:
try:
file_obj = FileModel()
file_obj.filepath = os.path.join(root, file)
file_obj.hash = hash
file_obj.filesize = len(content)
file_obj.filename = path
file_obj.save()
file_obj.firmware.add(self.firmware)
file_obj.file_type = magic.from_file(os.path.join(root,
file))
file_obj.save()
self.find_loots(file_obj)
# Performance tweak
file_obj.nb_loots = file_obj.loots.all().count()
except:
file_obj.file_type = "unknown"
print("Files registered")
def parse_file_info(file_path, dir_path):
print("entering parse_file_info")
mime_type = magic.from_file(file_path, mime=True)
print(mime_type)
print(file_path)
if mime_type in file_mimetype_relation:
return file_mimetype_relation[mime_type](file_path, dir_path)
return None
def _get_file_type(full_targ_path):
# This function takes the full path of a target sample and determines/returns the file type via python-magic.
try:
magicObj = magic.open(magic.MAGIC_NONE)
magicObj.load()
magic_out = str(magicObj.file(full_targ_path))
except AttributeError:
magic_out = str(magic.from_file(full_targ_path))
return(magic_out)
def _get_file_type(full_targ_path):
# This function takes the full path of a target sample and determines/returns the file type via python-magic.
try:
#magicObj = magic.open(magic.MAGIC_NONE)
#magicObj.load()
#magic_out = str(magicObj.file(full_targ_path))
magicObj = magic.Magic(magic_file=r'C:/Program Files (x86)/GnuWin32/share/misc/magic', mime=True)
magic_out = str(magicObj.from_file(full_targ_path))
print magic_out
except AttributeError:
magic_out = str(magic.from_file(full_targ_path))
print magic_out+" ERROR?!?!?!!?"
return(magic_out)
def get_type(self):
"""Get MIME file type.
@return: file type.
"""
file_type = None
if HAVE_MAGIC:
try:
ms = magic.open(magic.MAGIC_SYMLINK)
ms.load()
file_type = ms.file(self.file_path)
except:
try:
file_type = magic.from_file(self.file_path)
except:
pass
finally:
try:
ms.close()
except:
pass
if file_type is None:
try:
p = subprocess.Popen(["file", "-b", "-L", self.file_path],
stdout=subprocess.PIPE)
file_type = p.stdout.read().strip()
except:
pass
return file_type
def get_content_type(self):
"""Get MIME content file type (example: image/jpeg).
@return: file content type.
"""
file_type = None
if HAVE_MAGIC:
try:
ms = magic.open(magic.MAGIC_MIME|magic.MAGIC_SYMLINK)
ms.load()
file_type = ms.file(self.file_path)
except:
try:
file_type = magic.from_file(self.file_path, mime=True)
except:
pass
finally:
try:
ms.close()
except:
pass
if file_type is None:
try:
p = subprocess.Popen(["file", "-b", "-L", "--mime-type", self.file_path],
stdout=subprocess.PIPE)
file_type = p.stdout.read().strip()
except:
pass
return file_type
def processDownload(tmpFilePath, fileName, fileUrl):
logging.info('Downloaded as temporary file: {0}. Beginning processing...'.format(tmpFilePath))
fileSize = os.path.getsize(tmpFilePath) >> 20
if (fileSize > 10):
logging.error('File is {0}MB. Too large to process.'.format(fileSize))
cleanUp(tmpFilePath)
return False
fileHash = sha256SumFile(tmpFilePath)
if not isAcceptedHash(fileHash):
cleanUp(tmpFilePath)
return False
filePath = os.path.join(baseConfig.outputFolder, fileHash)
os.rename(tmpFilePath, filePath)
# Trust only the content type of the downloaded file.
mimeType = magic.from_file(filePath, mime=True)
if mimeType not in ['application/octet-stream', 'application/x-dosexec', 'application/x-msdownload', 'application/x-ms-installer', 'application/pdf', 'application/x-pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'application/vnd.openxmlformats-officedocument.wordprocessingml.template', 'application/vnd.ms-word.document.macroEnabled', 'application/vnd.ms-excel', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'application/vnd.openxmlformats-officedocument.spreadsheetml.template', 'application/vnd.ms-excel.sheet.macroEnabled', 'application/vnd.ms-excel.template.macroEnabled', 'application/vnd.ms-excel.addin.macroEnabled', 'application/vnd.ms-excel.sheet.binary.macroEnabled', 'application/x-shockwave-flash']:
logging.error('Detected non-binary or executable file type ({0}). Skipping: {1}'.format(mimeType, filePath))
cleanUp(filePath)
return False
logging.info('File with hash: {0} identified as type: {1}'.format(fileHash, mimeType))
uploaded = uploadToViper(filePath, fileName, fileHash, fileUrl)
addToHashCache(fileHash)
cleanUp(filePath)
return uploaded
def validate_elm_make(ctx, param, value):
if value is None:
return value
realpath = os.path.realpath(value)
if not os.path.isfile(realpath):
realpath = shutil.which(value)
if realpath is None or not os.path.isfile(realpath):
raise click.BadParameter('{} not found'.format(value))
elm_make_mimetype = magic.from_file(realpath, mime=True)
if not elm_make_mimetype.startswith('text'):
return value
perhaps_binwrap_of = os.path.normpath(
os.path.join(
os.path.dirname(realpath),
os.pardir,
'elm',
'Elm-Platform',
'*',
'.cabal-sandbox',
'bin',
'elm-make'))
raise click.BadParameter('''should be the real elm-make binary; this looks like a text file.
if you installed Elm through npm, then try {}'''.format(perhaps_binwrap_of))
def libmagic_file_type(self):
"""
Returns:
str: The libmagic-parsed file type.
"""
return magic.from_file(self.path, mime=True)
def get_type(self):
"""Get MIME file type.
@return: file type.
"""
file_type = None
if HAVE_MAGIC:
try:
ms = magic.open(magic.MAGIC_NONE)
ms.load()
file_type = ms.file(self.file_path)
except:
try:
file_type = magic.from_file(self.file_path)
except Exception as e:
log.debug("Error getting magic from file %s: %s",
self.file_path, e)
finally:
try:
ms.close()
except:
pass
if file_type is None:
try:
p = subprocess.Popen(["file", "-b", self.file_path],
stdout=subprocess.PIPE)
file_type = p.stdout.read().strip()
except Exception as e:
log.debug("Error running file(1) on %s: %s",
self.file_path, e)
return file_type
def get_content_type(self):
"""Get MIME content file type (example: image/jpeg).
@return: file content type.
"""
file_type = None
if HAVE_MAGIC:
try:
ms = magic.open(magic.MAGIC_MIME)
ms.load()
file_type = ms.file(self.file_path)
except:
try:
file_type = magic.from_file(self.file_path, mime=True)
except:
pass
finally:
try:
ms.close()
except:
pass
if file_type is None:
try:
args = ["file", "-b", "--mime-type", self.file_path]
file_type = subprocess.check_output(args).strip()
except:
pass
return file_type
def guess_mimetype(path):
magic_mimetype = magic.from_file(str(path), mime=True)
if magic_mimetype == b"audio/x-m4a":
return "audio/mp4"
else:
return magic_mimetype.decode("utf-8")
def inspect(self, sample):
sample.info[self.NAME] = {"magic": magic.from_file(sample.path), "mime": magic.from_file(sample.path, mime = True)}
def get_mime(self):
try:
ms = magic.open(magic.MIME)
ms.load()
mime_type = ms.file(self.path)
except:
try:
mime = magic.Magic(mime=True)
mime_type = mime.from_file(self.path)
except:
return ''
return mime_type
def mime(self):
if hasattr(magic, "from_file"):
# Use https://pypi.python.org/pypi/python-magic
return magic.from_file(self.fetch('filename'), mime=True)
elif hasattr(magic, "open"):
# Use the python-magic library in distro repos from the `file`
# command - http://www.darwinsys.com/file/
magic_instance = magic.open(magic.MAGIC_MIME)
magic_instance.load()
return magic_instance.file(self.fetch('filename'))
raise ImportError(
'The `magic` module that was found is not the expected pypi '
'package python-magic (https://pypi.python.org/pypi/python-magic) '
'nor file\'s (http://www.darwinsys.com/file/) package.')
def create_pads_from_files(job_id, attachment, email, client_id, client_secret):
""" For each HTML file in zipped attachment, create a new pad, return the number of
created pads
"""
logging.info("Opening attached zip %s." % attachment)
m = re.search('^.+attachments/(.+)\.zip$', attachment)
directory = './data/' + m.group(1)
unzip_attachment(attachment, directory)
files = os.listdir(directory)
hackpad = Hackpad(api_scheme = os.getenv('HACKPAD_API_SCHEME') or 'http',
api_domain = os.getenv('HACKPAD_API_DOMAIN') or 'hackpad.dev',
sub_domain = os.getenv('HACKPAD_SUB_DOMAIN') or '',
consumer_key = client_id,
consumer_secret = client_secret)
pads_created = pads_skipped = 0
for file_name in files:
file_path = directory + '/' + file_name
# check if it is really an html file
file_type = magic.from_file(file_path, mime=True)
if file_type != 'text/html':
logging.info('Invalid file type for file %s :%s' % (file_path, file_type))
continue
fh = open(file_path)
logging.info('importing for %s: %s' % (email, file_name))
if insert_pad_from_file(job_id, hackpad, fh, file_name, client_id, client_secret):
pads_created += 1
else:
pads_skipped += 1
fh.close()
# Check if all files are imported
if pads_created + pads_skipped != len(files):
email_error("Not all files were processed", job_id)
return pads_created, pads_skipped
def attachFile(attachList, filename, pos=None, replace=False):
"""Check a path and add it to the attachment list
If pos is given and replace is False, insert attachment at given position.
If pos is given and replace is True, replace the attachment at the given position.
"""
if pos is not None:
if pos < 1 or pos > len(attachList):
print("Bad position. {} not between 1 and {}".format(pos, len(attachList)))
return
# Adjust from human position to index
pos -= 1
try:
st = os.stat(filename)
except OSError as err:
import errno
# Can't read it. Is it because it doesn't exist?
if err.errno == errno.ENOENT:
print("WARNING: Given file doesn't currently exist. Adding to list anyway. We'll try reading it again when completing the message")
else:
print("WARNING: Couldn't get information about the file: %s" % err.strerror)
print("Adding to list anyway. We'll try reading it again when completing the message.")
else:
if not os.access(filename, os.R_OK):
print("WARNING: Can't read existing file. Adding to list anyway. We'll try again when completing the message.")
else:
print("Attachment added to list. Raw size is currently %i bytes. Note: we'll actually read the data when completing the message" % st.st_size)
mtype = magic.from_file(filename, mime=True)
print("Mime type appears to be %s" % mtype)
if pos is None:
attachList.append(filename)
elif replace == False:
attachList.insert(pos, filename)
else:
attachList[pos] = filename
def _download_file(self, tg_msg, file_obj, msg_type):
"""
Download media file from telegram platform.
Args:
tg_msg: Telegram message instance
file_obj: File object
msg_type: Type of message
Returns:
tuple of str[2]: Full path of the file, MIME type
"""
path = os.path.join("storage", self.channel_id)
if not os.path.exists(path):
os.makedirs(path)
size = getattr(file_obj, "file_size", None)
file_id = file_obj.file_id
if size and size > telegram.constants.MAX_FILESIZE_DOWNLOAD:
raise EFBMessageError("Attachment is too large. Maximum 20 MB. (AT01)")
f = self.bot.bot.getFile(file_id)
fname = "%s_%s_%s_%s" % (msg_type, tg_msg.chat.id, tg_msg.message_id, int(time.time()))
fullpath = os.path.join(path, fname)
f.download(fullpath)
mime = getattr(file_obj, "mime_type", magic.from_file(fullpath, mime=True))
if type(mime) is bytes:
mime = mime.decode()
guess_ext = mimetypes.guess_extension(mime) or ".unknown"
if guess_ext == ".unknown":
self.logger.warning("File %s with mime %s has no matching extensions.", fullpath, mime)
ext = ".jpeg" if mime == "image/jpeg" else guess_ext
os.rename(fullpath, "%s%s" % (fullpath, ext))
fullpath = "%s%s" % (fullpath, ext)
return fullpath, mime
def _produce_one_sample(self):
dirname = os.path.dirname(self.path)
if not check_dir(dirname):
raise ValueError("Invalid data path.")
with open(self.path, 'r') as fid:
flist = [l.strip() for l in fid.xreadlines()]
if self.shuffle:
random.shuffle(flist)
input_files = [os.path.join(dirname, 'input', f) for f in flist]
output_files = [os.path.join(dirname, 'output', f) for f in flist]
self.nsamples = len(input_files)
input_queue, output_queue = tf.train.slice_input_producer(
[input_files, output_files], shuffle=self.shuffle,
seed=0123, num_epochs=self.num_epochs)
if '16-bit' in magic.from_file(input_files[0]):
input_dtype = tf.uint16
input_wl = 65535.0
else:
input_wl = 255.0
input_dtype = tf.uint8
if '16-bit' in magic.from_file(output_files[0]):
output_dtype = tf.uint16
output_wl = 65535.0
else:
output_wl = 255.0
output_dtype = tf.uint8
input_file = tf.read_file(input_queue)
output_file = tf.read_file(output_queue)
if os.path.splitext(input_files[0])[-1] == '.jpg':
im_input = tf.image.decode_jpeg(input_file, channels=3)
else:
im_input = tf.image.decode_png(input_file, dtype=input_dtype, channels=3)
if os.path.splitext(output_files[0])[-1] == '.jpg':
im_output = tf.image.decode_jpeg(output_file, channels=3)
else:
im_output = tf.image.decode_png(output_file, dtype=output_dtype, channels=3)
# normalize input/output
sample = {}
with tf.name_scope('normalize_images'):
im_input = tf.to_float(im_input)/input_wl
im_output = tf.to_float(im_output)/output_wl
inout = tf.concat([im_input, im_output], 2)
fullres, inout = self._augment_data(inout, 6)
sample['lowres_input'] = inout[:, :, :3]
sample['lowres_output'] = inout[:, :, 3:]
sample['image_input'] = fullres[:, :, :3]
sample['image_output'] = fullres[:, :, 3:]
return sample