def parse(f: IO[Any]) -> Result:
"""
Parse a shellscript and return a ShellScript
:param f: TextIOBase handle to the shellscript file
:return: Result with Ok or Err
"""
comments = []
commands = []
interpreter = ""
buf = f.readlines()
for line in buf:
trimmed = line.strip()
if trimmed.startswith("#!"):
interpreter = trimmed
elif trimmed.startswith("#"):
comments.append(str(trimmed))
else:
# Skip blank lines
if trimmed:
commands.append(str(trimmed))
return Ok(ShellScript(interpreter=interpreter,
comments=comments,
commands=commands))
python类IO的实例源码
def parse_entries(self, file: IO[Any]) -> Result:
"""
Parse fstab entries
:param file: TextIOWrapper file handle to the fstab
:return: Result with Ok or Err
"""
entries = []
contents = file.readlines()
for line in contents:
if line.startswith("#"):
continue
parts = line.split()
if len(parts) != 6:
continue
fsck_order = int(parts[5])
entries.append(FsEntry(
fs_spec=parts[0],
mountpoint=os.path.join(parts[1]),
vfs_type=parts[2],
mount_options=parts[3].split(","),
dump=False if parts[4] == "0" else True,
fsck_order=fsck_order))
return Ok(entries)
def __init__(self, *, process: subprocess.Popen, encoding: str, tmp_input: FilesystemIPC) -> None:
self.process = process
self.stdout_encoding = encoding
self.iterating = False
#
# We need to capture stderr in a background thread to avoid deadlocks.
# (The problem would occur when dlvhex2 is blocked because the OS buffers on the stderr pipe are full... so we have to constantly read from *both* stdout and stderr)
self.stderr_capture_thread = StreamCaptureThread(self.process.stderr)
self.stderr_capture_thread.start()
#
# Set up finalization. Using weakref.finalize seems to work more robustly than using __del__.
# (One problem that occurred with __del__: It seemed like python was calling __del__ for self.process and its IO streams first,
# which resulted in ResourceWarnings even though we were closing the streams properly in our __del__ function.)
self._finalize = weakref.finalize(self, DlvhexLineReader.__close, process, self.stderr_capture_thread, encoding, tmp_input) # type: ignore
# Make sure the subprocess will be terminated if it's still running when the python process exits
self._finalize.atexit = True
def parse(self, file):
if not isinstance(file, str) and not isinstance(file, IO):
raise TypeError("file is not str or IO")
if isinstance(file, str):
try:
enc = 'windows-1251'
with open(file, encoding=enc) as f:
content = f.readlines()
except FileNotFoundError:
raise FileNotFoundError("Not found " + file)
else:
content = file.readlines()
self._data = [x.strip() for x in content]
self.clear()
return self
def _read_in_chunks(file_object: IO[bytes], chunk_size: int = 2*MB) -> Generator[bytes, None, None]:
"""Read a file in fixed-size chunks (to minimize memory usage for large files).
Args:
file_object: An opened file-like object supporting read().
chunk_size: Max size (in bytes) of each file chunk.
Yields:
File chunks, each of size at most chunk_size.
"""
while True:
chunk = file_object.read(chunk_size)
if chunk:
yield chunk
else:
return # End of file.
def __init__(
self,
_=None, # type: Optional[Union[AnyStr, typing.Mapping, typing.Sequence, typing.IO]]
):
self._meta = None
if _ is not None:
if isinstance(_, HTTPResponse):
meta.get(self).url = _.url
_ = deserialize(_)
for k, v in _.items():
try:
self[k] = v
except KeyError as e:
if e.args and len(e.args) == 1:
e.args = (
r'%s.%s: %s' % (type(self).__name__, e.args[0], json.dumps(_)),
)
raise e
def rawstream(fp):
# type: (IO[Any]) -> IO[bytes]
if PY3:
try:
return fp.buffer # type: ignore
except AttributeError:
# There might be a BytesIO behind fp.
pass
return fp # type: Optional[IO[bytes]]
def write(s, fp=None):
# type: (Union[str, bytes], Optional[IO[Any]]) -> None
"""Write s to the binary stream fp (default is stdout).
"""
efp = fp if fp is not None else sys.stdout
rawstream(efp).write(bytestr(s))
def outline(s=b'', end=b'\n', fp=None):
# type: (Union[str, bytes], Union[str, bytes], Optional[IO]) -> None
write(bytestr(s) + bytestr(end), fp=fp)
def category_print(categories, categorytype, category, s, prefix='', end='\n', fp=None):
# type: (Set[str], str, str, Union[str, bytes], str, str, Optional[IO]) -> None
if category not in categories:
return
if categorytype == 'info':
msg = prefix
else:
msg = '%s%s_%s: ' % (prefix, categorytype, category)
if MESSAGE_CATEGORY_FILES is not None:
logfilename = 'whatstyle_%s_%s.log' % (categorytype, category)
fp = MESSAGE_CATEGORY_FILES.get(logfilename)
if fp is None:
path = os.path.join(tempfile.gettempdir(), logfilename)
fp = open(path, 'wb')
MESSAGE_CATEGORY_FILES[logfilename] = fp
if fp is None and LOGFILE:
global LOGFILEFP
if not LOGFILEFP:
LOGFILEFP = open(LOGFILE, 'wb')
fp = LOGFILEFP
if fp is None:
fp = rawstream(sys.stderr if STDERR_OUTPUT else sys.stdout)
write(msg, fp=fp)
write(s, fp=fp)
if end:
write(end, fp=fp)
def iprint(category, s, prefix='', end='\n', fp=None):
# type: (str, AnyStr, str, str, Optional[IO[AnyStr]]) -> None
category_print(args_info, 'info', category, s, prefix, end, fp=fp)
def reporterror(s, fp=None):
# type: (str, Optional[IO[AnyStr]]) -> None
if fp is None:
fp = rawstream(sys.stderr) # type: ignore
reportmessage(s, fp=fp)
def soutline(s='', enc='utf-8', fp=None):
# type: (str, str, Optional[IO[Any]]) -> None
data = unescape_ill_surrencode(s, enc=enc)
write(data + b'\n', fp=fp)
def __init__(self, namespace):
self.__answers = namespace.answers # type: IO[Any]
self.__verbose = namespace.verbose # type: int
self.__execute = namespace.execute # type: bool
def answers(self):
# type: () -> IO[Any]
return self.__answers
def __load_answers(gateway, target):
# type: (AnswersGateway, IO) -> Answers
return gateway.read_answers_from_file(target)
def write_answers_to_file(self, answers, target_file):
# type: (Answers, IO) -> None
raw_answers = {}
if answers.installer() is not None:
raw_answers['installer'] = answers.installer().raw_options()
if answers.fqdn_configuration() is not None:
raw_answers['fqdn'] = answers.fqdn_configuration().raw_options()
if answers.csrattrs_configuration() is not None:
raw_answers['csr-attributes'] = answers.csrattrs_configuration().raw_options()
yaml = ruamel.yaml.dump(raw_answers, Dumper=ruamel.yaml.RoundTripDumper)
target_file.write(yaml)
def read_answers_from_file(self, target):
# type: (IO) -> Answers
pass
def _run(predictor: Predictor,
input_file: IO,
output_file: Optional[IO],
batch_size: int,
print_to_console: bool,
cuda_device: int) -> None:
def _run_predictor(batch_data):
if len(batch_data) == 1:
result = predictor.predict_json(batch_data[0], cuda_device)
# Batch results return a list of json objects, so in
# order to iterate over the result below we wrap this in a list.
results = [result]
else:
results = predictor.predict_batch_json(batch_data, cuda_device)
for model_input, output in zip(batch_data, results):
string_output = json.dumps(output)
if print_to_console:
print("input: ", model_input)
print("prediction: ", string_output)
if output_file:
output_file.write(string_output + "\n")
batch_json_data = []
for line in input_file:
if not line.isspace():
# Collect batch size amount of data.
json_data = json.loads(line)
batch_json_data.append(json_data)
if len(batch_json_data) == batch_size:
_run_predictor(batch_json_data)
batch_json_data = []
# We might not have a dataset perfectly divisible by the batch size,
# so tidy up the scraps.
if batch_json_data:
_run_predictor(batch_json_data)
def file(self, mode: str = 'w+b', buffering: int = -1, encoding: typing.Optional[str] = None,
newline: typing.Optional[str] = None, suffix: typing.Optional[str] = DEFAULT_SUFFIX,
prefix: typing.Optional[str] = DEFAULT_PREFIX, dir: typing.Optional[str] = None) -> typing.IO:
"""
Create a new temporary file within the scratch dir.
This returns the result of :func:`~tempfile.TemporaryFile` which returns a nameless, file-like object that
will cease to exist once it is closed.
:param mode: (Optional) mode to open the file with
:type mode: :class:`~str`
:param buffering: (Optional) size of the file buffer
:type buffering: :class:`~int`
:param encoding: (Optional) encoding to open the file with
:type encoding: :class:`~str` or :class:`~NoneType`
:param newline: (Optional) newline argument to open the file with
:type newline: :class:`~str` or :class:`~NoneType`
:param suffix: (Optional) filename suffix
:type suffix: :class:`~str` or :class:`~NoneType`
:param prefix: (Optional) filename prefix
:type prefix: :class:`~str` or :class:`~NoneType`
:param dir: (Optional) relative path to directory within the scratch dir where the file should exist
:type dir: :class:`~str` or :class:`~NoneType`
:return: file-like object as returned by :func:`~tempfile.TemporaryFile`
:rtype: :class:`~_io.BufferedRandom`
"""
return tempfile.TemporaryFile(mode, buffering, encoding, newline,
suffix, prefix, self.join(dir))
def named(self, mode: str = 'w+b', buffering: int = -1, encoding: typing.Optional[str] = None,
newline: typing.Optional[str] = None, suffix: typing.Optional[str] = DEFAULT_SUFFIX,
prefix: typing.Optional[str] = DEFAULT_PREFIX, dir: typing.Optional[str] = None,
delete: bool = True) -> typing.IO:
"""
Create a new named temporary file within the scratch dir.
This returns the result of :func:`~tempfile.NamedTemporaryFile` which returns a named, file-like object that
will cease to exist once it is closed unless `delete` is set to `False`.
:param mode: (Optional) mode to open the file with
:type mode: :class:`~str`
:param buffering: (Optional) size of the file buffer
:type buffering: :class:`~int`
:param encoding: (Optional) encoding to open the file with
:type encoding: :class:`~str` or :class:`~NoneType`
:param newline: (Optional) newline argument to open the file with
:type newline: :class:`~str` or :class:`~NoneType`
:param suffix: (Optional) filename suffix
:type suffix: :class:`~str` or :class:`~NoneType`
:param prefix: (Optional) filename prefix
:type prefix: :class:`~str` or :class:`~NoneType`
:param dir: (Optional) relative path to directory within the scratch dir where the file should exist
:type dir: :class:`~str` or :class:`~NoneType`
:param delete: (Optional) flag to indicate if the file should be deleted from disk when it is closed
:type delete: :class:`~bool`
:return: file-like object as returned by :func:`~tempfile.NamedTemporaryFile`
:rtype: :class:`~_io.TemporaryFileWrapper`
"""
return tempfile.NamedTemporaryFile(mode, buffering, encoding, newline,
suffix, prefix, self.join(dir), delete)
def spooled(self, max_size: int = 0, mode: str = 'w+b', buffering: int = -1,
encoding: typing.Optional[str] = None, newline: typing.Optional[str] = None,
suffix: typing.Optional[str] = DEFAULT_SUFFIX, prefix: typing.Optional[str] = DEFAULT_PREFIX,
dir: typing.Optional[str] = None) -> typing.IO:
"""
Create a new spooled temporary file within the scratch dir.
This returns a :class:`~tempfile.SpooledTemporaryFile` which is a specialized object that wraps a
:class:`StringIO`/:class:`BytesIO` instance that transparently overflows into a file on the disk once it
reaches a certain size.
By default, a spooled file will never roll over to disk.
:param max_size: (Optional) max size before the in-memory buffer rolls over to disk
:type max_size: :class:`~int`
:param mode: (Optional) mode to open the file with
:type mode: :class:`~str`
:param buffering: (Optional) size of the file buffer
:type buffering: :class:`~int`
:param encoding: (Optional) encoding to open the file with
:type encoding: :class:`~str`
:param newline: (Optional) newline argument to open the file with
:type newline: :class:`~str` or :class:`~NoneType`
:param suffix: (Optional) filename suffix
:type suffix: :class:`~str` or :class:`~NoneType`
:param prefix: (Optional) filename prefix
:type prefix: :class:`~str` or :class:`~NoneType`
:param dir: (Optional) relative path to directory within the scratch dir where the file should exist
:type dir: :class:`~bool`
:return: SpooledTemporaryFile instance
:rtype: :class:`~tempfile.SpooledTemporaryFile`
"""
return tempfile.SpooledTemporaryFile(max_size, mode, buffering, encoding,
newline, suffix, prefix, self.join(dir))
def sha1_from_file_object(file_object: typing.IO[bytes]):
block_size = 65536
hasher = hashlib.sha1()
buf = file_object.read(block_size)
while len(buf) > 0:
hasher.update(buf)
buf = file_object.read(block_size)
file_object.close()
return hasher.hexdigest()
def __init__(self, candidates: List[int], lock_dir: str) -> None:
self.candidates = candidates
self.lock_dir = lock_dir
self.lock_file = None # type: Optional[IO[Any]]
self.lock_file_path = None # type: Optional[str]
self.gpu_id = None # type: Optional[int]
self._acquired_lock = False
def print_multilines(cls, name: str, value: str, file: IO):
if value:
lines = value.split('\n')
if len(lines) == 1:
print(" * {name}: {value}".format(name=name, value=value), file=file)
else:
print(" * {name}:".format(name=name), file=file)
for line in lines:
print(" - {line}".format(line=line), file=file)
def print_leaf(cls, commit: Commit, file: IO) -> None:
print("* subject: {subject}".format(subject=commit.subject or ''), file=file)
cls.print_multilines(name='body', value=commit.body, file=file)
print(" * date: {date}".format(date=datetools.date2str(commit.date)), file=file)
print(" * author: {author}".format(author=commit.author), file=file)
print(" * commit: {id}".format(id=commit.id), file=file)
def print_header(self, node: 'Node', file: IO):
print(
"{header} {criterion_name}: {name}".format(
header="#" * (self.depth_level() + 1),
criterion_name=Commit.property_name(node.criterion),
name=node.name
),
file=file
)
print(file=file)
def __init__(self, view: View, info: Dict[str, Any], api: TwitchAPI, quality: str, temp_dir: str = '.') -> None:
if not TwitchVideo._schema:
with open('video_info.schema') as json_data:
TwitchVideo._schema = json.load(json_data)
self._validate_info(info)
self.info = info
self.api = api
self.quality = quality
self.temp_dir = temp_dir
self.view = view
self.download_done: bool = False
self.file: Optional[IO[bytes]] = None
def open_file(path: Union[str, IO], mode='rb'):
if isinstance(path, str):
file = open(path, mode)
else:
file = path
return file
def save(self, path: Union[str, IO]):
state = self.__getstate__()
with open_file(path, 'wb') as outfile:
pickle.dump(state, outfile)