python类open()的实例源码

utils.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def open(self):
            self.__inPos = 0
            self.__inPosOld = 0
            self.__outFile = None
            self.__current = DirHasher.FileIndex.Stat()
            try:
                if os.path.exists(self.__cachePath):
                    self.__inFile = open(self.__cachePath, "rb")
                    sig = self.__inFile.read(4)
                    if sig == DirHasher.FileIndex.SIGNATURE:
                        self.__mismatch = False
                        self.__inPos = self.__inPosOld = 4
                        self.__readEntry() # prefetch first entry
                    else:
                        logging.getLogger(__name__).info(
                            "Wrong signature at '%s': %s", self.__cachePath, sig)
                        self.__inFile.close()
                        self.__inFile = None
                        self.__mismatch = True
                else:
                    self.__inFile = None
                    self.__mismatch = True
            except OSError as e:
                raise BuildError("Error opening hash cache: " + str(e))
apk.py 文件源码 项目:ApkParser 作者: yigitozgumus 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def parse_icon(self, icon_path=None):
        """
        parse icon.
        :param icon_path: icon storage path
        """
        if not icon_path:
            icon_path = os.path.dirname(os.path.abspath(__file__))

        pkg_name_path = os.path.join(icon_path, self.package)
        if not os.path.exists(pkg_name_path):
            os.mkdir(pkg_name_path)

        aapt_line = "aapt dump badging %s | grep 'application-icon' | awk -F ':' '{print $2}'" % self.get_filename()
        parse_icon_rt = os.popen(aapt_line).read()
        icon_paths = [icon.replace("'", '') for icon in parse_icon_rt.split('\n') if icon]

        zfile = zipfile.ZipFile(StringIO.StringIO(self.__raw), mode='r')
        for icon in icon_paths:
            icon_name = icon.replace('/', '_')
            data = zfile.read(icon)
            with open(os.path.join(pkg_name_path, icon_name), 'w+b') as icon_file:
                icon_file.write(data)
        print "APK ICON in: %s" % pkg_name_path
createdb.py 文件源码 项目:binaryanalysis 作者: armijnhemel 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def readrewritelist(rewritelist):
    ## rewrite is a hash. Key is sha256 of the file.
    rewrite = {}
    try:
        rewritefile = open(rewritelist, 'r')
        rewritelines = rewritefile.readlines()
        rewritefile.close()
        for r in rewritelines:
            rs = r.strip().split()
            ## format error, bail out
            if len(rs) != 7:
                return {}
            (package, version, filename, origin, sha256, newp, newv) = rs
            ## dupe, skip
            if sha256 in rewrite:
                continue
            rewrite[sha256] = {'package': package, 'version': version, 'filename': filename, 'origin': origin, 'newpackage': newp, 'newversion': newv}
    except:
        return {}
    return rewrite

## split on the special characters, plus remove special control characters that are
## at the beginning and end of the string in escaped form.
## Return a list of strings.
pescanner.py 文件源码 项目:analyst-scripts 作者: Te-k 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_filetype(data):
    """There are two versions of python-magic floating around, and annoyingly, the interface
    changed between versions, so we try one method and if it fails, then we try the other.
    NOTE: you may need to alter the magic_file for your system to point to the magic file."""
    if sys.modules.has_key('magic'):
        try:
            ms = magic.open(magic.MAGIC_NONE)
            ms.load()
            return ms.buffer(data)
        except:
            try:
                return magic.from_buffer(data)
            except magic.MagicException:
                magic_custom = magic.Magic(magic_file='C:\windows\system32\magic')
                return magic_custom.from_buffer(data)
    return ''
apk.py 文件源码 项目:Tacita 作者: Quantika14 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def parse_icon(self, icon_path=None):
        """
        parse icon.
        :param icon_path: icon storage path
        """
        if not icon_path:
            icon_path = os.path.dirname(os.path.abspath(__file__))

        pkg_name_path = os.path.join(icon_path, self.package)
        if not os.path.exists(pkg_name_path):
            os.mkdir(pkg_name_path)

        aapt_line = "aapt dump badging %s | grep 'application-icon' | awk -F ':' '{print $2}'" % self.get_filename()
        parse_icon_rt = os.popen(aapt_line).read()
        icon_paths = [icon.replace("'", '') for icon in parse_icon_rt.split('\n') if icon]

        zfile = zipfile.ZipFile(StringIO.StringIO(self.__raw), mode='r')
        for icon in icon_paths:
            icon_name = icon.replace('/', '_')
            data = zfile.read(icon)
            with open(os.path.join(pkg_name_path, icon_name), 'w+b') as icon_file:
                icon_file.write(data)
        print "APK ICON in: %s" % pkg_name_path
static.py 文件源码 项目:cuckoodroid-2.0 作者: idanr1986 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _get_filetype(self, data):
        """Gets filetype, uses libmagic if available.
        @param data: data to be analyzed.
        @return: file type or None.
        """
        if not HAVE_MAGIC:
            return None

        try:
            ms = magic.open(magic.MAGIC_NONE)
            ms.load()
            file_type = ms.buffer(data)
        except:
            try:
                file_type = magic.from_buffer(data)
            except Exception:
                return None
        finally:
            try:
                ms.close()
            except:
                pass

        return file_type
extractor.py 文件源码 项目:firmflaws 作者: Ganapati 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def magic(indata, mime=False):
        """
        Performs file magic while maintaining compatibility with different
        libraries.
        """

        try:
            if mime:
                mymagic = magic.open(magic.MAGIC_MIME_TYPE)
            else:
                mymagic = magic.open(magic.MAGIC_NONE)
            mymagic.load()
        except AttributeError:
            mymagic = magic.Magic(mime)
            mymagic.file = mymagic.from_file
        return mymagic.file(indata)
__init__.py 文件源码 项目:rucio 作者: rucio01 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def smart_open(filename):
    '''
    Returns an open file object if `filename` is plain text, else assumes
    it is a bzip2 compressed file and returns a file-like object to
    handle it.
    '''
    if isplaintext(filename):
        f = open(filename, 'rt')
    else:
        file_type = mimetype(filename)
        if file_type.find('gzip') > -1:
            f = gzip.GzipFile(filename, 'rt')
        elif file_type.find('bzip2') > -1:
            f = bz2file.open(filename, 'rt')
        else:
            pass  # Not supported format
    return f
__init__.py 文件源码 项目:rucio 作者: rucio01 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def srm_download_to_file(url, file_):
    '''
    Download the file in `url` storing it in the `file_` file-like
    object.
    '''
    logger = logging.getLogger('dumper.__init__')
    ctx = gfal2.creat_context()  # pylint: disable=no-member
    infile = ctx.open(url, 'r')

    try:
        chunk = infile.read(CHUNK_SIZE)
    except GError as e:
        if e[1] == 70:
            logger.debug('GError(70) raised, using GRIDFTP PLUGIN:STAT_ON_OPEN=False workarround to download %s', url)
            ctx.set_opt_boolean('GRIDFTP PLUGIN', 'STAT_ON_OPEN', False)
            infile = ctx.open(url, 'r')
            chunk = infile.read(CHUNK_SIZE)
        else:
            raise

    while chunk:
        file_.write(chunk)
        chunk = infile.read(CHUNK_SIZE)
auto_mal.py 文件源码 项目:auto_mal 作者: 0xhughes 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _hardcode_setup():
    # A user may want to hardcode in and out paths for their analysis files. If no config file exists, it will ask if you want to store your in/out options in one and use them from then on out.
    sane = 0
    home_dir = str(os.path.expanduser('~'))
    input_conf = raw_input('-- : --  Please enter full path to input sample directory: ')
    output_conf = raw_input('-- : --  Please enter full path to sample/analysis output directory: ')
    if not os.path.exists(input_conf) or not os.path.exists(output_conf):
        sys.exit("-- : --  Invalid paths detected. Please check input. Run again to re-enter paths.")
    print '-- : --  Is this input directory correct "'+str(input_conf)+'"?'
    print '-- : --  Is this output directory correct "'+str(output_conf)+'"?'
    while sane == 0:
        dir_choice = raw_input('-- : --  Yes or No: ')
        if dir_choice == 'Yes':
            conf_file = open(home_dir+'/automal_conf.conf', 'w')
            conf_file.write('ipath='+str(input_conf)+'\n')
            conf_file.write('opath='+str(output_conf)+'\n')
            conf_file.close()
            sane = 1
        if dir_choice == 'No':
            sys.exit('-- : --  Exited on incorrect paths. Run again to re-enter.')
        if dir_choice != 'No' and dir_choice != 'Yes':
            print '-- : --  Invalid choice, try again. Yes or No.'
static.py 文件源码 项目:CAPE 作者: ctxis 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _get_guest_digital_signers(self):
        retdata = dict()
        cert_data = dict()
        cert_info = os.path.join(CUCKOO_ROOT, "storage", "analyses",
                                 str(self.results["info"]["id"]), "aux",
                                 "DigiSig.json")

        if os.path.exists(cert_info):
            with open(cert_info, "r") as cert_file:
                buf = cert_file.read()
            if buf:
                cert_data = json.loads(buf)

        if cert_data:
            retdata = {
                "aux_sha1": cert_data["sha1"],
                "aux_timestamp": cert_data["timestamp"],
                "aux_valid": cert_data["valid"],
                "aux_error": cert_data["error"],
                "aux_error_desc": cert_data["error_desc"],
                "aux_signers": cert_data["signers"]
            }

        return retdata
file.py 文件源码 项目:diffoscope 作者: ReproducibleBuilds 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def fuzzy_hash(self):
            if not hasattr(self, '_fuzzy_hash'):
                # tlsh is not meaningful with files smaller than 512 bytes
                if os.stat(self.path).st_size >= 512:
                    h = tlsh.Tlsh()
                    with open(self.path, 'rb') as f:
                        for buf in iter(lambda: f.read(32768), b''):
                            h.update(buf)
                    h.final()
                    try:
                        self._fuzzy_hash = h.hexdigest()
                    except ValueError:
                        # File must contain a certain amount of randomness.
                        self._fuzzy_hash = None
                else:
                    self._fuzzy_hash = None
            return self._fuzzy_hash
file.py 文件源码 项目:diffoscope 作者: ReproducibleBuilds 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def has_same_content_as(self, other):
        logger.debug('File.has_same_content: %s %s', self, other)
        if os.path.isdir(self.path) or os.path.isdir(other.path):
            return False
        # try comparing small files directly first
        try:
            my_size = os.path.getsize(self.path)
            other_size = os.path.getsize(other.path)
        except OSError:
            # files not readable (e.g. broken symlinks) or something else,
            # just assume they are different
            return False
        if my_size == other_size and my_size <= SMALL_FILE_THRESHOLD:
            try:
                with profile('command', 'cmp (internal)'):
                    with open(self.path, 'rb') as file1, open(other.path, 'rb') as file2:
                        return file1.read() == file2.read()
            except OSError:
                # one or both files could not be opened for some reason,
                # assume they are different
                return False

        return self.cmp_external(other)
extractor.py 文件源码 项目:firmanal 作者: kyechou 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def magic(indata, mime=False):
        """
        Performs file magic while maintaining compatibility with different
        libraries.
        """

        try:
            if mime:
                mymagic = magic.open(magic.MAGIC_MIME_TYPE)
            else:
                mymagic = magic.open(magic.MAGIC_NONE)
            mymagic.load()
        except AttributeError:
            mymagic = magic.Magic(mime)
            mymagic.file = mymagic.from_file
        return mymagic.file(indata)
objects.py 文件源码 项目:Snakepit 作者: K4lium 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_type(self):
        try:
            ms = magic.open(magic.MAGIC_NONE)
            ms.load()
            file_type = ms.file(self.path)
        except:
            try:
                file_type = magic.from_file(self.path)
            except:
                try:
                    import subprocess
                    file_process = subprocess.Popen(['file', '-b', self.path], stdout = subprocess.PIPE)
                    file_type = file_process.stdout.read().strip()
                except:
                    return ''
        finally:
            try:
                ms.close()
            except:
                pass

        return file_type
utils.py 文件源码 项目:Snakepit 作者: K4lium 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_type(data):
    try:
        ms = magic.open(magic.MAGIC_NONE)
        ms.load()
        file_type = ms.buffer(data)
    except:
        try:
            file_type = magic.from_buffer(data)
        except:
            return ''
    finally:
        try:
            ms.close()
        except:
            pass

    return file_type
ProcessYcsbLog.py 文件源码 项目:Overlord 作者: TSDBBench 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def generate_html(p, templateFile, templateDict, outputFile, overwrite, logger):
    if not Util.check_file_exists(templateFile):
        logger.error("Template file does not exist: '%s'" %(templateFile))
        os._exit(-1)
    try:
        template = jinja2.Environment(loader = jinja2.FileSystemLoader(searchpath=os.path.split(templateFile)[0])).get_template(os.path.split(templateFile)[1])
    except Exception, e:
        logger.error("Failed load template file '%s'" %(templateFile), exc_info=True)
        os._exit(-1)
    html = bokeh.embed.file_html(models=p, resources=bokeh.resources.INLINE, title=templateDict["title"] , template=template, template_variables=templateDict)
    if Util.check_file_exists(outputFile) and not overwrite:
        logger.error("Html file does exist: '%s'. Delete or use overwrite flag." %(outputFile))
        os._exit(-1)
    try:
        file = open(outputFile,"w")
        file.write(html)
        file.close()
    except Exception, e:
        logger.error("Error while writing html file '%s'" %(outputFile), exc_info=True)
        os._exit(-1)

# generates bokeh histogram_data
# gets data from every "LatencyList"
# data2 is just data/2.0
# commented out code is old and better to read but much slower due to "key not in" - if
ProcessYcsbLog.py 文件源码 项目:Overlord 作者: TSDBBench 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def openCompressedFile(ycsbfile, dict, key, decompress, overwrite, logger):
    try:
        file = gzip.open(ycsbfile,"r")
        dict[key]=cPickle.load(file)
        file.close()
    except Exception, e:
        logger.error("Can't open '%s'. Is it really a compressed .ydc file?" %(ycsbfile), exc_info=True)
        os._exit(-1)
    # if you truly just want to decompress it, stop after saving plain ycsb file
    if decompress:
        try:
            newFileName=os.path.splitext(os.path.basename(ycsbfile))[0]+".log"
            if (not Util.check_file_exists(newFileName) or  overwrite) and os.access(".", os.W_OK):
                if key in dict.keys() and dict[key] != None:
                    decompressFile(dict[key], newFileName, logger)
                else:
                    logger.error("Dictionary does not have filecontent or is null." , exc_info=True)
                    os._exit(-1)
            else:
                logger.error("Can't create '%s' to write. Does it already exist?" %(newFileName), exc_info=True)
                os._exit(-1)
        except Exception, e:
            logger.error("Can't open '%s'." %("%s.log.log" %(os.path.basename(ycsbfile))), exc_info=True)
            os._exit(-1)
acbs_src_process.py 文件源码 项目:acbs 作者: AOSC-Dev 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def file_type(file_loc):
    try:
        import magic
    except:
        print('[W] ACBS cannot find libmagic bindings, will use bundled one instead.')
        import lib.magic as magic
    mco = magic.open(magic.MIME_TYPE | magic.MAGIC_SYMLINK)
    mco.load()
    try:
        tp = mco.file(file_loc)
        tp_list = tp.decode('utf-8').split('/')
    except:
        print('[W] Unable to determine the file type!')
        return ['unknown', 'unknown']
    return tp_list
acbs_src_process.py 文件源码 项目:acbs 作者: AOSC-Dev 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def file_type_full(file_loc):
    try:
        import magic
    except:
        print('[W] ACBS cannot find libmagic bindings, will use bundled one instead.')
        import lib.magic as magic
    mco = magic.open(magic.NONE | magic.MAGIC_SYMLINK)
    mco.load()
    try:
        tp = mco.file(file_loc)
    except:
        print('[W] Unable to determine the file type!')
        return 'data'
    return tp.decode('utf-8')
utils.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def hashFile(path):
    m = hashlib.sha1()
    try:
        with open(path, 'rb', buffering=0) as f:
            buf = f.read(16384)
            while len(buf) > 0:
                m.update(buf)
                buf = f.read(16384)
    except OSError as e:
        logging.getLogger(__name__).warning("Cannot hash file: %s", str(e))
    return m.digest()
utils.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def open(self):
            pass
utils.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def hashDirectory(self, path):
        self.__index.open()
        try:
            return self.__hashDir(os.fsencode(path))
        finally:
            self.__index.close()
utils.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def hashPath(self, path):
        path = os.fsencode(path)
        try:
            s = os.lstat(path)
        except OSError as err:
            logging.getLogger(__name__).warning("Cannot stat '%s': %s", path, str(err))
            return b''

        self.__index.open()
        try:
            return self.__hashEntry(path, b'', s)
        finally:
            self.__index.close()
fetcher.py 文件源码 项目:csirtg-smrt-py 作者: csirtgadgets 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _process_cache(self, split="\n", rstrip=True):
        try:
            ftype = magic.from_file(self.cache, mime=True)
        except AttributeError:
            try:
                mag = magic.open(magic.MAGIC_MIME)
                mag.load()
                ftype = mag.file(self.cache)
            except AttributeError as e:
                raise RuntimeError('unable to detect cached file type')

        if PYVERSION < 3:
            ftype = ftype.decode('utf-8')

        if ftype.startswith('application/x-gzip') or ftype.startswith('application/gzip'):
            from csirtg_smrt.decoders.zgzip import get_lines
            for l in get_lines(self.cache, split=split):
                yield l

            return

        if ftype == "application/zip":
            from csirtg_smrt.decoders.zzip import get_lines
            for l in get_lines(self.cache, split=split):
                yield l

            return

        # all others, mostly txt, etc...
        with open(self.cache) as f:
            for l in f:
                yield l
fetcher.py 文件源码 项目:csirtg-smrt-py 作者: csirtgadgets 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _cache_write(self, s):
        with open(self.cache, 'wb') as f:
            auth = False
            if self.username:
                auth = (self.username, self.password)

            resp = self._cache_refresh(s, auth)
            if not resp:
                return

            for block in resp.iter_content(1024):
                f.write(block)
zcontent.py 文件源码 项目:csirtg-smrt-py 作者: csirtgadgets 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_mimetype(f):
    try:
        ftype = magic.from_file(f, mime=True)
    except AttributeError:
        try:
            mag = magic.open(magic.MAGIC_MIME)
            mag.load()
            ftype = mag.file(f)
        except AttributeError as e:
            raise RuntimeError('unable to detect cached file type')

    if PYVERSION < 3:
        ftype = ftype.decode('utf-8')

    return ftype
zcontent.py 文件源码 项目:csirtg-smrt-py 作者: csirtgadgets 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_type(f, mime=None):
    if not mime:
        mime = get_mimetype(f)

    if isinstance(f, str):
        f = open(f)

    t = None
    for tt in TESTS:
        f.seek(0)
        t = tt(f, mime)
        if t:
            return t
filter_exe.py 文件源码 项目:guest-images 作者: S2E 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def detect_magic():
    global g_m
    if hasattr(magic, 'open'):
        g_m = magic.open(magic.MAGIC_SYMLINK)
        g_m.load()
comparebinaries.py 文件源码 项目:binaryanalysis 作者: armijnhemel 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def gethash(path, filename):
    scanfile = open("%s/%s" % (path, filename), 'r')
    h = hashlib.new('sha256')
    scanfile.seek(0)
    hashdata = scanfile.read(10000000)
    while hashdata != '':
        h.update(hashdata)
        hashdata = scanfile.read(10000000)
    scanfile.close()
    return h.hexdigest()

## method to compare binaries. Returns the amount of bytes that differ
## according to bsdiff, or 0 if the files are identical


问题


面经


文章

微信
公众号

扫码关注公众号