python类PUT的实例源码

systesthelper.py 文件源码 项目:drift 作者: dgnorth 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _setup_mocking(self):
        def _mock_callback(request):
            method = request.method.lower()
            url = request.path_url
            handler = getattr(self.app, method)
            r = handler(
                url,
                data=request.body,
                headers=dict(request.headers)
            )
            return (r.status_code, r.headers, r.data)

        pattern = re.compile("{}/(.*)".format(self.host))
        methods = [
            responses.GET,
            responses.POST,
            responses.PUT,
            responses.DELETE,
            responses.PATCH,
        ]
        for method in methods:
            responses.add_callback(
                method, pattern,
                callback=_mock_callback
            )
mock_server.py 文件源码 项目:ros-interop 作者: mcgill-robotics 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def set_put_object_response(self, id, object_, user=1, code=200):
        """Sets mock PUT /api/odlcs/<id> response.

        Args:
            id (int): Target ID.
            object_ (dict): Target data to update with.
            user (int): User number to respond with.
            code (int): Status code to respond with.
        """
        object_.update({"id": id, "user": user})
        content = json.dumps(object_)

        self.rsps.add(
            responses.PUT,
            "{}/api/odlcs/{:d}".format(self.url, id),
            status=code,
            body=content if code == 200 else "",
            content_type="application/json")
test_resource.py 文件源码 项目:genericclient-requests 作者: genericclient 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_resource_save(self):
        with responses.RequestsMock() as rsps:
            rsps.add(responses.GET, MOCK_API_URL + '/users/1', json={
                'id': 1,
                'username': 'user1',
                'group': 'watchers',
            })

            user1 = generic_client.users.get(id=1)
            self.assertEqual(user1.username, 'user1')

        with responses.RequestsMock() as rsps:
            rsps.add(responses.PUT, MOCK_API_URL + '/users/1', json={
                'id': 1,
                'username': 'user1',
                'group': 'admins',
            })

            user1.group = 'admins'
            user1.save()
test_resource.py 文件源码 项目:genericclient-requests 作者: genericclient 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_resource_save_uuid(self):
        with responses.RequestsMock() as rsps:
            rsps.add(responses.GET, MOCK_API_URL + '/users/1', json={
                'uuid': 1,
                'username': 'user1',
                'group': 'watchers',
            })

            user1 = generic_client.users.get(uuid=1)
            self.assertEqual(user1.username, 'user1')

        with responses.RequestsMock() as rsps:
            rsps.add(responses.PUT, MOCK_API_URL + '/users/1', json={
                'uuid': 1,
                'username': 'user1',
                'group': 'admins',
            })

            user1.group = 'admins'
            user1.save()
test_resource_trailing_slash.py 文件源码 项目:genericclient-requests 作者: genericclient 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_resource_save(self):
        with responses.RequestsMock() as rsps:
            rsps.add(responses.GET, MOCK_API_URL + '/users/1/', json={
                'id': 1,
                'username': 'user1',
                'group': 'watchers',
            })

            user1 = generic_client.users.get(id=1)
            self.assertEqual(user1.username, 'user1')

        with responses.RequestsMock() as rsps:
            rsps.add(responses.PUT, MOCK_API_URL + '/users/1/', json={
                'id': 1,
                'username': 'user1',
                'group': 'admins',
            })

            user1.group = 'admins'
            user1.save()
test_client.py 文件源码 项目:dpm-py 作者: oki-archive 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_publish_invalid(self):
        # GIVEN datapackage that can be treated as valid by the dpm
        self.valid_dp = datapackage.DataPackage({
                "name": "some-datapackage",
                "resources": [
                    { "name": "some-resource", "path": "./data/some_data.csv", }
                ]
            },
            default_base_path='.')
        patch('dpm.client.DataPackage', lambda *a: self.valid_dp).start()
        patch('dpm.client.exists', lambda *a: True).start()

        # AND the server that accepts any user
        responses.add(
                responses.POST, 'http://127.0.0.1:5000/api/auth/token',
                json={'token': 'blabla'},
                status=200)
        # AND server rejects any datapackage as invalid
        responses.add(
                responses.PUT, 'http://127.0.0.1:5000/api/package/user/some-datapackage',
                json={'message': 'invalid datapackage json'},
                status=400)

        # AND the client
        client = Client(dp1_path, self.config)

        # WHEN publish() is invoked
        try:
            result = client.publish()
        except Exception as e:
            result = e

        # THEN HTTPStatusError should be raised
        assert isinstance(result, HTTPStatusError)
        # AND 'invalid datapackage json' should be printed to stdout
        self.assertRegexpMatches(str(result), 'invalid datapackage json')
test_modifying.py 文件源码 项目:slims-python-api 作者: genohm 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_add(self):
        slims = Slims("testSlims", "http://localhost:9999", "admin", "admin")
        responses.add(
            responses.PUT,
            'http://localhost:9999/rest/Content',
            json={"entities": [{
                "pk": 1,
                "tableName": "Content",
                "columns": []
            }]},
            content_type='application/json',
        )

        added = slims.add("Content", {"test": "foo"})
        self.assertIsInstance(added, Record)
test_heal.py 文件源码 项目:edx-video-pipeline 作者: edx 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_heal(self):
        val_response = {
            'courses': [{u'WestonHS/PFLC1x/3T2015': None}],
            'encoded_videos': [{
                'url': 'https://testurl.mp4',
                'file_size': 8499040,
                'bitrate': 131,
                'profile': 'mobile_low',
            }]
        }
        responses.add(
            responses.POST,
            CONFIG_DATA['val_token_url'],
            '{"access_token": "1234567890"}',
            status=200
        )
        responses.add(
            responses.GET,
            build_url(CONFIG_DATA['val_api_url'], self.video_id),
            body=json.dumps(val_response),
            content_type='application/json',
            status=200
        )
        responses.add(
            responses.PUT,
            build_url(CONFIG_DATA['val_api_url'], self.video_id),
            status=200
        )

        heal = VedaHeal()
        heal.discovery()
test_push.py 文件源码 项目:quilt 作者: quiltdata 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_push(self):
        mydir = os.path.dirname(__file__)
        build_path = os.path.join(mydir, './build_simple.yml')
        command.build('foo/bar', build_path)

        pkg_obj = store.PackageStore.find_package('foo', 'bar')
        pkg_hash = pkg_obj.get_hash()
        assert pkg_hash
        contents = pkg_obj.get_contents()

        all_hashes = set(find_object_hashes(contents))
        upload_urls = {
            blob_hash: dict(
                head="https://example.com/head/{owner}/{hash}".format(owner='foo', hash=blob_hash),
                put="https://example.com/put/{owner}/{hash}".format(owner='foo', hash=blob_hash)
            ) for blob_hash in all_hashes
        }

        # We will push the package twice, so we're mocking all responses twice.

        for blob_hash in all_hashes:
            urls = upload_urls[blob_hash]

            # First time the package is pushed, s3 HEAD 404s, and we get a PUT.
            self.requests_mock.add(responses.HEAD, urls['head'], status=404)
            self.requests_mock.add(responses.PUT, urls['put'])

            # Second time, s3 HEAD succeeds, and we're not expecting a PUT.
            self.requests_mock.add(responses.HEAD, urls['head'])

        self._mock_put_package('foo/bar', pkg_hash, upload_urls)
        self._mock_put_tag('foo/bar', 'latest')

        self._mock_put_package('foo/bar', pkg_hash, upload_urls)
        self._mock_put_tag('foo/bar', 'latest')

        # Push a new package.
        command.push('foo/bar')

        # Push it again; this time, we're verifying that there are no s3 uploads.
        command.push('foo/bar')
test_push.py 文件源码 项目:quilt 作者: quiltdata 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _mock_put_package(self, package, pkg_hash, upload_urls):
        pkg_url = '%s/api/package/%s/%s' % (command.get_registry_url(), package, pkg_hash)
        # Dry run, then the real thing.
        self.requests_mock.add(responses.PUT, pkg_url, json.dumps(dict(upload_urls=upload_urls)))
        self.requests_mock.add(responses.PUT, pkg_url, json.dumps(dict()))
test_push.py 文件源码 项目:quilt 作者: quiltdata 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _mock_put_tag(self, package, tag):
        tag_url = '%s/api/tag/%s/%s' % (command.get_registry_url(), package, tag)
        self.requests_mock.add(responses.PUT, tag_url, json.dumps(dict()))
test_command.py 文件源码 项目:quilt 作者: quiltdata 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_version_add_confirmed(self, mock_input, mock_match_hash):
        registry_url = command.get_registry_url()
        mock_input.return_value = 'y'
        mock_match_hash.return_value = 'fabc123'

        # Response content is not checked by version_add, so
        # status ok and URL verification are enough
        self.requests_mock.add(
            responses.PUT,
            registry_url + "/api/version/user/test/2.9.12",
            status=200,
        )

        command.version_add('user/test', '2.9.12', 'fabc123')
test_install.py 文件源码 项目:quilt 作者: quiltdata 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_short_hashes(self):
        """
        Test various functions that use short hashes
        """
        table_data, table_hash = self.make_table_data()
        file_data, file_hash = self.make_file_data()
        contents, contents_hash = self.make_contents(table=table_hash, file=file_hash)

        self._mock_log('foo/bar', contents_hash)
        self._mock_tag('foo/bar', 'mytag', contents_hash[0:6], cmd=responses.PUT)
        command.tag_add('foo/bar', 'mytag', contents_hash[0:6])

        self._mock_version('foo/bar', '1.0', contents_hash[0:6], cmd=responses.PUT)
        command.version_add('foo/bar', '1.0', contents_hash[0:6], force=True)
__init__.py 文件源码 项目:pytest-vts 作者: bhodorog 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setup_recording(self, **kwargs):
        _logger.info("recording ...")
        self.responses.reset()
        all_requests_re = re.compile("http.*")
        methods = (responses.GET, responses.POST, responses.PUT,
                   responses.PATCH, responses.DELETE, responses.HEAD,
                   responses.OPTIONS)
        for http_method in methods:
            self.responses.add_callback(
                http_method, all_requests_re,
                match_querystring=False,
                callback=self.record())


问题


面经


文章

微信
公众号

扫码关注公众号