def test_ufodiff_delta_class_instantiation_branch_with_ufo_filter():
make_testing_branch()
try:
deltaobj = Delta('.', ['Font-Regular.ufo'], is_branch_test=True, compare_branch_name='testing_branch')
assert deltaobj.is_commit_test is False
assert deltaobj.is_branch_test is True
assert deltaobj.compare_branch_name == "testing_branch"
assert deltaobj.commit_number == "0"
assert len(deltaobj.ufo_directory_list) == 1
assert deltaobj.ufo_directory_list[0] == "Font-Regular.ufo"
except Exception as e:
delete_testing_branch()
pytest.fail(e)
delete_testing_branch()
python类fail()的实例源码
def _runTest(self):
self._linter.check([self._test_file.module])
expected_messages, expected_text = self._get_expected()
received_messages, received_text = self._get_received()
if expected_messages != received_messages:
msg = ['Wrong results for file "%s":' % (self._test_file.base)]
missing, unexpected = multiset_difference(expected_messages,
received_messages)
if missing:
msg.append('\nExpected in testdata:')
msg.extend(' %3d: %s' % msg for msg in sorted(missing))
if unexpected:
msg.append('\nUnexpected in testdata:')
msg.extend(' %3d: %s' % msg for msg in sorted(unexpected))
pytest.fail('\n'.join(msg))
self._check_output_text(expected_messages, expected_text, received_text)
def test_connect():
n1 = WithParameters(a=1, b=2)
n2 = WithParameters(c=1, d=2)
g = graph.Graph('testgraph', [n1, n2])
not_included = WithParameters(e=1, f=2)
# Errors when trying to link nodes coming from out of the graph
with pytest.raises(exceptions.NodeConnectionError):
g.connect(n1, not_included, 'blabla')
pytest.fail()
with pytest.raises(exceptions.NodeConnectionError):
g.connect(not_included, n2, 'blabla')
pytest.fail()
# Now the correct procedure
assert n2.output_label is None
assert len(g.nxgraph.edges()) == 0
g.connect(n1, n2, 'label')
assert n2.output_label == 'label'
assert len(g.nxgraph.edges()) == 1
def pytest_runtest_item(self, item):
lines1 = self.get_open_files()
yield
if hasattr(sys, "pypy_version_info"):
gc.collect()
lines2 = self.get_open_files()
new_fds = set([t[0] for t in lines2]) - set([t[0] for t in lines1])
leaked_files = [t for t in lines2 if t[0] in new_fds]
if leaked_files:
error = []
error.append("***** %s FD leakage detected" % len(leaked_files))
error.extend([str(f) for f in leaked_files])
error.append("*** Before:")
error.extend([str(f) for f in lines1])
error.append("*** After:")
error.extend([str(f) for f in lines2])
error.append(error[0])
error.append("*** function %s:%s: %s " % item.location)
pytest.fail("\n".join(error), pytrace=False)
# XXX copied from execnet's conftest.py - needs to be merged
def _addexcinfo(self, rawexcinfo):
# unwrap potential exception info (see twisted trial support below)
rawexcinfo = getattr(rawexcinfo, '_rawexcinfo', rawexcinfo)
try:
excinfo = _pytest._code.ExceptionInfo(rawexcinfo)
except TypeError:
try:
try:
l = traceback.format_exception(*rawexcinfo)
l.insert(0, "NOTE: Incompatible Exception Representation, "
"displaying natively:\n\n")
pytest.fail("".join(l), pytrace=False)
except (pytest.fail.Exception, KeyboardInterrupt):
raise
except:
pytest.fail("ERROR: Unknown Incompatible Exception "
"representation:\n%r" %(rawexcinfo,), pytrace=False)
except KeyboardInterrupt:
raise
except pytest.fail.Exception:
excinfo = _pytest._code.ExceptionInfo()
self.__dict__.setdefault('_excinfo', []).append(excinfo)
def test_binary_hook_sequence_is_lower_than_030(
self, write_cloud_config_in_memory, monkeypatch):
# That's the lowest sequence of disks and we want to filter before that
filter_template = '-- binary hook filter:{} --'
hook_filter = 'some*glob'
monkeypatch.setattr(
generate_build_config,
"BINARY_HOOK_FILTER_CONTENT", filter_template)
output = write_cloud_config_in_memory(binary_hook_filter=hook_filter)
cloud_config = yaml.load(output)
expected_content = filter_template.format(hook_filter)
for stanza in cloud_config['write_files']:
content = base64.b64decode(stanza['content']).decode('utf-8')
if content == expected_content:
break
else:
pytest.fail('Binary hook filter not correctly included.')
path = stanza['path'].rsplit('/')[-1]
assert path < '030-some-file.binary'
def assert_vec(vec, x, y, z, msg=''):
"""Asserts that Vec is equal to (x,y,z)."""
# Don't show in pytest tracebacks.
__tracebackhide__ = True
# Ignore slight variations
if not vec.x == pytest.approx(x):
failed = 'x'
elif not vec.y == pytest.approx(y):
failed = 'y'
elif not vec.z == pytest.approx(z):
failed = 'z'
else:
# Success!
return
new_msg = "{!r} != ({}, {}, {})".format(vec, failed, x, y, z)
if msg:
new_msg += ': ' + msg
pytest.fail(new_msg)
def test_collect(self):
from dallinger.experiments import Bartlett1932
exp = Bartlett1932()
existing_uuid = "12345-12345-12345-12345"
data = exp.collect(existing_uuid, recruiter=u'bots')
assert isinstance(data, dallinger.data.Data)
dataless_uuid = "ed9e7ddd-3e97-452d-9e34-fee5d432258e"
dallinger.data.register(dataless_uuid, 'https://bogus-url.com/something')
try:
data = exp.collect(dataless_uuid, recruiter=u'bots')
except RuntimeError:
# This is expected for an already registered UUID with no accessible data
pass
else:
pytest.fail('Did not raise RuntimeError for existing UUID')
# In debug mode an unknown UUID fails
unknown_uuid = "c85d5086-2fa7-4baf-9103-e142b9170cca"
with pytest.raises(RuntimeError):
data = exp.collect(unknown_uuid, mode=u'debug', recruiter=u'bots')
def test_index_2dsphere(self, test_db):
assert 'geo_2dsphere' == await test_db.test.create_index([('geo', GEOSPHERE)])
for dummy, info in (await test_db.test.index_information()).items():
field, idx_type = info['key'][0]
if field == 'geo' and idx_type == '2dsphere':
break
else:
pytest.fail('2dsphere index not found.')
poly = {'type': 'Polygon',
'coordinates': [[[40, 5], [40, 6], [41, 6], [41, 5], [40, 5]]]}
query = {'geo': {'$within': {'$geometry': poly}}}
# This query will error without a 2dsphere index.
async with test_db.test.find(query) as cursor:
async for _ in cursor:
pass
def test_forbidden(self, client, media_type):
headers = {'Accept': media_type}
expected_body = {
'title': 'Request denied',
'description': ('You do not have write permissions for this '
'queue.'),
'link': {
'text': 'Documentation related to this error',
'href': 'http://example.com/api/rbac',
'rel': 'help',
},
}
response = client.simulate_post(path='/fail', headers=headers)
assert response.status == falcon.HTTP_403
assert response.json == expected_body
def test_epic_fail_json(self, client):
headers = {'Accept': 'application/json'}
expected_body = {
'title': 'Internet crashed',
'description': 'Catastrophic weather event due to climate change.',
'code': 8733224,
'link': {
'text': 'Drill baby drill!',
'href': 'http://example.com/api/climate',
'rel': 'help',
},
}
response = client.simulate_put('/fail', headers=headers)
assert response.status == falcon.HTTP_792
assert response.json == expected_body
def test_epic_fail_xml(self, client, media_type):
headers = {'Accept': media_type}
expected_body = ('<?xml version="1.0" encoding="UTF-8"?>' +
'<error>' +
'<title>Internet crashed</title>' +
'<description>' +
'Catastrophic weather event due to climate change.' +
'</description>' +
'<code>8733224</code>' +
'<link>' +
'<text>Drill baby drill!</text>' +
'<href>http://example.com/api/climate</href>' +
'<rel>help</rel>' +
'</link>' +
'</error>')
response = client.simulate_put(path='/fail', headers=headers)
assert response.status == falcon.HTTP_792
try:
et.fromstring(response.content.decode('utf-8'))
except ValueError:
pytest.fail()
assert response.text == expected_body
def test_actxmgr_exception_nested():
@aiotools.actxmgr
async def simple_ctx(msg):
yield msg
try:
async with simple_ctx('hello') as msg1:
async with simple_ctx('world') as msg2:
assert msg1 == 'hello'
assert msg2 == 'world'
raise IndexError('bomb1')
except BaseException as exc:
assert isinstance(exc, IndexError)
assert 'bomb1' == exc.args[0]
else:
pytest.fail()
def test_actxmgr_exception_chained():
@aiotools.actxmgr
async def simple_ctx(msg):
try:
await asyncio.sleep(0)
yield msg
except Exception as e:
await asyncio.sleep(0)
# exception is chained
raise ValueError('bomb2') from e
try:
async with simple_ctx('hello') as msg:
assert msg == 'hello'
raise IndexError('bomb1')
except BaseException as exc:
assert isinstance(exc, ValueError)
assert 'bomb2' == exc.args[0]
assert isinstance(exc.__cause__, IndexError)
assert 'bomb1' == exc.__cause__.args[0]
else:
pytest.fail()
def test():
t = timestamp_from_string("423423423")
d = datetime(1983, 6, 2, 17, 37, 3, tzinfo=pytz.utc)
assert t == d
try:
t = timestamp_from_string("asd")
except ValueError:
pass
else:
pytest.fail("This call should raise a ValueError")
t = timestamp_from_string("0")
d = datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc)
assert t == d
t = timestamp_from_string("-1")
d = datetime(1969, 12, 31, 23, 59, 59, tzinfo=pytz.utc)
assert t == d
def test():
packages = list_all()
assert packages is not None
assert isinstance(packages, list)
assert len(packages) > 0
found = False
for pkg in packages:
if pkg._key == "rapydo-utils":
found = True
if not found:
pytest.fail("rapydo.utils not found in the list of packages")
ver = check_version("rapydo-utils")
assert ver is not None
ver = check_version("rapydo-blabla")
assert ver is None
install("docker-compose")
def test_repeatable_one_zero_rr_conflicts():
"""
Check that translations of B+ and B* don't produce R/R conflict.
"""
grammar = """
S: A B+ C;
S: A B* D;
A:; B:; C:; D:;
"""
g = Grammar.from_string(grammar, _no_check_recognizers=True)
# Check if parser construction raises exception
try:
Parser(g)
except RRConflicts:
pytest.fail("R/R conflicts not expected here.")
def test_exceptions_in_server_bubble_up(self):
"""
The callbacks thrown in the server callback bubble up to the caller.
"""
class SentinelException(Exception):
pass
def server_callback(*args):
raise SentinelException()
def client_callback(*args): # pragma: nocover
pytest.fail("Should not be called")
client = self._client_connection(callback=client_callback, data=None)
server = self._server_connection(callback=server_callback, data=None)
with pytest.raises(SentinelException):
handshake_in_memory(client, server)
def test_gateway_status_busy(self, gw):
numchannels = gw.remote_status().numchannels
ch1 = gw.remote_exec("channel.send(1); channel.receive()")
ch2 = gw.remote_exec("channel.receive()")
ch1.receive()
status = gw.remote_status()
assert status.numexecuting == 2 # number of active execution threads
assert status.numchannels == numchannels + 2
ch1.send(None)
ch2.send(None)
ch1.waitclose()
ch2.waitclose()
for i in range(10):
status = gw.remote_status()
if status.numexecuting == 0:
break
else:
pytest.fail("did not get correct remote status")
# race condition
assert status.numchannels <= numchannels
def test_popen_filetracing(self, testdir, monkeypatch, makegateway):
tmpdir = testdir.tmpdir
monkeypatch.setenv("TMP", tmpdir)
monkeypatch.setenv("TEMP", tmpdir) # windows
monkeypatch.setenv('EXECNET_DEBUG', "1")
gw = makegateway("popen")
# hack out the debuffilename
fn = gw.remote_exec(
"import execnet;channel.send(execnet.gateway_base.fn)"
).receive()
slavefile = py.path.local(fn)
assert slavefile.check()
slave_line = "creating slavegateway"
for line in slavefile.readlines():
if slave_line in line:
break
else:
py.test.fail("did not find %r in tracefile" % (slave_line,))
gw.exit()
def test_can_have_same_cors_configurations(self, sample_app):
@sample_app.route('/cors', methods=['GET'], cors=True)
def cors():
pass
@sample_app.route('/cors', methods=['PUT'], cors=True)
def same_cors():
pass
try:
validate_routes(sample_app.routes)
except ValueError:
pytest.fail(
'A ValueError was unexpectedly thrown. Applications '
'may have multiple view functions that share the same '
'route and CORS configuration.'
)
def test_can_have_one_cors_configured_and_others_not(self, sample_app):
@sample_app.route('/cors', methods=['GET'], cors=True)
def cors():
pass
@sample_app.route('/cors', methods=['PUT'])
def no_cors():
pass
try:
validate_routes(sample_app.routes)
except ValueError:
pytest.fail(
'A ValueError was unexpectedly thrown. Applications '
'may have multiple view functions that share the same '
'route but only one is configured for CORS.'
)
def get_isis_topology(self, instance=None):
"""Get IS-IS topology table.
Returns:
list[dict]: List of dictionary with keys: vertex, type, metric, next_hop, interface, parent
"""
if instance:
try:
topology_output = instance.cli_send_command('vtysh -c "show isis topology"').stdout
except UICmdException as ex:
self.class_logger.error(ex)
pytest.fail('Failed to get IS-IS topology, node id {}.'.format(
instance.switch.id))
table = list(self.parse_isis_table_topology(topology_output.strip().splitlines()[4:]))
for record in table:
record["interface"] = instance.name_to_portid_map.get(record["interface"])
return table
else:
raise UIException("UI instance isn't specified.")
def get_node_hostname(self, instance=None):
"""Get node's hostname.
Returns:
str: Hostname of node.
"""
if instance:
try:
hostname_output = instance.cli_send_command('hostname').stdout
except UICmdException as ex:
self.class_logger.error(ex)
pytest.fail('Failed to get hostname, node id {}.'.format(
instance.switch.id))
return hostname_output.strip()
else:
raise UIException("UI instance isn't specified.")
def is_row_added_to_l2multicast_table(mac_address=None, port_id=None, vlan_id=1, switch_instance=None, result=True):
"""Check if row with specified parameters added to L2Multicast table.
"""
# Need to wait until entry will be added to L2Multicast table.
time.sleep(1)
table = switch_instance.getprop_table("L2Multicast")
is_entry_added = False
if table:
for row in table:
if (row['macAddress'] == mac_address) and (row['portId'] == port_id) and (row['vlanId'] == vlan_id):
is_entry_added = True
if is_entry_added == result:
return True
else:
if is_entry_added is False:
pytest.fail("Entry is not added to L2Multicast table!")
if is_entry_added is True:
pytest.fail("Entry is added to L2Multicast table (should not be)!")
def console_clear_config(self):
"""Clear device configuration using console connection
"""
cmd = [
"from xmlrpclib import ServerProxy", "rc = -1",
"rc = ServerProxy('http://127.0.0.1:8081/RPC2').nb.clearConfig()",
"print 'clearConfig() returnCode={0}.'.format(rc)"]
command = "python -c \"" + "; ".join(cmd) + "\""
output, err, _ = self.telnet.exec_command(command.encode("ascii"))
if err:
message = "Cannot perform clearConfig.\nCommand: %s.\nStdout: %s\nStdErr: %s" % (command, output, err)
self.class_logger.error(message)
self.db_corruption = True
pytest.fail(message)
else:
if "returnCode=0." in output:
self.class_logger.debug("ClearConfig finished. StdOut:\n%s" % (output, ))
else:
message = "ClearConfig failed. StdOut:\n%s" % (output, )
self.class_logger.error(message)
pytest.fail(message)
def check(self):
"""Check if switch is operational using waiton method.
Notes:
This mandatory method for all environment classes.
"""
if not self.status:
self.class_logger.info("Skip switch id:%s(%s) check because it's has Off status." % (self.id, self.name))
return
status = self.waiton()
# Verify Ports table is not empty
if self.ui.get_table_ports() == []:
if self.opts.fail_ctrl == 'stop':
self.class_logger.debug("Exit switch check. Ports table is empty!")
pytest.exit('Ports table is empty!')
else:
self.class_logger.debug("Fail switch check. Ports table is empty!")
pytest.fail('Ports table is empty!')
return status
def test_import_helpers_module():
"""Verify that all modules can be imported within 'helpers' module and classes objects can be created.
"""
module_name = "helpers"
try:
# define test data
def assertion_error_func():
"""Assertion error.
"""
assert 0
from testlib import helpers
helpers.raises(AssertionError, "", assertion_error_func)
except ImportError as err:
pytest.fail("Import failure in '%s' module: %s" % (module_name, err))
def test_import_common3_module(monkeypatch):
"""Verify that all modules can be imported within 'common3' module and 'Cross'/'Environment' objects can be created.
"""
def fake_get_conf(env_object, path_string):
"""Get config.
"""
return {'env': []}
module_name = "common3"
try:
from testlib import common3
common3.Cross(None, None)
# replace Environment _get_setup method
monkeypatch.setattr(common3.Environment, "_get_setup", fake_get_conf)
common3.Environment(FakeOpts())
except ImportError as err:
pytest.fail("Import failure in '%s' module: %s" % (module_name, err))
def test_readme(cookies):
"""The generated README.rst file should pass some sanity checks and validate as a PyPI long description."""
extra_context = {'repo_name': 'helloworld'}
with bake_in_temp_dir(cookies, extra_context=extra_context) as result:
readme_file = result.project.join('README.rst')
readme_lines = [x.strip() for x in readme_file.readlines(cr=False)]
assert 'helloworld' in readme_lines
assert 'The full documentation is at https://helloworld.readthedocs.org.' in readme_lines
setup_path = str(result.project.join('setup.py'))
try:
sh.python(setup_path, 'check', restructuredtext=True, strict=True)
except sh.ErrorReturnCode as exc:
pytest.fail(str(exc))