def test_poll(self):
@asyncio.coroutine
def test():
a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL)
f = b.poll(timeout=0)
yield from asyncio.sleep(0)
self.assertEqual(f.result(), 0)
f = b.poll(timeout=1)
assert not f.done()
evt = yield from f
self.assertEqual(evt, 0)
f = b.poll(timeout=1000)
assert not f.done()
yield from a.send_multipart([b'hi', b'there'])
evt = yield from f
self.assertEqual(evt, zmq.POLLIN)
recvd = yield from b.recv_multipart()
self.assertEqual(recvd, [b'hi', b'there'])
self.loop.run_until_complete(test())
python类asyncio()的实例源码
def can_connect(self, server, client):
"""Check if client can connect to server using tcp transport"""
@asyncio.coroutine
def go():
result = False
iface = 'tcp://127.0.0.1'
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
msg = [b"Hello World"]
yield from server.send_multipart(msg)
if (yield from client.poll(1000)):
rcvd_msg = yield from client.recv_multipart()
self.assertEqual(rcvd_msg, msg)
result = True
return result
return self.loop.run_until_complete(go())
def run(self):
self._loop = zmq.asyncio.ZMQEventLoop()
asyncio.set_event_loop(self._loop)
self.context = zmq.asyncio.Context()
self.status_sock = self.context.socket(zmq.ROUTER)
self.data_sock = self.context.socket(zmq.PUB)
self.status_sock.bind("tcp://*:%s" % self.status_port)
self.data_sock.bind("tcp://*:%s" % self.data_port)
self.poller = zmq.asyncio.Poller()
self.poller.register(self.status_sock, zmq.POLLIN)
self._loop.create_task(self.poll_sockets())
try:
self._loop.run_forever()
finally:
self.status_sock.close()
self.data_sock.close()
self.context.destroy()
def __init__(self, loop, logger, config):
print("test")
self.loop = loop
self.log = logger
self.config = config
self.zmq_url = config["BITCOIND"]["zeromq"]
self.zmqContext = zmq.asyncio.Context()
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.MYSQL_CONFIG = config["MYSQL"]
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
# self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
# self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
# self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
self.zmqSubSocket.connect(self.zmq_url)
print(self.zmq_url)
self.loop.create_task(self.init_db())
self.loop.create_task(self.handle())
self.loop.create_task(self.rpctest())
# self.loop.create_task(self.mysqltest())
def handle(self) :
msg = await self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
sequence = "Unknown"
if len(msg[-1]) == 4:
msgSequence = struct.unpack('<I', msg[-1])[-1]
sequence = str(msgSequence)
if topic == b"hashblock":
print('- HASH BLOCK ('+sequence+') -')
print(binascii.hexlify(body))
elif topic == b"hashtx":
print('- HASH TX ('+sequence+') -')
print(binascii.hexlify(body))
elif topic == b"rawblock":
print('- RAW BLOCK HEADER ('+sequence+') -')
print(binascii.hexlify(body))
elif topic == b"rawtx":
self.log.debug("new tx")
self.loop.create_task(self.handle_tx(body))
# print('- RAW TX ('+sequence+') -')
# print(binascii.hexlify(body))
# schedule ourselves to receive the next message
asyncio.ensure_future(self.handle())
def __init__(self, bind_address, linger=-1, poll_timeout=2, loop=None):
self.bind_address = bind_address
self.loop = loop
self.context = zmq.asyncio.Context()
self.poll_timeout = poll_timeout
self.socket = self.context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, linger)
self.in_poller = zmq.asyncio.Poller()
self.in_poller.register(self.socket, zmq.POLLIN)
log.info('Bound to: ' + self.bind_address)
self.socket.bind(self.bind_address)
self._kill = False
def run():
print("Getting ready for hello world client. Ctrl-C to exit.\n")
socket = Ctx.socket(zmq.REP)
socket.bind(Url)
while True:
# Wait for next request from client
message = await socket.recv()
print("Received request: {}".format(message))
# Do some "work"
await asyncio.sleep(1)
# Send reply back to client
message = message.decode('utf-8')
message = '{}, world'.format(message)
message = message.encode('utf-8')
print("Sending reply: {}".format(message))
await socket.send(message)
def start(self, slave_addr, task):
self._task = task
def _start(id, slave_addr, task):
from multiprocessing import Process
import multiprocessing
#multiprocessing.set_start_method('spawn')
Process(target=_worker_main, args=(id, slave_addr, task)).start()
from concurrent.futures import ProcessPoolExecutor
print("[Worker {0}] Create".format(self.id))
_start(self.id, slave_addr, task)
#executor = ProcessPoolExecutor()
#loop = asyncio.get_event_loop()
#asyncio.ensure_future(loop.run_in_executor(ProcessPoolExecutor(), _worker_main, self.id, slave_addr, task))
#asyncio.ensure_future(_start(self.id, slave_addr, task))
#yield from asyncio.sleep(10)
print("***")
def _monitor_disconnects(self):
"""Monitors the client socket for disconnects
"""
yield from self._monitor_sock.recv_multipart()
self._sock.disable_monitor()
self._monitor_sock.disconnect(self._monitor_fd)
self._monitor_sock.close(linger=0)
self._monitor_sock = None
self._sock.disconnect(self._url)
self._ready_event.clear()
LOGGER.debug("monitor socket received disconnect event")
for future in self._futures.future_values():
future.set_result(FutureError())
tasks = list(asyncio.Task.all_tasks(self._event_loop))
for task in tasks:
task.cancel()
self._event_loop.stop()
self._send_queue = None
self._recv_queue = None
def put_message(self, message):
"""
:param message: protobuf generated validator_pb2.Message
"""
if not self._ready_event.is_set():
return
with self._condition:
self._condition.wait_for(
lambda: self._event_loop is not None
and self._send_queue is not None
)
asyncio.run_coroutine_threadsafe(
self._put_message(message),
self._event_loop)
def _send(self, ident, message):
"""
(asyncio coroutine) Send the message and wait for a response.
:param message (sawtooth_sdk.protobuf.Message)
:param ident (str) the identity of the zmq.DEALER to send to
"""
LOGGER.debug(
"Sending %s(%s) to %s",
str(to_protobuf_class(message.message_type).__name__),
str(message.message_type),
str(ident)
)
return await self._socket.send_multipart([
ident,
message.SerializeToString()
])
def start(self):
"""Starts receiving messages on the underlying socket and passes them
to the message router.
"""
self._is_running = True
while self._is_running:
try:
zmq_msg = await self._socket.recv_multipart()
message = Message()
message.ParseFromString(zmq_msg[-1])
await self._msg_router.route_msg(message)
except DecodeError as e:
LOGGER.warning('Unable to decode: %s', e)
except zmq.ZMQError as e:
LOGGER.warning('Unable to receive: %s', e)
return
except asyncio.CancelledError:
self._is_running = False
def send(self, message_type, message_content, timeout=None):
correlation_id = uuid.uuid4().hex
self._msg_router.expect_reply(correlation_id)
message = Message(
correlation_id=correlation_id,
content=message_content,
message_type=message_type)
try:
await self._socket.send_multipart([message.SerializeToString()])
except asyncio.CancelledError:
raise
return await self._msg_router.await_reply(correlation_id,
timeout=timeout)
def _receive_message(self):
"""
Internal coroutine for receiving messages
"""
while True:
try:
if self._socket.getsockopt(zmq.TYPE) == zmq.ROUTER:
zmq_identity, msg_bytes = \
yield from self._socket.recv_multipart()
self._received_from_identity(zmq_identity)
self._dispatcher_queue.put_nowait(
(zmq_identity, msg_bytes))
else:
msg_bytes = yield from self._socket.recv()
self._last_message_time = time.time()
self._dispatcher_queue.put_nowait((None, msg_bytes))
except CancelledError:
# The concurrent.futures.CancelledError is caught by asyncio
# when the Task associated with the coroutine is cancelled.
# The raise is required to stop this component.
raise
except Exception as e: # pylint: disable=broad-except
LOGGER.exception("Received a message on address %s that "
"caused an error: %s", self._address, e)
def __init__(self, context=None, loop=None):
super().__init__(context)
self.loop = loop or asyncio.get_event_loop()
self.__poller = None
self.__task = None
def start(self):
"""Start ZAP authentication"""
super().start()
self.__poller = Poller()
self.__poller.register(self.zap_socket, zmq.POLLIN)
self.__task = asyncio.async(self.__handle_zap())
def setUp(self):
if asyncio is None:
raise SkipTest()
self.loop = zaio.ZMQEventLoop()
asyncio.set_event_loop(self.loop)
super(TestAsyncIOSocket, self).setUp()
def test_recv_multipart(self):
@asyncio.coroutine
def test():
a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL)
f = b.recv_multipart()
assert not f.done()
yield from a.send(b'hi')
recvd = yield from f
self.assertEqual(recvd, [b'hi'])
self.loop.run_until_complete(test())
def test_recv(self):
@asyncio.coroutine
def test():
a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL)
f1 = b.recv()
f2 = b.recv()
assert not f1.done()
assert not f2.done()
yield from a.send_multipart([b'hi', b'there'])
recvd = yield from f2
assert f1.done()
self.assertEqual(f1.result(), b'hi')
self.assertEqual(recvd, b'there')
self.loop.run_until_complete(test())
def test_aiohttp(self):
try:
import aiohttp
except ImportError:
raise SkipTest("Requires aiohttp")
from aiohttp import web
zmq.asyncio.install()
@asyncio.coroutine
def echo(request):
print(request.path)
return web.Response(body=str(request).encode('utf8'))
@asyncio.coroutine
def server(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', echo)
srv = yield from loop.create_server(app.make_handler(),
'127.0.0.1', 8080)
print("Server started at http://127.0.0.1:8080")
return srv
@asyncio.coroutine
def client():
push, pull = self.create_bound_pair(zmq.PUSH, zmq.PULL)
res = yield from aiohttp.request('GET', 'http://127.0.0.1:8080/')
text = yield from res.text()
yield from push.send(text.encode('utf8'))
rcvd = yield from pull.recv()
self.assertEqual(rcvd.decode('utf8'), text)
loop = asyncio.get_event_loop()
loop.run_until_complete(server(loop))
print("servered")
loop.run_until_complete(client())
def shortDescription(self):
"""Rewrite doc strings from TestThreadAuthentication from
'threaded' to 'asyncio'.
"""
doc = self._testMethodDoc
if doc:
doc = doc.split("\n")[0].strip()
if doc.startswith('threaded auth'):
doc = doc.replace('threaded auth', 'asyncio auth')
return doc
def setUp(self):
if asyncio is None:
raise SkipTest()
self.loop = zaio.ZMQEventLoop()
asyncio.set_event_loop(self.loop)
super().setUp()
def poll_sockets(self):
while not self.stopped:
evts = dict(await self.poller.poll(50))
if self.status_sock in evts and evts[self.status_sock] == zmq.POLLIN:
ident, msg = await self.status_sock.recv_multipart()
if msg == b"WHATSUP":
await self.status_sock.send_multipart([ident, b"HI!", json.dumps(self.plot_desc).encode('utf8')])
await asyncio.sleep(0.010)
def stop(self):
self.send("irrelevant", np.array([]), msg="done")
self.stopped = True
pending = asyncio.Task.all_tasks(loop=self._loop)
self._loop.stop()
time.sleep(1)
for task in pending:
task.cancel()
try:
self._loop.run_until_complete(task)
except asyncio.CancelledError:
pass
self._loop.close()
def ping_loop(ctx,
ping_interval,
cycle_time,
initial_ping_timeout,
ping_retries,
backoff,
loop,
inventory_router_url):
"""
:param ctx:
:param ping_interval:
:param cycle_time:
:param initial_ping_timeout:
:param ping_retries:
:param backoff:
:param loop:
:param inventory_router_url:
:return:
"""
# load the queue
inventory_client = InventoryClient(inventory_router_url)
while True:
if stop_ping_loop:
log.info('Stopping ping loop')
break
log.debug('Looking for work')
now = time.time()
for mercury_id, data in list(active_state.items()): # copy to list because the list length could change
# out from under us
if now - data['last_ping'] > ping_interval and not data['pinging']:
log.debug('Scheduling ping for {}'.format(mercury_id))
active_state[mercury_id]['pinging'] = True
asyncio.ensure_future(ping(data, ctx, initial_ping_timeout, ping_retries, backoff, inventory_client),
loop=loop)
await asyncio.sleep(cycle_time)
def get_ctx_and_connect_req_socket(zmq_url):
"""Creates a ZMQ context and a REQ socket.
:param zmq_url: URL for the socket to connect to.
:returns: A tuple containing the ZMQ context and the socket.
"""
ctx = zmq.asyncio.Context()
# noinspection PyUnresolvedReferences
socket = ctx.socket(zmq.REQ)
log.debug('Connection to: {}'.format(zmq_url))
socket.connect(zmq_url)
return ctx, socket
def __init__(self, zmq_url, linger=-1, response_timeout=0):
self.zmq_url = zmq_url
self.ctx, self.socket = get_ctx_and_connect_req_socket(self.zmq_url)
self.socket.setsockopt(zmq.LINGER, linger)
self.poller = zmq.asyncio.Poller()
self.poller.register(self.socket, flags=zmq.POLLIN)
self.response_timeout = response_timeout
def main():
try:
loop = ZMQEventLoop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run())
except KeyboardInterrupt:
print('\nFinished (interrupted)')
sys.exit(0)
def dispatch_msg(self, addr, header, body = b''):
async def _dispatch_msg(msg):
await self._router.send_multipart(msg)
msg = [addr, header, b'', body]
asyncio.ensure_future(_dispatch_msg(msg))
def dispatch_msg(self, addr, header, body = b''):
async def _dispatch_msg(msg):
await self._router.send_multipart(msg)
msg = [addr, header, b'', body]
asyncio.ensure_future(_dispatch_msg(msg))