python类fail()的实例源码

test_delta.py 文件源码 项目:ufodiff 作者: source-foundry 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_ufodiff_delta_class_instantiation_branch_with_ufo_filter():
    make_testing_branch()

    try:
        deltaobj = Delta('.', ['Font-Regular.ufo'], is_branch_test=True, compare_branch_name='testing_branch')
        assert deltaobj.is_commit_test is False
        assert deltaobj.is_branch_test is True
        assert deltaobj.compare_branch_name == "testing_branch"
        assert deltaobj.commit_number == "0"
        assert len(deltaobj.ufo_directory_list) == 1
        assert deltaobj.ufo_directory_list[0] == "Font-Regular.ufo"
    except Exception as e:
        delete_testing_branch()
        pytest.fail(e)

    delete_testing_branch()
test_functional.py 文件源码 项目:shopify_python 作者: Shopify 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def _runTest(self):
        self._linter.check([self._test_file.module])

        expected_messages, expected_text = self._get_expected()
        received_messages, received_text = self._get_received()

        if expected_messages != received_messages:
            msg = ['Wrong results for file "%s":' % (self._test_file.base)]
            missing, unexpected = multiset_difference(expected_messages,
                                                      received_messages)
            if missing:
                msg.append('\nExpected in testdata:')
                msg.extend(' %3d: %s' % msg for msg in sorted(missing))
            if unexpected:
                msg.append('\nUnexpected in testdata:')
                msg.extend(' %3d: %s' % msg for msg in sorted(unexpected))
            pytest.fail('\n'.join(msg))
        self._check_output_text(expected_messages, expected_text, received_text)
test_graph.py 文件源码 项目:robograph 作者: csparpa 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def test_connect():
    n1 = WithParameters(a=1, b=2)
    n2 = WithParameters(c=1, d=2)
    g = graph.Graph('testgraph', [n1, n2])
    not_included = WithParameters(e=1, f=2)

    # Errors when trying to link nodes coming from out of the graph
    with pytest.raises(exceptions.NodeConnectionError):
        g.connect(n1, not_included, 'blabla')
        pytest.fail()
    with pytest.raises(exceptions.NodeConnectionError):
        g.connect(not_included, n2, 'blabla')
        pytest.fail()

    # Now the correct procedure
    assert n2.output_label is None
    assert len(g.nxgraph.edges()) == 0
    g.connect(n1, n2, 'label')
    assert n2.output_label == 'label'
    assert len(g.nxgraph.edges()) == 1
pytester.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def pytest_runtest_item(self, item):
        lines1 = self.get_open_files()
        yield
        if hasattr(sys, "pypy_version_info"):
            gc.collect()
        lines2 = self.get_open_files()

        new_fds = set([t[0] for t in lines2]) - set([t[0] for t in lines1])
        leaked_files = [t for t in lines2 if t[0] in new_fds]
        if leaked_files:
            error = []
            error.append("***** %s FD leakage detected" % len(leaked_files))
            error.extend([str(f) for f in leaked_files])
            error.append("*** Before:")
            error.extend([str(f) for f in lines1])
            error.append("*** After:")
            error.extend([str(f) for f in lines2])
            error.append(error[0])
            error.append("*** function %s:%s: %s " % item.location)
            pytest.fail("\n".join(error), pytrace=False)


# XXX copied from execnet's conftest.py - needs to be merged
unittest.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _addexcinfo(self, rawexcinfo):
        # unwrap potential exception info (see twisted trial support below)
        rawexcinfo = getattr(rawexcinfo, '_rawexcinfo', rawexcinfo)
        try:
            excinfo = _pytest._code.ExceptionInfo(rawexcinfo)
        except TypeError:
            try:
                try:
                    l = traceback.format_exception(*rawexcinfo)
                    l.insert(0, "NOTE: Incompatible Exception Representation, "
                        "displaying natively:\n\n")
                    pytest.fail("".join(l), pytrace=False)
                except (pytest.fail.Exception, KeyboardInterrupt):
                    raise
                except:
                    pytest.fail("ERROR: Unknown Incompatible Exception "
                        "representation:\n%r" %(rawexcinfo,), pytrace=False)
            except KeyboardInterrupt:
                raise
            except pytest.fail.Exception:
                excinfo = _pytest._code.ExceptionInfo()
        self.__dict__.setdefault('_excinfo', []).append(excinfo)
tests.py 文件源码 项目:ubuntu-standalone-builder 作者: OddBloke 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_binary_hook_sequence_is_lower_than_030(
            self, write_cloud_config_in_memory, monkeypatch):
        # That's the lowest sequence of disks and we want to filter before that
        filter_template = '-- binary hook filter:{} --'
        hook_filter = 'some*glob'
        monkeypatch.setattr(
            generate_build_config,
            "BINARY_HOOK_FILTER_CONTENT", filter_template)
        output = write_cloud_config_in_memory(binary_hook_filter=hook_filter)
        cloud_config = yaml.load(output)
        expected_content = filter_template.format(hook_filter)
        for stanza in cloud_config['write_files']:
            content = base64.b64decode(stanza['content']).decode('utf-8')
            if content == expected_content:
                break
        else:
            pytest.fail('Binary hook filter not correctly included.')
        path = stanza['path'].rsplit('/')[-1]
        assert path < '030-some-file.binary'
test_vec.py 文件源码 项目:srctools 作者: TeamSpen210 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def assert_vec(vec, x, y, z, msg=''):
    """Asserts that Vec is equal to (x,y,z)."""
    # Don't show in pytest tracebacks.
    __tracebackhide__ = True

    # Ignore slight variations
    if not vec.x == pytest.approx(x):
        failed = 'x'
    elif not vec.y == pytest.approx(y):
        failed = 'y'
    elif not vec.z == pytest.approx(z):
        failed = 'z'
    else:
        # Success!
        return

    new_msg = "{!r} != ({}, {}, {})".format(vec, failed, x, y, z)
    if msg:
        new_msg += ': ' + msg
    pytest.fail(new_msg)
test_api.py 文件源码 项目:Dallinger 作者: Dallinger 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_collect(self):
        from dallinger.experiments import Bartlett1932
        exp = Bartlett1932()
        existing_uuid = "12345-12345-12345-12345"
        data = exp.collect(existing_uuid, recruiter=u'bots')
        assert isinstance(data, dallinger.data.Data)

        dataless_uuid = "ed9e7ddd-3e97-452d-9e34-fee5d432258e"
        dallinger.data.register(dataless_uuid, 'https://bogus-url.com/something')

        try:
            data = exp.collect(dataless_uuid, recruiter=u'bots')
        except RuntimeError:
            # This is expected for an already registered UUID with no accessible data
            pass
        else:
            pytest.fail('Did not raise RuntimeError for existing UUID')

        # In debug mode an unknown UUID fails
        unknown_uuid = "c85d5086-2fa7-4baf-9103-e142b9170cca"
        with pytest.raises(RuntimeError):
            data = exp.collect(unknown_uuid, mode=u'debug', recruiter=u'bots')
test_collection.py 文件源码 项目:aiomongo 作者: ZeoAlliance 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_index_2dsphere(self, test_db):
        assert 'geo_2dsphere' == await test_db.test.create_index([('geo', GEOSPHERE)])

        for dummy, info in (await test_db.test.index_information()).items():
            field, idx_type = info['key'][0]
            if field == 'geo' and idx_type == '2dsphere':
                break
        else:
            pytest.fail('2dsphere index not found.')

        poly = {'type': 'Polygon',
                'coordinates': [[[40, 5], [40, 6], [41, 6], [41, 5], [40, 5]]]}
        query = {'geo': {'$within': {'$geometry': poly}}}

        # This query will error without a 2dsphere index.
        async with test_db.test.find(query) as cursor:
            async for _ in cursor:
                pass
test_httperror.py 文件源码 项目:deb-python-falcon 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_forbidden(self, client, media_type):
        headers = {'Accept': media_type}

        expected_body = {
            'title': 'Request denied',
            'description': ('You do not have write permissions for this '
                            'queue.'),
            'link': {
                'text': 'Documentation related to this error',
                'href': 'http://example.com/api/rbac',
                'rel': 'help',
            },
        }

        response = client.simulate_post(path='/fail', headers=headers)

        assert response.status == falcon.HTTP_403
        assert response.json == expected_body
test_httperror.py 文件源码 项目:deb-python-falcon 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_epic_fail_json(self, client):
        headers = {'Accept': 'application/json'}

        expected_body = {
            'title': 'Internet crashed',
            'description': 'Catastrophic weather event due to climate change.',
            'code': 8733224,
            'link': {
                'text': 'Drill baby drill!',
                'href': 'http://example.com/api/climate',
                'rel': 'help',
            },
        }

        response = client.simulate_put('/fail', headers=headers)

        assert response.status == falcon.HTTP_792
        assert response.json == expected_body
test_httperror.py 文件源码 项目:deb-python-falcon 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_epic_fail_xml(self, client, media_type):
        headers = {'Accept': media_type}

        expected_body = ('<?xml version="1.0" encoding="UTF-8"?>' +
                         '<error>' +
                         '<title>Internet crashed</title>' +
                         '<description>' +
                         'Catastrophic weather event due to climate change.' +
                         '</description>' +
                         '<code>8733224</code>' +
                         '<link>' +
                         '<text>Drill baby drill!</text>' +
                         '<href>http://example.com/api/climate</href>' +
                         '<rel>help</rel>' +
                         '</link>' +
                         '</error>')

        response = client.simulate_put(path='/fail', headers=headers)

        assert response.status == falcon.HTTP_792
        try:
            et.fromstring(response.content.decode('utf-8'))
        except ValueError:
            pytest.fail()
        assert response.text == expected_body
test_context.py 文件源码 项目:aiotools 作者: achimnol 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_actxmgr_exception_nested():

    @aiotools.actxmgr
    async def simple_ctx(msg):
        yield msg

    try:
        async with simple_ctx('hello') as msg1:
            async with simple_ctx('world') as msg2:
                assert msg1 == 'hello'
                assert msg2 == 'world'
                raise IndexError('bomb1')
    except BaseException as exc:
        assert isinstance(exc, IndexError)
        assert 'bomb1' == exc.args[0]
    else:
        pytest.fail()
test_context.py 文件源码 项目:aiotools 作者: achimnol 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_actxmgr_exception_chained():

    @aiotools.actxmgr
    async def simple_ctx(msg):
        try:
            await asyncio.sleep(0)
            yield msg
        except Exception as e:
            await asyncio.sleep(0)
            # exception is chained
            raise ValueError('bomb2') from e

    try:
        async with simple_ctx('hello') as msg:
            assert msg == 'hello'
            raise IndexError('bomb1')
    except BaseException as exc:
        assert isinstance(exc, ValueError)
        assert 'bomb2' == exc.args[0]
        assert isinstance(exc.__cause__, IndexError)
        assert 'bomb1' == exc.__cause__.args[0]
    else:
        pytest.fail()
test_time.py 文件源码 项目:utils 作者: rapydo 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test():

    t = timestamp_from_string("423423423")

    d = datetime(1983, 6, 2, 17, 37, 3, tzinfo=pytz.utc)
    assert t == d

    try:
        t = timestamp_from_string("asd")
    except ValueError:
        pass
    else:
        pytest.fail("This call should raise a ValueError")

    t = timestamp_from_string("0")
    d = datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc)
    assert t == d

    t = timestamp_from_string("-1")
    d = datetime(1969, 12, 31, 23, 59, 59, tzinfo=pytz.utc)
    assert t == d
test_packing.py 文件源码 项目:utils 作者: rapydo 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test():

    packages = list_all()
    assert packages is not None
    assert isinstance(packages, list)
    assert len(packages) > 0

    found = False
    for pkg in packages:
        if pkg._key == "rapydo-utils":
            found = True

    if not found:
        pytest.fail("rapydo.utils not found in the list of packages")

    ver = check_version("rapydo-utils")
    assert ver is not None

    ver = check_version("rapydo-blabla")
    assert ver is None

    install("docker-compose")
test_repeatable.py 文件源码 项目:parglare 作者: igordejanovic 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_repeatable_one_zero_rr_conflicts():
    """
    Check that translations of B+ and B* don't produce R/R conflict.
    """
    grammar = """
    S: A B+ C;
    S: A B* D;
    A:; B:; C:; D:;
    """
    g = Grammar.from_string(grammar, _no_check_recognizers=True)

    # Check if parser construction raises exception
    try:
        Parser(g)
    except RRConflicts:
        pytest.fail("R/R conflicts not expected here.")
test_ssl.py 文件源码 项目:2FAssassin 作者: maxwellkoh 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_exceptions_in_server_bubble_up(self):
        """
        The callbacks thrown in the server callback bubble up to the caller.
        """
        class SentinelException(Exception):
            pass

        def server_callback(*args):
            raise SentinelException()

        def client_callback(*args):  # pragma: nocover
            pytest.fail("Should not be called")

        client = self._client_connection(callback=client_callback, data=None)
        server = self._server_connection(callback=server_callback, data=None)

        with pytest.raises(SentinelException):
            handshake_in_memory(client, server)
test_gateway.py 文件源码 项目:execnet 作者: pytest-dev 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_gateway_status_busy(self, gw):
        numchannels = gw.remote_status().numchannels
        ch1 = gw.remote_exec("channel.send(1); channel.receive()")
        ch2 = gw.remote_exec("channel.receive()")
        ch1.receive()
        status = gw.remote_status()
        assert status.numexecuting == 2  # number of active execution threads
        assert status.numchannels == numchannels + 2
        ch1.send(None)
        ch2.send(None)
        ch1.waitclose()
        ch2.waitclose()
        for i in range(10):
            status = gw.remote_status()
            if status.numexecuting == 0:
                break
        else:
            pytest.fail("did not get correct remote status")
        # race condition
        assert status.numchannels <= numchannels
test_gateway.py 文件源码 项目:execnet 作者: pytest-dev 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_popen_filetracing(self, testdir, monkeypatch, makegateway):
        tmpdir = testdir.tmpdir
        monkeypatch.setenv("TMP", tmpdir)
        monkeypatch.setenv("TEMP", tmpdir)  # windows
        monkeypatch.setenv('EXECNET_DEBUG', "1")
        gw = makegateway("popen")
        #  hack out the debuffilename
        fn = gw.remote_exec(
            "import execnet;channel.send(execnet.gateway_base.fn)"
        ).receive()
        slavefile = py.path.local(fn)
        assert slavefile.check()
        slave_line = "creating slavegateway"
        for line in slavefile.readlines():
            if slave_line in line:
                break
        else:
            py.test.fail("did not find %r in tracefile" % (slave_line,))
        gw.exit()
test_deployer.py 文件源码 项目:chalice 作者: aws 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_can_have_same_cors_configurations(self, sample_app):
        @sample_app.route('/cors', methods=['GET'], cors=True)
        def cors():
            pass

        @sample_app.route('/cors', methods=['PUT'], cors=True)
        def same_cors():
            pass

        try:
            validate_routes(sample_app.routes)
        except ValueError:
            pytest.fail(
                'A ValueError was unexpectedly thrown. Applications '
                'may have multiple view functions that share the same '
                'route and CORS configuration.'
            )
test_deployer.py 文件源码 项目:chalice 作者: aws 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_can_have_one_cors_configured_and_others_not(self, sample_app):
        @sample_app.route('/cors', methods=['GET'], cors=True)
        def cors():
            pass

        @sample_app.route('/cors', methods=['PUT'])
        def no_cors():
            pass

        try:
            validate_routes(sample_app.routes)
        except ValueError:
            pytest.fail(
                'A ValueError was unexpectedly thrown. Applications '
                'may have multiple view functions that share the same '
                'route but only one is configured for CORS.'
            )
ui_dcrp_shell.py 文件源码 项目:taf 作者: taf3 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_isis_topology(self, instance=None):
        """Get IS-IS topology table.

        Returns:
            list[dict]: List of dictionary with keys: vertex, type, metric, next_hop, interface, parent

        """
        if instance:
            try:
                topology_output = instance.cli_send_command('vtysh -c "show isis topology"').stdout
            except UICmdException as ex:
                self.class_logger.error(ex)
                pytest.fail('Failed to get IS-IS topology, node id {}.'.format(
                    instance.switch.id))
            table = list(self.parse_isis_table_topology(topology_output.strip().splitlines()[4:]))
            for record in table:
                record["interface"] = instance.name_to_portid_map.get(record["interface"])
            return table
        else:
            raise UIException("UI instance isn't specified.")
ui_dcrp_shell.py 文件源码 项目:taf 作者: taf3 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_node_hostname(self, instance=None):
        """Get node's hostname.

        Returns:
            str:  Hostname of node.

        """
        if instance:
            try:
                hostname_output = instance.cli_send_command('hostname').stdout
            except UICmdException as ex:
                self.class_logger.error(ex)
                pytest.fail('Failed to get hostname, node id {}.'.format(
                    instance.switch.id))
            return hostname_output.strip()
        else:
            raise UIException("UI instance isn't specified.")
helpers.py 文件源码 项目:taf 作者: taf3 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def is_row_added_to_l2multicast_table(mac_address=None, port_id=None, vlan_id=1, switch_instance=None, result=True):
    """Check if row with specified parameters added to L2Multicast table.

    """
    # Need to wait until entry will be added to L2Multicast table.
    time.sleep(1)
    table = switch_instance.getprop_table("L2Multicast")
    is_entry_added = False
    if table:
        for row in table:
            if (row['macAddress'] == mac_address) and (row['portId'] == port_id) and (row['vlanId'] == vlan_id):
                is_entry_added = True
    if is_entry_added == result:
        return True
    else:
        if is_entry_added is False:
            pytest.fail("Entry is not added to L2Multicast table!")
        if is_entry_added is True:
            pytest.fail("Entry is added to L2Multicast table (should not be)!")
switch_ons.py 文件源码 项目:taf 作者: taf3 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def console_clear_config(self):
        """Clear device configuration using console connection

        """
        cmd = [
            "from xmlrpclib import ServerProxy", "rc = -1",
            "rc = ServerProxy('http://127.0.0.1:8081/RPC2').nb.clearConfig()",
            "print 'clearConfig() returnCode={0}.'.format(rc)"]
        command = "python -c \"" + "; ".join(cmd) + "\""

        output, err, _ = self.telnet.exec_command(command.encode("ascii"))
        if err:
            message = "Cannot perform clearConfig.\nCommand: %s.\nStdout: %s\nStdErr: %s" % (command, output, err)
            self.class_logger.error(message)
            self.db_corruption = True
            pytest.fail(message)
        else:
            if "returnCode=0." in output:
                self.class_logger.debug("ClearConfig finished. StdOut:\n%s" % (output, ))
            else:
                message = "ClearConfig failed. StdOut:\n%s" % (output, )
                self.class_logger.error(message)
                pytest.fail(message)
switch_general.py 文件源码 项目:taf 作者: taf3 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def check(self):
        """Check if switch is operational using waiton method.

        Notes:
            This mandatory method for all environment classes.

        """
        if not self.status:
            self.class_logger.info("Skip switch id:%s(%s) check because it's has Off status." % (self.id, self.name))
            return
        status = self.waiton()
        # Verify Ports table is not empty
        if self.ui.get_table_ports() == []:
            if self.opts.fail_ctrl == 'stop':
                self.class_logger.debug("Exit switch check. Ports table is empty!")
                pytest.exit('Ports table is empty!')
            else:
                self.class_logger.debug("Fail switch check. Ports table is empty!")
                pytest.fail('Ports table is empty!')
        return status
test_dependencies_ons.py 文件源码 项目:taf 作者: taf3 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_import_helpers_module():
    """Verify that all modules can be imported within 'helpers' module and classes objects can be created.

    """
    module_name = "helpers"
    try:
        # define test data
        def assertion_error_func():
            """Assertion error.

            """
            assert 0

        from testlib import helpers
        helpers.raises(AssertionError, "", assertion_error_func)
    except ImportError as err:
        pytest.fail("Import failure in '%s' module: %s" % (module_name, err))
test_dependencies_core.py 文件源码 项目:taf 作者: taf3 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_import_common3_module(monkeypatch):
    """Verify that all modules can be imported within 'common3' module and 'Cross'/'Environment' objects can be created.

    """

    def fake_get_conf(env_object, path_string):
        """Get config.

        """
        return {'env': []}

    module_name = "common3"
    try:
        from testlib import common3
        common3.Cross(None, None)

        # replace Environment _get_setup method
        monkeypatch.setattr(common3.Environment, "_get_setup", fake_get_conf)
        common3.Environment(FakeOpts())
    except ImportError as err:
        pytest.fail("Import failure in '%s' module: %s" % (module_name, err))
test_bake_project.py 文件源码 项目:cookiecutter-django-app 作者: edx 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_readme(cookies):
    """The generated README.rst file should pass some sanity checks and validate as a PyPI long description."""
    extra_context = {'repo_name': 'helloworld'}
    with bake_in_temp_dir(cookies, extra_context=extra_context) as result:

        readme_file = result.project.join('README.rst')
        readme_lines = [x.strip() for x in readme_file.readlines(cr=False)]
        assert 'helloworld' in readme_lines
        assert 'The full documentation is at https://helloworld.readthedocs.org.' in readme_lines
        setup_path = str(result.project.join('setup.py'))
        try:
            sh.python(setup_path, 'check', restructuredtext=True, strict=True)
        except sh.ErrorReturnCode as exc:
            pytest.fail(str(exc))


问题


面经


文章

微信
公众号

扫码关注公众号