def test_job_pass_validation_options_string(self, mock_open):
invalid_csv = '''
a;b;c
#comment
1;2;3
'''
validation_options = '''{
"headers": 3,
"skip_rows": ["#"]
}'''
invalid_file = StringIO.StringIO()
invalid_file.write(invalid_csv)
mock_upload = MockFieldStorage(invalid_file, 'invalid.csv')
resource = factories.Resource(
format='csv',
upload=mock_upload,
validation_options=validation_options)
invalid_stream = io.BufferedReader(io.BytesIO(invalid_csv))
with mock.patch('io.open', return_value=invalid_stream):
run_validation_job(resource)
validation = Session.query(Validation).filter(
Validation.resource_id == resource['id']).one()
assert_equals(validation.report['valid'], True)
评论列表
文章目录