python类from_file()的实例源码

local.py 文件源码 项目:S4 作者: MichaelAquilina 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _load_index(self):
        index_path = self.index_path()
        if not os.path.exists(index_path):
            return {}

        content_type = magic.from_file(index_path, mime=True)
        if content_type == 'text/plain':
            logger.debug('Detected plaintext encoding for reading index')
            method = open
        elif content_type in ('application/gzip', 'application/x-gzip'):
            logger.debug('Detected gzip encoding for reading index')
            method = gzip.open
        else:
            raise ValueError('Index is of unknown type', content_type)

        with method(index_path, 'rt') as fp:
            data = json.load(fp)
        return data
engine.py 文件源码 项目:refextract 作者: inspirehep 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_plaintext_document_body(fpath, keep_layout=False):
    """Given a file-path to a full-text, return a list of unicode strings
       whereby each string is a line of the fulltext.
       In the case of a plain-text document, this simply means reading the
       contents in from the file. In the case of a PDF however,
       this means converting the document to plaintext.
       It raises UnknownDocumentTypeError if the document is not a PDF or
       plain text.
       @param fpath: (string) - the path to the fulltext file
       @return: (list) of strings - each string being a line in the document.
    """
    textbody = []
    mime_type = magic.from_file(fpath, mime=True)

    if mime_type == "text/plain":
        with open(fpath, "r") as f:
            textbody = [line.decode("utf-8") for line in f.readlines()]

    elif mime_type == "application/pdf":
        textbody = convert_PDF_to_plaintext(fpath, keep_layout)

    else:
        raise UnknownDocumentTypeError(mime_type)

    return textbody
files.py 文件源码 项目:malgazer 作者: keithjjones 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, filename):
        """
        Creates a file object for a malware sample.

        :param filename:  The file name of the available malware sample.
        """
        if not os.path.exists(filename):
            raise ValueError("File {0} does not exist!".format(filename))

        # Default settings of members
        self.running_entropy_data = None
        self.running_entropy_window_size = 0
        self.file_size = 0
        self.parsedfile = None

        # Fill out other data here...
        self.filename = filename
        self.data = list()
        self.filetype = magic.from_file(self.filename)
        self._read_file()
        self._parse_file_type()
objects.py 文件源码 项目:Snakepit 作者: K4lium 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_type(self):
        try:
            ms = magic.open(magic.MAGIC_NONE)
            ms.load()
            file_type = ms.file(self.path)
        except:
            try:
                file_type = magic.from_file(self.path)
            except:
                try:
                    import subprocess
                    file_process = subprocess.Popen(['file', '-b', self.path], stdout = subprocess.PIPE)
                    file_type = file_process.stdout.read().strip()
                except:
                    return ''
        finally:
            try:
                ms.close()
            except:
                pass

        return file_type
file_parsing.py 文件源码 项目:open-wob-api 作者: openstate 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def file_parser(fname, pages=None):
    if magic.from_file(fname, mime=True) == 'application/pdf':
        try:
            text_array = []
            d = pdf.Document(fname)
            for i, p in enumerate(d, start=1):
                for f in p:
                    for b in f:
                        for l in b:
                            text_array.append(l.text.encode('UTF-8'))

                if i == pages:  # break after x pages
                    break

            print "Processed %i pages" % (i)
            return '\n'.join(text_array)
        except Exception as e:
            print "PDF Parser Exception: ", e
    else:
        try:
            content = parser.from_file(fname)['content']
            return (content or '').encode('UTF-8')
        except Exception as e:
            print "File Parser Exception: ", e
eh_wechat_slave.py 文件源码 项目:ehForwarderBot 作者: blueset 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def save_file(self, msg, msg_type):
        path = os.path.join("storage", self.channel_id)
        if not os.path.exists(path):
            os.makedirs(path)
        filename = "%s_%s_%s" % (msg_type, msg['NewMsgId'], int(time.time()))
        fullpath = os.path.join(path, filename)
        msg['Text'](fullpath)
        mime = magic.from_file(fullpath, mime=True)
        if isinstance(mime, bytes):
            mime = mime.decode()
        guess_ext = mimetypes.guess_extension(mime) or ".unknown"
        if guess_ext == ".unknown":
            self.logger.warning("File %s with mime %s has no matching extensions.", fullpath, mime)
        ext = ".jpeg" if mime == "image/jpeg" else guess_ext
        os.rename(fullpath, "%s%s" % (fullpath, ext))
        fullpath = "%s%s" % (fullpath, ext)
        self.logger.info("File saved from WeChat\nFull path: %s\nMIME: %s", fullpath, mime)
        return fullpath, mime
file_scan.py 文件源码 项目:gibbersense 作者: smxlabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def file_magic(in_file):


   print "\n\t\tFile Type :", magic.from_file(in_file)
sample.py 文件源码 项目:polichombr 作者: ANSSI-FR 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def do_sample_type_detect(datafile):
        """
            Checks the datafile type's.
        """
        mtype = magic.from_file(datafile, mime=True)
        stype = magic.from_file(datafile)
        return (mtype, stype)
fetcher.py 文件源码 项目:csirtg-smrt-py 作者: csirtgadgets 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _process_cache(self, split="\n", rstrip=True):
        try:
            ftype = magic.from_file(self.cache, mime=True)
        except AttributeError:
            try:
                mag = magic.open(magic.MAGIC_MIME)
                mag.load()
                ftype = mag.file(self.cache)
            except AttributeError as e:
                raise RuntimeError('unable to detect cached file type')

        if PYVERSION < 3:
            ftype = ftype.decode('utf-8')

        if ftype.startswith('application/x-gzip') or ftype.startswith('application/gzip'):
            from csirtg_smrt.decoders.zgzip import get_lines
            for l in get_lines(self.cache, split=split):
                yield l

            return

        if ftype == "application/zip":
            from csirtg_smrt.decoders.zzip import get_lines
            for l in get_lines(self.cache, split=split):
                yield l

            return

        # all others, mostly txt, etc...
        with open(self.cache) as f:
            for l in f:
                yield l
zcontent.py 文件源码 项目:csirtg-smrt-py 作者: csirtgadgets 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_mimetype(f):
    try:
        ftype = magic.from_file(f, mime=True)
    except AttributeError:
        try:
            mag = magic.open(magic.MAGIC_MIME)
            mag.load()
            ftype = mag.file(f)
        except AttributeError as e:
            raise RuntimeError('unable to detect cached file type')

    if PYVERSION < 3:
        ftype = ftype.decode('utf-8')

    return ftype
analysis.py 文件源码 项目:do-portal 作者: certeu 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def preprocess(sample):
    """Preprocess files after upload.

    :param sample: :class:`~app.models.Sample`
    :return:
    """
    hash_path = os.path.join(
        current_app.config['APP_UPLOADS_SAMPLES'],
        sample.sha256
    )
    if zipfile.is_zipfile(hash_path):
        mt = magic.from_file(hash_path, mime=True)
        if mt in skip_mimes:
            return None
        current_app.log.debug('Extracting {}'.format(hash_path))
        zfile = zipfile.ZipFile(hash_path)
        for zipfo in zfile.namelist():
            cfg = current_app.config
            if zfile.getinfo(zipfo).compress_type == 99:  # PK compat. v5.1
                pwd = '-p{}'.format(cfg['INFECTED_PASSWD'])
                with popen('7z', 'e', '-so', pwd, hash_path) as zproc:
                    buf, stderr = zproc.communicate()
            else:
                buf = zfile.read(zipfo,
                                 pwd=bytes(cfg['INFECTED_PASSWD'], 'utf-8'))
            digests = get_hashes(buf)
            hash_path = os.path.join(cfg['APP_UPLOADS_SAMPLES'],
                                     digests.sha256)
            if not os.path.isfile(hash_path):
                with open(hash_path, 'wb') as wf:
                    wf.write(buf)
            s = Sample(user_id=sample.user_id, filename=zipfo,
                       parent_id=sample.id,
                       md5=digests.md5, sha1=digests.sha1,
                       sha256=digests.sha256, sha512=digests.sha512,
                       ctph=digests.ctph)
            db.session.add(s)
            db.session.commit()
style50.py 文件源码 项目:style50 作者: cs50 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _check(self, file):
        """
        Run apropriate check based on `file`'s extension and return it,
        otherwise raise an Error
        """

        if not os.path.exists(file):
            raise Error("file \"{}\" not found".format(file))

        _, extension = os.path.splitext(file)
        try:
            check = self.extension_map[extension[1:]]
        except KeyError:
            magic_type = magic.from_file(file)
            for name, cls in self.magic_map.items():
                if name in magic_type:
                    check = cls
                    break
            else:
                raise Error("unknown file type \"{}\", skipping...".format(file))

        try:
            with open(file) as f:
                code = f.read()
        except UnicodeDecodeError:
            raise Error("file does not seem to contain text, skipping...")

        # Ensure we don't warn about adding trailing newline
        try:
            if code[-1] != '\n':
                code += '\n'
        except IndexError:
            pass

        return check(code)
upload.py 文件源码 项目:oclubs 作者: SHSIDers 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def handle(cls, user, club, file):
        filename = os.urandom(8).encode('hex')
        temppath = os.path.join('/tmp', filename)
        file.save(temppath)

        try:
            # Don't use mimetypes.guess_type(temppath) -- Faked extensions
            mime = magic.from_file(temppath, mime=True)
            if mime not in cls._mimedict:
                raise UploadNotSupported

            filename = filename + cls._mimedict[mime]
            permpath = cls.mk_internal_path(filename)
            permdir = os.path.dirname(permpath)
            if not os.path.isdir(permdir):
                os.makedirs(permdir, 0o755)

            # resize to 600, 450
            cls._thumb(temppath, permpath)
            fs.watch(permpath)
        finally:
            os.remove(temppath)

        obj = cls.new()
        obj.club = club
        obj.uploader = user
        obj._location = filename
        obj.mime = mime
        return obj.create()
pdf.py 文件源码 项目:validatemyfile 作者: daisieh 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def check(filepath):
    result = magic.from_file(filepath, mime=True)
    if re.match('application/pdf', result):
        return True
    return False
filter_exe.py 文件源码 项目:guest-images 作者: S2E 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_magic(filename):
    if g_m:
        return g_m.file(filename)
    else:
        return magic.from_file(filename)
files.py 文件源码 项目:PeekabooAV 作者: scVENUS 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def guess_mime_type_from_file_contents(file_path):
    """  Get type from file magic bytes. """
    mt = magic.from_file(file_path, mime=True)
    if mt:
        return mt
file.py 文件源码 项目:fame 作者: certsocietegenerale 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _compute_default_properties(self):
        self['names'] = [os.path.basename(self['filepath'])]
        self['detailed_type'] = magic.from_file(self['filepath'])
        self['mime'] = magic.from_file(self['filepath'], mime=True)
        self['analysis'] = []

        # Init antivirus status
        self['antivirus'] = {}

        for module in dispatcher.get_antivirus_modules():
            self['antivirus'][module.name] = False

        self._set_type()

    # Convert mime/types into clearer type
models.py 文件源码 项目:web_develop 作者: dongweiming 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def create_by_old_paste(cls, filehash):
        filepath = get_file_path(filehash)
        mimetype = magic.from_file(filepath, mime=True)
        filestat = os.stat(filepath)
        size = filestat.st_size

        rst = cls(filehash, mimetype, size, filehash=filehash)
        return rst
app.py 文件源码 项目:web_develop 作者: dongweiming 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def create_by_old_paste(cls, filehash, symlink):
        filepath = get_file_path(filehash)
        mimetype = magic.from_file(filepath, mime=True)
        filestat = os.stat(filepath)
        size = filestat.st_size

        rst = cls(filehash, mimetype, size, filehash=filehash, symlink=symlink)
        return rst
models.py 文件源码 项目:web_develop 作者: dongweiming 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def create_by_old_paste(cls, filehash):
        filepath = get_file_path(filehash)
        mimetype = magic.from_file(filepath, mime=True)
        filestat = os.stat(filepath)
        size = filestat.st_size

        rst = cls(filehash, mimetype, size, filehash=filehash)
        return rst
models.py 文件源码 项目:web_develop 作者: dongweiming 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def create_by_old_paste(cls, filehash):
        filepath = get_file_path(filehash)
        mimetype = magic.from_file(filepath, mime=True)
        filestat = os.stat(filepath)
        size = filestat.st_size

        rst = cls(filehash, mimetype, size, filehash=filehash)
        return rst
models.py 文件源码 项目:web_develop 作者: dongweiming 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def create_by_old_paste(cls, filehash):
        filepath = get_file_path(filehash)
        mimetype = magic.from_file(filepath, mime=True)
        filestat = os.stat(filepath)
        size = filestat.st_size

        rst = cls(filehash, mimetype, size, filehash=filehash)
        return rst
load_imagenet.py 文件源码 项目:OneNet 作者: image-science-lab 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def load_pickle(pickle_path, dataset_path):
    if not os.path.exists(pickle_path):

        import magic

        image_files = []
        for dir, _, _, in os.walk(dataset_path):
            filenames = glob.glob( os.path.join(dir, '*.JPEG'))  # may be JPEG, depending on your image files
            image_files.append(filenames)

            ## use magic to perform a simple check of the images
            # import magic
            # for filename in filenames:
            #   if magic.from_file(filename, mime=True) == 'image/jpeg':
            #       image_files.append(filename)
            #   else:
            #       print '%s is not a jpeg!' % filename
            #       print magic.from_file(filename)

        if len(image_files) > 0:
            image_files = np.hstack(image_files)

        dataset_filenames = {'image_path':image_files}
        pickle.dump( dataset_filenames, open( pickle_path, "wb" ) )
    else:
        dataset_filenames = pickle.load( open( pickle_path, "rb" ) )
    return dataset_filenames


# return a pd object
code_caves.py 文件源码 项目:find_pe_caves 作者: marcoramilli 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_executables(files):
    """
    Filters the only executable files from a files array
    """
    exec_files = []
    for file in files:
        if "executable" in magic.from_file(file):
            exec_files.append(file)
    return exec_files
magic_characterizer_mixin.py 文件源码 项目:loris-redux 作者: jpstroop 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _get_and_cache(file_path, supported_formats):
        mime_type = from_file(file_path, mime=True)
        try:
            fmt = supported_formats[mime_type]
            MagicCharacterizerMixin._cache[file_path] = fmt
            return fmt
        except KeyError:
            message = '{0} characterized as {1} format, which is not supported'
            message = message.format(file_path, mime_type)
            raise UnsupportedFormat(message, http_status_code=500)
check_file.py 文件源码 项目:SSMA 作者: secrary 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def file_info(self, report):
        info = []
        with open(self.filename, 'rb') as f:
            file = f.read()
            if report == "output":
                return ""
            else:
                info.append("File: {}".format(self.filename))
                info.append("Size: {} bytes".format(os.path.getsize(self.filename)))
                info.append("Type: {}".format(magic.from_file(self.filename, mime=True)))
                info.append("MD5: {}".format(hashlib.md5(file).hexdigest()))
                info.append("SHA1: {}".format(hashlib.sha1(file).hexdigest()))
                if ssdeep_r:
                    info.append("ssdeep: {}".format(self.get_ssdeep()))
        return info
check_file.py 文件源码 项目:SSMA 作者: secrary 项目源码 文件源码 阅读 64 收藏 0 点赞 0 评论 0
def file_info(filename):
    info = []
    with open(filename, 'rb') as f:
        file = f.read()
        info.append("File: {}".format(filename))
        info.append("Size: {} bytes".format(os.path.getsize(filename)))
        info.append("Type: {}".format(magic.from_file(filename, mime=True)))
        info.append("MD5:  {}".format(hashlib.md5(file).hexdigest()))
        info.append("SHA1: {}".format(hashlib.sha1(file).hexdigest()))
        if ssdeep_r:
            info.append("ssdeep: {}".format(ssdeep.hash_from_file(filename)))
    return info
__init__.py 文件源码 项目:nemesis 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def post_file():
    file_uuid = secure_filename(str(uuid.uuid4()))
    filename = '/tmp/%s' % file_uuid

    try:
        file = request.files['file']
    except Exception:
        raise BadRequestException("Not a valid multipart upload form with "
                                  "key named file.")

    if 'Content-Range' in request.headers:
        # Extract starting byte from Content-Range header string.
        range_str = request.headers['Content-Range']
        start_bytes = int(range_str.split(' ')[1].split('-')[0])

        # Append chunk to the file on disk, or create new.
        with open(filename, 'a') as f:
            f.seek(start_bytes)
            f.write(file.stream.read())

    else:
        # This is not a chunked request, so just save the whole file.
        file.save(filename)

    # Generate hash of file, and create new, or renew existing db row.
    file_hashes = get_all_hashes(filename)
    file_size = os.path.getsize(filename)
    file_type = magic.from_file(filename, mime=True)
    file = create_or_renew_by_hash(file_hashes, file_size, file_type)
    file_id = file.file_id
    file_dict = file.to_dict()

    # Upload to swift and remove the local temp file.
    upload_to_swift(filename, file_uuid)
    os.remove(filename)

    # Send message to worker queue with file details.
    worker_msg = {"file_uuid": file_uuid, "file_id": file_id}
    submit_worker_notification(worker_msg)

    return jsonify(file_dict)
dx-starseqr.py 文件源码 项目:STAR-SEQR 作者: ExpressionAnalysis 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def maybe_gunzip(fname, base, ext):
    if fname and 'gzip' in magic.from_file(fname):
        start = time.time()
        print("Gunzip file " + str(fname))
        newf = safe_fname(base, ext)
        sh("gunzip", fname, "-c >", newf)
        fname = newf
        print("Gunzip took %g seconds" % (time.time() - start))
    return fname
download_test_data.py 文件源码 项目:earthio 作者: ContinuumIO 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def get_filetype(fpath):
    """Return a mime-style filetype string."""
    return magic.from_file(fpath, mime=True)


问题


面经


文章

微信
公众号

扫码关注公众号