def __init__(self, ws: web.WebSocketResponse, sid: str):
self._ws = ws
self._id = sid
self._queue = asyncio.Queue()
self.finished = False
self.default_timeout = None
python类WebSocketResponse()的实例源码
def ws_handler(self, request):
ws = web.WebSocketResponse()
await ws.prepare(request)
sid = request.cookies.get(self.config["cookie_name"])
client = None
admin = False
operator = False
login = None
if sid:
login = self.sessions.get(sid)
if login:
token = self.tokens.get(login).decode("ascii")
if token:
client = github.Client(token)
if client:
login = login.decode("utf8")
if login in self.acl["admin"]["users"]:
admin = True
if login in self.acl["operator"]["users"]:
operator = True
orgs = await client.get("user/orgs")
for org in orgs:
if org["login"] in self.acl["admin"]["orgs"]:
admin = True
if org["login"] in self.acl["operator"]["orgs"]:
operator = True
user = {"login": login, "admin": admin, "operator": operator}
self._ws_user_map[ws] = user
ws.send_str(json.dumps(["authOk", user]))
user_events = self._user_events[login]
if user_events:
ws.send_str(json.dumps(["userTasks", [e.id for e in user_events]]))
for event in self.root.get_tasks():
ws.send_str(json.dumps(["taskUpdate", event.to_dict()]))
self.connections.append(ws)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
method_name, args = json.loads(msg.data)
if not (await self._check_acl(method_name, client, operator, admin)):
ws.send_str(json.dumps(["accessDenied", method_name]))
continue
method = getattr(self, "_ws_" + method_name, None)
if method:
ret = await method(ws, *args)
if ret is not None:
ws.send_str(ret)
else:
LOG.info("Unknown websocket method %s", method_name)
else:
LOG.debug("websocket msg %s %s", msg.type, msg)
self._ws_user_map.pop(ws)
self.connections.remove(ws)
self._disconnect_ws_console(ws)
return ws
def websocket_handler(request):
ws = web.WebSocketResponse()
request.app[WS].append(ws)
await ws.prepare(request)
ws_type_lookup = {k.value: v for v, k in aiohttp.MsgType.__members__.items()}
async for msg in ws:
if msg.tp == aiohttp.MsgType.text:
try:
data = json.loads(msg.data)
except json.JSONDecodeError as e:
logger.error('JSON decode error: %s', str(e))
else:
command = data['command']
if command == 'hello':
if 'http://livereload.com/protocols/official-7' not in data['protocols']:
logger.error('live reload protocol 7 not supported by client %s', msg.data)
ws.close()
else:
handshake = {
'command': 'hello',
'protocols': [
'http://livereload.com/protocols/official-7',
],
'serverName': 'livereload-aiohttp',
}
ws.send_str(json.dumps(handshake))
elif command == 'info':
logger.info('browser connected at %s', data['url'])
logger.debug('browser plugins: %s', data['plugins'])
else:
logger.error('Unknown ws message %s', msg.data)
elif msg.tp == aiohttp.MsgType.error:
logger.error('ws connection closed with exception %s', ws.exception())
else:
logger.error('unknown websocket message type %s, data: %s', ws_type_lookup[msg.tp], msg.data)
# TODO gracefully close websocket connections on app shutdown
logger.debug('browser disconnected')
request.app[WS].remove(ws)
return ws
def _ws_handler(self, request):
"""
Handle websocket connections.
This includes:
* new connections
* closed connections
* messages
"""
websocket = web.WebSocketResponse()
yield from websocket.prepare(request)
session = yield from get_session(request)
if session.new:
logger.debug('websocket: not logged in')
websocket.send_str(json.dumps({'status': 401, 'text': "Unauthorized"}))
websocket.close()
return websocket
self.websockets.append(websocket)
for func in self.on_ws_connect:
yield from func(websocket, session)
while True:
msg = yield from websocket.receive()
if msg.type == WSMsgType.CLOSE or msg.type == WSMsgType.CLOSED:
logger.debug('websocket closed')
break
logger.debug("websocket got: %s", msg)
if msg.type == WSMsgType.TEXT:
for func in self.on_ws_message:
yield from func(websocket, session, msg.data)
elif msg.type == WSMsgType.ERROR:
logger.debug('websocket closed with exception %s', websocket.exception())
yield from asyncio.sleep(0.1)
self.websockets.remove(websocket)
for func in self.on_ws_disconnect:
yield from func(session)
return websocket
### JRPC protocol ###
def websocket_handler(request):
ws = web.WebSocketResponse(timeout=0.01)
url = None
await ws.prepare(request)
async for msg in ws:
if msg.tp == WSMsgType.TEXT:
try:
data = json.loads(msg.data)
except json.JSONDecodeError as e:
aux_logger.error('JSON decode error: %s', str(e))
else:
command = data['command']
if command == 'hello':
if 'http://livereload.com/protocols/official-7' not in data['protocols']:
aux_logger.error('live reload protocol 7 not supported by client %s', msg.data)
ws.close()
else:
handshake = {
'command': 'hello',
'protocols': [
'http://livereload.com/protocols/official-7',
],
'serverName': 'livereload-aiohttp',
}
ws.send_str(json.dumps(handshake))
elif command == 'info':
aux_logger.debug('browser connected: %s', data)
url = '/' + data['url'].split('/', 3)[-1]
request.app[WS].append((ws, url))
else:
aux_logger.error('Unknown ws message %s', msg.data)
elif msg.tp == WSMsgType.ERROR:
aux_logger.error('ws connection closed with exception %s', ws.exception())
else:
aux_logger.error('unknown websocket message type %s, data: %s', WS_TYPE_LOOKUP[msg.tp], msg.data)
if url is None:
aux_logger.warning('browser disconnected, appears no websocket connection was made')
else:
aux_logger.debug('browser disconnected')
request.app[WS].remove((ws, url))
return ws
def _handle(self, request: BaseRequest):
ws = web.WebSocketResponse()
await ws.prepare(request)
ws.request = request
self.connections.add(ws)
wsid = ws.headers['Sec-Websocket-Accept']
logger.debug('websocket connected: %r, %d user(s) online' % (wsid, len(self.connections)))
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'ws.close':
await ws.close()
else:
try:
# request id, command, data
rid, command, data = json.loads(msg.data)
except json.decoder.JSONDecodeError:
logger.error('websocket command parse failed %s: %r' % (msg.data, wsid))
continue
def send_json_wrap(rid):
async def send_json(code, data=NotImplemented):
if data is NotImplemented:
data = RETCODE.txt_cn.get(code)
val = {'code': code, 'data': data}
logger.info('websocket reply %r - %s: %r' % (command, val, wsid))
await ws.send_json([rid, val])
return send_json
send_json = send_json_wrap(rid)
if command in self._on_message:
logger.info('websocket command %r - %s: %r' % (command, data, wsid))
for i in self._on_message[command]:
ret = await i(ws, send_json, data)
if ret is not None:
await send_json(*ret)
else:
logger.info('websocket command not found %s: %r' % (command, wsid))
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.debug('websocket connection closed with exception %s: %r' % (ws.exception(), wsid))
break
self.connections.remove(ws)
await self.on_close(ws)
logger.debug('websocket connection closed: %r, %d user(s) online' % (wsid, len(self.connections)))
return ws
def websocket_handler(request):
ws = web.WebSocketResponse()
url = None
await ws.prepare(request)
ws_type_lookup = {k.value: v for v, k in MsgType.__members__.items()}
async for msg in ws:
if msg.tp == MsgType.text:
try:
data = json.loads(msg.data)
except json.JSONDecodeError as e:
aux_logger.error('JSON decode error: %s', str(e))
else:
command = data['command']
if command == 'hello':
if 'http://livereload.com/protocols/official-7' not in data['protocols']:
aux_logger.error('live reload protocol 7 not supported by client %s', msg.data)
ws.close()
else:
handshake = {
'command': 'hello',
'protocols': [
'http://livereload.com/protocols/official-7',
],
'serverName': 'livereload-aiohttp',
}
ws.send_str(json.dumps(handshake))
elif command == 'info':
aux_logger.debug('browser connected: %s', data)
url = data['url'].split('/', 3)[-1]
request.app[WS].append((ws, url))
else:
aux_logger.error('Unknown ws message %s', msg.data)
elif msg.tp == MsgType.error:
aux_logger.error('ws connection closed with exception %s', ws.exception())
else:
aux_logger.error('unknown websocket message type %s, data: %s', ws_type_lookup[msg.tp], msg.data)
aux_logger.debug('browser disconnected')
if url:
request.app[WS].remove((ws, url))
return ws