test_harvester.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:dati-ckan-docker 作者: italia 项目源码 文件源码
def test_harvest_after_download_empty_content_stops_gather_stage(self):

        plugin = p.get_plugin('test_rdf_harvester')

        source_url = 'http://return.empty.content'

        # Mock the GET request to get the file
        httpretty.register_uri(httpretty.GET, source_url,
                               body='return.empty.content',
                               content_type=self.rdf_content_type)

        # The harvester will try to do a HEAD request first so we need to mock
        # this as well
        httpretty.register_uri(httpretty.HEAD, source_url,
                               status=405,
                               content_type=self.rdf_content_type)

        harvest_source = self._create_harvest_source(source_url)
        self._create_harvest_job(harvest_source['id'])
        self._run_jobs(harvest_source['id'])
        self._gather_queue(1)

        eq_(plugin.calls['after_download'], 1)

        # Run the jobs to mark the previous one as Finished
        self._run_jobs()

        # Check that the file was requested
        assert ('return.empty.content'
                in httpretty.last_request().headers['host'])

        # Get the harvest source with the udpated status
        harvest_source = h.call_action('harvest_source_show',
                                       id=harvest_source['id'])

        last_job_status = harvest_source['status']['last_job']

        eq_(last_job_status['status'], 'Finished')

        eq_(last_job_status['stats']['added'], 0)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号