def process(cls, server: ni_abc.ServerHost,
request: web.Request, client: aiohttp.ClientSession) -> "Host":
"""Process the pull request."""
event = sansio.Event.from_http(request.headers,
await request.read(),
secret=server.contrib_secret())
if event.event == "ping":
# A ping event; nothing to do.
# https://developer.github.com/webhooks/#ping-event
raise ni_abc.ResponseExit(status=http.HTTPStatus.OK)
elif event.event != "pull_request":
# Only happens if GitHub is misconfigured to send the wrong events.
raise TypeError(f"don't know how to handle a {event.event!r} event")
elif event.data['action'] not in cls._useful_actions:
raise ni_abc.ResponseExit(status=http.HTTPStatus.NO_CONTENT)
elif event.data['action'] in {PullRequestEvent.opened.value, PullRequestEvent.synchronize.value}:
if event.data['action'] == PullRequestEvent.opened.value:
# GitHub is eventually consistent, so add a delay to wait for
# the API to digest the new pull request.
await asyncio.sleep(1)
return cls(server, client, PullRequestEvent(event.data['action']),
event.data)
elif event.data['action'] == PullRequestEvent.unlabeled.value:
label = event.data['label']['name']
if not label.startswith(LABEL_PREFIX):
raise ni_abc.ResponseExit(status=http.HTTPStatus.NO_CONTENT)
return cls(server, client, PullRequestEvent.unlabeled, event.data)
else: # pragma: no cover
# Should never happen.
raise TypeError(f"don't know how to handle a {event.data['action']!r} action")
评论列表
文章目录