test_weibo_harvester.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:sfm-weibo-harvester 作者: gwu-libraries 项目源码 文件源码
def setUp(self):
        self.exchange = Exchange(EXCHANGE, type="topic")
        self.result_queue = Queue(name="result_queue", routing_key="harvest.status.weibo.*", exchange=self.exchange,
                                  durable=True)
        self.web_harvest_queue = Queue(name="web_harvest_queue", routing_key="harvest.start.web",
                                       exchange=self.exchange)
        self.warc_created_queue = Queue(name="warc_created_queue", routing_key="warc_created", exchange=self.exchange)
        weibo_harvester_queue = Queue(name="weibo_harvester", exchange=self.exchange)
        with self._create_connection() as connection:
            self.result_queue(connection).declare()
            self.result_queue(connection).purge()
            self.web_harvest_queue(connection).declare()
            self.web_harvest_queue(connection).purge()
            self.warc_created_queue(connection).declare()
            self.warc_created_queue(connection).purge()
            # avoid raise NOT_FOUND error 404
            weibo_harvester_queue(connection).declare()
            weibo_harvester_queue(connection).purge()

        self.path = None
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号