def webhook(url, payload=None, oid=None):
"""Post to a webhook."""
from flask import current_app
from readability.readability import Document
try:
import json
from pybossa.core import sentinel, webhook_repo, project_repo
project = project_repo.get(payload['project_id'])
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
if oid:
webhook = webhook_repo.get(oid)
else:
webhook = Webhook(project_id=payload['project_id'],
payload=payload)
if url:
response = requests.post(url, data=json.dumps(payload), headers=headers)
webhook.response = Document(response.text).summary()
webhook.response_status_code = response.status_code
else:
raise requests.exceptions.ConnectionError('Not URL')
if oid:
webhook_repo.update(webhook)
webhook = webhook_repo.get(oid)
else:
webhook_repo.save(webhook)
except requests.exceptions.ConnectionError:
webhook.response = 'Connection Error'
webhook.response_status_code = None
webhook_repo.save(webhook)
finally:
if project.published and webhook.response_status_code != 200 and current_app.config.get('ADMINS'):
subject = "Broken: %s webhook failed" % project.name
body = 'Sorry, but the webhook failed'
mail_dict = dict(recipients=current_app.config.get('ADMINS'),
subject=subject, body=body, html=webhook.response)
send_mail(mail_dict)
if current_app.config.get('SSE'):
publish_channel(sentinel, payload['project_short_name'],
data=webhook.dictize(), type='webhook',
private=True)
return webhook
评论列表
文章目录