def on_connection_factory(execute_cmd, base_dispatcher):
async def on_connection(reader, writer):
context = ClientConnectionContext(reader, writer)
client_dispatcher = client_dispatcher_factory(context)
dispatcher = ComposedDispatcher([base_dispatcher, client_dispatcher])
context.logger.info('Connection started')
# Wait for two things:
# - User's command (incomming request)
# - Event subscribed by user (pushed to client requests)
# Note user's command should have been replied before sending an event notification
get_event = asyncio.ensure_future(context.queued_pushed_events.get())
get_cmd = asyncio.ensure_future(context.recv())
try:
while True:
done, pending = await asyncio.wait((get_event, get_cmd),
return_when='FIRST_COMPLETED')
if get_event in done:
payload = get_event.result()
context.logger.debug('Got event: %s' % payload)
await context.send(payload)
# Restart watch on incoming notifications
get_event = asyncio.ensure_future(context.queued_pushed_events.get())
else:
raw_cmd = get_cmd.result()
if not raw_cmd:
context.logger.debug('Connection stopped')
return
context.logger.debug('Received: %r' % raw_cmd)
intent = execute_cmd(raw_cmd)
raw_resp = await asyncio_perform(dispatcher, intent)
context.logger.debug('Replied: %r' % raw_resp)
await context.send(raw_resp)
# Restart watch on incoming messages
get_cmd = asyncio.ensure_future(context.recv())
except ConnectionClosed:
context.logger.info('Connection closed')
finally:
get_event.cancel()
get_cmd.cancel()
return on_connection
评论列表
文章目录