def readline(self, size=-1):
"""Read a line of uncompressed bytes from the file.
The terminating newline (if present) is retained. If size is
non-negative, no more than size bytes will be read (in which
case the line may be incomplete). Returns b'' if already at EOF.
"""
self._check_can_read()
# Shortcut for the common case - the whole line is in the buffer.
if size < 0:
end = self._buffer.find(b"\n", self._buffer_offset) + 1
if end > 0:
line = self._buffer[self._buffer_offset : end]
self._buffer_offset = end
self._pos += len(line)
return line
return io.BufferedIOBase.readline(self, size)
python类BufferedIOBase()的实例源码
def __init__(self, lsf_path: str, types: List[Type[pyimc.Message]] = None, make_index=True):
"""
Reads an LSF file.
:param lsf_path: The path to the LSF file.
:param types: The message types to return. List of pyimc message classes.
:param make_index: If true, an index that speeds up subsequent reads is created.
"""
self.fpath = lsf_path
self.f = None # type: io.BufferedIOBase
self.header = IMCHeader() # Preallocate header buffer
self.parser = pyimc.Parser()
self.idx = {} # type: Dict[Union[int, str], List[int]]
self.make_index = make_index
if types:
self.msg_types = [pyimc.Factory.id_from_abbrev(x.__name__) for x in types]
else:
self.msg_types = None
def _upload_media_py3(self, media_type, media_file, extension=''):
if isinstance(media_file, io.IOBase) and hasattr(media_file, 'name'):
extension = media_file.name.split('.')[-1].lower()
if not is_allowed_extension(extension):
raise ValueError('Invalid file type.')
filename = media_file.name
elif isinstance(media_file, io.BytesIO):
extension = extension.lower()
if not is_allowed_extension(extension):
raise ValueError('Please provide \'extension\' parameters when the type of \'media_file\' is \'io.BytesIO\'.')
filename = 'temp.' + extension
else:
raise ValueError('Parameter media_file must be io.BufferedIOBase(open a file with \'rb\') or io.BytesIO object.')
return self.request.post(
url='https://api.weixin.qq.com/cgi-bin/media/upload',
params={
'type': media_type,
},
files={
'media': (filename, media_file, convert_ext_to_mime(extension))
}
)
def _get_writable(stream_or_path, mode):
"""This method returns a tuple containing the stream and a flag to indicate
if the stream should be automatically closed.
The `stream_or_path` parameter is returned if it is an open writable stream.
Otherwise, it treats the `stream_or_path` parameter as a file path and
opens it with the given mode.
It is used by the svg and png methods to interpret the file parameter.
:type stream_or_path: str | io.BufferedIOBase
:type mode: str | unicode
:rtype: (io.BufferedIOBase, bool)
"""
is_stream = hasattr(stream_or_path, 'write')
if not is_stream:
# No stream provided, treat "stream_or_path" as path
stream_or_path = open(stream_or_path, mode)
return stream_or_path, not is_stream
def readline(self, size=-1):
"""Read a line of uncompressed bytes from the file.
The terminating newline (if present) is retained. If size is
non-negative, no more than size bytes will be read (in which
case the line may be incomplete). Returns b'' if already at EOF.
"""
if not isinstance(size, int):
if not hasattr(size, "__index__"):
raise TypeError("Integer argument expected")
size = size.__index__()
with self._lock:
self._check_can_read()
# Shortcut for the common case - the whole line is in the buffer.
if size < 0:
end = self._buffer.find(b"\n", self._buffer_offset) + 1
if end > 0:
line = self._buffer[self._buffer_offset : end]
self._buffer_offset = end
self._pos += len(line)
return line
return io.BufferedIOBase.readline(self, size)
def _pipe_stdin(self, stdin):
if stdin is None or isinstance(stdin, io.FileIO):
return None
tsi = self._temp_stdin
bufsize = self.bufsize
if isinstance(stdin, io.BufferedIOBase):
buf = stdin.read(bufsize)
while len(buf) != 0:
tsi.write(buf)
tsi.flush()
buf = stdin.read(bufsize)
elif isinstance(stdin, (str, bytes)):
raw = stdin.encode() if isinstance(stdin, str) else stdin
for i in range((len(raw)//bufsize) + 1):
tsi.write(raw[i*bufsize:(i + 1)*bufsize])
tsi.flush()
else:
raise ValueError('stdin not understood {0!r}'.format(stdin))
baker.py 文件源码
项目:rank_biased_precision_implementation
作者: vatsal-sodha
项目源码
文件源码
阅读 68
收藏 0
点赞 0
评论 0
def write(fobj, content, convert=True):
"""
Utility function used to write content to a file. Allow compatibility
between Python versions.
"""
# This function automatically converts strings to bytes
# if running under Python 3. Otherwise we cannot write
# to a file.
# First detect whether fobj requires binary stream
if hasattr(fobj, 'mode'):
# A file-like object
binary = 'b' in fobj.mode
else:
# A subclass of io.BufferedIOBase?
binary = isinstance(fobj, io.BufferedIOBase)
# If we are running under Python 3 and binary is required
if sys.version_info[:2] >= (3, 0) and convert and binary: # pragma: no cover
content = bytes(content, 'utf-8')
fobj.write(content)
def readline(self, size=-1):
"""Read a line of uncompressed bytes from the file.
The terminating newline (if present) is retained. If size is
non-negative, no more than size bytes will be read (in which
case the line may be incomplete). Returns b'' if already at EOF.
"""
if not isinstance(size, int):
if not hasattr(size, "__index__"):
raise TypeError("Integer argument expected")
size = size.__index__()
with self._lock:
self._check_can_read()
# Shortcut for the common case - the whole line is in the buffer.
if size < 0:
end = self._buffer.find(b"\n", self._buffer_offset) + 1
if end > 0:
line = self._buffer[self._buffer_offset : end]
self._buffer_offset = end
self._pos += len(line)
return line
return io.BufferedIOBase.readline(self, size)
def readline(self, size=-1):
"""Read a line of uncompressed bytes from the file.
The terminating newline (if present) is retained. If size is
non-negative, no more than size bytes will be read (in which
case the line may be incomplete). Returns b'' if already at EOF.
"""
self._check_can_read()
# Shortcut for the common case - the whole line is in the buffer.
if size < 0:
end = self._buffer.find(b"\n", self._buffer_offset) + 1
if end > 0:
line = self._buffer[self._buffer_offset : end]
self._buffer_offset = end
self._pos += len(line)
return line
return io.BufferedIOBase.readline(self, size)
def __init__(self, buffer):
"""
:param buffer: Buffer
:type buffer: io.BufferedIOBase
"""
self.buffer = buffer
def _gettextwriter(out, encoding):
if out is None:
import sys
out = sys.stdout
if isinstance(out, io.RawIOBase):
buffer = io.BufferedIOBase(out)
# Keep the original file open when the TextIOWrapper is
# destroyed
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
# wrap a binary writer with TextIOWrapper
return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n')
def _gettextwriter(out, encoding):
if out is None:
import sys
out = sys.stdout
if isinstance(out, io.RawIOBase):
buffer = io.BufferedIOBase(out)
# Keep the original file open when the TextIOWrapper is
# destroyed
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
# wrap a binary writer with TextIOWrapper
return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n')
def _gettextwriter(out, encoding):
if out is None:
import sys
out = sys.stdout
if isinstance(out, io.RawIOBase):
buffer = io.BufferedIOBase(out)
# Keep the original file open when the TextIOWrapper is
# destroyed
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
# wrap a binary writer with TextIOWrapper
return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n')
def _gettextwriter(out, encoding):
if out is None:
import sys
out = sys.stdout
if isinstance(out, io.RawIOBase):
buffer = io.BufferedIOBase(out)
# Keep the original file open when the TextIOWrapper is
# destroyed
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
# wrap a binary writer with TextIOWrapper
return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n')
def _gettextwriter(out, encoding):
if out is None:
import sys
out = sys.stdout
if isinstance(out, io.RawIOBase):
buffer = io.BufferedIOBase(out)
# Keep the original file open when the TextIOWrapper is
# destroyed
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
# wrap a binary writer with TextIOWrapper
return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n')
def output(self, text, highlighter=None):
"""Send text to the environment's output stream.
:param text: the text to output
:param highlighter: an optional function to colourize the text
"""
if not self.has_pipe: # only colourize when the output is not piped
highlighter = highlighter or (lambda x: x)
text = highlighter(text)
output = "{}\n".format(text)
if isinstance(self.output_stream, io.BufferedIOBase):
output = bytes(output, encoding='utf-8')
self.output_stream.write(output)
self.output_stream.flush()
def _gettextwriter(out, encoding):
if out is None:
import sys
out = sys.stdout
if isinstance(out, io.RawIOBase):
buffer = io.BufferedIOBase(out)
# Keep the original file open when the TextIOWrapper is
# destroyed
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
# wrap a binary writer with TextIOWrapper
return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n')
def _gettextwriter(out, encoding):
if out is None:
import sys
out = sys.stdout
if isinstance(out, io.RawIOBase):
buffer = io.BufferedIOBase(out)
# Keep the original file open when the TextIOWrapper is
# destroyed
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
# wrap a binary writer with TextIOWrapper
return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n')
def _gettextwriter(out, encoding):
if out is None:
import sys
out = sys.stdout
if isinstance(out, io.RawIOBase):
buffer = io.BufferedIOBase(out)
# Keep the original file open when the TextIOWrapper is
# destroyed
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
# wrap a binary writer with TextIOWrapper
return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n')
def test_io_buffered_by_default(self):
p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
self.assertIsInstance(p.stdin, io.BufferedIOBase)
self.assertIsInstance(p.stdout, io.BufferedIOBase)
self.assertIsInstance(p.stderr, io.BufferedIOBase)
finally:
p.stdin.close()
p.stdout.close()
p.stderr.close()
p.wait()
def _gettextwriter(out, encoding):
if out is None:
import sys
return sys.stdout
if isinstance(out, io.TextIOBase):
# use a text writer as is
return out
# wrap a binary writer with TextIOWrapper
if isinstance(out, io.RawIOBase):
# Keep the original file open when the TextIOWrapper is
# destroyed
class _wrapper:
__class__ = out.__class__
def __getattr__(self, name):
return getattr(out, name)
buffer = _wrapper()
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
return io.TextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n',
write_through=True)
def __init__(self, filename, mode="rb", compresslevel=9):
# This lock must be recursive, so that BufferedIOBase's
# readline(), readlines() and writelines() don't deadlock.
self._lock = RLock()
self._fp = None
self._closefp = False
self._mode = _MODE_CLOSED
self._pos = 0
self._size = -1
if not isinstance(compresslevel, int) or not (1 <= compresslevel <= 9):
raise ValueError("'compresslevel' must be an integer "
"between 1 and 9. You provided 'compresslevel={}'"
.format(compresslevel))
if mode == "rb":
mode_code = _MODE_READ
self._decompressor = zlib.decompressobj(self.wbits)
self._buffer = b""
self._buffer_offset = 0
elif mode == "wb":
mode_code = _MODE_WRITE
self._compressor = zlib.compressobj(compresslevel,
zlib.DEFLATED,
self.wbits,
zlib.DEF_MEM_LEVEL,
0)
else:
raise ValueError("Invalid mode: %r" % (mode,))
if isinstance(filename, _basestring):
self._fp = io.open(filename, mode)
self._closefp = True
self._mode = mode_code
elif hasattr(filename, "read") or hasattr(filename, "write"):
self._fp = filename
self._mode = mode_code
else:
raise TypeError("filename must be a str or bytes object, "
"or a file")
def readinto(self, b):
"""Read up to len(b) bytes into b.
Returns the number of bytes read (0 for EOF).
"""
with self._lock:
return io.BufferedIOBase.readinto(self, b)
def _gettextwriter(out, encoding):
if out is None:
import sys
out = sys.stdout
if isinstance(out, io.RawIOBase):
buffer = io.BufferedIOBase(out)
# Keep the original file open when the TextIOWrapper is
# destroyed
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
# wrap a binary writer with TextIOWrapper
return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n')
def is_fileobj(f):
""" Check if an object `f` is intance of FileIO object created
by `open()`"""
return isinstance(f, io.TextIOBase) or \
isinstance(f, io.BufferedIOBase) or \
isinstance(f, io.RawIOBase) or \
isinstance(f, io.IOBase)
def _gettextwriter(out, encoding):
if out is None:
import sys
out = sys.stdout
if isinstance(out, io.RawIOBase):
buffer = io.BufferedIOBase(out)
# Keep the original file open when the TextIOWrapper is
# destroyed
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
# wrap a binary writer with TextIOWrapper
return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n')
# PyPy: moved this class outside the function above
def readinto(self, b):
"""Read up to len(b) bytes into b.
Returns the number of bytes read (0 for EOF).
"""
with self._lock:
return io.BufferedIOBase.readinto(self, b)
def readlines(self, size=-1):
"""Read a list of lines of uncompressed bytes from the file.
size can be specified to control the number of lines read: no
further lines will be read once the total size of the lines read
so far equals or exceeds size.
"""
if not isinstance(size, int):
if not hasattr(size, "__index__"):
raise TypeError("Integer argument expected")
size = size.__index__()
with self._lock:
return io.BufferedIOBase.readlines(self, size)
def writelines(self, seq):
"""Write a sequence of byte strings to the file.
Returns the number of uncompressed bytes written.
seq can be any iterable yielding byte strings.
Line separators are not added between the written byte strings.
"""
with self._lock:
return io.BufferedIOBase.writelines(self, seq)
# Rewind the file to the beginning of the data stream.
def test_io_buffered_by_default(self):
p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
self.assertIsInstance(p.stdin, io.BufferedIOBase)
self.assertIsInstance(p.stdout, io.BufferedIOBase)
self.assertIsInstance(p.stderr, io.BufferedIOBase)
finally:
p.stdin.close()
p.stdout.close()
p.stderr.close()
p.wait()