def setup_class(cls):
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
set_url_type(
model.Package.get('annakarenina').resources, cls.sysadmin_user)
resource = model.Package.get('annakarenina').resources[0]
cls.data = {
'resource_id': resource.id,
'fields': [{'id': u'b\xfck', 'type': 'text'},
{'id': 'author', 'type': 'text'},
{'id': 'nested', 'type': 'json'},
{'id': 'characters', 'type': 'text[]'},
{'id': 'published'}],
'primary_key': u'b\xfck',
'records': [{u'b\xfck': 'annakarenina', 'author': 'tolstoy',
'published': '2005-03-01', 'nested': ['b', {'moo': 'moo'}]},
{u'b\xfck': 'warandpeace', 'author': 'tolstoy',
'nested': {'a':'b'}}
]
}
postparams = '%s=1' % json.dumps(cls.data)
auth = {'Authorization': str(cls.sysadmin_user.apikey)}
res = cls.app.post('/api/action/datastore_create', params=postparams,
extra_environ=auth)
res_dict = json.loads(res.body)
assert res_dict['success'] is True
engine = db._get_engine(
{'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
评论列表
文章目录