def test_poll(self, config, trivial_message):
self._test_poll_was_polled = False
def my_callback(event, message):
self._test_poll_was_polled = True
responses.add(responses.POST, "https://api.github.com/repos/tdsmith/test_repo/hooks")
repo_listener = snooze.RepositoryListener(
events=snooze.LISTEN_EVENTS,
callbacks=[my_callback], **config["tdsmith/test_repo"])
sqs = boto3.resource("sqs", region_name="us-west-2")
sqs_queue = list(sqs.queues.all())[0]
sqs_queue.send_message(MessageBody=trivial_message)
assert int(sqs_queue.attributes["ApproximateNumberOfMessages"]) > 0
repo_listener.poll()
sqs_queue.reload()
assert int(sqs_queue.attributes["ApproximateNumberOfMessages"]) == 0
assert self._test_poll_was_polled
test_repository_listener.py 文件源码
python
阅读 24
收藏 0
点赞 0
评论 0
评论列表
文章目录