python类open()的实例源码

compression.py 文件源码 项目:atropos 作者: jdidion 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def open_lzma_file(filename, mode, **kwargs):
    """Open a LZMA (xz) file.
    """
    return lzma.open(filename, mode)
dataset.py 文件源码 项目:rnn-morpheme-analyzer 作者: mitaki28 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def load(filename):
    with lzma.open(filename, 'rb') as dataset:
        while True:
            try:
                yield pickle.load(dataset)
            except EOFError:
                break
dataset.py 文件源码 项目:rnn-morpheme-analyzer 作者: mitaki28 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser(
        description='dataset generator'
    )
    parser.add_argument(
        '-p', '--possibility',
        type=float,
        default=0.9,
        help='possibility to add train dataset'
    )
    parser.add_argument(
        'source',
        help='path to mecab-processed corpus (xz compressed)'
    )
    parser.add_argument(
        'train',
        help='path for writing training dataset (xz compressed)'
    )
    parser.add_argument(
        'test',
        help='path for writing testing dataset (xz compressed)'
    )
    args = parser.parse_args()
    with lzma.open(args.source, 'rt') as source,\
         lzma.open(args.train, 'wb') as train,\
         lzma.open(args.test, 'wb') as test:
            separate(source, args.possibility, train, test)
test_chamber_of_deputies_dataset.py 文件源码 项目:serenata-toolbox 作者: datasciencebr 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_translate_csv_with_reimbursement_with_net_value_with_decimal_comma(self):
        csv_with_decimal_comma = os.path.join(self.fixtures_path, 'Ano-with-decimal-comma.csv')
        path_with_decimal_point = os.path.join(self.fixtures_path, 
             'reimbursements-with-decimal-point.csv')
        with open(path_with_decimal_point, 'r') as csv_expected:
            expected = csv_expected.read()

        xz_path = Dataset('')._translate_file(csv_with_decimal_comma)
        with lzma.open(xz_path) as xz_file:
            output = xz_file.read().decode('utf-8')
        self.assertEqual(output, expected)
06-lstm-tensorflow.py 文件源码 项目:albemarle 作者: SeanTater 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def glove_():
    vecs = np.memmap("glovesmall.arr", np.float32).reshape((-1, 300))
    words = open("glovewords.txt").read().splitlines()
    return dict(zip(words, vecs))
06-lstm-tensorflow.py 文件源码 项目:albemarle 作者: SeanTater 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def germanw2v_():
    vecs = np.memmap("german.vecbin", np.float32).reshape((-1, 300))
    words = open("german.words").read().splitlines()
    return dict(zip(words, vecs))
06-lstm-tensorflow.py 文件源码 项目:albemarle 作者: SeanTater 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_book(name, language):
    book = lzma.open('../data/{}-common.vpl.xz'.format(name), 'rt').read().splitlines()
    book = [
        [ language.get(w, veczero) for w in words(l)]
        + ([veczero] * (n_steps - len(words(l))))
        for l in book ]
    lens = np.array([len(l) for l in book], dtype=np.int32)

    for verse in book:
        assert len(verse) <= n_steps, "n_steps should be at least {}".format(len(verse))
    return (book, lens)
07-lstm-tensorflow.py 文件源码 项目:albemarle 作者: SeanTater 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_book(name, language):
    book = lzma.open('../data/{}-common.vpl.xz'.format(name), 'rt').read().splitlines()
    book = [
        [ language(w, 0) for w in l]
        + ([veczero] * (n_steps - len(l)))
        for l in book ]
    for verse in book:
        assert len(verse) <= n_steps, "n_steps should be at least {}".format(len(verse))
    return book
08-lstm-tensorflow.py 文件源码 项目:albemarle 作者: SeanTater 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_book(name, language):
    book = lzma.open('../data/{}-common.vpl.xz'.format(name), 'rt').read().splitlines()
    book = [
        [ language(w) for w in l]
        + ([language(' ')] * (n_steps - len(l)))
        for l in book ]
    lens = np.array([len(l) for l in book], dtype=np.int32)

    for verse in book:
        assert len(verse) <= n_steps, "n_steps should be at least {}".format(len(verse))
    return (book, lens)
benchmark.py 文件源码 项目:fastq-and-furious 作者: lgautier 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def benchmark_screed(fn):
    import screed
    total_seq = int(0)
    t0 = time.time()
    it = screed.open(fn)
    for i, e in enumerate(it):
        total_seq += len(e.sequence)
        if i % REFRESH_RATE == 0:
            t1 = time.time()
            print('\r%.2fMB/s' % (total_seq/(1E6)/(t1-t0)), end='', flush=True)
    print()
    print('%i entries' % (i+1))
benchmark.py 文件源码 项目:fastq-and-furious 作者: lgautier 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _opener(filename):
    if filename.endswith('.gz'):
        import gzip
        return gzip.open
    elif filename.endswith('.bz2'):
        import bz2
        return bz2.open
    elif filename.endswith('.lzma'):
        import lzma
        return lzma.open
    else:
        return open
benchmark.py 文件源码 项目:fastq-and-furious 作者: lgautier 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _screed_iter(fn):
    import screed
    it = screed.open(fn)
    for i, e in enumerate(it):
        yield (i, e.name.encode('ascii'), str(e.sequence).encode('ascii'))
benchmark.py 文件源码 项目:fastq-and-furious 作者: lgautier 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _ngs_plumbing_iter(fn, mode, buffering):
    import ngs_plumbing.fastq
    openfunc = _opener(fn)
    with open(fn, mode, buffering = buffering) as f:
        with openfunc(f) as fh: 
            it = ngs_plumbing.fastq.read_fastq(fh)
            for i, e in enumerate(it):
                yield (i, e.header[1:], e.sequence)
benchmark.py 文件源码 项目:fastq-and-furious 作者: lgautier 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _fastqandfurious_iter(fn, mode, buffering):
    from fastqandfurious import fastqandfurious
    bufsize = int(5E4)
    openfunc = _opener(fn)
    with open(fn, mode, buffering = buffering) as f:
        with openfunc(f) as fh: 
            it = fastqandfurious.readfastq_iter(fh, bufsize)
            for i, e in enumerate(it):
                yield (i, e.header, e.sequence)
gen-PKGBUILD.py 文件源码 项目:archlinux-amdgpu 作者: corngood 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def hashFile(file):
    block = 64 * 1024
    hash = hashlib.sha256()
    with open(file, 'rb') as f:
        buf = f.read(block)
        while len(buf) > 0:
            hash.update(buf)
            buf = f.read(block)
    return hash.hexdigest()
ios.py 文件源码 项目:objection 作者: sensepost 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def unpack(self):
        """
            Unpacks a downloaded .xz gadget.

            :return:
        """

        click.secho('Unpacking {0}...'.format(self.ios_dylib_gadget_archive_path), dim=True)

        with lzma.open(self.ios_dylib_gadget_archive_path) as f:
            with open(self.ios_dylib_gadget_path, 'wb') as g:
                g.write(f.read())

        return self
android.py 文件源码 项目:objection 作者: sensepost 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def unpack(self):
        """
            Unpacks a downloaded .xz gadget.

            :return:
        """

        click.secho('Unpacking {0}...'.format(self.get_frida_library_path(packed=True)), dim=True)

        with lzma.open(self.get_frida_library_path(packed=True)) as f:
            with open(self.get_frida_library_path(), 'wb') as g:
                g.write(f.read())

        return self
sysutils.py 文件源码 项目:nmtpy 作者: lium-lst 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_temp_file(suffix="", name=None, delete=False):
    """Creates a temporary file under /tmp."""
    if name:
        name = os.path.join("/tmp", name)
        t = open(name, "w")
        cleanup.register_tmp_file(name)
    else:
        _suffix = "_nmtpy_%d" % os.getpid()
        if suffix != "":
            _suffix += suffix

        t = tempfile.NamedTemporaryFile(suffix=_suffix, delete=delete)
        cleanup.register_tmp_file(t.name)
    return t
sysutils.py 文件源码 项目:nmtpy 作者: lium-lst 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def fopen(filename, mode=None):
    """GZ/BZ2/XZ-aware file opening function."""
    # NOTE: Mode is not used but kept for not breaking iterators.
    if filename.endswith('.gz'):
        return gzip.open(filename, 'rt')
    elif filename.endswith('.bz2'):
        return bz2.open(filename, 'rt')
    elif filename.endswith(('.xz', '.lzma')):
        return lzma.open(filename, 'rt')
    else:
        # Plain text
        return open(filename, 'r')
vwoptimize.py 文件源码 项目:vwoptimize 作者: denik 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def split_file(source, nfolds=None, ignoreheader=False, importance=0, minfoldsize=10000):
    if nfolds is None:
        nfolds = 10

    if isinstance(source, basestring):
        ext = get_real_ext(source)
    else:
        ext = 'xxx'

    if hasattr(source, 'seek'):
        source.seek(0)

    # XXX already have examples_count
    total_lines = 0
    for line in open_regular_or_compressed(source):
        total_lines += 1

    if hasattr(source, 'seek'):
        source.seek(0)

    source = open_regular_or_compressed(source)

    if ignoreheader:
        source.next()
        total_lines -= 1

    foldsize = int(math.ceil(total_lines / float(nfolds)))
    foldsize = max(foldsize, minfoldsize)
    nfolds = int(math.ceil(total_lines / float(foldsize)))

    folds = []

    current_fold = -1
    count = foldsize
    current_fileobj = None
    total_count = 0
    for line in source:
        if count >= foldsize:
            if current_fileobj is not None:
                flush_and_close(current_fileobj)
                current_fileobj = None
            current_fold += 1
            if current_fold >= nfolds:
                break
            fname = get_temp_filename('fold%s.%s' % (current_fold, ext))
            current_fileobj = open(fname, 'w')
            count = 0
            folds.append(fname)
        current_fileobj.write(line)
        count += 1
        total_count += 1

    if current_fileobj is not None:
        flush_and_close(current_fileobj)

    if total_count != total_lines:
        sys.exit('internal error: total_count=%r total_lines=%r source=%r' % (total_count, total_lines, source))

    return folds, total_lines


问题


面经


文章

微信
公众号

扫码关注公众号