def setUp(self):
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
blocking_patcher.start()
self.addCleanup(blocking_patcher.stop)
fstat_patcher = mock.patch('os.fstat')
m_fstat = fstat_patcher.start()
st = mock.Mock()
st.st_mode = stat.S_IFIFO
m_fstat.return_value = st
self.addCleanup(fstat_patcher.stop)
python类RawIOBase()的实例源码
def setUp(self):
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
blocking_patcher.start()
self.addCleanup(blocking_patcher.stop)
fstat_patcher = mock.patch('os.fstat')
m_fstat = fstat_patcher.start()
st = mock.Mock()
st.st_mode = stat.S_IFSOCK
m_fstat.return_value = st
self.addCleanup(fstat_patcher.stop)
def wait_until_ready(self, channel:RawIOBase, timeout=60):
"""
sends ' ' (space) and waits for the corresponding ACK message. Once we have 3 of these in a row we can be fairly
certain the device is ready for ymodem.
:param channel:
:param timeout:
:return:
"""
success_count = 0
while channel.readline(): # flush any existing data
success_count = 0
while success_count < 2:
channel.write(b' ')
result = channel.read()
if result and result[0]==LightYModemProtocol.ack:
success_count += 1
def __init__(self, fileno, mode='r', closefd=True):
RawIOBase.__init__(self) # Python 2: pylint:disable=no-member,non-parent-init-called
self._closed = False
self._closefd = closefd
self._fileno = fileno
make_nonblocking(fileno)
self._readable = 'r' in mode
self._writable = 'w' in mode
self.hub = get_hub()
io_watcher = self.hub.loop.io
if self._readable:
self._read_event = io_watcher(fileno, 1)
if self._writable:
self._write_event = io_watcher(fileno, 2)
self._seekable = None
def close(self):
if self._closed:
return
self.flush()
self._closed = True
if self._readable:
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
if self._writable:
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
fileno = self._fileno
if self._closefd:
self._fileno = None
os.close(fileno)
# RawIOBase provides a 'read' method that will call readall() if
# the `size` was missing or -1 and otherwise call readinto(). We
# want to take advantage of this to avoid single byte reads when
# possible. This is highlighted by a bug in BufferedIOReader that
# calls read() in a loop when its readall() method is invoked;
# this was fixed in Python 3.3. See
# https://github.com/gevent/gevent/issues/675)
def setUp(self):
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
blocking_patcher.start()
self.addCleanup(blocking_patcher.stop)
fstat_patcher = mock.patch('os.fstat')
m_fstat = fstat_patcher.start()
st = mock.Mock()
st.st_mode = stat.S_IFIFO
m_fstat.return_value = st
self.addCleanup(fstat_patcher.stop)
def setUp(self):
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
blocking_patcher.start()
self.addCleanup(blocking_patcher.stop)
fstat_patcher = mock.patch('os.fstat')
m_fstat = fstat_patcher.start()
st = mock.Mock()
st.st_mode = stat.S_IFSOCK
m_fstat.return_value = st
self.addCleanup(fstat_patcher.stop)
def __init__(self, fileno, mode='r', closefd=True):
RawIOBase.__init__(self) # Python 2: pylint:disable=no-member,non-parent-init-called
self._closed = False
self._closefd = closefd
self._fileno = fileno
make_nonblocking(fileno)
self._readable = 'r' in mode
self._writable = 'w' in mode
self.hub = get_hub()
io_watcher = self.hub.loop.io
if self._readable:
self._read_event = io_watcher(fileno, 1)
if self._writable:
self._write_event = io_watcher(fileno, 2)
self._seekable = None
def close(self):
if self._closed:
return
self.flush()
self._closed = True
if self._readable:
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
if self._writable:
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
fileno = self._fileno
if self._closefd:
self._fileno = None
os.close(fileno)
# RawIOBase provides a 'read' method that will call readall() if
# the `size` was missing or -1 and otherwise call readinto(). We
# want to take advantage of this to avoid single byte reads when
# possible. This is highlighted by a bug in BufferedIOReader that
# calls read() in a loop when its readall() method is invoked;
# this was fixed in Python 3.3. See
# https://github.com/gevent/gevent/issues/675)
def setUp(self):
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
blocking_patcher.start()
self.addCleanup(blocking_patcher.stop)
fstat_patcher = mock.patch('os.fstat')
m_fstat = fstat_patcher.start()
st = mock.Mock()
st.st_mode = stat.S_IFIFO
m_fstat.return_value = st
self.addCleanup(fstat_patcher.stop)
def setUp(self):
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
blocking_patcher.start()
self.addCleanup(blocking_patcher.stop)
fstat_patcher = mock.patch('os.fstat')
m_fstat = fstat_patcher.start()
st = mock.Mock()
st.st_mode = stat.S_IFSOCK
m_fstat.return_value = st
self.addCleanup(fstat_patcher.stop)
def isatty(self):
io.RawIOBase.isatty(self)
return True
def _gettextwriter(out, encoding):
if out is None:
import sys
out = sys.stdout
if isinstance(out, io.RawIOBase):
buffer = io.BufferedIOBase(out)
# Keep the original file open when the TextIOWrapper is
# destroyed
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
# wrap a binary writer with TextIOWrapper
return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n')
def isatty(self):
io.RawIOBase.isatty(self)
return True
def isatty(self):
io.RawIOBase.isatty(self)
return True
def _gettextwriter(out, encoding):
if out is None:
import sys
out = sys.stdout
if isinstance(out, io.RawIOBase):
buffer = io.BufferedIOBase(out)
# Keep the original file open when the TextIOWrapper is
# destroyed
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
# wrap a binary writer with TextIOWrapper
return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n')
def __init__(self, sock, mode):
if mode not in ("r", "w", "rw", "rb", "wb", "rwb"):
raise ValueError("invalid mode: %r" % mode)
io.RawIOBase.__init__(self)
self._sock = sock
if "b" not in mode:
mode += "b"
self._mode = mode
self._reading = "r" in mode
self._writing = "w" in mode
self._timeout_occurred = False
def close(self):
"""Close the SocketIO object. This doesn't close the underlying
socket, except if all references to it have disappeared.
"""
if self.closed:
return
io.RawIOBase.close(self)
self._sock._decref_socketios()
self._sock = None
def __init__(self, sock, mode):
if mode not in ("r", "w", "rw", "rb", "wb", "rwb"):
raise ValueError("invalid mode: %r" % mode)
io.RawIOBase.__init__(self)
self._sock = sock
if "b" not in mode:
mode += "b"
self._mode = mode
self._reading = "r" in mode
self._writing = "w" in mode
self._timeout_occurred = False
def close(self):
"""Close the SocketIO object. This doesn't close the underlying
socket, except if all references to it have disappeared.
"""
if self.closed:
return
io.RawIOBase.close(self)
self._sock._decref_socketios()
self._sock = None
def transfer(self, file, channel:RawIOBase, progress:ProgressSpan):
"""
Transfers a single file to the device and closes the session, so the device places the data in the
target location.
file: the file to transfer via ymodem
channel: the ymodem endpoint
progress: notification of progress
"""
self.seq = 0
self.channel = channel
self.progress = progress
file.seek(0, os.SEEK_END)
size = file.tell()
file.seek(0, os.SEEK_SET)
progress.min = 0
progress.max = size
progress.update(0)
response = self.send_filename_header(self.default_file_name, size)
while response == LightYModemClient.ack:
response = self.send_packet(file)
file.close()
if response != LightYModemClient.eot:
raise LightYModemException("Unable to transfer the file to the device. Code=%d" % response)
self._send_close()
return True
def isatty(self):
io.RawIOBase.isatty(self)
return True
def _gettextwriter(out, encoding):
if out is None:
import sys
out = sys.stdout
if isinstance(out, io.RawIOBase):
buffer = io.BufferedIOBase(out)
# Keep the original file open when the TextIOWrapper is
# destroyed
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
# wrap a binary writer with TextIOWrapper
return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n')
def _gettextwriter(out, encoding):
if out is None:
import sys
out = sys.stdout
if isinstance(out, io.RawIOBase):
buffer = io.BufferedIOBase(out)
# Keep the original file open when the TextIOWrapper is
# destroyed
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
# wrap a binary writer with TextIOWrapper
return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n')
def readall(self):
"""Read all remaining data"""
# avoid RawIOBase default impl
return self.read()
def __init__(self, sock, mode):
if mode not in ("r", "w", "rw", "rb", "wb", "rwb"):
raise ValueError("invalid mode: %r" % mode)
io.RawIOBase.__init__(self)
self._sock = sock
if "b" not in mode:
mode += "b"
self._mode = mode
self._reading = "r" in mode
self._writing = "w" in mode
self._timeout_occurred = False
def close(self):
"""Close the SocketIO object. This doesn't close the underlying
socket, except if all references to it have disappeared.
"""
if self.closed:
return
io.RawIOBase.close(self)
self._sock._decref_socketios()
self._sock = None
def test_unsupported_not_forwarded():
class FakeFile(io.RawIOBase):
def unsupported_attr(self): # pragma: no cover
pass
async_file = trio.wrap_file(FakeFile())
assert hasattr(async_file.wrapped, 'unsupported_attr')
with pytest.raises(AttributeError):
getattr(async_file, 'unsupported_attr')
def _gettextwriter(out, encoding):
if out is None:
import sys
out = sys.stdout
if isinstance(out, io.RawIOBase):
buffer = io.BufferedIOBase(out)
# Keep the original file open when the TextIOWrapper is
# destroyed
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy, but just have a write method
buffer = io.BufferedIOBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16, etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
# wrap a binary writer with TextIOWrapper
return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
errors='xmlcharrefreplace',
newline='\n')
def isatty(self):
io.RawIOBase.isatty(self)
return True