def ws_handler(self, request):
ws = web.WebSocketResponse()
await ws.prepare(request)
sid = request.cookies.get(self.config["cookie_name"])
client = None
admin = False
operator = False
login = None
if sid:
login = self.sessions.get(sid)
if login:
token = self.tokens.get(login).decode("ascii")
if token:
client = github.Client(token)
if client:
login = login.decode("utf8")
if login in self.acl["admin"]["users"]:
admin = True
if login in self.acl["operator"]["users"]:
operator = True
orgs = await client.get("user/orgs")
for org in orgs:
if org["login"] in self.acl["admin"]["orgs"]:
admin = True
if org["login"] in self.acl["operator"]["orgs"]:
operator = True
user = {"login": login, "admin": admin, "operator": operator}
self._ws_user_map[ws] = user
ws.send_str(json.dumps(["authOk", user]))
user_events = self._user_events[login]
if user_events:
ws.send_str(json.dumps(["userTasks", [e.id for e in user_events]]))
for event in self.root.get_tasks():
ws.send_str(json.dumps(["taskUpdate", event.to_dict()]))
self.connections.append(ws)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
method_name, args = json.loads(msg.data)
if not (await self._check_acl(method_name, client, operator, admin)):
ws.send_str(json.dumps(["accessDenied", method_name]))
continue
method = getattr(self, "_ws_" + method_name, None)
if method:
ret = await method(ws, *args)
if ret is not None:
ws.send_str(ret)
else:
LOG.info("Unknown websocket method %s", method_name)
else:
LOG.debug("websocket msg %s %s", msg.type, msg)
self._ws_user_map.pop(ws)
self.connections.remove(ws)
self._disconnect_ws_console(ws)
return ws
评论列表
文章目录