def thread_loop(self, context, pipe):
poller = zmq.Poller()
ipc_pub = zmq_tools.Msg_Dispatcher(context, self.g_pool.ipc_push_url)
poller.register(pipe, zmq.POLLIN)
remote_socket = None
while True:
items = dict(poller.poll())
if pipe in items:
cmd = pipe.recv_string()
if cmd == 'Exit':
break
elif cmd == 'Bind':
new_url = pipe.recv_string()
if remote_socket:
poller.unregister(remote_socket)
remote_socket.close(linger=0)
try:
remote_socket = context.socket(zmq.REP)
remote_socket.bind(new_url)
except zmq.ZMQError as e:
remote_socket = None
pipe.send_string("Error", flags=zmq.SNDMORE)
pipe.send_string("Could not bind to Socket: {}. Reason: {}".format(new_url, e))
else:
pipe.send_string("Bind OK", flags=zmq.SNDMORE)
# `.last_endpoint` is already of type `bytes`
pipe.send(remote_socket.last_endpoint.replace(b"tcp://", b""))
poller.register(remote_socket, zmq.POLLIN)
if remote_socket in items:
self.on_recv(remote_socket, ipc_pub)
self.thread_pipe = None
python类send()的实例源码
def on_notify(self, notification):
"""send simple string messages to control application functions.
Emits notifications:
``recording.should_start``
``recording.should_stop``
``calibration.should_start``
``calibration.should_stop``
Any other notification received though the reqrepl port.
"""
pass
def VerifyKey(socket, secret):
sha1 = hashlib.sha1()
sha1.update(str(secret).encode())
privateKey = sha1.hexdigest()
message = b'I am making a note here, huge success!'
message += b'\x00' * 10
iv = b'\x00' * AES.block_size
cipher = AES.new(key=privateKey[:16], mode=AES.MODE_CBC, IV=iv)
ciphertext = cipher.encrypt(message)
socket.send(ciphertext)
def DHExchangeClient(serverHost, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((serverHost, port))
# Usual parameters as recommended by NIST.
pHex = ('ffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e340'
'4ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f40'
'6b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece45b3dc2007cb8a163bf0598da48361c55d39a69163fa8f'
'd24cf5f83655d23dca3ad961c62f356208552bb9ed529077096966d670c354e4abc9804f1746c08ca237327ffffffffffffffff')
p = int(pHex, 16)
g = 2
a = random.randint(0, p - 1)
A = dh.modexp(g, a, p)
B = 0
# Sends the message in the predefined format.
message = 'BEGIN\n%s\n%s\n%s\nEND' % (str(p), str(g), str(A))
sock.send(message.encode())
# Gets a similar message back from the server. See the comments
# on dh_server which has a similar loop.
exchange = b''
while b'D' not in exchange:
exchange += sock.recv(100)
exchange = exchange.decode()
pieces = exchange.split('\n')
B = int(pieces[1])
secret = dh.modexp(B, a, p)
print('My secret is', str(secret))
VerifyKey(sock, secret)
sock.close()
def write(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be byte-ish (%r)',
type(data))
if self._eof_written:
raise RuntimeError('write_eof() already called')
if not data:
return
if self._conn_lost:
if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
logger.warning('socket.send() raised exception.')
self._conn_lost += 1
return
# Observable states:
# 1. IDLE: _write_fut and _buffer both None
# 2. WRITING: _write_fut set; _buffer None
# 3. BACKED UP: _write_fut set; _buffer a bytearray
# We always copy the data, so the caller can't modify it
# while we're still waiting for the I/O to happen.
if self._write_fut is None: # IDLE -> WRITING
assert self._buffer is None
# Pass a copy, except if it's already immutable.
self._loop_writing(data=bytes(data))
elif not self._buffer: # WRITING -> BACKED UP
# Make a mutable copy which we can extend.
self._buffer = bytearray(data)
self._maybe_pause_protocol()
else: # BACKED UP
# Append to buffer (also copies).
self._buffer.extend(data)
self._maybe_pause_protocol()
def _loop_writing(self, f=None, data=None):
try:
assert f is self._write_fut
self._write_fut = None
self._pending_write = 0
if f:
f.result()
if data is None:
data = self._buffer
self._buffer = None
if not data:
if self._closing:
self._loop.call_soon(self._call_connection_lost, None)
if self._eof_written:
self._sock.shutdown(socket.SHUT_WR)
# Now that we've reduced the buffer size, tell the
# protocol to resume writing if it was paused. Note that
# we do this last since the callback is called immediately
# and it may add more data to the buffer (even causing the
# protocol to be paused again).
self._maybe_resume_protocol()
else:
self._write_fut = self._loop._proactor.send(self._sock, data)
if not self._write_fut.done():
assert self._pending_write == 0
self._pending_write = len(data)
self._write_fut.add_done_callback(self._loop_writing)
self._maybe_pause_protocol()
else:
self._write_fut.add_done_callback(self._loop_writing)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
self._fatal_error(exc, 'Fatal write error on pipe transport')
def sock_sendall(self, sock, data):
return self._proactor.send(sock, data)
def _write_to_self(self):
self._csock.send(b'\0')
def _write_to_self(self):
# This may be called from a different thread, possibly after
# _close_self_pipe() has been called or even while it is
# running. Guard for self._csock being None or closed. When
# a socket is closed, send() raises OSError (with errno set to
# EBADF, but let's not rely on the exact error code).
csock = self._csock
if csock is not None:
try:
csock.send(b'\0')
except OSError:
if self._debug:
logger.debug("Fail to write a null byte into the "
"self-pipe socket",
exc_info=True)
def sock_accept(self, sock):
"""Accept a connection.
The socket must be bound to an address and listening for connections.
The return value is a pair (conn, address) where conn is a new socket
object usable to send and receive data on the connection, and address
is the address bound to the socket on the other end of the connection.
This method is a coroutine.
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
fut = futures.Future(loop=self)
self._sock_accept(fut, False, sock)
return fut
def write(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be byte-ish (%r)',
type(data))
if self._eof:
raise RuntimeError('Cannot call write() after write_eof()')
if not data:
return
if self._conn_lost:
if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
logger.warning('socket.send() raised exception.')
self._conn_lost += 1
return
if not self._buffer:
# Optimization: try to send now.
try:
n = self._sock.send(data)
except (BlockingIOError, InterruptedError):
pass
except Exception as exc:
self._fatal_error(exc, 'Fatal write error on socket transport')
return
else:
data = data[n:]
if not data:
return
# Not all was written; register write handler.
self._loop.add_writer(self._sock_fd, self._write_ready)
# Add it to the buffer.
self._buffer.extend(data)
self._maybe_pause_protocol()
def sendto(self, data, addr=None):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be byte-ish (%r)',
type(data))
if not data:
return
if self._address and addr not in (None, self._address):
raise ValueError('Invalid address: must be None or %s' %
(self._address,))
if self._conn_lost and self._address:
if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
logger.warning('socket.send() raised exception.')
self._conn_lost += 1
return
if not self._buffer:
# Attempt to send it right away first.
try:
if self._address:
self._sock.send(data)
else:
self._sock.sendto(data, addr)
return
except (BlockingIOError, InterruptedError):
self._loop.add_writer(self._sock_fd, self._sendto_ready)
except OSError as exc:
self._protocol.error_received(exc)
return
except Exception as exc:
self._fatal_error(exc,
'Fatal write error on datagram transport')
return
# Ensure that what we buffer is immutable.
self._buffer.append((bytes(data), addr))
self._maybe_pause_protocol()
def test_loop_writing(self):
tr = self.socket_transport()
tr._buffer = bytearray(b'data')
tr._loop_writing()
self.loop._proactor.send.assert_called_with(self.sock, b'data')
self.loop._proactor.send.return_value.add_done_callback.\
assert_called_with(tr._loop_writing)
def test_write_eof_buffer_write_pipe(self):
tr = _ProactorWritePipeTransport(self.loop, self.sock, self.protocol)
f = asyncio.Future(loop=self.loop)
tr._loop._proactor.send.return_value = f
tr.write(b'data')
tr.write_eof()
self.assertTrue(tr._closing)
self.assertFalse(self.sock.shutdown.called)
tr._loop._proactor.send.assert_called_with(self.sock, b'data')
f.set_result(4)
self.loop._run_once()
self.loop._run_once()
self.assertTrue(self.sock.close.called)
tr.close()
def test_pause_resume_writing(self):
tr = self.pause_writing_transport(high=4)
# write a large chunk, must pause writing
fut = asyncio.Future(loop=self.loop)
self.loop._proactor.send.return_value = fut
tr.write(b'large data')
self.loop._run_once()
self.assertTrue(self.protocol.pause_writing.called)
# flush the buffer
fut.set_result(None)
self.loop._run_once()
self.assertEqual(tr.get_write_buffer_size(), 0)
self.assertTrue(self.protocol.resume_writing.called)
def test_dont_pause_writing(self):
tr = self.pause_writing_transport(high=4)
# write a large chunk which completes immedialty,
# it should not pause writing
fut = asyncio.Future(loop=self.loop)
fut.set_result(None)
self.loop._proactor.send.return_value = fut
tr.write(b'very large data')
self.loop._run_once()
self.assertEqual(tr.get_write_buffer_size(), 0)
self.assertFalse(self.protocol.pause_writing.called)
def test_write_to_self(self):
self.loop._write_to_self()
self.csock.send.assert_called_with(b'\0')
def test_write_to_self_tryagain(self):
self.loop._csock.send.side_effect = BlockingIOError
with test_utils.disable_logger():
self.assertIsNone(self.loop._write_to_self())
def test_write_to_self_exception(self):
# _write_to_self() swallows OSError
self.loop._csock.send.side_effect = RuntimeError()
self.assertRaises(RuntimeError, self.loop._write_to_self)
def test__sock_sendall_canceled_fut(self):
sock = mock.Mock()
f = asyncio.Future(loop=self.loop)
f.cancel()
self.loop._sock_sendall(f, False, sock, b'data')
self.assertFalse(sock.send.called)