python类object()的实例源码

tests.py 文件源码 项目:openbare 作者: openbare 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_checkout_exception(self):
        self.c.login(username=self.user.username, password='str0ngpa$$w0rd')

        # Test check out lendable raises exception
        with patch.object(Lendable,
                          'checkout',
                          side_effect=Exception('Checkout Failed!')):
            response = self.c.get(reverse('library:checkout',
                                          args=['lendable']),
                                  follow=True)

        # Confirm error message displayed
        message = list(response.context['messages'])[0].message
        self.assertEqual(message, 'Checkout Failed!')

        # Confirm lendable not created
        self.assertEqual(Lendable.all_types.count(), 0)
tests.py 文件源码 项目:openbare 作者: openbare 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_set_username(self):
        """Test set username method."""
        # Test unicode converts to ascii J?hn to John
        with patch.object(AmazonAccountUtils,
                          'iam_user_exists',
                          return_value=False):
            self.aws_account._set_username()
            self.assertEqual(self.aws_account.username, 'John')

        # Test random username generated if iam user exists
        with patch.object(AmazonAccountUtils,
                          'iam_user_exists',
                          return_value=True):
            self.aws_account._set_username()
            self.assertNotEqual(self.aws_account.username, 'John')
            self.assertEqual(len(self.aws_account.username), 20)

        # Test random username is generated when regex invalid
        # after conversion. ??? converts to ???.
        self.user.username = '???'
        self.aws_account._set_username()
        self.assertNotEqual(self.aws_account.username, '???')
        self.assertEqual(len(self.aws_account.username), 20)
test_controllers.py 文件源码 项目:bitstore 作者: datahq 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setUp(self):

        # Cleanup
        self.addCleanup(patch.stopall)

        # Request patch
        self.request = patch.object(module, 'request').start()
        # Various patches
        self.services = patch.object(module, 'services').start()

        self.original_config = dict(module.config)
        module.config['STORAGE_BUCKET_NAME'] = self.bucket = 'buckbuck'
        module.config['STORAGE_ACCESS_KEY_ID'] = ''
        module.config['STORAGE_SECRET_ACCESS_KEY'] = ''
        module.config['ACCESS_KEY_EXPIRES_IN'] = ''
        module.config['STORAGE_PATH_PATTERN'] = '{owner}/{dataset}/{path}'
        self.s3 = boto3.client('s3')
common.py 文件源码 项目:scarlett_os 作者: bossjones 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def async_test_scarlett_os(loop):
    """Return a ScarlettOS object pointing at test config dir."""
    # loop._thread_ident = threading.get_ident()

    ss = s.ScarlettSystem(loop)

    ss.config.location_name = 'test scarlett'
    ss.config.config_dir = get_test_config_dir()
    ss.config.latitude = 32.87336
    ss.config.longitude = -117.22743
    ss.config.elevation = 0
    ss.config.time_zone = date_utility.get_time_zone('US/Pacific')
    ss.config.units = METRIC_SYSTEM
    ss.config.skip_pip = True

    # if 'custom_automations.test' not in loader.AVAILABLE_COMPONENTS:
    #     yield from loop.run_in_executor(None, loader.prepare, ss)

    ss.state = s.CoreState.running

    return ss
test_stale_issues.py 文件源码 项目:astropy-bot 作者: astropy 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_close_disabled(self):

        # Second case: time is beyond close deadline, and there is no comment yet
        # but the global option to allow closing has not been enabled. Since there
        # is no comment, the warning gets posted (rather than the 'epilogue')

        self.get_issues.return_value = ['123']
        self.get_label_added_date.return_value = now() - 34443
        self.find_comments.return_value = []

        with app.app_context():
            with patch.object(app, 'stale_issue_close', False):
                process_issues('repo', 'installation')

        assert self.submit_comment.call_count == 1
        expected = ISSUE_CLOSE_WARNING.format(pasttime='9 hours ago', futuretime='5 hours')
        self.submit_comment.assert_called_with(expected)
        assert self.close.call_count == 0
test_stale_pull_requests.py 文件源码 项目:astropy-bot 作者: astropy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setup_method(self, method):

        self.patch_repo_config = patch.object(RepoHandler, 'get_config_value')
        self.patch_open_pull_requests = patch.object(RepoHandler, 'open_pull_requests')
        self.patch_labels = patch.object(PullRequestHandler, 'labels', new_callable=PropertyMock)
        self.patch_last_commit_date = patch.object(PullRequestHandler, 'last_commit_date', new_callable=PropertyMock)
        self.patch_find_comments = patch.object(PullRequestHandler, 'find_comments')
        self.patch_submit_comment = patch.object(PullRequestHandler, 'submit_comment')
        self.patch_close = patch.object(PullRequestHandler, 'close')

        self.autoclose_stale = self.patch_repo_config.start()
        self.open_pull_requests = self.patch_open_pull_requests.start()
        self.labels = self.patch_labels.start()
        self.last_commit_date = self.patch_last_commit_date.start()
        self.find_comments = self.patch_find_comments.start()
        self.submit_comment = self.patch_submit_comment.start()
        self.close = self.patch_close.start()
test_stale_pull_requests.py 文件源码 项目:astropy-bot 作者: astropy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_close_disabled(self):

        # Time is beyond close deadline, and there is no comment yet but the
        # global option to allow closing has not been enabled. Since there is no
        # comment, the warning gets posted (rather than the 'epilogue')

        self.open_pull_requests.return_value = ['123']
        self.last_commit_date.return_value = now() - 241
        self.find_comments.return_value = []

        with app.app_context():
            with patch.object(app, 'stale_pull_requests_close', False):
                process_pull_requests('repo', 'installation')

        assert self.submit_comment.call_count == 1
        expected = PULL_REQUESTS_CLOSE_WARNING.format(pasttime='4 minutes', futuretime='20 seconds')
        self.submit_comment.assert_called_with(expected)
        assert self.close.call_count == 0
test_requests.py 文件源码 项目:peony-twitter 作者: odrling 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_request_error_handler(api_path, _error_handler=True):
    client = api_path._client
    with patch.object(client, 'request', side_effect=dummy) as client_request:
        with patch.object(client, 'error_handler',
                          side_effect=dummy_error_handler) as error_handler:
            client.loop.run_until_complete(
                requests.Request(api_path, 'get',
                                 _error_handling=_error_handler,
                                 test=1, _test=2)
            )
            assert client_request.called_with(method='get',
                                              url=url,
                                              skip_params=False,
                                              test=2,
                                              params={'test': 1})
            assert error_handler.called is _error_handler
test_utils.py 文件源码 项目:peony-twitter 作者: odrling 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_error_handler_service_unavailable():
    global tries
    tries = 4

    async def service_unavailable(**kwargs):
        global tries
        tries -= 1

        if tries > 0:
            response = MockResponse(status=503)
            raise await exceptions.throw(response)
        else:
            return MockResponse()

    with pytest.raises(exceptions.ServiceUnavailable):
        with patch.object(asyncio, 'sleep', side_effect=dummy) as sleep:
            await utils.error_handler(service_unavailable)()
            assert sleep.called

    await utils.error_handler(service_unavailable)()
test_client.py 文件源码 项目:peony-twitter 作者: odrling 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_streaming_apis(dummy_client):
    with patch.object(dummy_client, 'request', side_effect=dummy) as request:
        await dummy_client.api.test.get()
        assert request.called

    with patch.object(dummy_client, 'stream_request') as request:
        dummy_client.stream.test.get()
        assert request.called

    client = BasePeonyClient("", "", streaming_apis={'api'})
    with patch.object(client, 'stream_request') as request:
        client.api.test.get()
        assert request.called

    with patch.object(client, 'request', side_effect=dummy) as request:
        await client.stream.test.get()
        assert request.called
test_client.py 文件源码 项目:peony-twitter 作者: odrling 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_request_proxy(dummy_client):
    class RaiseProxy:

        def __init__(self, *args, proxy=None, **kwargs):
            raise RuntimeError(proxy)

    async with aiohttp.ClientSession() as session:
        with patch.object(session, 'request', side_effect=RaiseProxy):
            try:
                await dummy_client.request(method='get',
                                           url="http://hello.com",
                                           proxy="http://some.proxy.com",
                                           session=session,
                                           future=asyncio.Future())
            except RuntimeError as e:
                assert str(e) == "http://some.proxy.com"
test_client.py 文件源码 项目:peony-twitter 作者: odrling 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_chunked_upload_fail(dummy_peony_client, medias):
    media = medias['video']
    media_data = await media.download()

    chunk_size = 1024**2

    dummy_request = DummyRequest(dummy_peony_client, media, chunk_size, True)

    with patch.object(dummy_peony_client, 'request',
                      side_effect=dummy_request):
        with patch.object(asyncio, 'sleep', side_effect=dummy) as sleep:
            with pytest.raises(peony.exceptions.MediaProcessingError):
                await dummy_peony_client.upload_media(
                    media_data, chunk_size=chunk_size, chunked=True
                )
                sleep.assert_called_with(5)
test_client.py 文件源码 项目:peony-twitter 作者: odrling 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_upload_from_url(dummy_peony_client, medias, media_request, url):
    async def dummy_get(get_url):
        assert get_url == url
        return media_request

    async def dummy_request(url, method, future, data=None, skip_params=None):
        assert url == dummy_peony_client.upload.media.upload.url()
        assert method.lower() == 'post'
        assert data['media'] == media_request.content
        assert skip_params
        future.set_result(None)

    with patch.object(dummy_peony_client, '_session') as session:
        session.get = dummy_get
        with patch.object(dummy_peony_client, 'request',
                          side_effect=dummy_request):
            await dummy_peony_client.upload_media(url)
test_stream.py 文件源码 项目:peony-twitter 作者: odrling 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_stream_reconnection_disconnection(stream):
    async def dummy(*args, **kwargs):
        pass

    turn = -1

    with patch.object(stream, '_connect', side_effect=response_disconnection):
        with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy):
            async for data in stream:
                assert stream._state == DISCONNECTION
                turn += 1

                if turn == 0:
                    assert data == {'connected': True}
                elif turn % 2 == 1:
                    timeout = DISCONNECTION_TIMEOUT * (turn + 1) / 2

                    if timeout > MAX_DISCONNECTION_TIMEOUT:
                        actual = data['reconnecting_in']
                        assert actual == MAX_DISCONNECTION_TIMEOUT
                        break

                    assert data == {'reconnecting_in': timeout, 'error': None}
                else:
                    assert data == {'stream_restart': True}
test_stream.py 文件源码 项目:peony-twitter 作者: odrling 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_stream_reconnection_reconnect(stream):
    async def dummy(*args, **kwargs):
        pass

    turn = -1

    with patch.object(stream, '_connect', side_effect=response_reconnection):
        with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy):
            async for data in stream:
                assert stream._state == RECONNECTION
                turn += 1

                if turn == 0:
                    assert data == {'connected': True}
                elif turn % 2 == 1:
                    timeout = RECONNECTION_TIMEOUT * 2**(turn // 2)

                    if timeout > MAX_RECONNECTION_TIMEOUT:
                        actual = data['reconnecting_in']
                        assert actual == MAX_RECONNECTION_TIMEOUT
                        break

                    assert data == {'reconnecting_in': timeout, 'error': None}
                else:
                    assert data == {'stream_restart': True}
test_stream.py 文件源码 项目:peony-twitter 作者: odrling 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_stream_reconnection_enhance_your_calm(stream):
    async def dummy(*args, **kwargs):
        pass

    turn = -1

    with patch.object(stream, '_connect', side_effect=response_calm):
        with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy):
            async for data in stream:
                assert stream._state == ENHANCE_YOUR_CALM
                turn += 1

                if turn >= 100:
                    break

                if turn == 0:
                    assert data == {'connected': True}
                elif turn % 2 == 1:
                    timeout = ENHANCE_YOUR_CALM_TIMEOUT * 2**(turn // 2)
                    assert data == {'reconnecting_in': timeout, 'error': None}
                else:
                    assert data == {'stream_restart': True}
test_stream.py 文件源码 项目:peony-twitter 作者: odrling 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_stream_cancel(event_loop):
    async def cancel(task):
        await asyncio.sleep(0.001)
        task.cancel()

    async def test_stream_iterations(stream):
        while True:
            await test_stream_iteration(stream)

    with aiohttp.ClientSession(loop=event_loop) as session:
        client = peony.client.BasePeonyClient("", "", session=session)
        context = peony.stream.StreamResponse(method='GET',
                                              url="http://whatever.com",
                                              client=client)

        with context as stream:
            with patch.object(stream, '_connect',
                              side_effect=stream_content):
                coro = test_stream_iterations(stream)
                task = event_loop.create_task(coro)
                cancel_task = event_loop.create_task(cancel(task))

                with aiohttp.Timeout(1):
                    await asyncio.wait([task, cancel_task])
test_oauth.py 文件源码 项目:peony-twitter 作者: odrling 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def post(self, _data=None, _headers=None, **kwargs):
        self.count += 1

        if _data is not None:
            with patch.object(oauth.aiohttp.payload, 'BytesPayload',
                              side_effect=dummy_func):
                assert _data._gen_form_urlencoded() == b"access_token=abc"

        if _headers is not None:
            key = "1234567890:0987654321"
            auth = base64.b64encode(key.encode('utf-8')).decode('utf-8')
            assert _headers['Authorization'] == 'Basic ' + auth

        # This is needed to run `test_oauth2_concurrent_refreshes`
        # without that, refresh tasks would be executed sequentially
        # In a sense it is a simulation of a request being fetched
        await asyncio.sleep(0.001)
        return {'access_token': "abc"}
serializers_test.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_full_program_user_serialization_with__no_passed_course(self):
        """
        Tests that full ProgramEnrollment serialization works as expected when user
        has no passed courses.
        """
        with patch.object(MMTrack, 'count_courses_passed', return_value=0):
            self.profile.refresh_from_db()
            program = self.program_enrollment.program
            expected_result = {
                'id': program.id,
                'enrollments': self.serialized_enrollments,
                'grade_average': 75,
                'is_learner': True,
                'num_courses_passed': 0,
                'total_courses': 1
            }
            serialized_enrollments = UserProgramSearchSerializer.serialize(self.program_enrollment)
            assert serialized_enrollments == expected_result
test_bindings.py 文件源码 项目:channels-api 作者: linuxlewis 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_create(self):
        """Integration that asserts routing a message to the create channel.

        Asserts response is correct and an object is created.
        """
        json_content = self._send_and_consume("websocket.receive", self._build_message("testmodel", {
            'action': 'create',
            'pk': None,
            'request_id': 'client-request-id',
            'data': {'name': 'some-thing'}
        }))
        # it should create an object
        self.assertEqual(TestModel.objects.count(), 1)

        expected = {
            'action': 'create',
            'data': TestModelSerializer(TestModel.objects.first()).data,
            'errors': [],
            'request_id': 'client-request-id',
            'response_status': 201
        }
        # it should respond with the serializer.data
        self.assertEqual(json_content['payload'], expected)
test_bindings.py 文件源码 项目:channels-api 作者: linuxlewis 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_create_failure(self):
        """Integration that asserts error handling of a message to the create channel."""

        json_content = self._send_and_consume('websocket.receive', self._build_message("testmodel", {
            'action': 'create',
            'pk': None,
            'request_id': 'client-request-id',
            'data': {},
        }))
        # it should not create an object
        self.assertEqual(TestModel.objects.count(), 0)

        expected = {
            'action': 'create',
            'data': None,
            'request_id': 'client-request-id',
            'errors': [{'name': ['This field is required.']}],
            'response_status': 400
        }
        # it should respond with an error
        self.assertEqual(json_content['payload'], expected)
test_bindings.py 文件源码 项目:channels-api 作者: linuxlewis 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_bad_permission_reply(self):
        mock_has_perm = Mock()
        mock_has_perm.return_value = False
        with patch.object(TestModelResourceBinding, 'has_permission', mock_has_perm):
            json_content = self._send_and_consume('websocket.receive', self._build_message('testmodel',{
                'action': 'named_detail',
                'pk': 546,
                'data': {},
                'request_id': 'client-request-id'
            }))

            expected = {
                'action': 'named_detail',
                'errors': ['Permission Denied'],
                'data': None,
                'response_status': 401,
                'request_id': 'client-request-id'
            }

            self.assertEqual(json_content['payload'], expected)
            self.assertEqual(mock_has_perm.called, True)
test_bindings.py 文件源码 项目:channels-api 作者: linuxlewis 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_is_authenticated_permission(self):

        with patch.object(TestModelResourceBinding, 'permission_classes', (IsAuthenticated,)):
            content = {
                'action': 'test_list',
                'pk': None,
                'data': {},
                'request_id': 'client-request-id',
            }
            json_content = self._send_and_consume('websocket.receive', self._build_message('testmodel', content))
            # It should block the request
            self.assertEqual(json_content['payload']['response_status'], 401)

            user = User.objects.create(username="testuser", password="123")
            self.client.force_login(user)
            self.client._session_cookie = True

            json_content = self._send_and_consume('websocket.receive', self._build_message('testmodel', content))

            self.assertEqual(json_content['payload']['response_status'], 200)
test_base.py 文件源码 项目:aioautomatic 作者: armills 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_parent_inheritance(aiohttp_session):
    """Test the first api object creation."""
    mock_kwargs = {
        'arg1': 10,
        'arg2': 20,
    }
    child_kwargs = {
        'arg3': 30,
    }
    parent = base.BaseApiObject(None, request_kwargs=mock_kwargs,
                                client_session=aiohttp_session)
    child = base.BaseApiObject(parent, request_kwargs=child_kwargs)
    assert child.client_session == aiohttp_session
    assert child.loop == aiohttp_session.loop
    assert child.request_kwargs == {
        'arg1': 10,
        'arg2': 20,
        'arg3': 30,
    }
test_base.py 文件源码 项目:aioautomatic 作者: armills 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_result_list(session):
    """Test the result object."""
    obj = base.ResultList(parent=session, item_class=MockDataObject, resp={
        "_metadata": {
            "count": 2,
            "next": None,
            "previous": None,
            },
        "results": [
            {
                "attr1": "value1",
                "attr2": "value2",
            }, {
                "attr1": "value3"
            }],
        })
    next_list = session.loop.run_until_complete(obj.get_next())
    previous_list = session.loop.run_until_complete(obj.get_previous())
    assert obj.next is None
    assert obj.previous is None
    assert next_list is None
    assert previous_list is None
    assert len(obj) == 2
    assert sorted([item.attr1 for item in obj]) == sorted(["value1", "value3"])
test_event.py 文件源码 项目:python-moira-client 作者: moira-alert 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_fetch_by_trigger(self):
        client = Client(self.api_url)
        contact_manager = EventManager(client)

        trigger_id = '1'
        trigger = Trigger(client, 'Name', ['tag'], ['target'], 0, 1, id=trigger_id)

        with patch.object(client, 'get', return_value={'list': []}) as get_mock:
            contact_manager.fetch_by_trigger(trigger)

        self.assertTrue(get_mock.called)

        expected_request_data = {
            'p': 0,
            'size': MAX_FETCH_LIMIT
        }

        get_mock.assert_called_with('event/' + trigger_id, params=expected_request_data)
test_trigger.py 文件源码 项目:python-moira-client 作者: moira-alert 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_fetch_by_id(self):
        client = Client(self.api_url)
        trigger_manager = TriggerManager(client)

        trigger_id = '1'

        state = {
            'state': 'OK',
            'trigger_id': trigger_id
            }

        trigger = {
            'id': trigger_id,
            'name': 'trigger_name',
            'tags': ['tag'],
            'targets': ['pattern'],
            'warn_value': 0,
            'error_value': 1
            }

        with patch.object(client, 'get', side_effect=[state, trigger]) as get_mock:
            trigger = trigger_manager.fetch_by_id(trigger_id)

            self.assertTrue(get_mock.called)
            self.assertEqual(trigger_id, trigger.id)
test_contact.py 文件源码 项目:python-moira-client 作者: moira-alert 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_add_bad_response(self):
        client = Client(self.api_url)
        contact_manager = ContactManager(client)

        contact_value = '#channel'

        with patch.object(client, 'put', return_value={}) as put_mock, \
                patch.object(client, 'get', return_value={'contacts': []}) as get_mock:
            with self.assertRaises(ResponseStructureError):
                contact_manager.add(contact_value, CONTACT_SLACK)

        self.assertTrue(put_mock.called)
        self.assertTrue(get_mock.called)
        expected_request_data = {
            'value': contact_value,
            'type': CONTACT_SLACK
        }
        put_mock.assert_called_with('contact', json=expected_request_data)
test_client.py 文件源码 项目:python-moira-client 作者: moira-alert 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_get_invalid_response(self):

        def get(url, params, **kwargs):
            return FakeResponse()

        response = FakeResponse()

        with patch.object(requests, 'get', side_effects=get, return_value=response) as mock_get:
            test_path = 'test_path'

            client = Client(TEST_API_URL, AUTH_HEADERS)
            with self.assertRaises(InvalidJSONError):
                client.get(test_path)

        self.assertTrue(mock_get.called)
        expected_url_call = TEST_API_URL + '/' + test_path
        mock_get.assert_called_with(expected_url_call, headers=AUTH_HEADERS, auth=None)
test_client.py 文件源码 项目:python-moira-client 作者: moira-alert 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_put_invalid_response(self):
        test_data = {'test': 'test'}

        def put(url, data, **kwargs):
            return FakeResponse()

        response = FakeResponse()

        with patch.object(requests, 'put', side_effects=put, return_value=response) as mock_put:
            test_path = 'test_path'

            client = Client(TEST_API_URL, AUTH_HEADERS)
            with self.assertRaises(InvalidJSONError):
                client.put(test_path, data=test_data)

        self.assertTrue(mock_put.called)
        expected_url_call = TEST_API_URL + '/' + test_path
        mock_put.assert_called_with(expected_url_call, data=test_data, headers=AUTH_HEADERS, auth=None)


问题


面经


文章

微信
公众号

扫码关注公众号