test_jobs.py 文件源码

python
阅读 29 收藏 0 点赞 0 评论 0

项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码
def test_job_local_paths_are_hidden(self, mock_open):

        invalid_csv = 'id,type\n' + '1,a,\n' * 1010
        invalid_file = StringIO.StringIO()

        invalid_file.write(invalid_csv)

        mock_upload = MockFieldStorage(invalid_file, 'invalid.csv')

        resource = factories.Resource(format='csv', upload=mock_upload)

        invalid_stream = io.BufferedReader(io.BytesIO(invalid_csv))

        with mock.patch('io.open', return_value=invalid_stream):

            run_validation_job(resource)

        validation = Session.query(Validation).filter(
            Validation.resource_id == resource['id']).one()

        source = validation.report['tables'][0]['source']
        assert source.startswith('http')
        assert source.endswith('invalid.csv')

        warning = validation.report['warnings'][0]
        assert_equals(
            warning, 'Table inspection has reached 1000 row(s) limit')
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号