python类json_decode()的实例源码

web_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_decode_argument(self):
        # These urls all decode to the same thing
        urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
                "/decode_arg/%E9?foo=%E9&encoding=latin1",
                "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
                ]
        for req_url in urls:
            response = self.fetch(req_url)
            response.rethrow()
            data = json_decode(response.body)
            self.assertEqual(data, {u('path'): [u('unicode'), u('\u00e9')],
                                    u('query'): [u('unicode'), u('\u00e9')],
                                    })

        response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
        response.rethrow()
        data = json_decode(response.body)
        self.assertEqual(data, {u('path'): [u('bytes'), u('c3a9')],
                                u('query'): [u('bytes'), u('c3a9')],
                                })
httpserver_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_multipart_form(self):
        # Encodings here are tricky:  Headers are latin1, bodies can be
        # anything (we use utf8 by default).
        response = self.raw_fetch([
            b"POST /multipart HTTP/1.0",
            b"Content-Type: multipart/form-data; boundary=1234567890",
            b"X-Header-encoding-test: \xe9",
        ],
            b"\r\n".join([
                b"Content-Disposition: form-data; name=argument",
                b"",
                u("\u00e1").encode("utf-8"),
                b"--1234567890",
                u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
                b"",
                u("\u00fa").encode("utf-8"),
                b"--1234567890--",
                b"",
            ]))
        data = json_decode(response)
        self.assertEqual(u("\u00e9"), data["header"])
        self.assertEqual(u("\u00e1"), data["argument"])
        self.assertEqual(u("\u00f3"), data["filename"])
        self.assertEqual(u("\u00fa"), data["filebody"])
httpserver_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_chunked_request_body(self):
        # Chunked requests are not widely supported and we don't have a way
        # to generate them in AsyncHTTPClient, but HTTPServer will read them.
        self.stream.write(b"""\
POST /echo HTTP/1.1
Transfer-Encoding: chunked
Content-Type: application/x-www-form-urlencoded

4
foo=
3
bar
0

""".replace(b"\n", b"\r\n"))
        read_stream_body(self.stream, self.stop)
        headers, response = self.wait()
        self.assertEqual(json_decode(response), {u('foo'): [u('bar')]})
web_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_decode_argument(self):
        # These urls all decode to the same thing
        urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
                "/decode_arg/%E9?foo=%E9&encoding=latin1",
                "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
                ]
        for req_url in urls:
            response = self.fetch(req_url)
            response.rethrow()
            data = json_decode(response.body)
            self.assertEqual(data, {u('path'): [u('unicode'), u('\u00e9')],
                                    u('query'): [u('unicode'), u('\u00e9')],
                                    })

        response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
        response.rethrow()
        data = json_decode(response.body)
        self.assertEqual(data, {u('path'): [u('bytes'), u('c3a9')],
                                u('query'): [u('bytes'), u('c3a9')],
                                })
web_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_get_argument(self):
        response = self.fetch("/get_argument?foo=bar")
        self.assertEqual(response.body, b"bar")
        response = self.fetch("/get_argument?foo=")
        self.assertEqual(response.body, b"")
        response = self.fetch("/get_argument")
        self.assertEqual(response.body, b"default")

        # Test merging of query and body arguments.
        # In singular form, body arguments take precedence over query arguments.
        body = urllib_parse.urlencode(dict(foo="hello"))
        response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
        self.assertEqual(response.body, b"hello")
        # In plural methods they are merged.
        response = self.fetch("/get_arguments?foo=bar",
                              method="POST", body=body)
        self.assertEqual(json_decode(response.body),
                         dict(default=['bar', 'hello'],
                              query=['bar'],
                              body=['hello']))
web_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_decode_argument(self):
        # These urls all decode to the same thing
        urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
                "/decode_arg/%E9?foo=%E9&encoding=latin1",
                "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
                ]
        for req_url in urls:
            response = self.fetch(req_url)
            response.rethrow()
            data = json_decode(response.body)
            self.assertEqual(data, {u('path'): [u('unicode'), u('\u00e9')],
                                    u('query'): [u('unicode'), u('\u00e9')],
                                    })

        response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
        response.rethrow()
        data = json_decode(response.body)
        self.assertEqual(data, {u('path'): [u('bytes'), u('c3a9')],
                                u('query'): [u('bytes'), u('c3a9')],
                                })
web_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_get_argument(self):
        response = self.fetch("/get_argument?foo=bar")
        self.assertEqual(response.body, b"bar")
        response = self.fetch("/get_argument?foo=")
        self.assertEqual(response.body, b"")
        response = self.fetch("/get_argument")
        self.assertEqual(response.body, b"default")

        # Test merging of query and body arguments.
        # In singular form, body arguments take precedence over query arguments.
        body = urllib_parse.urlencode(dict(foo="hello"))
        response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
        self.assertEqual(response.body, b"hello")
        # In plural methods they are merged.
        response = self.fetch("/get_arguments?foo=bar",
                              method="POST", body=body)
        self.assertEqual(json_decode(response.body),
                         dict(default=['bar', 'hello'],
                              query=['bar'],
                              body=['hello']))
httpserver_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_multipart_form(self):
        # Encodings here are tricky:  Headers are latin1, bodies can be
        # anything (we use utf8 by default).
        response = self.raw_fetch([
            b"POST /multipart HTTP/1.0",
            b"Content-Type: multipart/form-data; boundary=1234567890",
            b"X-Header-encoding-test: \xe9",
        ],
            b"\r\n".join([
                b"Content-Disposition: form-data; name=argument",
                b"",
                u("\u00e1").encode("utf-8"),
                b"--1234567890",
                u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
                b"",
                u("\u00fa").encode("utf-8"),
                b"--1234567890--",
                b"",
            ]))
        data = json_decode(response)
        self.assertEqual(u("\u00e9"), data["header"])
        self.assertEqual(u("\u00e1"), data["argument"])
        self.assertEqual(u("\u00f3"), data["filename"])
        self.assertEqual(u("\u00fa"), data["filebody"])
httpserver_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_chunked_request_body(self):
        # Chunked requests are not widely supported and we don't have a way
        # to generate them in AsyncHTTPClient, but HTTPServer will read them.
        self.stream.write(b"""\
POST /echo HTTP/1.1
Transfer-Encoding: chunked
Content-Type: application/x-www-form-urlencoded

4
foo=
3
bar
0

""".replace(b"\n", b"\r\n"))
        read_stream_body(self.stream, self.stop)
        headers, response = self.wait()
        self.assertEqual(json_decode(response), {u('foo'): [u('bar')]})
web_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_get_argument(self):
        response = self.fetch("/get_argument?foo=bar")
        self.assertEqual(response.body, b"bar")
        response = self.fetch("/get_argument?foo=")
        self.assertEqual(response.body, b"")
        response = self.fetch("/get_argument")
        self.assertEqual(response.body, b"default")

        # Test merging of query and body arguments.
        # In singular form, body arguments take precedence over query arguments.
        body = urllib_parse.urlencode(dict(foo="hello"))
        response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
        self.assertEqual(response.body, b"hello")
        # In plural methods they are merged.
        response = self.fetch("/get_arguments?foo=bar",
                              method="POST", body=body)
        self.assertEqual(json_decode(response.body),
                         dict(default=['bar', 'hello'],
                              query=['bar'],
                              body=['hello']))
test_search.py 文件源码 项目:toshi-id-service 作者: toshiapp 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_username_query(self):

        username = "bobsmith"
        positive_query = 'bobsm'
        negative_query = 'nancy'

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO users (username, toshi_id) VALUES ($1, $2)", username, TEST_ADDRESS)

        resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET")
        self.assertEqual(resp.code, 200)
        body = json_decode(resp.body)
        self.assertEqual(len(body['results']), 1)

        # ensure we got a tracking event
        self.assertEqual((await self.next_tracking_event())[0], None)

        resp = await self.fetch("/search/user?query={}".format(negative_query), method="GET")
        self.assertEqual(resp.code, 200)
        body = json_decode(resp.body)
        self.assertEqual(len(body['results']), 0)

        # ensure we got a tracking event
        self.assertEqual((await self.next_tracking_event())[0], None)
test_search.py 文件源码 项目:toshi-id-service 作者: toshiapp 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_username_query_sql_inject_attampt(self):

        username = "bobsmith"
        inject_attempt = quote_plus("x'; delete from users; select * from users")

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO users (username, toshi_id) VALUES ($1, $2)", username, TEST_ADDRESS)

        resp = await self.fetch("/search/user?query={}".format(inject_attempt), method="GET")
        self.assertEqual(resp.code, 200)
        body = json_decode(resp.body)
        self.assertEqual(len(body['results']), 0)

        async with self.pool.acquire() as con:
            row = await con.fetchrow("SELECT COUNT(*) AS count FROM users")

        self.assertEqual(row['count'], 1)
test_search.py 文件源码 项目:toshi-id-service 作者: toshiapp 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_underscore_username_query(self):

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)",
                              "wager_weight", "Wager Weight", "0x0000000000000000000000000000000000000001")
            await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)",
                              "bob_smith", "Robert", "0x0000000000000000000000000000000000000002")
            await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)",
                              "bob_jack", "Jackie", "0x0000000000000000000000000000000000000003")
            await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)",
                              "user1234", "user1234", "0x0000000000000000000000000000000000000004")

        for positive_query in ["wager", "wager_we", "wager_weight", "bob_smi"]:

            resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET")
            self.assertEqual(resp.code, 200)
            body = json_decode(resp.body)
            self.assertEqual(len(body['results']), 1, "Failed to get match for search query: {}".format(positive_query))

        for negative_query in ["wager_foo", "bob_bar", "1234", "bobsmith"]:

            resp = await self.fetch("/search/user?query={}".format(negative_query), method="GET")
            self.assertEqual(resp.code, 200)
            body = json_decode(resp.body)
            self.assertEqual(len(body['results']), 0, "got unexpected match for search query: {}".format(negative_query))
test_search.py 文件源码 项目:toshi-id-service 作者: toshiapp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_inactive_username_query(self):

        username = "bobsmith"
        positive_query = 'bobsm'

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO users (username, toshi_id, active) VALUES ($1, $2, false)", username, TEST_ADDRESS)

        resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET")
        self.assertEqual(resp.code, 200)
        body = json_decode(resp.body)
        self.assertEqual(len(body['results']), 0)

        resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="PUT", body={
            "payment_address": TEST_ADDRESS
        })
        self.assertResponseCodeEqual(resp, 200)

        resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET")
        self.assertEqual(resp.code, 200)
        body = json_decode(resp.body)
        self.assertEqual(len(body['results']), 1)
test_user.py 文件源码 项目:toshi-id-service 作者: toshiapp 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_create_user(self):

        resp = await self.fetch("/timestamp")
        self.assertEqual(resp.code, 200)

        resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="POST",
                                       body={'payment_address': TEST_PAYMENT_ADDRESS})

        self.assertResponseCodeEqual(resp, 200)

        body = json_decode(resp.body)

        self.assertEqual(body['toshi_id'], TEST_ADDRESS)

        async with self.pool.acquire() as con:
            row = await con.fetchrow("SELECT * FROM users WHERE toshi_id = $1", TEST_ADDRESS)

        self.assertIsNotNone(row)
        self.assertFalse(row['is_app'])

        self.assertIsNotNone(row['username'])

        # ensure we got a tracking event
        self.assertEqual((await self.next_tracking_event())[0], encode_id(TEST_ADDRESS))
test_user.py 文件源码 项目:toshi-id-service 作者: toshiapp 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_create_app_user(self):

        resp = await self.fetch("/timestamp")
        self.assertEqual(resp.code, 200)

        resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="POST",
                                       body={"is_app": True, 'payment_address': TEST_PAYMENT_ADDRESS})

        self.assertResponseCodeEqual(resp, 200)

        body = json_decode(resp.body)

        self.assertEqual(body['toshi_id'], TEST_ADDRESS)

        async with self.pool.acquire() as con:
            row = await con.fetchrow("SELECT * FROM users WHERE toshi_id = $1", TEST_ADDRESS)

        self.assertIsNotNone(row)
        self.assertTrue(row['is_app'])

        # ensure we got a tracking event
        self.assertEqual((await self.next_tracking_event())[0], encode_id(TEST_ADDRESS))
test_categories.py 文件源码 项目:toshi-id-service 作者: toshiapp 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_set_unknown_categories_fails(self):

        username = "toshibot"
        name = "ToshiBot"

        categories = await self.setup_categories()

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO users (username, toshi_id, name, is_app, is_public) VALUES ($1, $2, $3, true, true)",
                              username, TEST_ADDRESS, name)

        resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="PUT", body={
            "categories": [categories[-1][0] + 10, 'badcat']
        })
        self.assertResponseCodeEqual(resp, 400)

        resp = await self.fetch("/user/{}".format(TEST_ADDRESS))
        self.assertResponseCodeEqual(resp, 200)
        body = json_decode(resp.body)
        self.assertIn("categories", body)
        self.assertEqual(len(body['categories']), 0)
test_apps_search.py 文件源码 项目:toshi-id-service 作者: toshiapp 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_app_underscore_username_query(self):

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)",
                              "wager_weight", "Wager Weight", "0x0000000000000000000000000000000000000001")
            await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)",
                              "bob_smith", "Robert", "0x0000000000000000000000000000000000000002")
            await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)",
                              "bob_jack", "Jackie", "0x0000000000000000000000000000000000000003")
            await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)",
                              "user1234", "user1234", "0x0000000000000000000000000000000000000004")

        for positive_query in ["wager", "wager_we", "wager_weight", "bob_smi"]:

            resp = await self.fetch("/search/apps?query={}".format(positive_query), method="GET")
            self.assertEqual(resp.code, 200)
            body = json_decode(resp.body)
            self.assertEqual(len(body['results']), 1, "Failed to get match for search query: {}".format(positive_query))

        for negative_query in ["wager_foo", "bob_bar", "1234", "bobsmith"]:

            resp = await self.fetch("/search/apps?query={}".format(negative_query), method="GET")
            self.assertEqual(resp.code, 200)
            body = json_decode(resp.body)
            self.assertEqual(len(body['results']), 0, "got unexpected match for search query: {}".format(negative_query))
plugin.py 文件源码 项目:globibot 作者: best-coloc-ever 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def fetch_definitions(self, term):
        url = url_concat(UrbanDictionary.DEFINE_URL, dict(term=term))
        request = HTTPRequest(
            url     = url,
            headers = {
                'X-Mashape-Key': self.api_key,
                'Accept'       : 'text/plain'
            }
        )
        tornado_future = self.client.fetch(request)
        future = to_asyncio_future(tornado_future)
        response = await future

        data = json_decode(response.body)

        return data['list']
plugin.py 文件源码 项目:globibot 作者: best-coloc-ever 项目源码 文件源码 阅读 144 收藏 0 点赞 0 评论 0
def access_token(self, code, state):
        client = AsyncHTTPClient()

        payload = (
            ('client_id', self.client_id),
            ('client_secret', self.client_secret),
            ('grant_type', 'authorization_code'),
            ('redirect_uri', Twitch.REDIRECT_URI),
            ('code', code),
            ('state', state),
        )

        url = Twitch.TOKEN_URL
        request = HTTPRequest(
            url = url,
            method = 'POST',
            body = urlencode(payload)
        )
        tornado_future = client.fetch(request)
        future = to_asyncio_future(tornado_future)
        response = await future

        data = json_decode(response.body)
        return data['access_token']
test_apps_search.py 文件源码 项目:toshi-directory-service 作者: toshiapp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_username_query(self):
        username = "TokenBot"
        positive_query = 'enb'
        negative_query = 'TickleFight'

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO apps (name, token_id) VALUES ($1, $2)", username, TEST_ADDRESS)
            await con.execute("INSERT INTO sofa_manifests (token_id, payment_address) VALUES ($1, $2)", TEST_ADDRESS, TEST_ADDRESS)

        resp = await self.fetch("/search/apps?query={}".format(positive_query), method="GET")
        self.assertEqual(resp.code, 200)
        body = json_decode(resp.body)
        self.assertEqual(len(body['apps']), 1)

        resp = await self.fetch("/search/apps?query={}".format(negative_query), method="GET")
        self.assertEqual(resp.code, 200)
        body = json_decode(resp.body)
        self.assertEqual(len(body['apps']), 0)
term_utils.py 文件源码 项目:django-gateone 作者: jimmy201602 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def save_term_settings(term, location, session, settings):
    """
    Saves the *settings* associated with the given *term*, *location*, and
    *session* in the 'term_settings.json' file inside the user's session
    directory.

    When complete the given *callback* will be called (if given).
    """
    if not session:
        return # Just a viewer of a broadcast terminal
    term = str(term) # JSON wants strings as keys
    term_settings = RUDict()
    term_settings[location] = {term: settings}
    session_dir = os.path.join(getsettings('BASE_DIR'), 'sessions')
    session_dir = os.path.join(session_dir, session)
    settings_path = os.path.join(session_dir, 'term_settings.json')
    # First we read in the existing settings and then update them.
    if os.path.exists(settings_path):
        with io.open(settings_path, encoding='utf-8') as f:
            term_settings.update(json_decode(f.read()))
        term_settings[location][term].update(settings)
    with io.open(settings_path, 'w', encoding='utf-8') as f:
        f.write(json_encode(term_settings))
logviewer.py 文件源码 项目:django-gateone 作者: jimmy201602 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_log_metadata(golog_path):
    """
    Returns the metadata from the log at the given *golog_path* in the form of
    a dict.
    """
    metadata = {}
    if not os.path.getsize(golog_path): # 0 bytes
        return metadata # Nothing to do
    try:
        first_frame, distance = retrieve_first_frame(golog_path)
    except IOError:
        # Something wrong with the log...  Probably still being written to
        return metadata
    if first_frame[14:].startswith('{'):
        # This is JSON, capture metadata
        metadata = json_decode(first_frame[14:])
    return metadata # All done
server.py 文件源码 项目:django-gateone 作者: jimmy201602 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_current_user(self):
        """
        Mostly identical to the function of the same name in MainHandler.  The
        difference being that when API authentication is enabled the WebSocket
        will expect and perform its own auth of the client.
        """
        expiration = self.settings.get('auth_timeout', "14d")
        # Need the expiration in days (which is a bit silly but whatever):
        expiration = (
            float(total_seconds(convert_to_timedelta(expiration)))
            / float(86400))
        #user_json = self.get_secure_cookie(
            #"gateone_user", max_age_days=expiration)
        user_json = self.message.http_session.get('gateone_user',None)
        #print 'user_json',user_json
        #user_json {"upn": "ANONYMOUS", "session": "YmM5MDU5MDgyYmVjNDU0M2E5MDMzYTg5NWMzZTI5YTBkN"}
        if not user_json:
            if not self.settings['auth']:
                # This can happen if the user's browser isn't allowing
                # persistent cookies (e.g. incognito mode)
                return {'upn': 'ANONYMOUS', 'session': generate_session_id()}
            return None
        user = json_decode(user_json)
        #user['ip_address'] = self.request.remote_ip
        return user
app_terminal.py 文件源码 项目:django-gateone 作者: jimmy201602 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def clear_term_settings(self, term):
        """
        Removes any settings associated with the given *term* in the user's
        term_settings.json file (in their session directory).
        """
        term = str(term)
        self.term_log.debug("clear_term_settings(%s)" % term)
        term_settings = RUDict()
        term_settings[self.ws.location] = {term: {}}
        #session_dir = options.session_dir
        session_dir = self.settings['session_dir']
        session_dir = os.path.join(session_dir, self.ws.session)
        settings_path = os.path.join(session_dir, 'term_settings.json')
        if not os.path.exists(settings_path):
            return # Nothing to do
        # First we read in the existing settings and then update them.
        if os.path.exists(settings_path):
            with io.open(settings_path, encoding='utf-8') as f:
                term_settings.update(json_decode(f.read()))
        del term_settings[self.ws.location][term]
        with io.open(settings_path, 'w', encoding='utf-8') as f:
            f.write(json_encode(term_settings))
        self.trigger("terminal:clear_term_settings", term)

    #@require(authenticated(), policies('terminal'))
httpserver_test.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_multipart_form(self):
        # Encodings here are tricky:  Headers are latin1, bodies can be
        # anything (we use utf8 by default).
        response = self.raw_fetch([
            b"POST /multipart HTTP/1.0",
            b"Content-Type: multipart/form-data; boundary=1234567890",
            b"X-Header-encoding-test: \xe9",
        ],
            b"\r\n".join([
                b"Content-Disposition: form-data; name=argument",
                b"",
                u("\u00e1").encode("utf-8"),
                b"--1234567890",
                u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
                b"",
                u("\u00fa").encode("utf-8"),
                b"--1234567890--",
                b"",
            ]))
        data = json_decode(response)
        self.assertEqual(u("\u00e9"), data["header"])
        self.assertEqual(u("\u00e1"), data["argument"])
        self.assertEqual(u("\u00f3"), data["filename"])
        self.assertEqual(u("\u00fa"), data["filebody"])
httpserver_test.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_chunked_request_body(self):
        # Chunked requests are not widely supported and we don't have a way
        # to generate them in AsyncHTTPClient, but HTTPServer will read them.
        self.stream.write(b"""\
POST /echo HTTP/1.1
Transfer-Encoding: chunked
Content-Type: application/x-www-form-urlencoded

4
foo=
3
bar
0

""".replace(b"\n", b"\r\n"))
        read_stream_body(self.stream, self.stop)
        headers, response = self.wait()
        self.assertEqual(json_decode(response), {u('foo'): [u('bar')]})
web_test.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_decode_argument(self):
        # These urls all decode to the same thing
        urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
                "/decode_arg/%E9?foo=%E9&encoding=latin1",
                "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
                ]
        for req_url in urls:
            response = self.fetch(req_url)
            response.rethrow()
            data = json_decode(response.body)
            self.assertEqual(data, {u('path'): [u('unicode'), u('\u00e9')],
                                    u('query'): [u('unicode'), u('\u00e9')],
                                    })

        response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
        response.rethrow()
        data = json_decode(response.body)
        self.assertEqual(data, {u('path'): [u('bytes'), u('c3a9')],
                                u('query'): [u('bytes'), u('c3a9')],
                                })
web_test.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_get_argument(self):
        response = self.fetch("/get_argument?foo=bar")
        self.assertEqual(response.body, b"bar")
        response = self.fetch("/get_argument?foo=")
        self.assertEqual(response.body, b"")
        response = self.fetch("/get_argument")
        self.assertEqual(response.body, b"default")

        # Test merging of query and body arguments.
        # In singular form, body arguments take precedence over query arguments.
        body = urllib_parse.urlencode(dict(foo="hello"))
        response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
        self.assertEqual(response.body, b"hello")
        # In plural methods they are merged.
        response = self.fetch("/get_arguments?foo=bar",
                              method="POST", body=body)
        self.assertEqual(json_decode(response.body),
                         dict(default=['bar', 'hello'],
                              query=['bar'],
                              body=['hello']))
graphql_handler.py 文件源码 项目:react-tornado-graphql-example 作者: yatsu 项目源码 文件源码 阅读 107 收藏 0 点赞 0 评论 0
def graphql_request(self):
        return json_decode(self.request.body)


问题


面经


文章

微信
公众号

扫码关注公众号