python类Mocker()的实例源码

client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_write_points_string(self):
        with requests_mock.Mocker() as m:
            m.register_uri(
                requests_mock.POST,
                "http://localhost:8086/db/db/series"
            )

            cli = InfluxDBClient(database='db')
            cli.write_points(
                str(json.dumps(self.dummy_points))
            )

            self.assertListEqual(
                json.loads(m.last_request.body),
                self.dummy_points
            )
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_write_points_batch_multiple_series(self):
        dummy_points = [
            {"points": [["1", 1, 1.0], ["2", 2, 2.0], ["3", 3, 3.0],
                        ["4", 4, 4.0], ["5", 5, 5.0]],
             "name": "foo",
             "columns": ["val1", "val2", "val3"]},
            {"points": [["1", 1, 1.0], ["2", 2, 2.0], ["3", 3, 3.0],
                        ["4", 4, 4.0], ["5", 5, 5.0], ["6", 6, 6.0],
                        ["7", 7, 7.0], ["8", 8, 8.0]],
             "name": "bar",
             "columns": ["val1", "val2", "val3"]},
        ]
        expected_last_body = [{'points': [['7', 7, 7.0], ['8', 8, 8.0]],
                               'name': 'bar',
                               'columns': ['val1', 'val2', 'val3']}]
        with requests_mock.Mocker() as m:
            m.register_uri(requests_mock.POST,
                           "http://localhost:8086/db/db/series")
            cli = InfluxDBClient('localhost', 8086,
                                 'username', 'password', 'db')
            cli.write_points(data=dummy_points, batch_size=3)
        self.assertEqual(m.call_count, 5)
        self.assertEqual(expected_last_body, m.request_history[4].json())
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_get_series_list(self):
        cli = InfluxDBClient(database='db')

        with requests_mock.Mocker() as m:
            example_response = \
                '[{"name":"list_series_result","columns":' \
                '["time","name"],"points":[[0,"foo"],[0,"bar"]]}]'

            m.register_uri(
                requests_mock.GET,
                "http://localhost:8086/db/db/series",
                text=example_response
            )

            self.assertListEqual(
                cli.get_list_series(),
                ['foo', 'bar']
            )
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_add_cluster_admin(self):
        with requests_mock.Mocker() as m:
            m.register_uri(
                requests_mock.POST,
                "http://localhost:8086/cluster_admins"
            )

            cli = InfluxDBClient(database='db')
            cli.add_cluster_admin(
                new_username='paul',
                new_password='laup'
            )

            self.assertDictEqual(
                json.loads(m.last_request.body),
                {
                    'name': 'paul',
                    'password': 'laup'
                }
            )
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_update_cluster_admin_password(self):
        with requests_mock.Mocker() as m:
            m.register_uri(
                requests_mock.POST,
                "http://localhost:8086/cluster_admins/paul"
            )

            cli = InfluxDBClient(database='db')
            cli.update_cluster_admin_password(
                username='paul',
                new_password='laup'
            )

            self.assertDictEqual(
                json.loads(m.last_request.body),
                {'password': 'laup'}
            )
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_get_database_users(self):
        cli = InfluxDBClient('localhost', 8086, 'username', 'password', 'db')

        example_response = \
            '[{"name":"paul","isAdmin":false,"writeTo":".*","readFrom":".*"},'\
            '{"name":"bobby","isAdmin":false,"writeTo":".*","readFrom":".*"}]'

        with requests_mock.Mocker() as m:
            m.register_uri(
                requests_mock.GET,
                "http://localhost:8086/db/db/users",
                text=example_response
            )
            users = cli.get_database_users()

        self.assertEqual(json.loads(example_response), users)
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_add_database_user(self):
        with requests_mock.Mocker() as m:
            m.register_uri(
                requests_mock.POST,
                "http://localhost:8086/db/db/users"
            )
            cli = InfluxDBClient(database='db')
            cli.add_database_user(
                new_username='paul',
                new_password='laup',
                permissions=('.*', '.*')
            )

            self.assertDictEqual(
                json.loads(m.last_request.body),
                {
                    'writeTo': '.*',
                    'password': 'laup',
                    'readFrom': '.*',
                    'name': 'paul'
                }
            )
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_alter_database_user_password(self):
        with requests_mock.Mocker() as m:
            m.register_uri(
                requests_mock.POST,
                "http://localhost:8086/db/db/users/paul"
            )

            cli = InfluxDBClient(database='db')
            cli.alter_database_user(
                username='paul',
                password='n3wp4ss!'
            )

            self.assertDictEqual(
                json.loads(m.last_request.body),
                {
                    'password': 'n3wp4ss!'
                }
            )
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_alter_database_user_permissions(self):
        with requests_mock.Mocker() as m:
            m.register_uri(
                requests_mock.POST,
                "http://localhost:8086/db/db/users/paul"
            )

            cli = InfluxDBClient(database='db')
            cli.alter_database_user(
                username='paul',
                permissions=('^$', '.*')
            )

            self.assertDictEqual(
                json.loads(m.last_request.body),
                {
                    'readFrom': '^$',
                    'writeTo': '.*'
                }
            )
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_alter_database_user_password_and_permissions(self):
        with requests_mock.Mocker() as m:
            m.register_uri(
                requests_mock.POST,
                "http://localhost:8086/db/db/users/paul"
            )

            cli = InfluxDBClient(database='db')
            cli.alter_database_user(
                username='paul',
                password='n3wp4ss!',
                permissions=('^$', '.*')
            )

            self.assertDictEqual(
                json.loads(m.last_request.body),
                {
                    'password': 'n3wp4ss!',
                    'readFrom': '^$',
                    'writeTo': '.*'
                }
            )
test_session.py 文件源码 项目:python-zhmcclient 作者: zhmcclient 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_logoff(self):
        """Test Session.logoff() (and also Session.is_logon())."""

        with requests_mock.Mocker() as m:

            self.mock_server_1(m)

            # Create a session in logged-off state
            session = Session('fake-host', 'fake-userid', 'fake-pw')

            session.logon()

            logged_on = session.is_logon()
            assert logged_on

            # The code to be tested:
            session.logoff()

            assert session.session_id is None
            assert session.session is None
            assert 'X-API-Session' not in session.headers
            assert len(session.headers) == 2

            logged_on = session.is_logon()
            assert not logged_on
test_discussion_topic.py 文件源码 项目:canvasapi 作者: ucfopen 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUp(self):
        self.canvas = Canvas(settings.BASE_URL, settings.API_KEY)

        with requests_mock.Mocker() as m:
            requires = {
                'course': ['get_by_id', 'get_discussion_topic'],
                'group': ['get_by_id', 'get_discussion_topic']
            }
            register_uris(requires, m)

            self.course = self.canvas.get_course(1)
            self.group = self.canvas.get_group(1)
            self.discussion_topic = self.course.get_discussion_topic(1)
            self.discussion_topic_group = self.group.get_discussion_topic(1)

    # __str__()
test_discussion_topic.py 文件源码 项目:canvasapi 作者: ucfopen 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUp(self):
        self.canvas = Canvas(settings.BASE_URL, settings.API_KEY)

        with requests_mock.Mocker() as m:
            requires = {
                'course': ['get_by_id', 'get_discussion_topic'],
                'discussion_topic': ['list_entries_single', 'list_entries_single_group'],
                'group': ['get_by_id', 'get_discussion_topic']
            }
            register_uris(requires, m)

            self.course = self.canvas.get_course(1)
            self.group = self.canvas.get_group(1)
            self.discussion_topic = self.course.get_discussion_topic(1)
            self.discussion_topic_group = self.group.get_discussion_topic(1)
            self.discussion_entry = self.discussion_topic.list_entries([1])[0]
            self.discussion_entry_group = self.discussion_topic_group.list_entries([1])[0]

    # __str__()
test_page.py 文件源码 项目:canvasapi 作者: ucfopen 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setUp(self):
        self.canvas = Canvas(settings.BASE_URL, settings.API_KEY)

        with requests_mock.Mocker() as m:
            requires = {
                'course': ['get_by_id'],
                'group': ['get_by_id', 'pages_get_page'],
                'page': ['get_page']
            }
            register_uris(requires, m)

            self.course = self.canvas.get_course(1)
            self.group = self.canvas.get_group(1)
            self.page_course = self.course.get_page('my-url')
            self.page_group = self.group.get_page('my-url')

    # __str__()
test_page.py 文件源码 项目:canvasapi 作者: ucfopen 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def setUp(self):
        self.canvas = Canvas(settings.BASE_URL, settings.API_KEY)

        with requests_mock.Mocker() as m:
            requires = {
                'course': ['get_by_id', 'get_page'],
                'group': ['get_by_id', 'pages_get_page'],
                'page': ['get_latest_rev_by_id', 'get_latest_rev_by_id_group']
            }
            register_uris(requires, m)

            self.course = self.canvas.get_course(1)
            self.group = self.canvas.get_group(1)
            self.page_course = self.course.get_page('my-url')
            self.page_group = self.group.get_page('my-url')
            self.revision = self.page_course.get_revision_by_id(2)
            self.group_revision = self.page_group.get_revision_by_id(2)

    # __str__()
test_submission.py 文件源码 项目:canvasapi 作者: ucfopen 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUp(self):
        self.canvas = Canvas(settings.BASE_URL, settings.API_KEY)

        with requests_mock.Mocker() as m:
            register_uris({
                'course': ['get_by_id'],
                'section': ['get_by_id'],
                'submission': ['get_by_id_course', 'get_by_id_section']
            }, m)

            self.course = self.canvas.get_course(1)
            self.submission_course = self.course.get_submission(1, 1)
            self.section = self.canvas.get_section(1)
            self.submission_section = self.section.get_submission(1, 1)

    # __str__()
test_course.py 文件源码 项目:canvasapi 作者: ucfopen 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUp(self):
        self.canvas = Canvas(settings.BASE_URL, settings.API_KEY)

        with requests_mock.Mocker() as m:
            requires = {
                'course': ['get_by_id', 'get_page'],
                'quiz': ['get_by_id'],
                'user': ['get_by_id']
            }
            register_uris(requires, m)

            self.course = self.canvas.get_course(1)
            self.page = self.course.get_page('my-url')
            self.quiz = self.course.get_quiz(1)
            self.user = self.canvas.get_user(1)

    # __str__()
test_tab.py 文件源码 项目:canvasapi 作者: ucfopen 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUp(self):
        self.canvas = Canvas(settings.BASE_URL, settings.API_KEY)

        with requests_mock.Mocker() as m:
            register_uris({
                'course': ['get_by_id', 'list_tabs']
            }, m)

            self.course = self.canvas.get_course(1)

            tabs = self.course.list_tabs()
            tab_list = [tab for tab in tabs]

            self.tab = tab_list[0]

    # __str__()
test_progress.py 文件源码 项目:canvasapi 作者: ucfopen 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setUp(self):
        self.canvas = Canvas(settings.BASE_URL, settings.API_KEY)

        with requests_mock.Mocker() as m:
            requires = {
                'course': ['get_by_id', 'create_group_category'],
                'group': ['category_assign_members_false']
            }

            register_uris(requires, m)

            self.course = self.canvas.get_course(1)
            self.group_category = self.course.create_group_category("Test String")
            self.progress = self.group_category.assign_members()

    # __str__()
test_loadurl.py 文件源码 项目:cjworkbench 作者: CJWorkbench 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_load_xlsx(self):
        url = 'http://test.com/the.xlsx'
        self.url_pval.set_value(url)
        self.url_pval.save()

        xlsx_bytes = open(mock_xslx_path, "rb").read()
        xlsx_table = pd.read_excel(mock_xslx_path)

        # success case
        with requests_mock.Mocker() as m:
            m.get(url, content=xlsx_bytes, headers={'content-type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'})
            self.press_fetch_button()
            response = self.get_render()
            self.assertEqual(response.content, make_render_json(xlsx_table))

        # malformed file  should put module in error state
        with requests_mock.Mocker() as m:
            m.get(url, content=b"there's just no way this is xlsx", headers={'content-type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'})
            self.press_fetch_button()
            self.wfmodule.refresh_from_db()
            self.assertEqual(self.wfmodule.status, WfModule.ERROR)


问题


面经


文章

微信
公众号

扫码关注公众号