python类raises()的实例源码

base_command_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_process_source_and_namespace_args_name_error(
        self,
        command,
        parser,
        source_one_active
    ):
        args = Args(
            source_id=None,
            source_name=source_one_active,
            namespace=None
        )
        with pytest.raises(FakeParserError) as e:
            command.process_source_and_namespace_args(args, parser)
        assert e
        assert e.value.args
        assert "--namespace must be provided" in e.value.args[0]
producer_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_ensure_messages_published_fails_when_overpublished(
        self, topic, messages, producer, topic_offsets
    ):
        for message in messages:
            producer.publish(message)
        producer.flush()

        with pytest.raises(
            PublicationUnensurableError
        ), mock.patch.object(
            data_pipeline.producer,
            'logger'
        ) as mock_logger:
            producer.ensure_messages_published(messages[:2], topic_offsets)

            self._assert_logged_info_correct(
                mock_logger,
                len(messages),
                topic,
                topic_offsets,
                message_count=len(messages[:2])
            )
producer_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_publish_fails_after_retry(self, message, producer):
        # TODO(DATAPIPE-606|clin) investigate better way than mocking response
        with mock.patch.object(
            producer._kafka_producer.kafka_client,
            'send_produce_request',
            side_effect=[FailedPayloadsError]
        ) as mock_send_request, capture_new_messages(
            message.topic
        ) as get_messages, pytest.raises(
            MaxRetryError
        ):
            orig_topic_to_offset_map = self.get_orig_topic_to_offset_map(producer)
            producer.publish(message)
            producer.flush()

            messages = get_messages()
            assert len(messages) == 0
            assert mock_send_request.call_count == self.max_retry_count
            self.assert_new_topic_to_offset_map(
                producer,
                message.topic,
                orig_topic_to_offset_map,
                published_message_count=0
            )
test_move.py 文件源码 项目:transfert 作者: rbernand 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_move(tmpdir, storages):
    f = tmpdir.join('alpha')
    f_http = Resource(storages['http']('/LICENSE'))
    f_file = Resource(storages['file'](f.strpath))
    f_sftp = Resource(storages['sftp']('/gamma'))
    f_ftp = Resource(storages['ftp']('/beta'))
    assert f_http.exists()
    delete_files(f_file, f_sftp, f_ftp)
    assert not f_file.exists()\
        and not f_sftp.exists()\
        and not f_ftp.exists()
    with pytest.raises(transfert.exceptions.ReadOnlyResourceException):
        transfert.actions.move(f_http, f_ftp, size=100)
    assert f_ftp.exists() and f_http.exists()
    transfert.actions.move(f_ftp, f_sftp, size=100)
    assert not f_ftp.exists() and f_sftp.exists()
    transfert.actions.move(f_sftp, f_file, size=100)
    assert not f_sftp.exists() and f_file.exists()
test_unit.py 文件源码 项目:ln2sql 作者: FerreroJeremy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def noTableFoundExceptionQueries():
    noTable = [
        {
            'input': 'Quel est le nom des reservation ?',  # No table name found in sentence!
            'database': DATABASE_PATH + 'ecole.sql',
            'language': LANG_PATH + 'french.csv'
        }
    ]
    for test in noTable:
        with pytest.raises(Exception):
            Ln2sql(test['database'], test['language']).get_query(test['input'])
test_unit.py 文件源码 项目:ln2sql 作者: FerreroJeremy 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def noLinkForJoinFoundExceptionQueries():   
    noLink = [
        {
            'input': "Quel est le professeur qui enseigne la matière SVT ?",
            # There is at least column `matiere` that is unreachable from table `PROFESSEUR`!
            'database': DATABASE_PATH + 'ecole.sql',
            'language': LANG_PATH + 'french.csv'
        },
        {
            'input': "compte les salle des élève",
            # There is at least column `salle` that is unreachable from table `ELEVE`!
            'database': DATABASE_PATH + 'ecole.sql',
            'language': LANG_PATH + 'french.csv'
        }
    ]
    for test in noLink:
        with pytest.raises(Exception):
            Ln2sql(test['database'], test['language']).get_query(test['input'])
test_worst_report.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_report_can_deal_with_single_anonymous_result_not_with_more():
    report = WorstReport()
    report.handle_results_collected(
        signal=None, sender=WithId('foo'),
        results=[9], context={})
    assert list(report.data.keys()) == ['foo']
    assert get_value_and_context(report, 'foo')[0] == 9
    with pytest.raises(TypeError) as excinfo:
        report.handle_results_collected(
            signal=None, sender=WithId('foo'),
            results=[1, 2], context={})
    assert 'Duplicate result name(s): \'\'' == str(excinfo.value)
    assert list(report.data.keys()) == ['foo']
    assert get_value_and_context(report, 'foo')[0] == 9
test_timelimit.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_can_limit_elapsed_seconds(seconds):
    with freeze_time('2016-09-22 15:57:01') as frozen_time:
        with pytest.raises(LimitViolationError) as excinfo:
            with TimeLimit(total=0):
                frozen_time.tick(timedelta(seconds=seconds))
    assert excinfo.value.base_error_msg == \
        'Too many ({}) total elapsed seconds (limit: 0)'.format(seconds)
test_core_infrastructure.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_signals_all_listeners_reports_first_failure(self, collector_cls):
        collector = collector_cls()

        class MyException(Exception):
            pass

        def get_mock_listener():
            m = Mock()
            m.return_value = None
            return m

        first_attached_handler = get_mock_listener()

        def second_handler_that_will_fail(*a, **kw):
            raise MyException('foo')
        last_attached_handler = get_mock_listener()

        listeners = [
            first_attached_handler, second_handler_that_will_fail,
            last_attached_handler]
        for l in listeners:
            results_collected.connect(l)
        try:
            with pytest.raises(MyException) as excinfo:
                with collector:
                    pass
            assert str(excinfo.value).endswith('foo')
            method_name_in_stacktrace = 'second_handler_that_will_fail'
            assert method_name_in_stacktrace in str(excinfo.value)
            assert first_attached_handler.called
            assert last_attached_handler.called
        finally:
            for l in listeners:
                results_collected.disconnect(l)
test_core_infrastructure.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_signal_handler_error_doesnt_hide_orig_error(self, collector_cls):
        collector = collector_cls()
        failing_signal_handler = Mock(side_effect=Exception('handler error'))
        results_collected.connect(failing_signal_handler)
        try:
            with pytest.raises(Exception) as excinfo:
                with collector:
                    raise Exception('actual code error')
            assert str(excinfo.value) == 'actual code error'
            assert failing_signal_handler.called
        finally:
            results_collected.disconnect(failing_signal_handler)
test_core_infrastructure.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_when_above_the_limit_there_is_an_error(
            self, limit_cls_and_name, limit, value):
        limit_cls, name = limit_cls_and_name
        assert limit < value, 'test pre-req'
        limit_obj = limit_cls(**{name: limit})
        result = NameValueResult(name, value)
        with pytest.raises(LimitViolationError) as excinfo:
            limit_obj.handle_results(
                results=[result], context=None)
        assert excinfo.value.limit_obj == limit_obj
        assert excinfo.value.result == result
        assert excinfo.value.actual == str(value)
        assert not excinfo.value.context
test_core_infrastructure.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_without_collector_id_cannot_be_settings_based(self, limit_cls):
        with pytest.raises(TypeError) as excinfo:
            limit_cls(settings_based=True)
        assert 'Can only be settings based when collector_id is provided.' == \
            str(excinfo.value)
test_template_limit_blocks.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_limits_can_be_used_as_template_tags(sample_template):
    with freeze_time('2016-11-02 10:33:00') as frozen_time:
        too_slow = SlowRendering(frozen_time=frozen_time, render_in_seconds=5)
        too_slow_context = template.context.Context(
            {'slow_rendering': too_slow})
        with pytest.raises(LimitViolationError) as excinfo:
            rendered_content = sample_template.render(too_slow_context)
            print(rendered_content)  # to get debug info
        assert excinfo.value.actual == '5.0'
        assert excinfo.value.limit == 3
test_core_constructing_a_registry.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_cannot_construct_from_list_of_duplicates(dotted_paths):
    with pytest.raises(DuplicateNamesError):
        UniqueNamedClassRegistry(dotted_paths)
test_wrapper.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_fails_when_class_has_no_such_method_as_to_wrap(wrapper):
    with pytest.raises(AttributeError):
        wrapper(
            cls_to_wrap=object, method_to_wrap_name='no_such_method',
            context_manager=None, is_cls_method=False)
    with pytest.raises(AttributeError):
        wrapper(
            cls_to_wrap=object, method_to_wrap_name='no_such_method',
            context_manager=None, is_cls_method=True)
test_querycount_limits.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_integration_test_with_db(db):
    with pytest.raises(LimitViolationError) as excinfo:
        with override_current_context() as ctx:
            with QueryBatchLimit(total=2) as limit:
                ctx.enter(key='some', value='context')
                list(Group.objects.all())
                Group.objects.update(name='bar')
                Group.objects.create(name='group')
    assert excinfo.value.context == {'some': ['context']}
    assert excinfo.value.limit_obj == limit
    assert excinfo.value.limit == 2
    assert excinfo.value.name == 'total'
test_integrates_with_template_rendering.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_has_support_for_number_of_queries_in_templates(db, settings):
    settings.PERFORMANCE_LIMITS = {
        'Template.render': {
            'queries': {
                'total': 0
            }
        }
    }
    template = loader.get_template('all-group-names.markdown')
    with pytest.raises(LimitViolationError) as excinfo:
        template.render(context={'groups': Group.objects.all()})

    assert excinfo.value.context == {'template': ['all-group-names.markdown']}
test_core_context.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_exiting_a_not_entered_context_is_an_error(self):
        ctx = Context()
        with pytest.raises(ValueError) as excinfo:
            ctx.exit(key='no such key', value='some value')
        assert str(excinfo.value) == \
            'cannot exit not entered context - key {!r} mismatch'.format(
                'no such key')

        ctx.enter(key='key', value='enter value')
        with pytest.raises(ValueError) as excinfo:
            ctx.exit(key='key', value='exit value')
        assert str(excinfo.value) == \
            'cannot exit not entered context - value mismatch ' \
            '(exit: {!r}, enter: {!r})'.format(
                'exit value', 'enter value')
test_core_service.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_required_config(config):
    with pytest.raises(TypeError) as excinfo:
        Fake.from_config(**config)
    message = excinfo.value.args[0]
    assert 'missing required config keys' in message
    assert 'from Fake' in message


问题


面经


文章

微信
公众号

扫码关注公众号