def configure(self, options, conf):
super(NoseSQLAlchemy, self).configure(options, conf)
plugin_base.pre_begin(options)
plugin_base.set_coverage_flag(options.enable_plugin_coverage)
plugin_base.set_skip_test(nose.SkipTest)
python类SkipTest()的实例源码
def skip(reason):
"""
Unconditionally skip a test.
"""
def decorator(test_item):
if not (isinstance(test_item, type) and issubclass(test_item, TestCase)):
@functools.wraps(test_item)
def skip_wrapper(*args, **kwargs):
raise SkipTest(reason)
test_item = skip_wrapper
test_item.__unittest_skip__ = True
test_item.__unittest_skip_why__ = reason
return test_item
return decorator
def setUp(self):
super(BaseZMQTestCase, self).setUp()
if self.green and not have_gevent:
raise SkipTest("requires gevent")
self.context = self.Context.instance()
self.sockets = []
def ping_pong_json(self, s1, s2, o):
if jsonapi.jsonmod is None:
raise SkipTest("No json library")
s1.send_json(o)
o2 = s2.recv_json()
s2.send_json(o2)
o3 = s1.recv_json()
return o3
def assertRaisesErrno(self, errno, func, *args, **kwargs):
if errno == zmq.EAGAIN:
raise SkipTest("Skipping because we're green.")
try:
func(*args, **kwargs)
except zmq.ZMQError:
e = sys.exc_info()[1]
self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \
got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
else:
self.fail("Function did not raise any error")
def skip_green(self):
raise SkipTest("Skipping because we are green")
def setUp(self):
if not socks:
raise nose.SkipTest('socks module unavailable')
if not subprocess:
raise nose.SkipTest('subprocess module unavailable')
# start a short-lived miniserver so we can get a likely port
# for the proxy
self.httpd, self.proxyport = miniserver.start_server(
miniserver.ThisDirHandler)
self.httpd.shutdown()
self.httpd, self.port = miniserver.start_server(
miniserver.ThisDirHandler)
self.pidfile = tempfile.mktemp()
self.logfile = tempfile.mktemp()
fd, self.conffile = tempfile.mkstemp()
f = os.fdopen(fd, 'w')
our_cfg = tinyproxy_cfg % {'user': os.getlogin(),
'pidfile': self.pidfile,
'port': self.proxyport,
'logfile': self.logfile}
f.write(our_cfg)
f.close()
try:
# TODO use subprocess.check_call when 2.4 is dropped
ret = subprocess.call(['tinyproxy', '-c', self.conffile])
self.assertEqual(0, ret)
except OSError, e:
if e.errno == errno.ENOENT:
raise nose.SkipTest('tinyproxy not available')
raise
def configure(self, options, conf):
super(NoseSQLAlchemy, self).configure(options, conf)
plugin_base.pre_begin(options)
plugin_base.set_coverage_flag(options.enable_plugin_coverage)
plugin_base.set_skip_test(nose.SkipTest)
def test_plugin_load():
try:
# attempt to load dependencies of plugins
import pldns
except ImportError:
raise nose.SkipTest
expected_names = set(['TFO', 'ECN', 'DSCP', 'UDPZero', 'UDPOpts', 'DNSResolv', 'H2', 'EvilBit'])
names = set()
for plugin in pathspider.cmd.measure.plugins:
assert issubclass(plugin, pathspider.base.Spider)
names.add(plugin.__name__)
assert names == expected_names
def test_issue_23():
if not IN_TRAVIS:
raise SkipTest()
pe.get_book(url="https://github.com/pyexcel/pyexcel-ods/raw/master/tests/fixtures/white_space.ods", library='pyexcel-odsr'); # flake8: noqa
def setup_class(cls):
if p.toolkit.check_ckan_version(max_version='2.4.99'):
raise nose.SkipTest()
super(TestTranslations, cls).setup_class()
def setup_class(cls):
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
resource = model.Package.get('annakarenina').resources[0]
cls.data = dict(
resource_id=resource.id,
force=True,
fields=[
{'id': 'id'},
{'id': 'date', 'type':'date'},
{'id': 'x'},
{'id': 'y'},
{'id': 'z'},
{'id': 'country'},
{'id': 'title'},
{'id': 'lat'},
{'id': 'lon'}
],
records=[
{'id': 0, 'date': '2011-01-01', 'x': 1, 'y': 2, 'z': 3, 'country': 'DE', 'title': 'first 99', 'lat':52.56, 'lon':13.40},
{'id': 1, 'date': '2011-02-02', 'x': 2, 'y': 4, 'z': 24, 'country': 'UK', 'title': 'second', 'lat':54.97, 'lon':-1.60},
{'id': 2, 'date': '2011-03-03', 'x': 3, 'y': 6, 'z': 9, 'country': 'US', 'title': 'third', 'lat':40.00, 'lon':-75.5},
{'id': 3, 'date': '2011-04-04', 'x': 4, 'y': 8, 'z': 6, 'country': 'UK', 'title': 'fourth', 'lat':57.27, 'lon':-6.20},
{'id': 4, 'date': '2011-05-04', 'x': 5, 'y': 10, 'z': 15, 'country': 'UK', 'title': 'fifth', 'lat':51.58, 'lon':0},
{'id': 5, 'date': '2011-06-02', 'x': 6, 'y': 12, 'z': 18, 'country': 'DE', 'title': 'sixth 53.56', 'lat':51.04, 'lon':7.9}
]
)
postparams = '%s=1' % json.dumps(cls.data)
auth = {'Authorization': str(cls.normal_user.apikey)}
res = cls.app.post('/api/action/datastore_create', params=postparams,
extra_environ=auth)
res_dict = json.loads(res.body)
assert res_dict['success'] is True
def test_pg_version_check(self):
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
engine = db._get_engine(
{'connection_url': config['sqlalchemy.url']})
connection = engine.connect()
assert db._pg_version_is_at_least(connection, '8.0')
assert not db._pg_version_is_at_least(connection, '10.0')
def setup_class(cls):
wsgiapp = middleware.make_app(config['global_conf'], **config)
cls.app = paste.fixture.TestApp(wsgiapp)
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
resource = model.Package.get('annakarenina').resources[0]
cls.data = {
'resource_id': resource.id,
'force': True,
'aliases': 'books',
'fields': [{'id': u'b\xfck', 'type': 'text'},
{'id': 'author', 'type': 'text'},
{'id': 'published'},
{'id': u'characters', u'type': u'_text'},
{'id': 'random_letters', 'type': 'text[]'}],
'records': [{u'b\xfck': 'annakarenina',
'author': 'tolstoy',
'published': '2005-03-01',
'nested': ['b', {'moo': 'moo'}],
u'characters': [u'Princess Anna', u'Sergius'],
'random_letters': ['a', 'e', 'x']},
{u'b\xfck': 'warandpeace', 'author': 'tolstoy',
'nested': {'a': 'b'}, 'random_letters': []}]
}
postparams = '%s=1' % json.dumps(cls.data)
auth = {'Authorization': str(cls.sysadmin_user.apikey)}
res = cls.app.post('/api/action/datastore_create', params=postparams,
extra_environ=auth)
res_dict = json.loads(res.body)
assert res_dict['success'] is True
engine = db._get_engine({
'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
def setup_class(cls):
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
set_url_type(
model.Package.get('annakarenina').resources, cls.sysadmin_user)
resource = model.Package.get('annakarenina').resources[0]
cls.data = {
'resource_id': resource.id,
'fields': [{'id': u'b\xfck', 'type': 'text'},
{'id': 'author', 'type': 'text'},
{'id': 'nested', 'type': 'json'},
{'id': 'characters', 'type': 'text[]'},
{'id': 'published'}],
'primary_key': u'b\xfck',
'records': [{u'b\xfck': 'annakarenina', 'author': 'tolstoy',
'published': '2005-03-01', 'nested': ['b', {'moo': 'moo'}]},
{u'b\xfck': 'warandpeace', 'author': 'tolstoy',
'nested': {'a':'b'}}
]
}
postparams = '%s=1' % json.dumps(cls.data)
auth = {'Authorization': str(cls.sysadmin_user.apikey)}
res = cls.app.post('/api/action/datastore_create', params=postparams,
extra_environ=auth)
res_dict = json.loads(res.body)
assert res_dict['success'] is True
engine = db._get_engine(
{'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
def setup_class(cls):
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
set_url_type(
model.Package.get('annakarenina').resources, cls.sysadmin_user)
resource = model.Package.get('annakarenina').resources[0]
cls.data = {
'resource_id': resource.id,
'fields': [{'id': u'b\xfck', 'type': 'text'},
{'id': 'author', 'type': 'text'},
{'id': 'nested', 'type': 'json'},
{'id': 'characters', 'type': 'text[]'},
{'id': 'published'}],
'primary_key': u'b\xfck',
'records': [{u'b\xfck': 'annakarenina', 'author': 'tolstoy',
'published': '2005-03-01', 'nested': ['b', {'moo': 'moo'}]},
{u'b\xfck': 'warandpeace', 'author': 'tolstoy',
'nested': {'a':'b'}}
]
}
postparams = '%s=1' % json.dumps(cls.data)
auth = {'Authorization': str(cls.sysadmin_user.apikey)}
res = cls.app.post('/api/action/datastore_create', params=postparams,
extra_environ=auth)
res_dict = json.loads(res.body)
assert res_dict['success'] is True
engine = db._get_engine(
{'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
def setup_class(cls):
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
set_url_type(
model.Package.get('annakarenina').resources, cls.sysadmin_user)
resource = model.Package.get('annakarenina').resources[0]
hhguide = u"hitchhiker's guide to the galaxy"
cls.data = {
'resource_id': resource.id,
'fields': [{'id': u'b\xfck', 'type': 'text'},
{'id': 'author', 'type': 'text'},
{'id': 'nested', 'type': 'json'},
{'id': 'characters', 'type': 'text[]'},
{'id': 'published'}],
'primary_key': u'b\xfck',
'records': [{u'b\xfck': 'annakarenina', 'author': 'tolstoy',
'published': '2005-03-01', 'nested': ['b', {'moo': 'moo'}]},
{u'b\xfck': 'warandpeace', 'author': 'tolstoy',
'nested': {'a':'b'}},
{'author': 'adams',
'characters': ['Arthur Dent', 'Marvin'],
'nested': {'foo': 'bar'},
u'b\xfck': hhguide}
]
}
postparams = '%s=1' % json.dumps(cls.data)
auth = {'Authorization': str(cls.sysadmin_user.apikey)}
res = cls.app.post('/api/action/datastore_create', params=postparams,
extra_environ=auth)
res_dict = json.loads(res.body)
assert res_dict['success'] is True
engine = db._get_engine(
{'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
def setup_class(cls):
if not is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
plugin = p.load('datastore')
if plugin.legacy_mode:
# make sure we undo adding the plugin
p.unload('datastore')
raise nose.SkipTest("Info is not supported in legacy mode")
def setup_class(cls):
wsgiapp = middleware.make_app(config['global_conf'], **config)
cls.app = paste.fixture.TestApp(wsgiapp)
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
engine = db._get_engine(
{'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
set_url_type(
model.Package.get('annakarenina').resources, cls.sysadmin_user)
def setup_class(cls):
wsgiapp = middleware.make_app(config['global_conf'], **config)
cls.app = paste.fixture.TestApp(wsgiapp)
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
p.load('datapusher')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
engine = db._get_engine(
{'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
set_url_type(
model.Package.get('annakarenina').resources, cls.sysadmin_user)