def test_search_timeline(self):
self.path = "/sfm-data/collection_set/test_collection/test_3"
harvest_msg = {
"id": "test:3",
"type": "weibo_timeline",
"path": self.path,
"credentials": {
"access_token": tests.WEIBO_ACCESS_TOKEN
},
"collection_set": {
"id": "test_collection_set"
},
"collection": {
"id": "test_collection"
},
"options": {
"web_resources": True,
"image_sizes": [
"Thumbnail",
"Medium",
"Large"
]
}
}
with self._create_connection() as connection:
bound_exchange = self.exchange(connection)
producer = Producer(connection, exchange=bound_exchange)
producer.publish(harvest_msg, routing_key="harvest.start.weibo.weibo_timeline")
# Now wait for status message.
status_msg = self._wait_for_message(self.result_queue, connection)
# Matching ids
self.assertEqual("test:3", status_msg["id"])
# Running
self.assertEqual(STATUS_RUNNING, status_msg["status"])
# Another running message
status_msg = self._wait_for_message(self.result_queue, connection)
self.assertEqual(STATUS_RUNNING, status_msg["status"])
# Now wait for result message.
result_msg = self._wait_for_message(self.result_queue, connection)
# Matching ids
self.assertEqual("test:3", result_msg["id"])
# Success
self.assertEqual(STATUS_SUCCESS, result_msg["status"])
# Some weibo posts
self.assertTrue(result_msg["stats"][date.today().isoformat()]["weibos"])
# Web harvest message.
web_harvest_msg = self._wait_for_message(self.web_harvest_queue, connection)
# Some seeds
self.assertTrue(len(web_harvest_msg["seeds"]))
# Warc created message.
warc_msg = self._wait_for_message(self.warc_created_queue, connection)
# check path exist
self.assertTrue(os.path.isfile(warc_msg["warc"]["path"]))
test_weibo_harvester.py 文件源码
python
阅读 21
收藏 0
点赞 0
评论 0
评论列表
文章目录