def download_test_files_if_not_present(self):
'''
Download %s file at G-node for testing
url_for_tests is global at beginning of this file.
''' % self.ioclass.__name__
if not self.use_network:
raise unittest.SkipTest("Requires download of data from the web")
url = url_for_tests+self.shortname
try:
make_all_directories(self.files_to_download, self.local_test_dir)
download_test_file(self.files_to_download,
self.local_test_dir, url)
except IOError as exc:
raise unittest.SkipTest(exc)
python类SkipTest()的实例源码
def test_paged_result_handling(self):
if PROTOCOL_VERSION < 2:
raise unittest.SkipTest("Paging requires native protocol 2+, currently using: {0}".format(PROTOCOL_VERSION))
# addresses #225
class PagingTest(Model):
id = columns.Integer(primary_key=True)
val = columns.Integer()
sync_table(PagingTest)
PagingTest.create(id=1, val=1)
PagingTest.create(id=2, val=2)
session = get_session()
with mock.patch.object(session, 'default_fetch_size', 1):
results = PagingTest.objects()[:]
assert len(results) == 2
test_failure_types.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def setUp(self):
"""
Test is skipped if run with native protocol version <4
"""
self.support_v5 = True
if PROTOCOL_VERSION < 4:
raise unittest.SkipTest(
"Native protocol 4,0+ is required for custom payloads, currently using %r"
% (PROTOCOL_VERSION,))
try:
self.cluster = Cluster(protocol_version=ProtocolVersion.MAX_SUPPORTED, allow_beta_protocol_version=True)
self.session = self.cluster.connect()
except NoHostAvailable:
log.info("Protocol Version 5 not supported,")
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
self.session = self.cluster.connect()
self.support_v5 = False
self.nodes_currently_failing = []
self.node1, self.node2, self.node3 = get_cluster().nodes.values()
def setUp(self):
"""
Test is skipped if run with cql version < 2
"""
if PROTOCOL_VERSION < 2:
raise unittest.SkipTest(
"Protocol 2.0+ is required for Lightweight transactions, currently testing against %r"
% (PROTOCOL_VERSION,))
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
self.session = self.cluster.connect()
ddl = '''
CREATE TABLE test3rf.lwt (
k int PRIMARY KEY,
v int )'''
self.session.execute(ddl)
def test_cql_compatibility(self):
if CASS_SERVER_VERSION >= (3, 0):
raise unittest.SkipTest("cql compatibility does not apply Cassandra 3.0+")
# having more than one non-PK column is okay if there aren't any
# clustering columns
create_statement = self.make_create_statement(["a"], [], ["b", "c", "d"], compact=True)
self.session.execute(create_statement)
tablemeta = self.get_table_metadata()
self.assertEqual([u'a'], [c.name for c in tablemeta.partition_key])
self.assertEqual([], tablemeta.clustering_key)
self.assertEqual([u'a', u'b', u'c', u'd'], sorted(tablemeta.columns.keys()))
self.assertTrue(tablemeta.is_cql_compatible)
# ... but if there are clustering columns, it's not CQL compatible.
# This is a hacky way to simulate having clustering columns.
tablemeta.clustering_key = ["foo", "bar"]
tablemeta.columns["foo"] = None
tablemeta.columns["bar"] = None
self.assertFalse(tablemeta.is_cql_compatible)
def test_replicas(self):
"""
Ensure cluster.metadata.get_replicas return correctly when not attached to keyspace
"""
if murmur3 is None:
raise unittest.SkipTest('the murmur3 extension is not available')
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
self.assertEqual(cluster.metadata.get_replicas('test3rf', 'key'), [])
cluster.connect('test3rf')
self.assertNotEqual(list(cluster.metadata.get_replicas('test3rf', six.b('key'))), [])
host = list(cluster.metadata.get_replicas('test3rf', six.b('key')))[0]
self.assertEqual(host.datacenter, 'dc1')
self.assertEqual(host.rack, 'r1')
cluster.shutdown()
def test_paged_result_handling(self):
if PROTOCOL_VERSION < 2:
raise unittest.SkipTest(
"Paging requires native protocol 2+, "
"currently using: {0}".format(PROTOCOL_VERSION)
)
# addresses #225
class PagingTest(Model):
id = columns.Integer(primary_key=True)
val = columns.Integer()
sync_table(self.conn, PagingTest)
PagingTest.create(self.conn, id=1, val=1)
PagingTest.create(self.conn, id=2, val=2)
with mock.patch.object(self.conn.session, 'default_fetch_size', 1):
results = PagingTest.objects().find_all(self.conn)
assert len(results) == 2
drop_table(self.conn, PagingTest)
def test_skiptest_in_setupclass(self):
class Test(unittest2.TestCase):
@classmethod
def setUpClass(cls):
raise unittest2.SkipTest('foo')
def test_one(self):
pass
def test_two(self):
pass
result = self.runTests(Test)
self.assertEqual(result.testsRun, 0)
self.assertEqual(len(result.errors), 0)
self.assertEqual(len(result.skipped), 1)
skipped = result.skipped[0][0]
self.assertEqual(str(skipped),
'setUpClass (%s.%s)' % (__name__,
getattr(Test, '__qualname__', Test.__name__)))
def test_skiptest_in_setupmodule(self):
class Test(unittest2.TestCase):
def test_one(self):
pass
def test_two(self):
pass
class Module(object):
@staticmethod
def setUpModule():
raise unittest2.SkipTest('foo')
Test.__module__ = 'Module'
sys.modules['Module'] = Module
result = self.runTests(Test)
self.assertEqual(result.testsRun, 0)
self.assertEqual(len(result.errors), 0)
self.assertEqual(len(result.skipped), 1)
skipped = result.skipped[0][0]
self.assertEqual(str(skipped), 'setUpModule (Module)')
def test_discover_with_init_module_that_raises_SkipTest_on_import(self):
vfs = {abspath('/foo'): ['my_package'],
abspath('/foo/my_package'): ['__init__.py', 'test_module.py']}
self.setup_import_issue_package_tests(vfs)
import_calls = []
def _get_module_from_name(name):
import_calls.append(name)
raise unittest.SkipTest('skipperoo')
loader = unittest.TestLoader()
loader._get_module_from_name = _get_module_from_name
suite = loader.discover(abspath('/foo'))
self.assertIn(abspath('/foo'), sys.path)
self.assertEqual(suite.countTestCases(), 1)
result = unittest.TestResult()
suite.run(result)
self.assertEqual(len(result.skipped), 1)
self.assertEqual(result.testsRun, 1)
self.assertEqual(import_calls, ['my_package'])
# Check picklability
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
pickle.loads(pickle.dumps(suite, proto))
def assert_garbage_collect_test_after_run(self, TestSuiteClass):
if not unittest.BaseTestSuite._cleanup:
raise unittest.SkipTest("Suite cleanup is disabled")
class Foo(unittest.TestCase):
def test_nothing(self):
pass
test = Foo('test_nothing')
wref = weakref.ref(test)
suite = TestSuiteClass([wref()])
suite.run(unittest.TestResult())
del test
# for the benefit of non-reference counting implementations
gc.collect()
self.assertEqual(suite._tests, [None])
self.assertIsNone(wref())
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
def test_json(self):
args = self.databases[0].copy()
args["charset"] = "utf8mb4"
conn = pymysql.connect(**args)
if not self.mysql_server_is(conn, (5, 7, 0)):
raise SkipTest("JSON type is not supported on MySQL <= 5.6")
self.safe_create_table(conn, "test_json", """\
create table test_json (
id int not null,
json JSON not null,
primary key (id)
);""")
cur = conn.cursor()
json_str = u'{"hello": "?????"}'
cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
res = cur.fetchone()[0]
self.assertEqual(json.loads(res), json.loads(json_str))
cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
res = cur.fetchone()[0]
self.assertEqual(json.loads(res), json.loads(json_str))
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
def test_json(self):
args = self.databases[0].copy()
args["charset"] = "utf8mb4"
conn = pymysql.connect(**args)
if not self.mysql_server_is(conn, (5, 7, 0)):
raise SkipTest("JSON type is not supported on MySQL <= 5.6")
self.safe_create_table(conn, "test_json", """\
create table test_json (
id int not null,
json JSON not null,
primary key (id)
);""")
cur = conn.cursor()
json_str = u'{"hello": "?????"}'
cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
res = cur.fetchone()[0]
self.assertEqual(json.loads(res), json.loads(json_str))
cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
res = cur.fetchone()[0]
self.assertEqual(json.loads(res), json.loads(json_str))
def requires(resource, msg=None):
"""Raise ResourceDenied if the specified resource is not available.
If the caller's module is __main__ then automatically return True. The
possibility of False being returned occurs when regrtest.py is
executing.
"""
if resource == 'gui' and not _is_gui_available():
raise unittest.SkipTest("Cannot use the 'gui' resource")
# see if the caller's module is __main__ - if so, treat as if
# the resource was set
if sys._getframe(1).f_globals.get("__name__") == "__main__":
return
if not is_resource_enabled(resource):
if msg is None:
msg = "Use of the %r resource not enabled" % resource
raise ResourceDenied(msg)
def bigaddrspacetest(f):
"""Decorator for tests that fill the address space."""
def wrapper(self):
if max_memuse < MAX_Py_ssize_t:
if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
raise unittest.SkipTest(
"not enough memory: try a 32-bit build instead")
else:
raise unittest.SkipTest(
"not enough memory: %.1fG minimum needed"
% (MAX_Py_ssize_t / (1024 ** 3)))
else:
return f(self)
return wrapper
#=======================================================================
# unittest integration.
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
def test_json(self):
args = self.databases[0].copy()
args["charset"] = "utf8mb4"
conn = pymysql.connect(**args)
if not self.mysql_server_is(conn, (5, 7, 0)):
raise SkipTest("JSON type is not supported on MySQL <= 5.6")
self.safe_create_table(conn, "test_json", """\
create table test_json (
id int not null,
json JSON not null,
primary key (id)
);""")
cur = conn.cursor()
json_str = u'{"hello": "?????"}'
cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
res = cur.fetchone()[0]
self.assertEqual(json.loads(res), json.loads(json_str))
cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
res = cur.fetchone()[0]
self.assertEqual(json.loads(res), json.loads(json_str))
def requires(resource, msg=None):
"""Raise ResourceDenied if the specified resource is not available.
If the caller's module is __main__ then automatically return True. The
possibility of False being returned occurs when regrtest.py is
executing.
"""
if resource == 'gui' and not _is_gui_available():
raise unittest.SkipTest("Cannot use the 'gui' resource")
# see if the caller's module is __main__ - if so, treat as if
# the resource was set
if sys._getframe(1).f_globals.get("__name__") == "__main__":
return
if not is_resource_enabled(resource):
if msg is None:
msg = "Use of the %r resource not enabled" % resource
raise ResourceDenied(msg)
def bigaddrspacetest(f):
"""Decorator for tests that fill the address space."""
def wrapper(self):
if max_memuse < MAX_Py_ssize_t:
if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
raise unittest.SkipTest(
"not enough memory: try a 32-bit build instead")
else:
raise unittest.SkipTest(
"not enough memory: %.1fG minimum needed"
% (MAX_Py_ssize_t / (1024 ** 3)))
else:
return f(self)
return wrapper
#=======================================================================
# unittest integration.
def requires(resource, msg=None):
"""Raise ResourceDenied if the specified resource is not available.
If the caller's module is __main__ then automatically return True. The
possibility of False being returned occurs when regrtest.py is
executing.
"""
if resource == 'gui' and not _is_gui_available():
raise unittest.SkipTest("Cannot use the 'gui' resource")
# see if the caller's module is __main__ - if so, treat as if
# the resource was set
if sys._getframe(1).f_globals.get("__name__") == "__main__":
return
if not is_resource_enabled(resource):
if msg is None:
msg = "Use of the %r resource not enabled" % resource
raise ResourceDenied(msg)
def bigaddrspacetest(f):
"""Decorator for tests that fill the address space."""
def wrapper(self):
if max_memuse < MAX_Py_ssize_t:
if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
raise unittest.SkipTest(
"not enough memory: try a 32-bit build instead")
else:
raise unittest.SkipTest(
"not enough memory: %.1fG minimum needed"
% (MAX_Py_ssize_t / (1024 ** 3)))
else:
return f(self)
return wrapper
#=======================================================================
# unittest integration.
def get_test_client(nowait=False, **kwargs):
# construct kwargs from the environment
kw = {'timeout': 30}
if 'TEST_ES_CONNECTION' in os.environ:
from elasticsearch import connection
kw['connection_class'] = getattr(connection, os.environ['TEST_ES_CONNECTION'])
kw.update(kwargs)
client = Elasticsearch([os.environ.get('TEST_ES_SERVER', {})], **kw)
# wait for yellow status
for _ in range(1 if nowait else 100):
try:
client.cluster.health(wait_for_status='yellow')
return client
except ConnectionError:
time.sleep(.1)
else:
# timeout
raise SkipTest("Elasticsearch failed to start.")
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
test_connection.py 文件源码
项目:rekall-agent-server
作者: rekall-innovations
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
def test_json(self):
args = self.databases[0].copy()
args["charset"] = "utf8mb4"
conn = pymysql.connect(**args)
if not self.mysql_server_is(conn, (5, 7, 0)):
raise SkipTest("JSON type is not supported on MySQL <= 5.6")
self.safe_create_table(conn, "test_json", """\
create table test_json (
id int not null,
json JSON not null,
primary key (id)
);""")
cur = conn.cursor()
json_str = u'{"hello": "?????"}'
cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
res = cur.fetchone()[0]
self.assertEqual(json.loads(res), json.loads(json_str))
cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
res = cur.fetchone()[0]
self.assertEqual(json.loads(res), json.loads(json_str))
def requires(resource, msg=None):
"""Raise ResourceDenied if the specified resource is not available.
If the caller's module is __main__ then automatically return True. The
possibility of False being returned occurs when regrtest.py is
executing.
"""
if resource == 'gui' and not _is_gui_available():
raise unittest.SkipTest("Cannot use the 'gui' resource")
# see if the caller's module is __main__ - if so, treat as if
# the resource was set
if sys._getframe(1).f_globals.get("__name__") == "__main__":
return
if not is_resource_enabled(resource):
if msg is None:
msg = "Use of the %r resource not enabled" % resource
raise ResourceDenied(msg)
def bigaddrspacetest(f):
"""Decorator for tests that fill the address space."""
def wrapper(self):
if max_memuse < MAX_Py_ssize_t:
if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
raise unittest.SkipTest(
"not enough memory: try a 32-bit build instead")
else:
raise unittest.SkipTest(
"not enough memory: %.1fG minimum needed"
% (MAX_Py_ssize_t / (1024 ** 3)))
else:
return f(self)
return wrapper
#=======================================================================
# unittest integration.