def __init__(self,file,dest):
pymagic = magic.Magic(uncompress=False)
filetype = pymagic.from_file(file)
if 'zip archive' in filetype.lower():
file = zipfile.ZipFile(file)
file.extractall(dest)
elif 'gzip' in filetype.lower():
f = gzip.open(dest)
sp3file = f.read()
f.close()
python类Magic()的实例源码
def get(self, request, *args, **kwargs):
path = kwargs.get("path")
# No path? You're boned. Move along.
if not path:
raise Http404
if self._is_url(path):
content = requests.get(path, stream=True).raw.read()
else:
# Normalise the path to strip out naughty attempts
path = os.path.normpath(path).replace(
settings.MEDIA_URL, settings.MEDIA_ROOT, 1)
# Evil path request!
if not path.startswith(settings.MEDIA_ROOT):
raise Http404
# The file requested doesn't exist locally. A legit 404
if not os.path.exists(path):
raise Http404
with open(path, "rb") as f:
content = f.read()
content = Cryptographer.decrypted(content)
return HttpResponse(
content, content_type=magic.Magic(mime=True).from_buffer(content))
def get_filetype(data):
"""There are two versions of python-magic floating around, and annoyingly, the interface
changed between versions, so we try one method and if it fails, then we try the other.
NOTE: you may need to alter the magic_file for your system to point to the magic file."""
if sys.modules.has_key('magic'):
try:
ms = magic.open(magic.MAGIC_NONE)
ms.load()
return ms.buffer(data)
except:
try:
return magic.from_buffer(data)
except magic.MagicException:
magic_custom = magic.Magic(magic_file='C:\windows\system32\magic')
return magic_custom.from_buffer(data)
return ''
def create_task_log(self, task_id, case_task_log):
"""
:param task_id: Task identifier
:param case_task_log: TheHive log
:type case_task_log: CaseTaskLog defined in models.py
:return: TheHive log
:rtype: json
"""
req = self.url + "/api/case/task/{}/log".format(task_id)
data = {'_json': json.dumps({"message":case_task_log.message})}
if case_task_log.file:
f = {'attachment': (os.path.basename(case_task_log.file), open(case_task_log.file, 'rb'), magic.Magic(mime=True).from_file(case_task_log.file))}
try:
return requests.post(req, data=data,files=f, proxies=self.proxies, auth=self.auth, verify=self.cert)
except requests.exceptions.RequestException as e:
sys.exit("Error: {}".format(e))
else:
try:
return requests.post(req, headers={'Content-Type': 'application/json'}, data=json.dumps({'message':case_task_log.message}), proxies=self.proxies, auth=self.auth, verify=self.cert)
except requests.exceptions.RequestException as e:
sys.exit("Error: {}".format(e))
def magic(indata, mime=False):
"""
Performs file magic while maintaining compatibility with different
libraries.
"""
try:
if mime:
mymagic = magic.open(magic.MAGIC_MIME_TYPE)
else:
mymagic = magic.open(magic.MAGIC_NONE)
mymagic.load()
except AttributeError:
mymagic = magic.Magic(mime)
mymagic.file = mymagic.from_file
return mymagic.file(indata)
def get_input_file_type(input_file: str) -> str:
if os.environ.get("CONDA_PREFIX"):
magic_file = "{}/share/misc/magic.mgc".format(os.environ["CONDA_PREFIX"])
else:
magic_file = os.environ.get("LIBMAGIC_FILE_PATH")
if magic_file:
mime_type = magic.Magic(magic_file=magic_file, mime=True).from_file(input_file)
else:
eprint("Magic file was not found so python-magic will probably fail. Set LIBMAGIC_FILE_PATH environment variable with path to 'magic.mgc' file (usually '/usr/share/misc/magic.mgc'.")
mime_type = magic.Magic(mime=True).from_file(input_file)
input_type = mime_type.split("/")
if input_type[1] == "pdf":
return "pdf"
elif input_type[0] == "image":
return "image"
elif input_type[0] == "text":
return "text"
else:
return mime_type
def magic(indata, mime=False):
"""
Performs file magic while maintaining compatibility with different
libraries.
"""
try:
if mime:
mymagic = magic.open(magic.MAGIC_MIME_TYPE)
else:
mymagic = magic.open(magic.MAGIC_NONE)
mymagic.load()
except AttributeError:
mymagic = magic.Magic(mime)
mymagic.file = mymagic.from_file
return mymagic.file(indata)
def __init__(self, filepath):
self.path = filepath
self.filename = os.path.basename(filepath)
self.stream = open(filepath, 'r').read()
if magic.Magic(mime=True).from_file(filepath) == 'application/x-dosexec':
try:
self.pe = pefile.PE(filepath)
self.pedict = self.pe.dump_dict()
except Exception as excp:
print('Failed processing %s') % filepath
# Magic
def magic(self):
return magic.Magic().from_file(self.path)
def mimetype(self):
return magic.Magic(mime=True).from_file(self.path)
# FileType
def __init__(self):
self.magic = Magic(mime=True)
def __init__(self, config):
self.config = config
self.twitter = None
self.magic = Magic(mime=True)
def main(argv=None):
if argv is None:
argv = sys.argv
p = argparse.ArgumentParser(
description='SAVAPI Client',
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
p.add_argument('--savapi-host', dest='host', help='SAVAPI service host',
default='127.0.0.1')
p.add_argument('--savapi-port', dest='port', help='SAVAPI service port',
default=9999, type=int)
p.add_argument('file', help='Absolute path of file')
try:
args = p.parse_args(argv[1:])
except SystemExit:
return 2
log = logging.getLogger()
log.addHandler(logging.StreamHandler())
log.setLevel(logging.DEBUG)
archive_mimes = ('application/zip', 'application/x-tar', 'application/rar')
if magic:
mime = magic.Magic(mime=True)
mimetype = mime.from_file(args.file)
else:
mimes = MimeTypes()
mimetype, _ = mimes.guess_type(args.file)
with SAVAPIClient(args.host, args.port) as savapi:
r = savapi.command('SET', 'PRODUCT', '10776')
log.info(r)
if mimetype in archive_mimes:
r = savapi.command('SET', 'ARCHIVE_SCAN', '1')
log.info(r)
for r in savapi.scan(args.file):
print('{} {} <<< {}'.format(r.code, r.definition, r.data))
if int(r.code) in [319, 350]:
savapi.command('QUIT')
def __init__(self):
magic.Magic.__init__(self, mime=True, mime_encoding=False,
keep_going=False)
def get(self, request, *args, **kwargs):
# We get file last version
try:
version = FileVersion.objects\
.filter(file_id=self.object.id)\
.latest('created_at')
mime = magic.Magic(mime=True, magic_file=settings.MAGIC_BIN)
mime_type = mime.from_file(version.data.path)
response = HttpResponse(version.data.read(), content_type=mime_type)
return response
except FileVersion.DoesNotExist:
raise Http404
def clean_data(self):
file = self.cleaned_data['data']
# We ensure file have correct mime type
allowed_types = self.folder.allowed_types.all()
mime = magic.Magic(mime=True, magic_file=settings.MAGIC_BIN)
if mime.from_file(file.temporary_file_path()) not in [type.mime_type for type in allowed_types]:
raise forms.ValidationError("Ce type de fichier n'est pas autorisé")
if file is not None and file.size > settings.PORTAILVA_APP['file']['file_max_size']:
raise forms.ValidationError("Votre fichier est trop lourd, la limite autorisée est de " +
str(settings.PORTAILVA_APP['file']['file_max_size'] // (1024 * 1024)) + "Mo")
return file
def get_file_encoding(file_path):
"""Return encoding for file path.
:param file_path: Path to file
:type file_path: str
:returns: encoding
:rtype: str
"""
magic_instance = magic.Magic(mime_encoding=True)
encoding = magic_instance.from_file(file_path)
return encoding
def __init__(self, robotname, session, datalayer):
self.robotname = robotname
self.session = session
self.datalayer = datalayer
self.max_tries = config.read('Robots', 'MaxTries')
self.in_progress = set()
self.magic = magic.Magic(flags=magic.MAGIC_MIME_TYPE)
self.robotslog = config.read('Logging', 'Robotslog')
if self.robotslog:
self.robotslogfd = open(self.robotslog, 'a')
else:
self.robotslogfd = None
def test_magic():
'''
The python filemagic package requires the OS libmagic package,
so let's test it to be sure nothing's missing
'''
with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m:
pdf_string = '%PDF-1.3\n'
assert m.id_buffer(pdf_string) == 'application/pdf'
html_string = '<html>\n'
assert m.id_buffer(html_string) == 'text/html'
def get_mime_type(filename):
""" Returns the MIME type associated with a particular audio file.
"""
try:
import magic
except ImportError:
if get_mime_type.warn:
logger.warning('Python package "magic" could not be loaded, '
'possibly because system library "libmagic" could not be '
'found. We are falling back on our own heuristics.')
get_mime_type.warn = False
ext = os.path.splitext(filename)[1].lower()
return {
'.wav' : 'audio/x-wav',
'.mp3' : 'audio/mpeg',
'.flac' : 'audio/x-flac'
}.get(ext, 'unknown')
else:
# Read off magic numbers and return MIME types
mime_magic = magic.Magic(mime=True)
ftype = mime_magic.from_file(filename)
if isinstance(ftype, bytes):
ftype = ftype.decode('utf-8')
# If we are dealing with a symlink, read the link
# and try again with the target file. We do this in
# a while loop to cover the case of symlinks which
# point to other symlinks
current_filename = filename
while ftype == 'inode/symlink':
current_filename = os.readlink(current_filename)
ftype = mime_magic.from_file(current_filename)
ftype = ftype.decode('utf-8') if isinstance(ftype, bytes) else ftype
return ftype
def get(self, request, key=None, **kwargs):
image_fp = _get_image_fp(key)
magic = Magic(mime=True)
content_type = magic.from_buffer(image_fp.read(1024))
image_fp.seek(0)
return HttpResponse(image_fp, content_type=content_type)
def __init__(self, **attributes):
if attributes.get('json', False):
attributes = attributes['json']
self.dataType = attributes.get('dataType', None)
self.message = attributes.get('message', None)
self.tlp = attributes.get('tlp', 2)
self.tags = attributes.get('tags', [])
self.ioc = attributes.get('ioc', False)
data = attributes.get('data', [])
if self.dataType == 'file':
self.data = [{'attachment': (os.path.basename(data[0]), open(data[0], 'rb'), magic.Magic(mime=True).from_file(data[0]))}]
else:
self.data = data
def _prepare_file_data(self, file_path):
with open(file_path, "rb") as file_artifact:
filename = os.path.basename(file_path)
mime = magic.Magic(mime=True).from_file(file_path)
encoded_string = base64.b64encode(file_artifact.read())
return "{};{};{}".format(filename, mime, encoded_string.decode())
def run_restore(self, args):
"""Run the restore operation."""
if os.path.isdir(args.infile):
self.logger.info('Setting up PngDirImporter')
importer = PngDirImporter(args.infile,
args.fnamepattern)
else:
m = magic.Magic(mime=True, uncompress=True)
ftype = m.from_file(args.infile)
if ftype == "image/png":
importer = ImageImporter(args.infile)
else:
raise Exception('Could not detect import type of file %s' % args.infile)
decodefunc = base64.b85decode #FIXME: add arg
with open(args.outfile, "wb") as outfile:
for image in importer:
encdatalist = zbarlight.scan_codes('qrcode', image)
for encdata in encdatalist:
frame = decodefunc(encdata)
bindata, position = self.unframe_data(frame)
outfile.seek(position)
outfile.write(bindata)
self.logger.info('Finished importing')
# Cant calculate during writing because we may be reading pieces
# out of order
if args.sha256:
hashfunc = hashlib.sha256()
with open(args.outfile, "rb") as outfile:
while True:
data = outfile.read()
if not data: break
hashfunc.update(data)
print('SHA-256 of output: %s' % hashfunc.hexdigest())
def mimeFrom(self,path):
mime = magic.Magic(mime=True)
return mime.from_file(path)
def process(self):
m = fmagic.Magic(mime=True)
return m.from_buffer(self.buffer)
def upload(self, filename, configuration=None, metadata=None, transcript=None):
"""
Upload new new media to the service as an attachment or from a url.
HTTP POST on /media
:param filename: Media file attached to the request.
:param configuration: VoicebaseMediaConfiguration
:param metadata: VoicebaseMediaMeta
:param transcript: attached transcript
:return: VoicebaseMedia
"""
data = {}
if metadata:
data['metadata'] = str(metadata)
if configuration:
data['configuration'] = str(configuration)
# Determine mime type
m = magic.Magic(mime=True)
mime_type = m.from_file(filename)
# Open file and pipe to request
with open(filename) as handle:
file_info = [('media', (filename, handle, mime_type))]
rq = requests.Request(b'POST', self.full_url('base'), data=data, headers=self.session.headers,
files=file_info)
prepared_rq = rq.prepare()
response = self.session.send(prepared_rq)
response.raise_for_status()
jsn = response.json()
log.debug('Upload response: {}'.format(jsn))
return VoicebaseMedia(jsn, api=self.api)
def _get_file_type(full_targ_path):
# This function takes the full path of a target sample and determines/returns the file type via python-magic.
try:
#magicObj = magic.open(magic.MAGIC_NONE)
#magicObj.load()
#magic_out = str(magicObj.file(full_targ_path))
magicObj = magic.Magic(magic_file=r'C:/Program Files (x86)/GnuWin32/share/misc/magic', mime=True)
magic_out = str(magicObj.from_file(full_targ_path))
print magic_out
except AttributeError:
magic_out = str(magic.from_file(full_targ_path))
print magic_out+" ERROR?!?!?!!?"
return(magic_out)
def contenttype(payload):
mime = magic.Magic(mime=True)
return mime.from_buffer(payload)
def _importMagic():
if config.enable_magic:
try:
import magic
return magic.Magic(magic_file=config.magic_file, mime=True)
l.debug("libmagic enabled.")
except:
l.debug("libmagic cannot be imported")
return None