def read_header(dispout):
""" Read header (first 3 words) from disp.dat
:param dispout: disp.dat filename
:returns: header (num_nodes, num_dims, num_timesteps)
"""
import struct
word_size = 4 # bytes
if dispout.endswith('.xz'):
import lzma
d = lzma.open(dispout, 'rb')
else:
d = open(dispout, 'rb')
num_nodes = struct.unpack('f', d.read(word_size))
num_dims = struct.unpack('f', d.read(word_size))
num_timesteps = struct.unpack('f', d.read(word_size))
header = {'num_nodes': int(num_nodes[0]),
'num_dims': int(num_dims[0]),
'num_timesteps': int(num_timesteps[0])}
return header
python类open()的实例源码
def extract_dt(dyn_file):
""" extract time step (dt) from dyna input deck
assumes that input deck is comma-delimited
:param dyn_file: input.dyn filename
:returns: dt from input.dyn binary data save parameter
"""
found_database = False
with open(dyn_file, 'r') as d:
for dyn_line in d:
if found_database:
line_items = dyn_line.split(',')
# make sure we're not dealing with a comment
if '$' in line_items[0]:
continue
else:
dt = float(line_items[0])
break
elif '*DATABASE_NODOUT' in dyn_line:
found_database = True
return dt
def save_companies(self):
"""
Receives path to the dataset file and create a Company object for
each row of each file. It creates the related activity when needed.
"""
skip = ('main_activity', 'secondary_activity')
keys = tuple(f.name for f in Company._meta.fields if f not in skip)
with lzma.open(self.path, mode='rt', encoding='utf-8') as file_handler:
for row in csv.DictReader(file_handler):
main, secondary = self.save_activities(row)
filtered = {k: v for k, v in row.items() if k in keys}
obj = Company.objects.create(**self.serialize(filtered))
for activity in main:
obj.main_activity.add(activity)
for activity in secondary:
obj.secondary_activity.add(activity)
obj.save()
self.count += 1
self.print_count(Company, count=self.count)
def load_embeddings(self, filename, xz=False):
if not os.path.isfile(filename):
print(filename, "does not exist")
return self
if xz:
f = lzma.open(filename, "rt", encoding="utf-8", errors="ignore")
else:
f = open(filename, "r")
found_set = set()
for line in f:
l = line.split()
word = strong_normalize(l[0])
vec = [float(x) for x in l[1:]]
if word in self._vocab:
found_set.add(word)
self._word_lookup.init_row(self._vocab[word], vec)
f.close()
print("Loaded embeddings from", filename)
print(len(found_set), "hits with vocab size of", len(self._vocab))
return self
def _token_to_filenames(token):
if token[0] == '!':
pattern = token[1:]
filenames = glob.glob(pattern)
if not filenames:
raise RuntimeError('No filenames matched "%s" pattern' % pattern)
elif token[0] == '@':
filelist_name = sys.stdin if token == '@-' else token[1:]
with open(filelist_name) as filelist:
filenames = [line.rstrip('\n') for line in filelist]
directory = os.path.dirname(token[1:])
if directory != '.':
filenames = [f if f[0] != '/' else directory + '/' + f for f in filenames]
else:
filenames = token
return filenames
def next_filehandle(self):
"""Go to the next file and retrun its filehandle or None (meaning no more files)."""
filename = self.next_filename()
if filename is None:
fhandle = None
elif filename == '-':
fhandle = sys.stdin
else:
filename_extension = filename.split('.')[-1]
if filename_extension == 'gz':
myopen = gzip.open
elif filename_extension == 'xz':
myopen = lzma.open
elif filename_extension == 'bz2':
myopen = bz2.open
else:
myopen = open
fhandle = myopen(filename, 'rt', encoding=self.encoding)
self.filehandle = fhandle
return fhandle
def open_regular_or_compressed(filename):
if filename is None:
return sys.stdin
if hasattr(filename, 'read'):
fobj = filename
else:
f = filename.lower()
ext = f.rsplit('.', 1)[-1]
if ext == 'gz':
import gzip
fobj = gzip.GzipFile(filename)
elif ext == 'bz2':
import bz2
fobj = bz2.BZ2File(filename)
elif ext == 'xz':
import lzma
fobj = lzma.open(filename)
else:
fobj = open(filename)
return fobj
def _convert_any_to_vw(source, format, output, weights, preprocessor, columnspec, named_labels, remap_label, ignoreheader):
if named_labels is not None:
assert not isinstance(named_labels, basestring)
named_labels = set(named_labels)
rows_source = open_anything(source, format, ignoreheader=ignoreheader)
output = open(output, 'wb')
for row in rows_source:
try:
vw_line = convert_row_to_vw(row, columnspec, preprocessor=preprocessor, weights=weights, named_labels=named_labels, remap_label=remap_label)
except Exception:
log_always('Failed to parse: %r', row)
raise
output.write(vw_line)
flush_and_close(output)
def open_compressed_file(filename, mode):
"""Open a compressed file, determining the compression type based on the
file name.
Args:
filename: The file to open.
mode: The file open mode.
Returns:
The opened file.
"""
ext = os.path.splitext(filename)
opener = get_file_opener(ext)
if not opener:
raise ValueError("{} is not a recognized compression format")
return opener(filename, mode)
def __init__(self, path, mode='w'):
self.outfile = open(path, mode)
self.devnull = open(os.devnull, 'w')
self.closed = False
# Setting close_fds to True in the Popen arguments is necessary due to
# <http://bugs.python.org/issue12786>.
kwargs = dict(stdin=PIPE, stdout=self.outfile, stderr=self.devnull, close_fds=True)
try:
self.process = Popen(['pigz'], **kwargs)
self.program = 'pigz'
except OSError as e:
# binary not found, try regular gzip
try:
self.process = Popen(['gzip'], **kwargs)
self.program = 'gzip'
except (IOError, OSError) as e:
self.outfile.close()
self.devnull.close()
raise
except IOError as e:
self.outfile.close()
self.devnull.close()
raise
def download(self):
"""
Downloads the latest iOS gadget.
:return:
"""
download_url = self._get_download_url()
# stream the download using requests
dylib = requests.get(download_url, stream=True)
# save the requests stream to file
with open(self.ios_dylib_gadget_archive_path, 'wb') as f:
click.secho('Downloading iOS dylib to {0}...'.format(self.ios_dylib_gadget_archive_path),
fg='green', dim=True)
shutil.copyfileobj(dylib.raw, f)
return self
def set_application_binary(self, binary: str = None) -> None:
"""
Sets the binary that will be patched.
If a binary is not defined, the applications Info.plist is parsed
and the CFBundleIdentifier key read.
:param binary:
:return:
"""
if binary is not None:
click.secho('Using user provided binary name of: {0}'.format(binary))
self.app_binary = os.path.join(self.app_folder, binary)
return
with open(os.path.join(self.app_folder, 'Info.plist'), 'rb') as f:
info_plist = plistlib.load(f)
# print the bundle identifier
click.secho('Bundle identifier is: {0}'.format(info_plist['CFBundleIdentifier']),
fg='green', bold=True)
self.app_binary = os.path.join(self.app_folder, info_plist['CFBundleExecutable'])
def download(self):
"""
Downloads the latest Android gadget for this
architecture.
:return:
"""
download_url = self._get_download_url()
# stream the download using requests
library = requests.get(download_url, stream=True)
library_destination = self.get_frida_library_path(packed=True)
# save the requests stream to file
with open(library_destination, 'wb') as f:
click.secho('Downloading {0} library to {1}...'.format(self.architecture,
library_destination), fg='green', dim=True)
shutil.copyfileobj(library.raw, f)
return self
def open_dispout(dispout):
"""open dispout file for reading
:param dispout: (str) dispout filename (disp.dat)
:return: dispout file object
"""
if dispout.endswith('.xz'):
import lzma
dispout = lzma.open(dispout, 'rb')
else:
dispout = open(dispout, 'rb')
return dispout
def create_dat(nodout="nodout", dispout="disp.dat", legacynodes=False):
"""create binary data file
:param str nodout: nodout file created by ls-dyna (default="nodout")
:param str dispout: default = "disp.dat"
:param boolean legacynodes: node IDs written every timestep (default=False)
"""
header_written = False
timestep_read = False
timestep_count = 0
writenode = True
with open(nodout, 'r') as nodout:
with open_dispout(dispout) as dispout:
for line in nodout:
if 'nodal' in line:
timestep_read = True
timestep_count += 1
data = []
continue
if timestep_read is True:
if line[0:2] == '\n': # done reading the time step
timestep_read = False
# if this was the first time, everything needed to
# be read to # get node count for header
if not header_written:
header = generate_header(data, nodout)
write_headers(dispout, header)
header_written = True
print('Time Step: ', end="", flush=True)
if timestep_count > 1 and not legacynodes:
writenode = False
print("%i, " % timestep_count, end="", flush=True)
process_timestep_data(data, dispout, writenode)
else:
raw_data = parse_line(line)
data.append(list(raw_data))
print("done.", flush=True)
return 0
def count_timesteps(outfile):
"""count timesteps written to nodout
searches for 'time' in lines, and then removes 1 extra entry that occurs
for t = 0
grep will be used on linux systems (way faster)
:param outfile: usually 'nodout'
:returns: int ts_count
"""
from sys import platform
print("Reading number of time steps... ", end="", flush=True)
if platform == "linux":
from subprocess import PIPE, Popen
p = Popen('grep time %s | wc -l' % outfile, shell=True, stdout=PIPE)
ts_count = int(p.communicate()[0].strip().decode())
else:
print("Non-linux OS detected -> using slower python implementation",
flush=True)
ts_count = 0
with open(outfile, 'r') as f:
for line in f:
if 'time' in line:
ts_count += 1
ts_count -= 1 # rm extra time count
print('there are {}.'.format(ts_count), flush=True)
return ts_count
def receipts(self):
"""Returns a Generator with batches of receipts text."""
print('Loading receipts text dataset…', end='\r')
with lzma.open(self.path, mode='rt') as file_handler:
batch = []
for row in csv.DictReader(file_handler):
batch.append(self.serialize(row))
if len(batch) >= self.batch_size:
yield batch
batch = []
yield batch
def suspicions(self):
"""Returns a Generator with batches of suspicions."""
print('Loading suspicions dataset…', end='\r')
with lzma.open(self.path, mode='rt', encoding='utf-8') as file_handler:
batch = []
for row in csv.DictReader(file_handler):
batch.append(self.serialize(row))
if len(batch) >= self.batch_size:
yield batch
batch = []
yield batch
def reimbursements(self):
"""Returns a Generator with a dict object for each row."""
with lzma.open(self.path, 'rt') as file_handler:
yield from DictReader(file_handler)
def verify_contents(thefile, tgt_hostname=None, callback=None):
"""
Given a sysstat binary data file verify that it contains a set of well
formed data values.
The optional 'tgt_hostname' argument is checked against the file header's
stored hostname value.
The optional 'callback' argument, if provided, should be an instance of
the ContentAction class, where for each magic structure, file header, file
activity set, record header and record payload read the appropriate method
will be invoked, with the 'eof' method invoked at the end.
One of the following exceptions will be raised if a problem is found with
the file:
Invalid: The file header or record header metadata values do not make
sense in relation to each other
Corruption: The file appears to be corrupted in some way
Truncated: The file does not appear to contain all the data as
described by the file header or a given record header
"""
try:
with lzma.open(thefile, "rb") as fp:
verify_contents_fp(fp, tgt_hostname, callback)
except lzma.LZMAError:
with open(thefile, "rb") as fp:
verify_contents_fp(fp, tgt_hostname, callback)
def fetch_fileheader(thefile):
"""
Fetch the sysstat FileHeader object for the given file path.
"""
try:
with lzma.open(thefile, "rb") as fp:
res = fetch_fileheader_with_fp(fp)
except lzma.LZMAError:
with open(thefile, "rb") as fp:
res = fetch_fileheader_with_fp(fp)
return res
def load_vocab(self, filename):
with open(filename, "rb") as f:
vocab = pickle.load(f)
self._load_vocab(vocab)
return self
def save_vocab(self, filename):
with open(filename, "wb") as f:
pickle.dump(self._fullvocab, f)
return self
def save_model(self, filename):
self.save_vocab(filename + ".vocab")
with open(filename + ".params", "wb") as f:
pickle.dump(self._args, f)
self._model.save(filename + ".model")
return self
def load_model(self, filename, **kwargs):
self.load_vocab(filename + ".vocab")
with open(filename + ".params", "rb") as f:
args = pickle.load(f)
args.update(kwargs)
self.create_parser(**args)
self.init_model()
self._model.load(filename + ".model")
return self
def write_file(filename, data):
if isinstance(data, list):
data = ''.join(data)
else:
assert isinstance(data, str), type(data)
if filename in STDOUT_NAMES:
sys.stdout.write(data)
else:
fobj = open(filename, 'w')
fobj.write(data)
flush_and_close(fobj)
def get_num_features(filename):
counting = False
count = 0
for line in open(filename):
if counting:
count += 1
else:
if line.strip() == ':0':
counting = True
return count
def _load_erdm_ground_truth(outdir):
"""A helper function to load Legal TREC 2009 data"""
with open(os.path.join(outdir, 'seed_relevant.txt'), 'rt') as fh:
relevant_files = [el.strip() for el in fh.readlines()]
with open(os.path.join(outdir, 'seed_non_relevant.txt'), 'rt') as fh:
non_relevant_files = [el.strip() for el in fh.readlines()]
if platform.system() == 'Windows':
relevant_files = [el.replace('/', '\\') for el in relevant_files]
non_relevant_files = [el.replace('/', '\\') for el in non_relevant_files]
return non_relevant_files, relevant_files
def __init__(self, path, mode='w'):
self.name = path
self.outfile = open(path, mode)
self.devnull = open(os.devnull, 'w')
self.closed = False
try:
# Setting close_fds to True is necessary due to
# http://bugs.python.org/issue12786
self.process = Popen(
[get_program_path('gzip')], stdin=PIPE, stdout=self.outfile,
stderr=self.devnull, close_fds=True)
except IOError:
self.outfile.close()
self.devnull.close()
raise
def open_gzip_file(filename, mode, use_system=True):
"""Open a gzip file, preferring the system gzip program if `use_system`
is True, falling back to the gzip python library.
Args:
mode: The file open mode.
use_system: Whether to try to use the system gzip program.
"""
if use_system:
try:
if 'r' in mode:
gzfile = GzipReader(filename)
else:
gzfile = GzipWriter(filename)
if 't' in mode:
gzfile = io.TextIOWrapper(gzfile)
return gzfile
except:
pass
gzfile = gzip.open(filename, mode)
if 'b' in mode:
if 'r' in mode:
gzfile = io.BufferedReader(gzfile)
else:
gzfile = io.BufferedWriter(gzfile)
return gzfile