def flush(self):
if not self.out_buffer:
return
try:
if not self.pollout.poll(0):
if sendfail_cnt >= sendfail_msg:
print 'signalk socket failed to send', sendfail_cnt
self.sendfail_msg *= 10
self.sendfail_cnt += 1
return
t0 = time.time()
count = self.socket.send(self.out_buffer)
t1 = time.time()
if t1-t0 > .1:
print 'socket send took too long!?!?', t1-t0
if count < 0:
print 'socket send error', count
self.socket.close()
self.out_buffer = self.out_buffer[count:]
except:
self.socket.close()
python类send()的实例源码
def sock_sendall(self, sock, data):
"""Send data to the socket.
The socket must be connected to a remote socket. This method continues
to send data from data until either all data has been sent or an
error occurs. None is returned on success. On error, an exception is
raised, and there is no way to determine how much data, if any, was
successfully processed by the receiving end of the connection.
This method is a coroutine.
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
fut = futures.Future(loop=self)
if data:
self._sock_sendall(fut, False, sock, data)
else:
fut.set_result(None)
return fut
def _sock_sendall(self, fut, registered, sock, data):
fd = sock.fileno()
if registered:
self.remove_writer(fd)
if fut.cancelled():
return
try:
n = sock.send(data)
except (BlockingIOError, InterruptedError):
n = 0
except Exception as exc:
fut.set_exception(exc)
return
if n == len(data):
fut.set_result(None)
else:
if n:
data = data[n:]
self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
def _write_ready(self):
assert self._buffer, 'Data should not be empty'
try:
n = self._sock.send(self._buffer)
except (BlockingIOError, InterruptedError):
pass
except Exception as exc:
self._loop.remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on socket transport')
else:
if n:
del self._buffer[:n]
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
self._loop.remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
elif self._eof:
self._sock.shutdown(socket.SHUT_WR)
def write(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be byte-ish (%r)',
type(data))
if not data:
return
if self._conn_lost:
if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
logger.warning('socket.send() raised exception.')
self._conn_lost += 1
return
if not self._buffer:
self._loop.add_writer(self._sock_fd, self._write_ready)
# Add it to the buffer.
self._buffer.extend(data)
self._maybe_pause_protocol()
def _sendto_ready(self):
while self._buffer:
data, addr = self._buffer.popleft()
try:
if self._address:
self._sock.send(data)
else:
self._sock.sendto(data, addr)
except (BlockingIOError, InterruptedError):
self._buffer.appendleft((data, addr)) # Try again later.
break
except OSError as exc:
self._protocol.error_received(exc)
return
except Exception as exc:
self._fatal_error(exc,
'Fatal write error on datagram transport')
return
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
self._loop.remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
def test_loop_writing_err(self, m_log):
err = self.loop._proactor.send.side_effect = OSError()
tr = self.socket_transport()
tr._fatal_error = mock.Mock()
tr._buffer = [b'da', b'ta']
tr._loop_writing()
tr._fatal_error.assert_called_with(
err,
'Fatal write error on pipe transport')
tr._conn_lost = 1
tr.write(b'data')
tr.write(b'data')
tr.write(b'data')
tr.write(b'data')
tr.write(b'data')
self.assertEqual(tr._buffer, None)
m_log.warning.assert_called_with('socket.send() raised exception.')
def test_pause_writing_2write(self):
tr = self.pause_writing_transport(high=4)
# first short write, the buffer is not full (3 <= 4)
fut1 = asyncio.Future(loop=self.loop)
self.loop._proactor.send.return_value = fut1
tr.write(b'123')
self.loop._run_once()
self.assertEqual(tr.get_write_buffer_size(), 3)
self.assertFalse(self.protocol.pause_writing.called)
# fill the buffer, must pause writing (6 > 4)
tr.write(b'abc')
self.loop._run_once()
self.assertEqual(tr.get_write_buffer_size(), 6)
self.assertTrue(self.protocol.pause_writing.called)
def test_pause_writing_3write(self):
tr = self.pause_writing_transport(high=4)
# first short write, the buffer is not full (1 <= 4)
fut = asyncio.Future(loop=self.loop)
self.loop._proactor.send.return_value = fut
tr.write(b'1')
self.loop._run_once()
self.assertEqual(tr.get_write_buffer_size(), 1)
self.assertFalse(self.protocol.pause_writing.called)
# second short write, the buffer is not full (3 <= 4)
tr.write(b'23')
self.loop._run_once()
self.assertEqual(tr.get_write_buffer_size(), 3)
self.assertFalse(self.protocol.pause_writing.called)
# fill the buffer, must pause writing (6 > 4)
tr.write(b'abc')
self.loop._run_once()
self.assertEqual(tr.get_write_buffer_size(), 6)
self.assertTrue(self.protocol.pause_writing.called)
def test_write_exception(self, m_log):
err = self.sock.send.side_effect = OSError()
data = b'data'
transport = self.socket_transport()
transport._fatal_error = mock.Mock()
transport.write(data)
transport._fatal_error.assert_called_with(
err,
'Fatal write error on socket transport')
transport._conn_lost = 1
self.sock.reset_mock()
transport.write(data)
self.assertFalse(self.sock.send.called)
self.assertEqual(transport._conn_lost, 2)
transport.write(data)
transport.write(data)
transport.write(data)
transport.write(data)
m_log.warning.assert_called_with('socket.send() raised exception.')
def test_sendto_exception(self, m_log):
data = b'data'
err = self.sock.sendto.side_effect = RuntimeError()
transport = self.datagram_transport()
transport._fatal_error = mock.Mock()
transport.sendto(data, ())
self.assertTrue(transport._fatal_error.called)
transport._fatal_error.assert_called_with(
err,
'Fatal write error on datagram transport')
transport._conn_lost = 1
transport._address = ('123',)
transport.sendto(data)
transport.sendto(data)
transport.sendto(data)
transport.sendto(data)
transport.sendto(data)
m_log.warning.assert_called_with('socket.send() raised exception.')
def socketOperation(socket, sendMessage, receive = True):
try:
socket.send(sendMessage.encode('ascii'))
except IOError as errmsg:
print('socket', socket, ' sending error: ', errmsg)
return Exceptions['SOCKET_ERROR']
if receive:
try:
responseData = socket.recv(BUFSIZ)
except IOError as errmsg:
print('socket', socket, ' receving error: ', errmsg)
return Exceptions['SOCKET_ERROR']
return responseData.decode('ascii')
#
# functions for blocking socket to send and recv message
# with timeout option, return Exception['TIMEOUT'] if timeout
# para: timeout (type-> seconds)
# will return timeout exception if timeout occurs
#
def socketOperationTimeout(socket, sendMessage, timeout):
readList = [socket]
try:
socket.send(sendMessage.encode('ascii'))
except OSError as errmsg:
print('socket sending error: ', errmsg)
return Exceptions['SOCKET_ERROR']
# realize timeout feature by select
available = select(readList, [], [], timeout)
if available:
sockfd = readList[0]
try:
responseData = sockfd.recv(BUFSIZ)
return responseData.decode('ascii')
except OSError as errmsg:
print('socket receving error: ', errmsg)
return Exceptions['SOCKET_ERROR']
else:
return Exceptions['TIMEOUT']
# abstraction for checking exit status
# must be inside of stateLock
def socketOperationTimeout(socket, sendMessage, timeout):
readList = [socket]
try:
socket.send(sendMessage.encode('ascii'))
except OSError as errmsg:
print('socket sending error: ', errmsg)
return Exceptions['SOCKET_ERROR']
readable, writeable, exceptions = select(readList, [], [], timeout)
if readable:
sockfd = readable[0]
try:
responseData = sockfd.recv(BUFSIZ)
return responseData.decode('ascii')
except OSError as errmsg:
print('socket receving error: ', errmsg)
return Exceptions['SOCKET_ERROR']
else:
return Exceptions['TIMEOUT']
#
# functions for facilitation threads of keep alive procedure
# resend 'JOIN' request ever 20 seconds after successfully joining
#
def sock_sendall(self, sock, data):
"""Send data to the socket.
The socket must be connected to a remote socket. This method continues
to send data from data until either all data has been sent or an
error occurs. None is returned on success. On error, an exception is
raised, and there is no way to determine how much data, if any, was
successfully processed by the receiving end of the connection.
This method is a coroutine.
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
fut = self.create_future()
if data:
self._sock_sendall(fut, False, sock, data)
else:
fut.set_result(None)
return fut
def _sock_sendall(self, fut, registered, sock, data):
fd = sock.fileno()
if registered:
self.remove_writer(fd)
if fut.cancelled():
return
try:
n = sock.send(data)
except (BlockingIOError, InterruptedError):
n = 0
except Exception as exc:
fut.set_exception(exc)
return
if n == len(data):
fut.set_result(None)
else:
if n:
data = data[n:]
self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
def _write_ready(self):
assert self._buffer, 'Data should not be empty'
if self._conn_lost:
return
try:
n = self._sock.send(self._buffer)
except (BlockingIOError, InterruptedError):
pass
except Exception as exc:
self._loop._remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on socket transport')
else:
if n:
del self._buffer[:n]
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
self._loop._remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
elif self._eof:
self._sock.shutdown(socket.SHUT_WR)
def write(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be a bytes-like object, '
'not %r' % type(data).__name__)
if not data:
return
if self._conn_lost:
if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
logger.warning('socket.send() raised exception.')
self._conn_lost += 1
return
if not self._buffer:
self._loop._add_writer(self._sock_fd, self._write_ready)
# Add it to the buffer.
self._buffer.extend(data)
self._maybe_pause_protocol()
def _sendto_ready(self):
while self._buffer:
data, addr = self._buffer.popleft()
try:
if self._address:
self._sock.send(data)
else:
self._sock.sendto(data, addr)
except (BlockingIOError, InterruptedError):
self._buffer.appendleft((data, addr)) # Try again later.
break
except OSError as exc:
self._protocol.error_received(exc)
return
except Exception as exc:
self._fatal_error(exc,
'Fatal write error on datagram transport')
return
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
self._loop._remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
def write_to_fd(self, data):
return self.socket.send(data)
def write_to_fd(self, data):
try:
return self.socket.send(data)
except ssl.SSLError as e:
if e.args[0] == ssl.SSL_ERROR_WANT_WRITE:
# In Python 3.5+, SSLSocket.send raises a WANT_WRITE error if
# the socket is not writeable; we need to transform this into
# an EWOULDBLOCK socket.error or a zero return value,
# either of which will be recognized by the caller of this
# method. Prior to Python 3.5, an unwriteable socket would
# simply return 0 bytes written.
return 0
raise
def write_to_fd(self, data):
return self.socket.send(data)
def write_to_fd(self, data):
try:
return self.socket.send(data)
except ssl.SSLError as e:
if e.args[0] == ssl.SSL_ERROR_WANT_WRITE:
# In Python 3.5+, SSLSocket.send raises a WANT_WRITE error if
# the socket is not writeable; we need to transform this into
# an EWOULDBLOCK socket.error or a zero return value,
# either of which will be recognized by the caller of this
# method. Prior to Python 3.5, an unwriteable socket would
# simply return 0 bytes written.
return 0
raise
def write_to_fd(self, data):
return self.socket.send(data)
def send(self, data):
self.out_buffer += data
if len(self.out_buffer) > 65536:
self.out_buffer = data
print 'overflow in signalk socket'
def send(self, data):
self.out_buffer += data
def flush(self):
if not len(self.out_buffer):
return
try:
count = self.socket.send(self.out_buffer)
self.out_buffer = self.out_buffer[count:]
except:
self.socket.close()
def ListValues(self, socket):
msg = {}
for value in self.values:
t = self.values[value].type()
if type(t) == type(''):
t = {'type' : t}
msg[value] = t
socket.send(kjson.dumps(msg) + '\n')
def HandleRequest(self, socket, request):
data = kjson.loads(request)
if data['method'] == 'list':
self.ListValues(socket)
else:
name = data['name']
if not name in self.values:
socket.send('invalid request: unknown value: ' + name + '\n')
else:
self.HandleNamedRequest(socket, data)
def PollSockets(self):
events = self.poller.poll(0)
while events:
event = events.pop()
fd, flag = event
socket = self.fd_to_socket[fd]
if socket == self.server_socket:
connection, address = socket.accept()
if len(self.sockets) == max_connections:
print 'max connections reached!!!', len(self.sockets)
self.RemoveSocket(self.sockets[0]) # dump first socket??
socket = LineBufferedNonBlockingSocket(connection)
self.sockets.append(socket)
fd = socket.socket.fileno()
# print 'new client', address, fd
self.fd_to_socket[fd] = socket
self.poller.register(fd, select.POLLIN)
elif flag & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
self.RemoveSocket(socket)
elif flag & select.POLLIN:
if not socket.recv():
self.RemoveSocket(socket)
while True:
line = socket.readline()
if not line:
break
try:
self.HandleRequest(socket, line)
except:
print 'invalid request from socket', line
socket.send('invalid request: ' + line + '\n')
# flush all sockets
for socket in self.sockets:
socket.flush()