python类patch()的实例源码

test_jwt_apns_client.py 文件源码 项目:jwt_apns_client 作者: Mobelux 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_make_provider_token_calls_jwt_encode_with_correct_args(self):
        """
        Test that APNSConnection.make_provider_token() calls jwt.encode() with the correct
        arguments.  jwt.encode() returns different results each time even with the same data passed in
        so we cannot just test for the expected return value.
        """
        issued_at = time.time()
        connection = jwt_apns_client.APNSConnection(
            team_id='TEAMID',
            apns_key_id='KEYID',
            apns_key_path=self.KEY_FILE_PATH)

        with mock.patch('jwt_apns_client.utils.jwt.encode') as mock_encode:
            connection.make_provider_token(issued_at=issued_at)
            mock_encode.assert_called_with(
                {
                    'iss': connection.team_id,
                    'iat': issued_at
                },
                connection.secret,
                algorithm=connection.algorithm,
                headers=connection.get_token_headers())
test_checkers.py 文件源码 项目:django-heartbeat 作者: pbs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_redis_status(self, mock_redis):
        setattr(settings, 'CACHEOPS_REDIS', {'host': 'foo', 'port': 1337})
        mock_redis.StrictRedis.return_value.ping.return_value = 'PONG'
        mock_redis.StrictRedis.return_value.info.return_value = {
            'redis_version': '1.0.0'}
        status = redis_status.check(request=None)
        assert status['ping'] == 'PONG'
        assert status['version'] == '1.0.0'

    # @mock.patch('heartbeat.checkers.redis_status.redis')
    # def test_redis_connection_error(self, mock_redis):
    #     setattr(settings, 'CACHEOPS_REDIS', {'host': 'foo', 'port': 1337})
    #     mock_ping = mock_redis.StrictRedis.return_value.ping
    #     mock_ping.side_effect = ConnectionError('foo')
    #     status = redis.check(request=None)
    #     assert status['error'] == 'foo', status
test_checkers.py 文件源码 项目:django-heartbeat 作者: pbs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_db_version(self):
        import django
        if django.VERSION >= (1, 7):
            cursor = 'django.db.backends.utils.CursorWrapper'
        else:
            cursor = 'django.db.backends.util.CursorWrapper'
        with mock.patch(cursor) as mock_cursor:
            mock_cursor.return_value.fetchone.return_value = ['1.0.0']
            dbs = {
                'default': {
                    'ENGINE': 'django.db.backends.sqlite3',
                    'NAME': 'foo'
                }
            }
            setattr(settings, 'DATABASES', dbs)
            dbs = databases.check(request=None)
        assert len(dbs) == 1
        assert dbs[0]['version'] == '1.0.0'
test_runscheduler.py 文件源码 项目:tts-bug-bounty-dashboard 作者: 18F 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def call_runscheduler(loops=1, mock_call_command=None):
    ctx = {'sleep_count': 0}

    def fake_sleep(seconds):
        ctx['sleep_count'] += 1
        if ctx['sleep_count'] > loops:
            raise KeyboardInterrupt()

    if mock_call_command is None:
        mock_call_command = mock.MagicMock()

    with mock.patch.object(runscheduler, 'call_command', mock_call_command):
        with mock.patch.object(runscheduler, 'logger') as mock_logger:
            with mock.patch('time.sleep', fake_sleep):
                with pytest.raises(KeyboardInterrupt):
                    call_command('runscheduler')
            return mock_call_command, mock_logger
write_tensorboard_test.py 文件源码 项目:cxflow-tensorflow 作者: Cognexa 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_args(self):
        """Test WriteTensorBoard argument handling and ``SummaryWriter`` creation."""
        model = self.get_model()

        with self.assertRaises(AssertionError):
            _ = WriteTensorBoard(output_dir=self.tmpdir, model=42)
        with self.assertRaises(AssertionError):
            _ = WriteTensorBoard(output_dir=self.tmpdir, model=model, on_missing_variable='not-recognized')
        with self.assertRaises(AssertionError):
            _ = WriteTensorBoard(output_dir=self.tmpdir, model=model, on_unknown_type='not-recognized')

        with mock.patch('tensorflow.summary.FileWriter', autospec=True) as mocked_writer:
            _ = WriteTensorBoard(output_dir=self.tmpdir, model=model, flush_secs=42, visualize_graph=True)
            mocked_writer.assert_called_with(logdir=self.tmpdir, flush_secs=42, graph=model.graph)

            _ = WriteTensorBoard(output_dir=self.tmpdir, model=model)
            mocked_writer.assert_called_with(logdir=self.tmpdir, flush_secs=10, graph=None)
write_tensorboard_test.py 文件源码 项目:cxflow-tensorflow 作者: Cognexa 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_missing_variable(self):
        """Test if ``WriteTensorBoard`` handles missing image variables as expected."""
        bad_epoch_data = {'valid': {}}

        with mock.patch.dict('sys.modules', **{'cv2': cv2_mock}):
            # test ignore
            hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
                                    on_missing_variable='ignore')
            with LogCapture(level=logging.INFO) as log_capture:
                hook.after_epoch(42, bad_epoch_data)
            log_capture.check()

            # test warn
            warn_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
                                         on_missing_variable='warn')
            with LogCapture(level=logging.INFO) as log_capture2:
                warn_hook.after_epoch(42, bad_epoch_data)
            log_capture2.check(('root', 'WARNING', '`plot` not found in epoch data.'))

            # test error
            raise_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
                                          on_missing_variable='error')
            with self.assertRaises(KeyError):
                raise_hook.after_epoch(42, bad_epoch_data)
test_latexipy.py 文件源码 项目:latexipy 作者: masasin 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_params_dict_after_font_size(self):
        with patch('matplotlib.rcParams.update') as mock_update, \
                patch('matplotlib.pyplot.switch_backend') as mock_switch:
            old_params = dict(plt.rcParams)
            with lp.temp_params(font_size=10, params_dict={
                    'axes.labelsize': 12,
                    'legend.fontsize': 12,
                    }):
                called_with = mock_update.call_args[0][0]
                assert called_with['font.size'] == 10
                assert called_with['axes.labelsize'] == 12
                assert called_with['axes.titlesize'] == 10
                assert called_with['legend.fontsize'] == 12
                assert called_with['xtick.labelsize'] == 10
                assert called_with['ytick.labelsize'] == 10

            mock_update.assert_called_with(old_params)
test_latexipy.py 文件源码 项目:latexipy 作者: masasin 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_parameters_passed_custom_kwargs(self):
        params = inspect.signature(lp.figure).parameters

        with patch('matplotlib.figure.Figure.set_size_inches'), \
                patch('latexipy._latexipy.save_figure') as mock_save_figure:
            with lp.figure('filename', directory='directory', exts='exts',
                           mkdir='mkdir'):
                pass

            mock_save_figure.assert_called_once_with(
                filename='filename',
                directory='directory',
                exts='exts',
                mkdir='mkdir',
                from_context_manager=True,
            )
utils.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        # disable logging while testing
        logging.disable(logging.CRITICAL)

        self.patched = {}
        if hasattr(self, 'patch_these'):
            for patch_this in self.patch_these:
                namespace = patch_this[0] if isinstance(patch_this, (list, set)) else patch_this

                patcher = mock.patch(namespace)
                mocked = patcher.start()
                mocked.reset_mock()
                self.patched[namespace] = mocked

                if isinstance(patch_this, (list, set)) and len(patch_this) > 0:
                    retval = patch_this[1]
                    if callable(retval):
                        retval = retval()
                    mocked.return_value = retval
test_aws_sns.py 文件源码 项目:xavier 作者: bepress 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_publish_sns_event():

    TEST_ARN = 'arn:abc'
    TEST_MESSAGE = "message"
    with patch('xavier.aws.sns.send_sns_message') as mock_send_sns_message:
        mock_send_sns_message.return_value = {"MessageId": "1234"}

        message_publisher = publish_sns_message(TEST_ARN)

        message_publisher(TEST_MESSAGE)

        mock_send_sns_message.assert_called_once_with(TopicArn=TEST_ARN, Message=TEST_MESSAGE)

    with patch('xavier.aws.sns.send_sns_message') as mock_send_sns_message:
        mock_send_sns_message.return_value = None

        message_publisher = publish_sns_message(TEST_ARN)
        with pytest.raises(Exception):
            message_publisher(TEST_MESSAGE)

        mock_send_sns_message.assert_called_once_with(TopicArn=TEST_ARN, Message=TEST_MESSAGE)
test_create_jobs.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_yml_invalid(mocker, SETTINGS):
    GITHUB = mocker.patch('jenkins_epo.extensions.core.GITHUB')
    from jenkins_epo.extensions.core import YamlExtension

    ext = YamlExtension('ext', Mock())
    ext.current = ext.bot.current
    ext.current.yaml = {}
    ext.current.errors = []

    GITHUB.fetch_file_contents = CoroutineMock(return_value="{INVALID")

    head = ext.current.head
    head.repository.url = 'https://github.com/owner/repo.git'
    head.repository.jobs = []

    yield from ext.run()

    assert GITHUB.fetch_file_contents.mock_calls
    assert ext.current.errors
test_create_jobs.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_yml_found(mocker, SETTINGS):
    GITHUB = mocker.patch('jenkins_epo.extensions.core.GITHUB')
    Job = mocker.patch('jenkins_epo.extensions.core.Job')
    from jenkins_epo.extensions.core import YamlExtension

    Job.jobs_filter = ['*', '-skip']
    SETTINGS.update(YamlExtension.SETTINGS)

    ext = YamlExtension('ext', Mock())
    ext.current = ext.bot.current
    ext.current.yaml = {'job': dict()}

    GITHUB.fetch_file_contents = CoroutineMock(
        return_value="job: command\nskip: command",
    )

    head = ext.current.head
    head.repository.url = 'https://github.com/owner/repo.git'
    head.repository.jobs = {}

    yield from ext.run()

    assert GITHUB.fetch_file_contents.mock_calls
    assert 'job' in ext.current.job_specs
    assert 'skip' not in ext.current.job_specs
test_commit_log.py 文件源码 项目:gitnet 作者: networks-lab 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_default(self):
        """Does the default method print the proper information?"""
        with patch('sys.stdout', new=StringIO()) as fake_out:
            self.my_log.describe(mode="default")
            output = fake_out.getvalue()
            self.assertIn("Log containing 4 records from local git created at ", output)
            self.assertIn("\nOrigin:", output)
            self.assertNotIn("Filters:", output)
            self.assertIn("\nNumber of authors: 4\n", output)
            self.assertIn("\nNumber of files: 7\n", output)
            self.assertIn("\nMost common email address domains:", output)
            self.assertIn("\n\t @gmail.com [4 users]\n", output)
            self.assertIn("\nDate range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n", output)
            self.assertIn("\nChange distribution summary:\n", output)
            self.assertIn("\n\t Files changed: Mean = 2.75, SD = 0.829\n", output)
            self.assertIn("\n\t Line insertions: Mean = 2.75, SD = 0.829\n", output)
            self.assertIn("\n\t Line deletions: Mean = nan, SD = nan\n", output)
            self.assertIn("\nNumber of merges: 0\n", output)
            self.assertIn("\nNumber of parsing errors: 0\n", output)
test_commit_log.py 文件源码 项目:gitnet 作者: networks-lab 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_not_default(self):
        """ Does a non-default method print the proper information?
            Note: At this point, default is the only setting so they end up being the same."""
        with patch('sys.stdout', new=StringIO()) as fake_out:
            self.my_log.describe(mode="not default")
            output = fake_out.getvalue()
            self.assertIn("Log containing 4 records from local git created at ", output)
            self.assertIn("\nOrigin:", output)
            self.assertNotIn("Filters:", output)
            self.assertIn("\nNumber of authors: 4\n", output)
            self.assertIn("\nNumber of files: 7\n", output)
            self.assertIn("\nMost common email address domains:", output)
            self.assertIn("\n\t @gmail.com [4 users]\n", output)
            self.assertIn("\nDate range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n", output)
            self.assertIn("\nChange distribution summary:\n", output)
            self.assertIn("\n\t Files changed: Mean = 2.75, SD = 0.829\n", output)
            self.assertIn("\n\t Line insertions: Mean = 2.75, SD = 0.829\n", output)
            self.assertIn("\n\t Line deletions: Mean = nan, SD = nan\n", output)
            self.assertIn("\nNumber of merges: 0\n", output)
            self.assertIn("\nNumber of parsing errors: 0\n", output)
test_commit_log.py 文件源码 项目:gitnet 作者: networks-lab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_whole(self):
        """Is the entire output as expected?"""
        with patch('sys.stdout', new=StringIO()) as fake_out:
            self.my_log.describe()
            out = fake_out.getvalue()

            self.assertRegex(out, "Log containing 4 records from local git created at ....-..-.. ..:..:..\.......\.\n"
                                  "Origin:  .*\n"
                                  "Number of authors: 4\n"
                                  "Number of files: 7\n"
                                  "Most common email address domains:\n"
                                  "\t @gmail.com \[4 users\]\n"
                                  "Date range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n"
                                  "Change distribution summary:\n"
                                  "\t Files changed: Mean = 2.75, SD = 0.829\n"
                                  "\t Line insertions: Mean = 2.75, SD = 0.829\n"
                                  "\t Line deletions: Mean = nan, SD = nan\n"
                                  "Number of merges: 0\n"
                                  "Number of parsing errors: 0\n")
test_commit_log.py 文件源码 项目:gitnet 作者: networks-lab 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_exclude(self):
        """Does exclude prevent statistics from being printed?"""
        with patch('sys.stdout', new=StringIO()) as fake_out:
            self.my_log.describe(exclude=['merges', 'errors', 'files', 'summary', 'changes', 'path', 'filters',
                                          'authors', 'dates', 'emails'])
            output = fake_out.getvalue()
            self.assertNotIn("Log containing 4 records from local git created at ", output)
            self.assertNotIn("\nOrigin:", output)
            self.assertNotIn("Filters:", output)
            self.assertNotIn("\nNumber of authors: 4\n", output)
            self.assertNotIn("\nNumber of files: 7\n", output)
            self.assertNotIn("\nMost common email address domains:", output)
            self.assertNotIn("\n\t @gmail.com [4 users]\n", output)
            self.assertNotIn("\nDate range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n", output)
            self.assertNotIn("\nChange distribution summary:\n", output)
            self.assertNotIn("\n\t Files changed: Mean = 2.75, SD = 0.829\n", output)
            self.assertNotIn("\n\t Line insertions: Mean = 2.75, SD = 0.829\n", output)
            self.assertNotIn("\n\t Line deletions: Mean = nan, SD = nan\n", output)
            self.assertNotIn("\nNumber of merges: 0\n", output)
            self.assertNotIn("\nNumber of parsing errors: 0\n", output)
            self.assertEqual(output, "")
test_zone_records.py 文件源码 项目:zinc 作者: PressLabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_update_record_values(api_client, zone, boto_client):
    G(m.Zone)

    record_data = {
        'values': ['1.2.3.4']
    }
    response = api_client.patch(
        '/zones/{}/records/{}'.format(zone.id, hash_test_record(zone)),
        data=record_data)

    assert response.data == {
        **get_test_record(zone),
        **record_data
    }
    assert aws_strip_ns_and_soa(
        boto_client.list_resource_record_sets(HostedZoneId=zone.r53_zone.id), zone.root
    ) == sorted([
        record_data_to_aws({
            **get_test_record(zone),
            **record_data
        }, zone.root)
    ], key=aws_sort_key)
test_zone_records.py 文件源码 项目:zinc 作者: PressLabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_update_record_ttl(api_client, zone, boto_client):
    G(m.Zone)

    record_data = {
        'ttl': 580
    }
    response = api_client.patch(
        '/zones/{}/records/{}'.format(zone.id, hash_test_record(zone)),
        data=record_data
    )

    assert response.data == {
        **get_test_record(zone),
        **record_data
    }
    assert aws_strip_ns_and_soa(
        boto_client.list_resource_record_sets(HostedZoneId=zone.r53_zone.id), zone.root
    ) == sorted([
        record_data_to_aws({
            **get_test_record(zone),
            **record_data
        }, zone.root)
    ], key=aws_sort_key)
test_consumers.py 文件源码 项目:sauna 作者: NicolasLM 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_get_checks_as_dict(self):
        foo = ServiceCheck(timestamp=42, hostname='server1',
                           name='foo', status=0, output='foo out')
        bar = ServiceCheck(timestamp=42, hostname='server1',
                           name='bar', status=1, output='bar out')
        with mock.patch.dict('sauna.check_results', foo=foo, bar=bar):
            self.assertDictEqual(base.AsyncConsumer.get_checks_as_dict(), {
                'foo': {
                    'status': 'OK',
                    'code': 0,
                    'timestamp': 42,
                    'output': 'foo out'
                },
                'bar': {
                    'status': 'WARNING',
                    'code': 1,
                    'timestamp': 42,
                    'output': 'bar out'
                }
            })
test_pa_start_django_webapp_with_virtualenv.py 文件源码 项目:helper_scripts 作者: pythonanywhere 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_calls_all_stuff_in_right_order(self):
        with patch('scripts.pa_start_django_webapp_with_virtualenv.DjangoProject') as mock_DjangoProject:
            main('www.domain.com', 'django.version', 'python.version', nuke='nuke option')
        assert mock_DjangoProject.call_args == call('www.domain.com')
        assert mock_DjangoProject.return_value.method_calls == [
            call.sanity_checks(nuke='nuke option'),
            call.create_virtualenv('python.version', 'django.version', nuke='nuke option'),
            call.run_startproject(nuke='nuke option'),
            call.find_django_files(),
            call.update_settings_file(),
            call.run_collectstatic(),
            call.create_webapp(nuke='nuke option'),
            call.add_static_file_mappings(),
            call.update_wsgi_file(),
            call.webapp.reload(),
        ]
test_pa_start_django_webapp_with_virtualenv.py 文件源码 项目:helper_scripts 作者: pythonanywhere 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_actually_creates_django_project_in_virtualenv_with_hacked_settings_and_static_files(
        self, fake_home, virtualenvs_folder, api_token
    ):

        with patch('scripts.pa_start_django_webapp_with_virtualenv.DjangoProject.update_wsgi_file'):
            with patch('pythonanywhere.api.call_api'):
                main('mydomain.com', '1.9.2', '2.7', nuke=False)

        django_version = subprocess.check_output([
            virtualenvs_folder / 'mydomain.com/bin/python',
            '-c'
            'import django; print(django.get_version())'
        ]).decode().strip()
        assert django_version == '1.9.2'

        with open(fake_home / 'mydomain.com/mysite/settings.py') as f:
            lines = f.read().split('\n')
        assert "MEDIA_ROOT = os.path.join(BASE_DIR, 'media')" in lines
        assert "ALLOWED_HOSTS = ['mydomain.com']" in lines

        assert 'base.css' in os.listdir(fake_home / 'mydomain.com/static/admin/css')
test_cli.py 文件源码 项目:mistletoe 作者: miyuchina 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_interactive(self, mock_print, mock_markdown,
                         mock_print_heading, mock_import_readline):
        def MockInputFactory(return_values):
            _counter = -1
            def mock_input(prompt=''):
                nonlocal _counter
                _counter += 1
                if _counter < len(return_values):
                    return return_values[_counter]
                elif _counter == len(return_values):
                    raise EOFError
                else:
                    raise KeyboardInterrupt
            return mock_input

        return_values = ['foo', 'bar', 'baz']
        with patch('builtins.input', MockInputFactory(return_values)):
            cli.interactive(sentinel.RendererCls)

        mock_import_readline.assert_called_with()
        mock_print_heading.assert_called_with(sentinel.RendererCls)
        mock_markdown.assert_called_with(['foo\n', 'bar\n', 'baz\n'],
                sentinel.RendererCls)
        calls = [call('\nrendered text', end=''), call('\nExiting.')]
        mock_print.assert_has_calls(calls)
tests.py 文件源码 项目:django-webpack 作者: csinchok 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_munge_config(self):

        def mock_find(path):
            return os.path.join('/some/path/static/', path)

        with mock.patch('webpack.conf.find', new=mock_find) as find:

            munged = get_munged_config(WEBPACK_CONFIG)

        expected_output = WEBPACK_CONFIG_OUTPUT.format(
            url=settings.STATIC_URL,
            root=settings.STATIC_ROOT
        )
        self.assertEqual(
            munged,
            expected_output
        )
release_test.py 文件源码 项目:release-script 作者: mitodl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_init_working_dir():
    """init_working_dir should initialize a valid git repo, and clean up after itself"""
    repo_url = "https://github.com/mitodl/release-script.git"
    access_token = 'fake_access_token'
    with patch('release.check_call', autospec=True) as check_call_mock, init_working_dir(
        access_token, repo_url,
    ) as other_directory:
        assert os.path.exists(other_directory)
    assert not os.path.exists(other_directory)

    calls = check_call_mock.call_args_list
    assert [call[0][0] for call in calls] == [
        ['git', 'init'],
        ['git', 'remote', 'add', 'origin', url_with_access_token(access_token, repo_url)],
        ['git', 'fetch'],
        ['git', 'checkout', '-t', 'origin/master'],
    ]
conftest.py 文件源码 项目:antenna 作者: mozilla-services 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def randommock():
    """Returns a contextmanager that mocks random.random() at a specific value

    Usage::

        def test_something(randommock):
            with randommock(0.55):
                # test stuff...

    """
    @contextlib.contextmanager
    def _randommock(value):
        with mock.patch('random.random') as mock_random:
            mock_random.return_value = value
            yield

    return _randommock
test_base_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_set_exc_handler_broken(self):
        def run_loop():
            def zero_error():
                1/0
            self.loop.call_soon(zero_error)
            self.loop._run_once()

        def handler(loop, context):
            raise AttributeError('spam')

        self.loop._process_events = mock.Mock()

        self.loop.set_exception_handler(handler)

        with mock.patch('asyncio.base_events.logger') as log:
            run_loop()
            log.error.assert_called_with(
                test_utils.MockPattern(
                    'Unhandled error in exception handler'),
                exc_info=(AttributeError, MOCK_ANY, MOCK_ANY))
test_base_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_create_connection_timeout(self, m_socket):
        # Ensure that the socket is closed on timeout
        sock = mock.Mock()
        m_socket.socket.return_value = sock

        def getaddrinfo(*args, **kw):
            fut = asyncio.Future(loop=self.loop)
            addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '',
                    ('127.0.0.1', 80))
            fut.set_result([addr])
            return fut
        self.loop.getaddrinfo = getaddrinfo

        with mock.patch.object(self.loop, 'sock_connect',
                               side_effect=asyncio.TimeoutError):
            coro = self.loop.create_connection(MyProto, '127.0.0.1', 80)
            with self.assertRaises(asyncio.TimeoutError):
                self.loop.run_until_complete(coro)
            self.assertTrue(sock.close.called)
test_unix_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def setUp(self):
        self.loop = self.new_test_loop()
        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
        self.pipe = mock.Mock(spec_set=io.RawIOBase)
        self.pipe.fileno.return_value = 5

        blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
        blocking_patcher.start()
        self.addCleanup(blocking_patcher.stop)

        fstat_patcher = mock.patch('os.fstat')
        m_fstat = fstat_patcher.start()
        st = mock.Mock()
        st.st_mode = stat.S_IFIFO
        m_fstat.return_value = st
        self.addCleanup(fstat_patcher.stop)
test_unix_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setUp(self):
        self.loop = self.new_test_loop()
        self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
        self.pipe = mock.Mock(spec_set=io.RawIOBase)
        self.pipe.fileno.return_value = 5

        blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
        blocking_patcher.start()
        self.addCleanup(blocking_patcher.stop)

        fstat_patcher = mock.patch('os.fstat')
        m_fstat = fstat_patcher.start()
        st = mock.Mock()
        st.st_mode = stat.S_IFSOCK
        m_fstat.return_value = st
        self.addCleanup(fstat_patcher.stop)
test_unix_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def waitpid_mocks(func):
        def wrapped_func(self):
            def patch(target, wrapper):
                return mock.patch(target, wraps=wrapper,
                                  new_callable=mock.Mock)

            with patch('os.WTERMSIG', self.WTERMSIG) as m_WTERMSIG, \
                 patch('os.WEXITSTATUS', self.WEXITSTATUS) as m_WEXITSTATUS, \
                 patch('os.WIFSIGNALED', self.WIFSIGNALED) as m_WIFSIGNALED, \
                 patch('os.WIFEXITED', self.WIFEXITED) as m_WIFEXITED, \
                 patch('os.waitpid', self.waitpid) as m_waitpid:
                func(self, WaitPidMocks(m_waitpid,
                                        m_WIFEXITED, m_WIFSIGNALED,
                                        m_WEXITSTATUS, m_WTERMSIG,
                                        ))
        return wrapped_func


问题


面经


文章

微信
公众号

扫码关注公众号