def open_lzma_file(filename, mode, **kwargs):
"""Open a LZMA (xz) file.
"""
return lzma.open(filename, mode)
python类open()的实例源码
def load(filename):
with lzma.open(filename, 'rb') as dataset:
while True:
try:
yield pickle.load(dataset)
except EOFError:
break
def main():
parser = argparse.ArgumentParser(
description='dataset generator'
)
parser.add_argument(
'-p', '--possibility',
type=float,
default=0.9,
help='possibility to add train dataset'
)
parser.add_argument(
'source',
help='path to mecab-processed corpus (xz compressed)'
)
parser.add_argument(
'train',
help='path for writing training dataset (xz compressed)'
)
parser.add_argument(
'test',
help='path for writing testing dataset (xz compressed)'
)
args = parser.parse_args()
with lzma.open(args.source, 'rt') as source,\
lzma.open(args.train, 'wb') as train,\
lzma.open(args.test, 'wb') as test:
separate(source, args.possibility, train, test)
test_chamber_of_deputies_dataset.py 文件源码
项目:serenata-toolbox
作者: datasciencebr
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def test_translate_csv_with_reimbursement_with_net_value_with_decimal_comma(self):
csv_with_decimal_comma = os.path.join(self.fixtures_path, 'Ano-with-decimal-comma.csv')
path_with_decimal_point = os.path.join(self.fixtures_path,
'reimbursements-with-decimal-point.csv')
with open(path_with_decimal_point, 'r') as csv_expected:
expected = csv_expected.read()
xz_path = Dataset('')._translate_file(csv_with_decimal_comma)
with lzma.open(xz_path) as xz_file:
output = xz_file.read().decode('utf-8')
self.assertEqual(output, expected)
def glove_():
vecs = np.memmap("glovesmall.arr", np.float32).reshape((-1, 300))
words = open("glovewords.txt").read().splitlines()
return dict(zip(words, vecs))
def germanw2v_():
vecs = np.memmap("german.vecbin", np.float32).reshape((-1, 300))
words = open("german.words").read().splitlines()
return dict(zip(words, vecs))
def get_book(name, language):
book = lzma.open('../data/{}-common.vpl.xz'.format(name), 'rt').read().splitlines()
book = [
[ language.get(w, veczero) for w in words(l)]
+ ([veczero] * (n_steps - len(words(l))))
for l in book ]
lens = np.array([len(l) for l in book], dtype=np.int32)
for verse in book:
assert len(verse) <= n_steps, "n_steps should be at least {}".format(len(verse))
return (book, lens)
def get_book(name, language):
book = lzma.open('../data/{}-common.vpl.xz'.format(name), 'rt').read().splitlines()
book = [
[ language(w, 0) for w in l]
+ ([veczero] * (n_steps - len(l)))
for l in book ]
for verse in book:
assert len(verse) <= n_steps, "n_steps should be at least {}".format(len(verse))
return book
def get_book(name, language):
book = lzma.open('../data/{}-common.vpl.xz'.format(name), 'rt').read().splitlines()
book = [
[ language(w) for w in l]
+ ([language(' ')] * (n_steps - len(l)))
for l in book ]
lens = np.array([len(l) for l in book], dtype=np.int32)
for verse in book:
assert len(verse) <= n_steps, "n_steps should be at least {}".format(len(verse))
return (book, lens)
def benchmark_screed(fn):
import screed
total_seq = int(0)
t0 = time.time()
it = screed.open(fn)
for i, e in enumerate(it):
total_seq += len(e.sequence)
if i % REFRESH_RATE == 0:
t1 = time.time()
print('\r%.2fMB/s' % (total_seq/(1E6)/(t1-t0)), end='', flush=True)
print()
print('%i entries' % (i+1))
def _opener(filename):
if filename.endswith('.gz'):
import gzip
return gzip.open
elif filename.endswith('.bz2'):
import bz2
return bz2.open
elif filename.endswith('.lzma'):
import lzma
return lzma.open
else:
return open
def _screed_iter(fn):
import screed
it = screed.open(fn)
for i, e in enumerate(it):
yield (i, e.name.encode('ascii'), str(e.sequence).encode('ascii'))
def _ngs_plumbing_iter(fn, mode, buffering):
import ngs_plumbing.fastq
openfunc = _opener(fn)
with open(fn, mode, buffering = buffering) as f:
with openfunc(f) as fh:
it = ngs_plumbing.fastq.read_fastq(fh)
for i, e in enumerate(it):
yield (i, e.header[1:], e.sequence)
def _fastqandfurious_iter(fn, mode, buffering):
from fastqandfurious import fastqandfurious
bufsize = int(5E4)
openfunc = _opener(fn)
with open(fn, mode, buffering = buffering) as f:
with openfunc(f) as fh:
it = fastqandfurious.readfastq_iter(fh, bufsize)
for i, e in enumerate(it):
yield (i, e.header, e.sequence)
def hashFile(file):
block = 64 * 1024
hash = hashlib.sha256()
with open(file, 'rb') as f:
buf = f.read(block)
while len(buf) > 0:
hash.update(buf)
buf = f.read(block)
return hash.hexdigest()
def unpack(self):
"""
Unpacks a downloaded .xz gadget.
:return:
"""
click.secho('Unpacking {0}...'.format(self.ios_dylib_gadget_archive_path), dim=True)
with lzma.open(self.ios_dylib_gadget_archive_path) as f:
with open(self.ios_dylib_gadget_path, 'wb') as g:
g.write(f.read())
return self
def unpack(self):
"""
Unpacks a downloaded .xz gadget.
:return:
"""
click.secho('Unpacking {0}...'.format(self.get_frida_library_path(packed=True)), dim=True)
with lzma.open(self.get_frida_library_path(packed=True)) as f:
with open(self.get_frida_library_path(), 'wb') as g:
g.write(f.read())
return self
def get_temp_file(suffix="", name=None, delete=False):
"""Creates a temporary file under /tmp."""
if name:
name = os.path.join("/tmp", name)
t = open(name, "w")
cleanup.register_tmp_file(name)
else:
_suffix = "_nmtpy_%d" % os.getpid()
if suffix != "":
_suffix += suffix
t = tempfile.NamedTemporaryFile(suffix=_suffix, delete=delete)
cleanup.register_tmp_file(t.name)
return t
def fopen(filename, mode=None):
"""GZ/BZ2/XZ-aware file opening function."""
# NOTE: Mode is not used but kept for not breaking iterators.
if filename.endswith('.gz'):
return gzip.open(filename, 'rt')
elif filename.endswith('.bz2'):
return bz2.open(filename, 'rt')
elif filename.endswith(('.xz', '.lzma')):
return lzma.open(filename, 'rt')
else:
# Plain text
return open(filename, 'r')
def split_file(source, nfolds=None, ignoreheader=False, importance=0, minfoldsize=10000):
if nfolds is None:
nfolds = 10
if isinstance(source, basestring):
ext = get_real_ext(source)
else:
ext = 'xxx'
if hasattr(source, 'seek'):
source.seek(0)
# XXX already have examples_count
total_lines = 0
for line in open_regular_or_compressed(source):
total_lines += 1
if hasattr(source, 'seek'):
source.seek(0)
source = open_regular_or_compressed(source)
if ignoreheader:
source.next()
total_lines -= 1
foldsize = int(math.ceil(total_lines / float(nfolds)))
foldsize = max(foldsize, minfoldsize)
nfolds = int(math.ceil(total_lines / float(foldsize)))
folds = []
current_fold = -1
count = foldsize
current_fileobj = None
total_count = 0
for line in source:
if count >= foldsize:
if current_fileobj is not None:
flush_and_close(current_fileobj)
current_fileobj = None
current_fold += 1
if current_fold >= nfolds:
break
fname = get_temp_filename('fold%s.%s' % (current_fold, ext))
current_fileobj = open(fname, 'w')
count = 0
folds.append(fname)
current_fileobj.write(line)
count += 1
total_count += 1
if current_fileobj is not None:
flush_and_close(current_fileobj)
if total_count != total_lines:
sys.exit('internal error: total_count=%r total_lines=%r source=%r' % (total_count, total_lines, source))
return folds, total_lines