python类fail()的实例源码

test_bake_project.py 文件源码 项目:cookiecutter-django-app 作者: edx 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def check_quality(result):
    """Run quality tests on the given generated output."""
    for dirpath, _dirnames, filenames in os.walk(str(result.project)):
        pylintrc = str(result.project.join('pylintrc'))
        for filename in filenames:
            name = os.path.join(dirpath, filename)
            if not name.endswith('.py'):
                continue
            try:
                sh.pylint(name, rcfile=pylintrc)
                sh.pylint(name, py3k=True)
                sh.pycodestyle(name)
                if filename != 'setup.py':
                    sh.pydocstyle(name)
                sh.isort(name, check_only=True)
            except sh.ErrorReturnCode as exc:
                pytest.fail(str(exc))

    tox_ini = result.project.join('tox.ini')
    docs_build_dir = result.project.join('docs/_build')
    try:
        # Sanity check the generated Makefile
        sh.make('help')
        # quality check docs
        sh.doc8(result.project.join("README.rst"), ignore_path=docs_build_dir, config=tox_ini)
        sh.doc8(result.project.join("docs"), ignore_path=docs_build_dir, config=tox_ini)
    except sh.ErrorReturnCode as exc:
        pytest.fail(str(exc))
test_iam_valid_json.py 文件源码 项目:foremast 作者: gogoair 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_all_iam_templates(template_name):
    """Verify all IAM templates render as proper JSON."""
    *_, service_json = template_name.split('/')
    service, *_ = service_json.split('.')

    items = ['resource1', 'resource2']

    if service == 'rds-db':
        items = {
            'resource1': 'user1',
            'resource2': 'user2',
        }

    try:
        rendered = render_policy_template(
            account_number='',
            app='coreforrest',
            env='dev',
            group='forrest',
            items=items,
            pipeline_settings={
                'lambda': {
                    'vpc_enabled': False,
                },
            },
            region='us-east-1',
            service=service)
    except json.decoder.JSONDecodeError:
        pytest.fail('Bad template: {0}'.format(template_name), pytrace=False)

    assert isinstance(rendered, list)
test_kubey.py 文件源码 项目:kubey 作者: bradrf 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _release(self, name):
        attr = getattr(self, '_orig_' + name)
        setattr(subprocess, name, attr)
        delattr(self, '_orig_' + name)
        responder = self.responders[name]
        if not responder.is_satisfied:
            pytest.fail('Unsatisfied responder: %s' % responder)
test_delta.py 文件源码 项目:ufodiff 作者: source-foundry 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_ufodiff_delta_class_instantiation_commit():
    try:
        deltaobj = Delta('.', [], is_commit_test=True, commit_number='2')
        assert deltaobj.is_commit_test is True
        assert deltaobj.is_branch_test is False
        assert deltaobj.commit_number == "2"
        assert deltaobj.compare_branch_name is None
        assert len(deltaobj.ufo_directory_list) == 0
    except Exception as e:
        pytest.fail(e)
test_delta.py 文件源码 项目:ufodiff 作者: source-foundry 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_ufodiff_delta_class_instantiation_branch():
    make_testing_branch()

    try:
        deltaobj = Delta('.', [], is_branch_test=True, compare_branch_name='testing_branch')
        assert deltaobj.is_commit_test is False
        assert deltaobj.is_branch_test is True
        assert deltaobj.compare_branch_name == "testing_branch"
        assert deltaobj.commit_number == "0"
        assert len(deltaobj.ufo_directory_list) == 0
    except Exception as e:
        delete_testing_branch()
        pytest.fail(e)

    delete_testing_branch()
test_delta.py 文件源码 项目:ufodiff 作者: source-foundry 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_ufodiff_delta_class_instantiation_commit_with_ufo_filter():
    try:
        deltaobj = Delta('.', ['Font-Regular.ufo'], is_commit_test=True, commit_number='2')
        assert deltaobj.is_commit_test is True
        assert deltaobj.is_branch_test is False
        assert deltaobj.commit_number == "2"
        assert deltaobj.compare_branch_name is None
        assert len(deltaobj.ufo_directory_list) == 1
        assert deltaobj.ufo_directory_list[0] == "Font-Regular.ufo"
    except Exception as e:
        pytest.fail(e)
conftest.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def error_on_ResourceWarning():
    """This fixture captures ResourceWarning's and reports an "error"
    describing the file handles left open.

    This is shown regardless of how successful the test was, if a test fails
    and leaves files open then those files will be reported.  Ideally, even
    those files should be closed properly after a test failure or exception.

    Since only Python 3 and PyPy3 have ResourceWarning's, this context will
    have no effect when running tests on Python 2 or PyPy.

    Because of autouse=True, this function will be automatically enabled for
    all test_* functions in this module.

    This code is primarily based on the examples found here:
    https://stackoverflow.com/questions/24717027/convert-python-3-resourcewarnings-into-exception
    """
    try:
        ResourceWarning
    except NameError:
        # Python 2, PyPy
        yield
        return
    # Python 3, PyPy3
    with warnings.catch_warnings(record=True) as caught:
        warnings.resetwarnings() # clear all filters
        warnings.simplefilter('ignore') # ignore all
        warnings.simplefilter('always', ResourceWarning) # add filter
        yield # run tests in this context
        gc.collect() # run garbage collection (for pypy3)
        if not caught:
            return
        pytest.fail('The following file descriptors were not closed properly:\n' +
                    '\n'.join((str(warning.message) for warning in caught)),
                    pytrace=False)
helpers.py 文件源码 项目:integration 作者: mendersoftware 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def yocto_id_from_ext4(self, filename):
        try:
            cmd = "debugfs -R 'cat %s' %s| sed -n 's/^%s=//p'" % (self.artifact_info_file, filename, self.artifact_prefix)
            output = subprocess.check_output(cmd, shell=True).strip()
            logger.info("Running: " + cmd + " returned: " + output)
            return output

        except subprocess.CalledProcessError:
            pytest.fail("Unable to read: %s, is it a broken image?" % (filename))

        except Exception, e:
            pytest.fail("Unexpected error trying to read ext4 image: %s, error: %s" % (filename, str(e)))
helpers.py 文件源码 项目:integration 作者: mendersoftware 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def artifact_id_randomize(self, install_image, device_type="vexpress-qemu", specific_image_id=None):

        if specific_image_id:
            imageid = specific_image_id
        else:
            imageid = "mender-%s" % str(random.randint(0, 99999999))

        config_file = r"""%s=%s""" % (self.artifact_prefix, imageid)
        tfile = tempfile.NamedTemporaryFile(delete=False)
        tfile.write(config_file)
        tfile.close()

        try:
            cmd = "debugfs -w -R 'rm %s' %s" % (self.artifact_info_file, install_image)
            logger.info("Running: " + cmd)
            output = subprocess.check_output(cmd, shell=True).strip()
            logger.info("Returned: " + output)

            cmd = ("printf 'cd %s\nwrite %s %s\n' | debugfs -w %s"
                   % (os.path.dirname(self.artifact_info_file),
                      tfile.name, os.path.basename(self.artifact_info_file), install_image))
            logger.info("Running: " + cmd)
            output = subprocess.check_output(cmd, shell=True).strip()
            logger.info("Returned: " + output)

        except subprocess.CalledProcessError:
            pytest.fail("Trying to modify ext4 image failed, probably because it's not a valid image.")

        except Exception, e:
            pytest.fail("Unexpted error trying to modify ext4 image: %s, error: %s" % (install_image, str(e)))

        finally:
            os.remove(tfile.name)

        return imageid
helpers.py 文件源码 项目:integration 作者: mendersoftware 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def verify_reboot_performed(self, max_wait=60*30):
        logger.info("waiting for system to reboot")

        successful_connections = 0
        timeout = time.time() + max_wait

        while time.time() <= timeout:
            try:
                with settings(warn_only=True, abort_exception=FabricFatalException):
                    time.sleep(1)
                    if exists(self.tfile):
                        logger.debug("temp. file still exists, device hasn't rebooted.")
                        continue
                    else:
                        logger.debug("temp. file no longer exists, device has rebooted.")
                        successful_connections += 1

                    # try connecting 10 times before returning
                    if successful_connections <= 9:
                        continue
                    return

            except (BaseException):
                logger.debug("system exit was caught, this is probably because SSH connectivity is broken while the system is rebooting")
                continue

        if time.time() > timeout:
            pytest.fail("Device never rebooted!")
common_update.py 文件源码 项目:integration 作者: mendersoftware 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def common_update_procedure(install_image,
                            regenerate_image_id=True,
                            device_type="vexpress-qemu",
                            broken_image=False,
                            verify_status=True,
                            signed=False,
                            devices=None,
                            scripts=[]):

    with artifact_lock:
        if broken_image:
            artifact_id = "broken_image_" + str(random.randint(0, 999999))
        elif regenerate_image_id:
            artifact_id = Helpers.artifact_id_randomize(install_image)
            logger.debug("randomized image id: " + artifact_id)
        else:
            artifact_id = Helpers.yocto_id_from_ext4(install_image)

        # create atrifact
        with tempfile.NamedTemporaryFile() as artifact_file:
            created_artifact = image.make_artifact(install_image, device_type, artifact_id, artifact_file, signed=signed, scripts=scripts)

            if created_artifact:
                deploy.upload_image(created_artifact)
                if devices is None:
                    devices = list(set([device["device_id"] for device in adm.get_devices_status("accepted")]))
                deployment_id = deploy.trigger_deployment(name="New valid update",
                                                          artifact_name=artifact_id,
                                                          devices=devices)
            else:
                logger.warn("failed to create artifact")
                pytest.fail("error creating artifact")

    # wait until deployment is in correct state
    if verify_status:
        deploy.check_expected_status("inprogress", deployment_id)

    return deployment_id, artifact_id
test_security.py 文件源码 项目:integration 作者: mendersoftware 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def test_ssl_only(self):
        """ make sure we are not exposing any non-ssl connections in production environment """
        done = False
        sleep_time = 2
        # start production environment
        subprocess.call(["./production_test_env.py", "--start"])

        # get all exposed ports from docker

        for _ in range(3):
            exposed_hosts = subprocess.check_output("docker ps | grep %s | grep -o -E '0.0.0.0:[0-9]*'" % ("testprod"), shell=True)

            try:
                for host in exposed_hosts.split():
                    with contextlib.closing(ssl.wrap_socket(socket.socket())) as sock:
                        logging.info("%s: connect to host with TLS" % host)
                        host, port = host.split(":")
                        sock.connect((host, int(port)))
                        done = True
            except:
                sleep_time *= 2
                time.sleep(sleep_time)
                continue

            if done:
                break

        # tear down production env
        subprocess.call(["./production_test_env.py", "--kill"])

        if not done:
            pytest.fail("failed to connect to production env. using SSL")
test_security.py 文件源码 项目:integration 作者: mendersoftware 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_token_token_expiration(self):
        """ verify that an expired token is handled correctly (client gets a new, valid one)
            and that deployments are still recieved by the client
        """

        if not env.host_string:
            execute(self.test_token_token_expiration,
                    hosts=get_mender_clients())
            return

        timeout_time = int(time.time()) + 60
        while int(time.time()) < timeout_time:
            with quiet():
                output = run("journalctl -u mender -l --no-pager | grep \"received new authorization data\"")
                time.sleep(1)

            if output.return_code == 0:
                logging.info("mender logs indicate new authorization data available")
                break

        if timeout_time <= int(time.time()):
            pytest.fail("timed out waiting for download retries")


        # this call verifies that the deployment process goes into an "inprogress" state
        # which is only possible when the client has a valid token.
        common_update_procedure(install_image=conftest.get_valid_image())
test_multi_tenancy.py 文件源码 项目:integration 作者: mendersoftware 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def mender_log_contains_aborted_string(self, mender_client_container="mender-client"):
        expected_string = "deployment aborted at the backend"

        for _ in range(60*5):
            with settings(hide('everything'), warn_only=True):
                out = run("journalctl -u mender | grep \"%s\"" % expected_string)
                if out.succeeded:
                    return
                else:
                    time.sleep(2)

        pytest.fail("deployment never aborted.")
test_multi_tenancy.py 文件源码 项目:integration 作者: mendersoftware 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def perform_update(self, mender_client_container="mender-client", fail=False):

        if fail:
            execute(update_image_failed,
                    hosts=get_mender_client_by_container_name(mender_client_container))
        else:
            execute(update_image_successful,
                    install_image=conftest.get_valid_image(),
                    skip_reboot_verification=True,
                    hosts=get_mender_client_by_container_name(mender_client_container))
common_setup.py 文件源码 项目:integration 作者: mendersoftware 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def wait_for_containers(expected_containers, defined_in):
    for _ in range(60 * 5):
        out = subprocess.check_output("docker-compose -p %s %s ps -q" % (conftest.docker_compose_instance, "-f " + " -f ".join(defined_in)), shell=True)
        if len(out.split()) == expected_containers:
            time.sleep(60)
            return
        else:
            time.sleep(1)

    pytest.fail("timeout: %d containers not running for docker-compose project: %s" % (expected_containers, conftest.docker_compose_instance))
utilities.py 文件源码 项目:nucypher-kms 作者: nucypher 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def get_ursula_by_id(self, ursula_id):
        try:
            ursula = self._ursulas[ursula_id]
        except KeyError:
            pytest.fail("No Ursula with ID {}".format(ursula_id))
        return ursula
test_network_actors.py 文件源码 项目:nucypher-kms 作者: nucypher 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def test_all_ursulas_know_about_all_other_ursulas(ursulas):
    """
    Once launched, all Ursulas know about - and can help locate - all other Ursulas in the network.
    """
    ignorance = []
    for acounter, announcing_ursula in enumerate(blockchain_client._ursulas_on_blockchain):
        for counter, propagating_ursula in enumerate(ursulas):
            if not digest(announcing_ursula) in propagating_ursula.server.storage:
                ignorance.append((counter, acounter))
    if ignorance:
        pytest.fail(str(["{} didn't know about {}".format(counter, acounter) for counter, acounter in ignorance]))
test_graph.py 文件源码 项目:robograph 作者: csparpa 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_execute_fails_with_graphs_with_isles():
    g = make_a_graph_with_isles()
    with pytest.raises(exceptions.GraphExecutionError):
        g.execute()
        pytest.fail()


问题


面经


文章

微信
公众号

扫码关注公众号