def setup_class(cls):
wsgiapp = middleware.make_app(config['global_conf'], **config)
cls.app = paste.fixture.TestApp(wsgiapp)
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
resource = model.Package.get('annakarenina').resources[0]
cls.data = {
'resource_id': resource.id,
'force': True,
'aliases': 'books',
'fields': [{'id': u'b\xfck', 'type': 'text'},
{'id': 'author', 'type': 'text'},
{'id': 'published'},
{'id': u'characters', u'type': u'_text'},
{'id': 'random_letters', 'type': 'text[]'}],
'records': [{u'b\xfck': 'annakarenina',
'author': 'tolstoy',
'published': '2005-03-01',
'nested': ['b', {'moo': 'moo'}],
u'characters': [u'Princess Anna', u'Sergius'],
'random_letters': ['a', 'e', 'x']},
{u'b\xfck': 'warandpeace', 'author': 'tolstoy',
'nested': {'a': 'b'}, 'random_letters': []}]
}
postparams = '%s=1' % json.dumps(cls.data)
auth = {'Authorization': str(cls.sysadmin_user.apikey)}
res = cls.app.post('/api/action/datastore_create', params=postparams,
extra_environ=auth)
res_dict = json.loads(res.body)
assert res_dict['success'] is True
engine = db._get_engine({
'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
评论列表
文章目录