def run_server():
asyncio.ensure_future(client_router.run())
asyncio.ensure_future(slave_router.run())
# terminate server if receive a control packet from control socket.
control_router = context.socket(zmq.ROUTER)
control_router.bind(CONTROL_ROUTER_ADDR)
msg = await control_router.recv_multipart()
python类asyncio()的实例源码
def dispatch_msg(self, header, body = b''):
async def _dispatch_msg(msg):
print("_dispatch_msg("+str(msg)+")")
await self._router.send_multipart(msg) # why server cannot receive this msg???
print("_dispatch_msg finish") # come here : okay
msg = [header, b'', body]
asyncio.ensure_future(_dispatch_msg(msg))
def dispatch_msg(self, addr, header, body = b''):
async def _dispatch_msg(msg):
await self._router.send_multipart(msg)
msg = [addr, header, b'', body]
asyncio.ensure_future(_dispatch_msg(msg))
def run_server():
asyncio.ensure_future(master_conn.run())
asyncio.ensure_future(worker_router.run())
# terminate server if receive a control packet from control socket.
control_router = context.socket(zmq.ROUTER)
control_router.bind(CONTROL_ROUTER_ADDR)
msg = await control_router.recv_multipart()
def main(MASTER_ADDR, WORKER_ROUTER_ADDR, slave_addr, control_router_addr):
global context
global master_conn
global worker_router
global worker_manager
global SLAVE_ADDR
global CONTROL_ROUTER_ADDR
SLAVE_ADDR = slave_addr
CONTROL_ROUTER_ADDR = control_router_addr
try:
loop = ZMQEventLoop()
asyncio.set_event_loop(loop)
context = Context()
master_conn = MasterConnection(context, MASTER_ADDR)
worker_router = WorkerRouter(context, WORKER_ROUTER_ADDR)
worker_manager = WorkerManager()
#loop.set_default_executor(ProcessPoolExecutor())
loop.run_until_complete(run_server())
except KeyboardInterrupt:
print('\nFinished (interrupted)')
sys.exit(0)
def dispatch_msg(self, header, body = b''):
async def _dispatch_msg(msg):
print("_dispatch_msg("+str(msg)+")")
await self._router.send_multipart(msg) # why server cannot receive this msg???
print("_dispatch_msg finish") # come here : okay
msg = [header, b'', body]
asyncio.ensure_future(_dispatch_msg(msg))
def run(self):
for idx in range(TaskSimulator.NUM_TASKS):
print("[*] Simulate Task #{0}".format(idx))
task = self._make_task()
task_manager.add_task(task)
self._process_task(task)
await asyncio.sleep(randint(TaskSimulator.TASK_GAP_MIN_SECONDS, TaskSimulator.TASK_GAP_MAX_SECONDS))
def main():
try:
loop = ZMQEventLoop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run_server())
except KeyboardInterrupt:
print('\nFinished (interrupted)')
sys.exit(0)
def _dispatch_msg_async(msg):
async def _dispatch_msg(msg):
await self._router.send_multipart(msg)
asyncio.ensure_future(_dispatch_msg(msg))
def _dispatch_msg_async(msg):
async def _dispatch_msg(msg):
await self._router.send_multipart(msg)
asyncio.ensure_future(_dispatch_msg(msg))
def main(client_router_addr, slave_router_addr):
try:
loop = ZMQEventLoop()
asyncio.set_event_loop(loop)
context = Context()
loop.run_until_complete(run_master(context, client_router_addr, slave_router_addr))
except KeyboardInterrupt:
print('\nFinished (interrupted)')
sys.exit(0)
def _dispatch_msg_async(msg):
async def _dispatch_msg(msg):
await self._router.send_multipart(msg)
asyncio.ensure_future(_dispatch_msg(msg))
def run_worker(context : Context, slave_addr, serialized_data : bytes):
slave_conn = SlaveConnection(context, slave_addr, SlaveMessageHandler())
asyncio.wait([
asyncio.ensure_future(slave_conn.run()),
asyncio.ensure_future(do_task(context, TaskInformation.from_bytes(serialized_data)))
])
def main(slave_addr, serialized_data : bytes):
try:
loop = ZMQEventLoop()
asyncio.set_event_loop(loop)
context = Context()
loop.run_until_complete(run_worker(context, slave_addr, serialized_data))
except KeyboardInterrupt:
print('\nFinished (interrupted)')
sys.exit(0)
def _do_sleep_task(sleep_task : SleepTask):
await asyncio.sleep(sleep_task.job.seconds)
def _dispatch_msg_async(msg):
async def _dispatch_msg(msg):
await self._router.send_multipart(msg)
asyncio.ensure_future(_dispatch_msg(msg))
def _dispatch_msg_async(msg):
async def _dispatch_msg(msg):
await self._router.send_multipart(msg)
asyncio.ensure_future(_dispatch_msg(msg))
def run_master(context : Context, master_addr, worker_router_addr, worker_file_name):
master_conn = MasterConnection(context, master_addr, MasterMessageHandler())
worker_router = WorkerRouter(context, worker_router_addr, WorkerMessageHandler())
WorkerCreator(worker_file_name)
asyncio.wait([
asyncio.ensure_future(master_conn.run()),
asyncio.ensure_future(worker_router.run()),
asyncio.ensure_future(run_polling_workers())
])
def main(master_addr, worker_router_addr):
try:
loop = ZMQEventLoop()
asyncio.set_event_loop(loop)
context = Context()
loop.run_until_complete(run_master(context, master_addr, worker_router_addr))
except KeyboardInterrupt:
print('\nFinished (interrupted)')
sys.exit(0)
def get_message(self):
"""
:return message: concurrent.futures.Future
"""
with self._condition:
self._condition.wait_for(lambda: self._event_loop is not None)
return asyncio.run_coroutine_threadsafe(self._get_message(),
self._event_loop)