test_harvester.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:dati-ckan-docker 作者: italia 项目源码 文件源码
def test_harvest_before_download_errors_get_stored(self):

        plugin = p.get_plugin('test_rdf_harvester')

        source_url = 'http://return.errors'

        # Mock the GET request to get the file
        httpretty.register_uri(httpretty.GET, source_url,
                               body=self.rdf_content,
                               content_type=self.rdf_content_type)

        # The harvester will try to do a HEAD request first so we need to mock
        # this as well
        httpretty.register_uri(httpretty.HEAD, source_url,
                               status=405,
                               content_type=self.rdf_content_type)

        harvest_source = self._create_harvest_source(source_url)
        self._create_harvest_job(harvest_source['id'])
        self._run_jobs(harvest_source['id'])
        self._gather_queue(1)

        eq_(plugin.calls['before_download'], 1)

        # Run the jobs to mark the previous one as Finished
        self._run_jobs()

        # Check that the file was not requested
        assert 'return.errors' not in httpretty.last_request().headers['host']

        # Get the harvest source with the udpated status
        harvest_source = h.call_action('harvest_source_show',
                                       id=harvest_source['id'])

        last_job_status = harvest_source['status']['last_job']

        eq_('Error 1', last_job_status['gather_error_summary'][0][0])
        eq_('Error 2', last_job_status['gather_error_summary'][1][0])
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号