def test_selector_raises_timeout_error_on_interrupt_over_time(self):
selectors2._DEFAULT_SELECTOR = None
mock_socket = mock.Mock()
mock_socket.fileno.return_value = 1
def slow_interrupting_select(*args, **kwargs):
time.sleep(0.2)
error = OSError()
error.errno = errno.EINTR
raise error
patch_select_module(self, select=slow_interrupting_select)
selector = self.make_selector()
selector.register(mock_socket, selectors2.EVENT_READ)
try:
selector.select(timeout=0.1)
except OSError as e:
self.assertEqual(e.errno, errno.ETIMEDOUT)
else:
self.fail('Didn\'t raise an OSError')
python类EINTR的实例源码
def _wait_for(fd, readable, writable, error, expiration):
done = False
while not done:
if expiration is None:
timeout = None
else:
timeout = expiration - time.time()
if timeout <= 0.0:
raise dns.exception.Timeout
try:
if not _polling_backend(fd, readable, writable, error, timeout):
raise dns.exception.Timeout
except select_error as e:
if e.args[0] != errno.EINTR:
raise e
done = True
def _read(self, N):
# Starting with Python 3 open with buffering=0 returns a FileIO object.
# FileIO.read behaves like read(2) and not like fread(3) and thus we
# have to handle the case that read returns less data as requested here
# more carefully.
data = b("")
while len(data) < N:
try:
d = self.__file.read(N - len(data))
except IOError, e:
# read(2) has been interrupted by a signal; redo the read
if e.errno == errno.EINTR:
continue
raise
if d is None:
# __file is in non-blocking mode and no data is available
return data
if len(d) == 0:
# __file is in blocking mode and arrived at EOF
return data
data += d
return data
def _peek_unlocked(self, n=0):
want = min(n, self.buffer_size)
have = len(self._read_buf) - self._read_pos
if have < want or have <= 0:
to_read = self.buffer_size - have
while True:
try:
current = self.raw.read(to_read)
except IOError as e:
if e.errno != EINTR:
raise
continue
break
if current:
self._read_buf = self._read_buf[self._read_pos:] + current
self._read_pos = 0
return self._read_buf[self._read_pos:]
def _flush_unlocked(self):
if self.closed:
raise ValueError("flush of closed file")
while self._write_buf:
try:
n = self.raw.write(self._write_buf)
except BlockingIOError:
raise RuntimeError("self.raw should implement RawIOBase: it "
"should not raise BlockingIOError")
except IOError as e:
if e.errno != EINTR:
raise
continue
if n is None:
raise BlockingIOError(
errno.EAGAIN,
"write could not complete without blocking", 0)
if n > len(self._write_buf) or n < 0:
raise IOError("write() returned incorrect number of bytes")
del self._write_buf[:n]
def reset_input_buffer(self):
"""Clear input buffer, discarding all that is in the buffer."""
if not self.is_open:
raise portNotOpenError
# just use recv to remove input, while there is some
ready = True
while ready:
ready, _, _ = select.select([self._socket], [], [], 0)
try:
self._socket.recv(4096)
except OSError as e:
# this is for Python 3.x where select.error is a subclass of
# OSError ignore BlockingIOErrors and EINTR. other errors are shown
# https://www.python.org/dev/peps/pep-0475.
if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
raise SerialException('read failed: {}'.format(e))
except (select.error, socket.error) as e:
# this is for Python 2.x
# ignore BlockingIOErrors and EINTR. all errors are shown
# see also http://www.python.org/dev/peps/pep-3151/#select
if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
raise SerialException('read failed: {}'.format(e))
def wait_on_children(self):
"""Wait on children exit."""
while self.running:
try:
pid, status = os.wait()
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
self._remove_children(pid)
self._verify_and_respawn_children(pid, status)
except OSError as err:
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
except KeyboardInterrupt:
LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
os.killpg(0, signal.SIGTERM)
break
except exception.SIGHUPInterrupt:
self.reload()
continue
eventlet.greenio.shutdown_safe(self.sock)
self.sock.close()
LOG.debug('Exited')
def poll(self, timeout):
try:
fds = self._poller.poll(timeout * 1000)
except select.error as err:
if err[0] == errno.EINTR:
display.vvv(u"EINTR encountered in poll")
return [], []
raise
readables = []
for fd, eventmask in fds:
if eventmask & select.POLLNVAL:
# POLLNVAL means `fd` value is invalid, not open.
self._poller.unregister(fd)
elif eventmask & self.READ:
if fd in self.readables_fd_map:
readables.append(self.readables_fd_map[fd])
return readables, []
def sleep(self):
"""\
Sleep until PIPE is readable or we timeout.
A readable PIPE means a signal occurred.
"""
try:
ready = select.select([self.PIPE[0]], [], [], 1.0)
if not ready[0]:
return
while os.read(self.PIPE[0], 1):
pass
except select.error as e:
if e.args[0] not in [errno.EAGAIN, errno.EINTR]:
raise
except OSError as e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
except KeyboardInterrupt:
sys.exit()
def wait(self, timeout):
try:
self.notify()
ret = select.select(self.wait_fds, [], [], timeout)
if ret[0]:
return ret[0]
except select.error as e:
if e.args[0] == errno.EINTR:
return self.sockets
if e.args[0] == errno.EBADF:
if self.nr < 0:
return self.sockets
else:
raise StopWaiting
raise
def sleep(self):
"""\
Sleep until PIPE is readable or we timeout.
A readable PIPE means a signal occurred.
"""
try:
ready = select.select([self.PIPE[0]], [], [], 1.0)
if not ready[0]:
return
while os.read(self.PIPE[0], 1):
pass
except select.error as e:
if e.args[0] not in [errno.EAGAIN, errno.EINTR]:
raise
except OSError as e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
except KeyboardInterrupt:
sys.exit()
def wait(self, timeout):
try:
self.notify()
ret = select.select(self.wait_fds, [], [], timeout)
if ret[0]:
if self.PIPE[0] in ret[0]:
os.read(self.PIPE[0], 1)
return ret[0]
except select.error as e:
if e.args[0] == errno.EINTR:
return self.sockets
if e.args[0] == errno.EBADF:
if self.nr < 0:
return self.sockets
else:
raise StopWaiting
raise
def _read_bytes(self, num_bytes):
self._sock.settimeout(self._read_timeout)
while True:
try:
data = self._rfile.read(num_bytes)
break
except (IOError, OSError) as e:
if e.errno == errno.EINTR:
continue
raise err.OperationalError(
2013,
"Lost connection to MySQL server during query (%s)" % (e,))
if len(data) < num_bytes:
raise err.OperationalError(
2013, "Lost connection to MySQL server during query")
return data
def readinto(self, b):
"""Read up to len(b) bytes into the writable buffer *b* and return
the number of bytes read. If the socket is non-blocking and no bytes
are available, None is returned.
If *b* is non-empty, a 0 return value indicates that the connection
was shutdown at the other end.
"""
self._checkClosed()
self._checkReadable()
if self._timeout_occurred:
raise IOError("cannot read from timed out object")
while True:
try:
return self._sock.recv_into(b)
except timeout:
self._timeout_occurred = True
raise
except error as e:
n = e.args[0]
if n == EINTR:
continue
if n in _blocking_errnos:
return None
raise
def serve_forever():
listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_socket.bind(SERVER_ADDRESS)
listen_socket.listen(REQUEST_QUEUE_SIZE)
print('Serving HTTP on port {port} ...'.format(port=PORT))
signal.signal(signal.SIGTSTP, grim_reaper)
while True:
try:
client_connection, client_address = listen_socket.accept()
except IOError as e:
code, msg = e.args
# restart 'accept' if it was interrupted
if code == errno.EINTR:
continue
else:
raise
pid = os.fork()
if pid == 0: # child
listen_socket.close() # close child copy
handle_request(client_connection)
client_connection.close()
os._exit(0)
else: # parent
client_connection.close() # close parent copy and loop over
def Popen(*args, **kwargs):
"""
Wrapper for subprocess.Popen() that provides thread-safety and works around
other known bugs.
"""
_popen_lock.acquire()
try:
# Python's subprocess module has a bug where it propagates the
# exception to the caller when it gets interrupted trying to read the
# status back from the child process, leaving the child process
# effectively orphaned and registering a false failure. To work around
# it, we temporarily replace os.read with a retrying version that
# allows Popen to succeed in this case.
class RetryFunc(object):
def __init__ (self, func):
import os
self.func = func
def __call__ (self, *args, **kwargs):
while True:
try:
return self.func(*args, **kwargs)
except OSError, e:
if e.errno != errno.EINTR:
raise
reader = RetryFunc(os.read)
os.read = reader
return subprocess.Popen(*args, **kwargs)
finally:
os.read = reader.func
_popen_lock.release()
def _eintr_retry(func, *args):
"""restart a system call interrupted by EINTR"""
while True:
try:
return func(*args)
except OSError as e:
if e.args[0] != errno.EINTR:
raise
def _eintr_retry_call(func, *args):
while True:
try:
return func(*args)
except OSError, e:
if e.errno == errno.EINTR:
continue
raise
def wait(self, timeout=None):
"""
Wait through 'timeout' (in seconds) for a notification, or forever
if it is None. Return True if a notification was received or
False otherwise.
"""
if timeout is not None:
timeout = float(timeout)
while True:
try:
selected = select.select([self.pg], [], [], timeout)
break
except select.error as ex:
err_no, message = ex
if err_no == errno.EINTR:
# TODO: This needs to adjust the time remaining
continue
raise ex
if selected == ([], [], []):
return False
self.pg.poll()
self.__capture_notifications()
return len(self.pending_notifications) > 0
def read_socket_input(connection, socket_obj):
"""Read from the network layer and processes all data read. Can
support both blocking and non-blocking sockets.
Returns the number of input bytes processed, or EOS if input processing
is done. Any exceptions raised by the socket are re-raised.
"""
count = connection.needs_input
if count <= 0:
return count # 0 or EOS
while True:
try:
sock_data = socket_obj.recv(count)
break
except socket.timeout as e:
LOG.debug("Socket timeout exception %s", str(e))
raise # caller must handle
except socket.error as e:
err = e.errno
if err in [errno.EAGAIN,
errno.EWOULDBLOCK,
errno.EINTR]:
# try again later
return 0
# otherwise, unrecoverable, caller must handle
LOG.debug("Socket error exception %s", str(e))
raise
except Exception as e: # beats me... assume fatal
LOG.debug("unknown socket exception %s", str(e))
raise # caller must handle
if len(sock_data) > 0:
count = connection.process_input(sock_data)
else:
LOG.debug("Socket closed")
count = Connection.EOS
connection.close_input()
connection.close_output()
return count